Basic To advanced PySpark query

# 🔹 Cluster vs. Distributed System

| Feature               | **Cluster**                                      | **Distributed System**                             |
|----------------------|--------------------------------|---------------------------------|
| **Definition**       | A group of interconnected computers (nodes) that work together as a single unit. | A collection of independent computers that work together but appear as a single system to users. |
| **Architecture**     | Usually tightly coupled with shared resources (e.g., shared storage). | Loosely coupled; nodes communicate over a network. |
| **Data Sharing**     | Nodes often have access to shared storage. | Nodes manage data separately and communicate over a network. |
| **Fault Tolerance**  | High availability; failure of one node doesn't affect others significantly. | More fault-tolerant; designed for redundancy and recovery. |
| **Example Technologies** | Databricks, Hadoop Cluster, Kubernetes Cluster | Apache Spark, AWS, Google Cloud, Distributed Databases |

## ✅ Key Differences
1. **A cluster is a type of distributed system**, but not all distributed systems are clusters.
2. **Clusters are typically within a single data center**, while distributed systems **can span across multiple locations**.
3. **Clusters often have a shared resource model**, while distributed systems focus on **scalability and fault tolerance**.


# DATA READING  

In [0]:
# Read Data from CSV:
df_csv = spark.read.csv('dbfs:/FileStore/tables/employees.csv',inferSchema=True, header=True)
#display(df_csv.limit(5))        # Alter method
#df_csv.display()                # Alter method
df_csv.show(5)  # Returns a list of Row objects



# inferSchema means auto identify the data type in CSV / Json file.

# same as json 
#df_json = spark.read.json('/FileStore/tables/employees.json',inferSchema=True, header=True)

+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|employee_id|first_name|last_name|   email|phone_number| hire_date| job_id|salary|commission_pct|manager_id|department_id|
+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|515.123.4567|2087-06-17|AD_PRES| 25000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2089-09-21|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2093-01-13|  AD_VP| 17000|          null|       100|           90|
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|2090-01-03|IT_PROG|  9000|          null|       102|           60|
|        104|     Bruce|    Ernst|  BERNST|590.423.4568|2091-05-21|IT_PROG|  6000|          null|       103|           60|
+-----------+---

## Data Reading Utils

In [0]:
# Csv/Json uploaded locatations in Data bricks
dbutils.fs.ls('dbfs:/FileStore/tables/')

Out[3]: [FileInfo(path='dbfs:/FileStore/tables/departments-1.csv', name='departments-1.csv', size=687, modificationTime=1740000633000),
 FileInfo(path='dbfs:/FileStore/tables/departments.csv', name='departments.csv', size=687, modificationTime=1738674464000),
 FileInfo(path='dbfs:/FileStore/tables/emp.json', name='emp.json', size=752, modificationTime=1739564468000),
 FileInfo(path='dbfs:/FileStore/tables/employees.csv', name='employees.csv', size=7920, modificationTime=1738674451000)]

**Delete a File in locatation**

In [0]:
#Delete a Single File
dbutils.fs.rm("dbfs:/FileStore/tables/employees-1.csv")


#Delete All Files in the Folder
#dbutils.fs.rm("dbfs:/FileStore/tables/", recurse=True)   #The recurse=True flag ensures all files in the folder are deleted.


**Schema** **Definition**

In [0]:
# print the description of table / Scheme
df_csv.printSchema()

# Select query 




In [0]:
df_csv.display()   # Shows all 

df_csv.show(5)      # Alter method

df_csv.select('first_name','salary','department_id').show(5)    # Selecting few columns we can't do column name change in this method

df_csv.select(col('first_name').alias('Name'),col('salary'),col('department_id')).show(5)    # Selecting few columns with alias

df_csv = spark.read.csv('/FileStore/tables/employees.csv',inferSchema=True, header=True)

df_csv.describe().display()     # data frame describtion like count, mean, median, max min sd

## Select with filter condition
The filter transformation in PySpark allows you to select rows from a DataFrame based on a condition. 

### 🚀 Summary of `filter()` Use Cases  

| Use Case               | Example  |
|------------------------|----------|
| **Logical Operators:(and)** | `& (AND): (condition1) & (condition2)`|
| **Equality:**   | `col("column_name") == value or col("column_name") === value (The === is preferred for clarity)` |
| **Inequality:** | `col("column_name") != value or col("column_name") <> value` |
| **Greater than:** | `col("column_name") > value` |
| **Less than:** | `col("column_name") < value` |
| **Greater than or equal to:** | `col("column_name") >= value` |
| **Less than or equal to:** | `col("column_name") <= value` |
| **Handle NULL values** | `col("column_name").isNull()` |
| **Extract date parts** | `col("column_name").isNotNull()` |



In [0]:
df_csv.filter(col('department_id')==60).show(10)

df_csv.filter(col('department_id').isin(60,80)).show(5)

df_csv.filter(~col('department_id').isin(60,80)).show()    # selecting Notin 

df_csv.filter((col('department_id').isNotNull()) & (col('commission_pct').isNull())).show(5)



df_csv.filter("department_id IS NOT NULL AND commission_pct IS NULL").show(5)  # Alter SQL Method

# withColumn
The withColumn() function in PySpark allows you to add, update, or modify columns in a DataFrame.

### 🚀 Summary of `withColumn()` Use Cases  

| Use Case               | Example  |
|------------------------|----------|
| **Add a new column**   | `withColumn("bonus", lit(500))` |
| **Modify existing column** | `withColumn("salary", col("salary") * 1.1)` |
| **Rename column** | `withColumn("emp_name", col("first_name")).drop("first_name")` |
| **Change data type** | `withColumn("department_id", col("department_id").cast("string"))` |
| **String transformations** | `withColumn("first_name_upper", upper(col("first_name")))` |
| **Replace values** | `withColumn("job_id", regexp_replace(col("job_id"), "FI_ACCOUNT", "ACNT"))` |
| **Handle NULL values** | `withColumn("commission_pct", coalesce(col("commission_pct"), lit(0)))` |
| **Extract date parts** | `withColumn("hire_year", year(col("hire_date")))` |
| **Conditional logic** | `withColumn("salary_category", when(col("salary") > 60000, "High").otherwise("Low"))` |
| **Mathematical operations** | `withColumn("yearly_salary", col("salary") * 12)` |


In [0]:
from pyspark.sql.functions import col, lit, coalesce, round

# When you use withColumn the hole column and additional column will contain
df_csv.withColumn("bonus", lit(500)).show(5)  # Adds a new column with a fixed value

df_csv.withColumn(
    "Full_sal", round((col("salary")) * (1 + coalesce(col("commission_pct"), lit(0))), 2) # Round to 2 decimal places
).display()


df_new = df_csv.withColumn("department_id", col("department_id").cast("string"))



# When you use select only selected column will contain
df_csv.select(col('first_name'), round((col("salary")) * (1 + coalesce(col("commission_pct"), lit(0)).alias("full_salary"))


Changing the column type DDL :-

In [0]:
from pyspark.sql.functions import col

df_csv = df_csv.withColumn("employee_id", col("employee_id").cast("string"))

from pyspark.sql.functions import to_date;
df.withColumn('date', to_date(df['date_string']))


## StructType() Schema

**importing** **libraries**

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *

# Sorting ascending / descending

In [0]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *

#   Method 1
df_csv.sort(col('department_id').desc()).show(5)

df_csv.orderBy(col("salary").desc()).show(5)  # Alternative

df_csv.sort(col('department_id').asc()).show(5) # Ascending order


#   Method 2 with 2 or more column 
df_csv.sort(['department_id','salary'],ascending = [1,0]).show(10)     # passing 0 is false 1 is True for assending order

df_csv.sort(['first_name'],ascending = [1]).show(5)

Type Casting:-

In [0]:
#df_new = df_csv.withColumn('department_id',col('department_id').cast(StringType()))
#df_new.printSchema()

### Drop

In [0]:

# Drop / remove column:-
df_csv.drop('phone_number','email').show(10)

## Drop_Duplicates

In [0]:
# remove duplicates with all columns 
df_csv.distinct().display()


# remove duplicates with all column and specfic column also 
df_csv.dropDuplicates().display()

df_dis = df_csv.dropDuplicates(subset=['employee_id'])      # when you go with column duplicate use this method
df_dis.display()

### Count number of rows 

In [0]:
# Method 1: Using count() action
row_count = df_csv.count()
print(f"Number of rows: {row_count}")

# Method 2: Using describe() to get summary statistics (includes count)
summary = df_csv.describe()
row_count_from_summary = int(summary.filter("summary == 'count'").collect()[0][1]) #extracting the count value
print(f"Number of rows (from describe): {row_count_from_summary}")

# Method 3: Using length (if you've already collected the data - less efficient for large datasets)
# Avoid this for large DataFrames as it brings all data to the driver
# df_collected = df_csv.collect()  # Only use if DataFrame is small
# row_count_collected = len(df_collected)
# print(f"Number of rows (collected): {row_count_collected}")

# Method 4: Using rdd.count()
row_count_rdd = df_csv.rdd.count()
print(f"Number of rows (rdd): {row_count_rdd}")

## Union & UnionByName

### 🔍 Key Differences in PySpark Union Functions

| Method | Handles Column Order? | Handles Extra Columns? | Removes Duplicates? |
|--------|------------------|------------------|------------------|
| `union()` | ❌ No | ❌ No | ❌ No |
| `unionAll()` | ❌ No | ❌ No | ❌ No |
| `union().distinct()` | ❌ No | ❌ No | ✅ Yes |
| `unionByName()` | ✅ Yes | ❌ No | ❌ No |
| `unionByName(allowMissingColumns=True)` | ✅ Yes | ✅ Yes | ❌ No |

---
### **💡 Best Practices**
- Use **`unionByName()`** when working with DataFrames that have **different column orders**.
- Use **`allowMissingColumns=True`** when columns are missing between DataFrames.
- Use **`.distinct()`** to **remove duplicates** after a union.

In [0]:
# Data Preparatation : 
data1 = [('kad','1'),
           ('sid','2')]
schema1 = 'name STRING, id STRING' 

df1 = spark.createDataFrame(data1,schema1)        # creating data frame 1


data2 = [('3','rahul'),
        ('4','jas')]
schema2 = 'id STRING, name STRING' 

df2 = spark.createDataFrame(data2,schema2)      # creating data frame 2

#df1.display()   
df1.display()
df2.display()

df1.union(df2).show()

df1.unionByName(df2).show()         # See the difference in result 



# 📌 String Functions

PySpark provides various string functions to manipulate and process string data. Below is a categorized list of commonly used **string functions**:

---

## 🔹 1. Basic String Operations
| Function | Description | Example |
|----------|------------|---------|
| `upper()` | Converts to uppercase | `df.select(upper(col("name"))).show()` |
| `lower()` | Converts to lowercase | `df.select(lower(col("name"))).show()` |
| `length()` | Returns string length | `df.select(length(col("name"))).show()` |
| `trim()` | Removes spaces from both sides | `df.select(trim(col("name"))).show()` |
| `ltrim()` | Removes spaces from the left | `df.select(ltrim(col("name"))).show()` |
| `rtrim()` | Removes spaces from the right | `df.select(rtrim(col("name"))).show()` |

---

## 🔹 2. Substring & Character Extraction
| Function | Description | Example |
|----------|------------|---------|
| `substring(col, start, length)` | Extracts substring from a column | `df.select(substring(col("name"), 1, 3)).show()` |
| `split(col, pattern)` | Splits string into an array | `df.select(split(col("name"), " ")).show()` |
| `instr(col, substr)` | Finds position of substring | `df.select(instr(col("name"), "a")).show()` |

---

## 🔹 3. String Replacement & Formatting
| Function | Description | Example |
|----------|------------|---------|
| `regexp_replace(col, pattern, replacement)` | Replaces substrings using regex | `df.select(regexp_replace(col("name"), "John", "Mike")).show()` |
| `translate(col, fromStr, toStr)` | Replaces characters | `df.select(translate(col("name"), "abc", "xyz")).show()` |
| `format_string(format, col1, col2, ...)` | Formats string | `df.select(format_string("%s - %s", col("name"), col("job"))).show()` |

---

## 🔹 4. String Case & Padding
| Function | Description | Example |
|----------|------------|---------|
| `initcap(col)` | Capitalizes first letter of each word | `df.select(initcap(col("name"))).show()` |
| `lpad(col, length, pad)` | Left-pads string with characters | `df.select(lpad(col("name"), 10, "0")).show()` |
| `rpad(col, length, pad)` | Right-pads string with characters | `df.select(rpad(col("name"), 10, "0")).show()` |

---

## 🔹 5. String Comparison & Matching
| Function | Description | Example |
|----------|------------|---------|
| `like(col, pattern)` | SQL-like pattern matching (`%`, `_`) | `df.filter(col("name").like("A%")).show()` |
| `rlike(col, regex)` | Regex pattern matching | `df.filter(col("name").rlike("^A.*")).show()` |
| `contains(col, substr)` | Checks if string contains substring | `df.filter(col("name").contains("John")).show()` |
| `startsWith(col, prefix)` | Checks if string starts with substring | `df.filter(col("name").startswith("A")).show()` |
| `endsWith(col, suffix)` | Checks if string ends with substring | `df.filter(col("name").endswith("z")).show()` |


##  🔹 6. Concat two or more columns

| Method | Use Case | Example |
|--------|---------|---------|
| `concat(col1, col2)` | Joins multiple string columns | `concat(col("first_name"), col("last_name"))` |
| `concat_ws(" ", col1, col2)` | Joins columns with a separator | `concat_ws("-", col("city"), col("state"))` |
| `concat(col1, lit(" "), col2)` | Adds static text between columns | `concat(col("first_name"), lit(" - "), col("last_name"))` |
| `concat(col1, col2.cast("string"))` | Converts non-string columns before joining | `concat(col("first_name"), col("salary").cast("string"))` |

---


In [0]:
from pyspark.sql.functions import col, upper, lower, length, trim, ltrim, rtrim, substring, split, instr, regexp_replace, translate, format_string, initcap, lpad, rpad

from pyspark.sql.functions import *     # you can use this 

df_csv.select(col('first_name'), upper(col("first_name")), 
              lower(col("first_name")).alias('Name'), length(col("first_name")),
              concat(col('first_name'), col('last_name')).alias("full_name") ,
              concat(col('first_name'), lit(' '), col('last_name')).alias("full_name") 
            ).show(5)


# 📆 Date Functions  

PySpark provides various **date and time functions** for handling and manipulating date-related data. Below is a categorized list of commonly used **date functions** in PySpark.

---

## 🔹 **1. Extracting Date Components**
| Function | Description | Example |
|----------|------------|---------|
| `year(col("date"))` | Extracts the year from a date | `df.select(year(col("hire_date"))).show()` |
| `month(col("date"))` | Extracts the month from a date | `df.select(month(col("hire_date"))).show()` |
| `day(col("date"))` | Extracts the day of the month | `df.select(dayofmonth(col("hire_date"))).show()` |
| `dayofweek(col("date"))` | Extracts the day of the week (1 = Sunday, 7 = Saturday) | `df.select(dayofweek(col("hire_date"))).show()` |
| `dayofyear(col("date"))` | Extracts the day of the year | `df.select(dayofyear(col("hire_date"))).show()` |
| `quarter(col("date"))` | Extracts the quarter of the year | `df.select(quarter(col("hire_date"))).show()` |

---

## 🔹 **2. Date Arithmetic & Manipulation**
| Function | Description | Example |
|----------|------------|---------|
| `date_add(col("date"), n)` | Adds `n` days to a date | `df.select(date_add(col("hire_date"), 10)).show()` |
| `date_sub(col("date"), n)` | Subtracts `n` days from a date | `df.select(date_sub(col("hire_date"), 10)).show()` |
| `add_months(col("date"), n)` | Adds `n` months to a date | `df.select(add_months(col("hire_date"), 3)).show()` |
| `months_between(col("date1"), col("date2"))` | Returns months between two dates | `df.select(months_between(col("hire_date"), col("current_date"))).show()` |
| `next_day(col("date"), "Day")` | Returns the next occurrence of the specified day | `df.select(next_day(col("hire_date"), "Friday")).show()` |

---

## 🔹 **3. Converting & Formatting Dates**
| Function | Description | Example |
|----------|------------|---------|
| `to_date(col("string_col"), "format")` | Converts string to date | `df.select(to_date(col("date_str"), "yyyy-MM-dd")).show()` |
| `date_format(col("date"), "format")` | Formats a date column | `df.select(date_format(col("hire_date"), "yyyy/MM/dd")).show()` |
| `unix_timestamp(col("date"))` | Converts date to Unix timestamp | `df.select(unix_timestamp(col("hire_date"))).show()` |
| `from_unixtime(col("unix_col"))` | Converts Unix timestamp to date | `df.select(from_unixtime(col("unix_col"))).show()` |

---

## 🔹 **4. Time Functions**
| Function | Description | Example |
|----------|------------|---------|
| `current_date()` | Returns the current system date | `df.select(current_date()).show()` |
| `current_timestamp()` | Returns the current timestamp | `df.select(current_timestamp()).show()` |
| `datediff(col("date1"), col("date2"))` | Returns days between two dates | `df.select(datediff(col("hire_date"), current_date())).show()` |
| `timestamp_seconds(col("unix_col"))` | Converts Unix timestamp (seconds) to timestamp | `df.select(timestamp_seconds(col("unix_col"))).show()` |

---



In [0]:
#from pyspark.sql.functions import col, year, month, dayofmonth, date_add, to_date, current_date, datediff
from pyspark.sql.functions import * 

df_csv.select(
                col('hire_date'),
                year(col('hire_date')).alias('year'),
                month(col('hire_date')).alias('Mon'),
                dayofmonth(col('hire_date')).alias('day'),
                dayofweek('hire_date').alias('week_of_mon'),
                add_months(col('hire_date'),3).alias('add_mon'),
                add_months(col('hire_date'),-3).alias('sub_mon'),
                date_add(col('hire_date'),5).alias('add_date'),
                date_sub(col('hire_date'),5).alias('add_date'),
                date_trunc('Month',col('hire_date')).alias('first_day_of_mon'),
                datediff(current_date(),col('hire_date')).alias('date_diff'),
                current_date()
            ).show(5)


# here showing difference between withColumn and select 
df_csv.withColumn('current_Date', current_date()).show(5)



# 🚀 Handling NULLs 

Handling **NULL** values is crucial when working with large datasets in PySpark. Below is a summary of common techniques to manage **NULL** values effectively.  

---

## 🔹 **1. Filtering NULL Values**
| Use Case | Example |
|----------|---------|
| Filter rows where column is NOT NULL | `df.filter(col("salary").isNotNull()).show()` |
| Filter rows where column IS NULL | `df.filter(col("salary").isNull()).show()` |
| Drop rows containing NULLs in any column | `df.na.drop().show()` |
| Drop rows only if specific column(s) are NULL | `df.na.drop(subset=["salary"]).show()` |

---

## 🔹 **2. Replacing NULL Values**
| Use Case | Example |
|----------|---------|
| Replace NULLs with a default value | `df.na.fill(0).show()` |
| Replace NULLs in specific column | `df.na.fill({"salary": 50000, "department": "Unknown"}).show()` |

---

## 🔹 **3. Using `coalesce()` for Default Values**
| Use Case | Example |
|----------|---------|
| Replace NULLs with another column value | `df.withColumn("final_salary", coalesce(col("salary"), lit(50000))).show()` |

---

## 🔹 **4. Handling NULLs in Aggregations**
| Use Case | Example |
|----------|---------|
| Ignore NULLs in aggregations (default behavior) | `df.select(avg("salary")).show()` |
| Count only non-null values | `df.select(count("salary")).show()` |
| Count NULL values explicitly | `df.select(count(when(col("salary").isNull(), 1))).show()` |


In [0]:
#df_csv.dropna(subset=['commission_pct']).display()      # show only not null rows in commission column.

#df_csv.fillna(0).show()             # if it is number you have you fill with numeric

df_csv.fillna(0,subset=['commission_pct']).show(5)      # Filling only in commission column

## Split, Indexing and Explode

In [0]:
from pyspark.sql.functions import *
df_csv.withColumn('job_id',split('job_id','_')).show()      # Spliting

df_csv.withColumn('job_id',split('job_id','_')[1]).show()   # showing only index 1 means 2nd value 


df_csv.withColumn('job_id_array', split(col('job_id'), '_')) \              # Explode into another row
      .withColumn('job_id_exploded', explode(col('job_id_array'))) \
      .display()

# 📌 GroupBY ()
#  Aggregation Functions

| Function | Example | Description |
|----------|---------|-------------|
| `count()` | `df.groupBy("dept").count().show()` | Counts rows for each group |
| `sum()` | `df.groupBy("dept").sum("salary").show()` | Sum of a numeric column |
| `avg()` / `mean()` | `df.groupBy("dept").avg("salary").show()` | Average value |
| `min()` | `df.groupBy("dept").min("salary").show()` | Minimum value |
| `max()` | `df.groupBy("dept").max("salary").show()` | Maximum value |
| `agg()` | `df.groupBy("dept").agg(sum("salary").alias("total_salary"))` | Multiple aggregations in one query |
| `collect_list()` | `df.collect_list("first_name").show()` | collect all names for each group |
| `pivot()` | `df.groupBy("year").pivot("dept").sum("salary")` | Pivot table for better visualization |



In [0]:


df_grp = df_csv.groupBy('department_id')\
                    .agg(
                        count('employee_id').alias('count_emp'),        # count of the emp in each department
                        sum('salary').alias('sum_salary'),              # sum of the emp in each department
                        round(avg('salary')).alias('avg_salary'),
                        min('salary').alias('min_salary'),
                        max('salary').alias('max_salary'),
                        concat_ws(", ", collect_list('first_name')).alias('collect_list_Names')  # Convert list to string
    )

df_grp.sort(col('department_id').asc()).show()

df_grp.printSchema()


+-------------+---------+----------+----------+----------+----------+--------------------+
|department_id|count_emp|sum_salary|avg_salary|min_salary|max_salary|  collect_list_Names|
+-------------+---------+----------+----------+----------+----------+--------------------+
|         null|        1|      7000|    7000.0|      7000|      7000|           Kimberely|
|           10|        1|      4400|    4400.0|      4400|      4400|            Jennifer|
|           20|        2|     19000|    9500.0|      6000|     13000|        Michael, Pat|
|           30|        6|     24900|    4150.0|      2500|     11000|Den, Alexander, S...|
|           40|        1|      6500|    6500.0|      6500|      6500|               Susan|
|           50|       45|    156400|    3476.0|      2100|      8200|Matthew, Adam, Pa...|
|           60|        5|     28800|    5760.0|      4200|      9000|Alexander, Bruce,...|
|           70|        1|     10000|   10000.0|     10000|     10000|             Hermann|

### Pivot()

In [0]:
df_csv.groupBy('job_id').pivot('department_id').sum('salary').show(5)       # Use pivot() when you need a pivot table format.


## When-Otherwise


In [0]:


from pyspark.sql.functions import *


df_csv.withColumn('salary_range',when(col('salary')>20000,'PRESENENT')\                             # This is like a CASE Statement in SQL.
                                .when((col('salary')>=15000) & (col('salary')<=20000) ,'VP')
                                .when((col('salary')>=10000) & (col('salary')<=15000) ,'Manager')
                                .otherwise('employees')).show(5)

+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+------------+
|employee_id|first_name|last_name|   email|phone_number| hire_date| job_id|salary|commission_pct|manager_id|department_id|salary_range|
+-----------+----------+---------+--------+------------+----------+-------+------+--------------+----------+-------------+------------+
|        100|    Steven|     King|   SKING|515.123.4567|2087-06-17|AD_PRES| 25000|          null|      null|           90|   PRESENENT|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2089-09-21|  AD_VP| 17000|          null|       100|           90|          VP|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2093-01-13|  AD_VP| 17000|          null|       100|           90|          VP|
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|2090-01-03|IT_PROG|  9000|          null|       102|           60|   employees|
|        104|     Bruce|    Ernst|  BERNST|590.4

# Joins

In [0]:
# Reading emp and department table in data frame
#from pyspark.sql.functions import *
df_emp = spark.read.csv('dbfs:/FileStore/tables/employees.csv',inferSchema=True,header=True)
df_emp.show(5)
df_dept = spark.read.csv('dbfs:/FileStore/tables/departments.csv',inferSchema=True,header=True)
df_dept.show(5)


df_in_join = df_emp.join(df_dept, df_emp['department_id']==df_dept['department_id'],'inner').show()

df_in_join = df_emp.join(df_dept, df_emp['department_id']==df_dept['department_id'],'left').show()

df_in_join = df_emp.join(df_dept, df_emp['department_id']==df_dept['department_id'],'right').show()

df_in_join = df_emp.join(df_dept, df_emp['department_id']==df_dept['department_id'],'full').display()

df_in_join = df_emp.join(df_dept, df_emp['department_id']==df_dept['department_id'],'anti').show()

# Window Functions
## 🔹Common Window Functions in PySpark

| **Function** | **Description** | **Example Usage** |
|-------------|---------------|------------------|
| `row_number()` | Assigns a unique row number within a partition | `df.withColumn("row_num", row_number().over(windowSpec))` |
| `rank()` | Assigns a rank, but skips ranks for duplicate values | `df.withColumn("rank", rank().over(windowSpec))` |
| `dense_rank()` | Assigns a rank without skipping values | `df.withColumn("dense_rank", dense_rank().over(windowSpec))` |
| `lag(col, n)` | Gets the previous row’s value in a partition | `df.withColumn("prev_salary", lag("salary", 1).over(windowSpec))` |
| `lead(col, n)` | Gets the next row’s value in a partition | `df.withColumn("next_salary", lead("salary", 1).over(windowSpec))` |
| `sum(col)` | Running total (cumulative sum) | `df.withColumn("cumulative_salary", sum("salary").over(windowSpec))` |
| `avg(col)` | Running average | `df.withColumn("avg_salary", avg("salary").over(windowSpec))` |
| `max(col)` | Running maximum | `df.withColumn("max_salary", max("salary").over(windowSpec))` |
| `min(col)` | Running minimum | `df.withColumn("min_salary", min("salary").over(windowSpec))` |


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *

windowSpec = Window.partitionBy('department_id').orderBy(col("salary").desc())  # you can remove partitionBy 

df_csv = df_csv.drop('phone_number','email','last_name')

df_csv = df_csv.withColumn('row_num',row_number().over(windowSpec))\
               .withColumn('rank',rank().over(windowSpec))\
               .withColumn('densc_rnk',dense_rank().over(windowSpec))
df_csv.show(10)


+-----------+----------+----------+--------+------+--------------+----------+-------------+-------+----+---------+
|employee_id|first_name| hire_date|  job_id|salary|commission_pct|manager_id|department_id|row_num|rank|densc_rnk|
+-----------+----------+----------+--------+------+--------------+----------+-------------+-------+----+---------+
|        178| Kimberely|2099-05-24|  SA_REP|  7000|          0.15|       149|         null|      1|   1|        1|
|        200|  Jennifer|2087-09-17| AD_ASST|  4400|          null|       101|           10|      1|   1|        1|
|        201|   Michael|2096-02-17|  MK_MAN| 13000|          null|       100|           20|      1|   1|        1|
|        202|       Pat|2097-08-17|  MK_REP|  6000|          null|       201|           20|      2|   2|        2|
|        114|       Den|2094-12-07|  PU_MAN| 11000|          null|       100|           30|      1|   1|        1|
|        115| Alexander|2095-05-18|PU_CLERK|  3100|          null|       114|   

## Cumulative Sum:-

In [0]:
# commululative Sum:-
#from pyspark.sql.functions import *
from pyspark.sql.window import *

# Define Window for Cumulative Sum (Partition by Department, Ordered by Salary)
windowSpecCumulative = Window.partitionBy("department_id").orderBy("salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_csv = df_csv.withColumn("cumulative_salary", sum("salary").over(windowSpecCumulative))
df_csv.show(10)


+-----------+----------+----------+--------+------------------+----------+--------+------+--------------+----------+-------------+-----------------+
|employee_id|first_name| last_name|   email|      phone_number| hire_date|  job_id|salary|commission_pct|manager_id|department_id|cumulative_salary|
+-----------+----------+----------+--------+------------------+----------+--------+------+--------------+----------+-------------+-----------------+
|        178| Kimberely|     Grant|  KGRANT|011.44.1644.429263|2099-05-24|  SA_REP|  7000|          0.15|       149|         null|             7000|
|        200|  Jennifer|    Whalen| JWHALEN|      515.123.4444|2087-09-17| AD_ASST|  4400|          null|       101|           10|             4400|
|        202|       Pat|       Fay|    PFAY|      603.123.6666|2097-08-17|  MK_REP|  6000|          null|       201|           20|             6000|
|        201|   Michael| Hartstein|MHARTSTE|      515.123.5555|2096-02-17|  MK_MAN| 13000|          null| 

# User Defind Function:-

In [0]:
from pyspark.sql.functions import udf, col, coalesce, lit
from pyspark.sql.types import DoubleType

# Define a Python function
def calculate_full_salary(salary, commission):
    if salary is None:
        salary = 0
    if commission is None:
        commission = 0
    return salary + commission

# Register as a UDF
calculate_full_salary_udf = udf(calculate_full_salary, DoubleType())

# Use in DataFrame
df_full_sal = df_csv.withColumn(
    "full_salary", 
    calculate_full_salary_udf(col("salary"), col("commission_pct"))
)

# Show the result
df_full_sal.show(10)


+-----------+----------+----------+--------+------+--------------+----------+-------------+-------+----+---------+-----------------+-----------+
|employee_id|first_name| hire_date|  job_id|salary|commission_pct|manager_id|department_id|row_num|rank|densc_rnk|cumulative_salary|full_salary|
+-----------+----------+----------+--------+------+--------------+----------+-------------+-------+----+---------+-----------------+-----------+
|        178| Kimberely|2099-05-24|  SA_REP|  7000|          0.15|       149|         null|      1|   1|        1|             7000|    7000.15|
|        200|  Jennifer|2087-09-17| AD_ASST|  4400|          null|       101|           10|      1|   1|        1|             4400|       null|
|        202|       Pat|2097-08-17|  MK_REP|  6000|          null|       201|           20|      2|   2|        2|             6000|       null|
|        201|   Michael|2096-02-17|  MK_MAN| 13000|          null|       100|           20|      1|   1|        1|            1900

# 📝 DATA WRITING in PySpark

PySpark provides multiple methods to write data to different formats and destinations.

🚀 Best Practices.

✅ Use Parquet/Delta for efficient storage.

✅ Use Partitioning for large datasets.

✅ Use Append Mode for incremental data updates.

✅ Use Overwrite Mode carefully to avoid accidental data loss.


Why Parquet is Efficient for Storage?

1️⃣ Columnar Storage Format
Unlike CSV (row-based format), Parquet stores data column-wise.
This improves compression and query performance by scanning only required columns.

📌 Example Query:

sql
Copy
Edit
SELECT salary FROM employees WHERE department_id = 10;
In CSV, all columns are read before filtering.
In Parquet, only salary & department_id columns are read, making it faster.

In [0]:
## 1. Writing DataFrame to CSV
df_grp.write.format('csv').option('header',True).save('/FileStore/tables/Dept_Grp_csv')

## 2. Writing DataFrame to JSON
df.write.format("json").save("/FileStore/tables/output_json")


## 3. Writing DataFrame to Parquet (Optimized Storage Format)
df.write.format("parquet").save("/FileStore/tables/output_parquet")


## 4. Writing DataFrame to Delta Format (For Databricks)
df.write.format("delta").save("/mnt/delta/output_delta")

## 5. Writing DataFrame to a Database (JDBC)
df.write.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/srk") \
    .option("dbtable", "employees") \
    .option("user", "root") \
    .option("password", "admin") \
    .save()

In [0]:
## 6. Overwriting & Appending Data
df.write.mode("overwrite").csv("/FileStore/tables/output_csv")

## Append to Existing Data
df.write.mode("append").csv("/FileStore/tables/output_csv")


## 7. Writing Data in Partitioned Format
df.write.partitionBy("department_id").parquet("/FileStore/tables/output_partitioned")

## 8. Writing Data in Bucketing Format
df.write.bucketBy(4, "employee_id").saveAsTable("bucketed_employees")

## Error
df.write.format('csv').mode('error').option('path','/FileStore/tables/CSV/data.csv').save()

## Ignore
df.write.format('csv').mode('ignore').option('path','/FileStore/tables/CSV/data.csv').save()

# Creating Table : -
df.write.format('parquet').mode('overwrite').saveAsTable('my_table')


# SPARK SQL

In [0]:
df_csv.createTempView('emp')

%sql

select * from emp where department_id = 60


df_sql = spark.sql("select * from emp where department_id = 60")