### ## DF CREATION

In [0]:
df_employees = spark.read.format("csv").option("inferSchema", True).option("Header", True).load("/Volumes/workspace/default/join/employees.csv")

df_department = spark.read.format("csv").option("inferschema", True).option("header", True).load("/Volumes/workspace/default/join/departments.csv")



#df_parquet = spark.read.format("parquet").load("/mnt/data/sample.parquet")

#df = spark.read.format("json").option("multiline", "true").load("/mnt/data/sample.json")



In [0]:
display(df_department)

### **JOINS/ remove duplicate column from DF

In [0]:
df_leftjoin = df_employees.join(df_department,df_employees.dept_id == df_department.dept_id, "left").drop(df_department.dept_id)
display(df_leftjoin)



In [0]:
df_rightjoin = df_employees.join(df_department, df_employees.dept_id == df_department.dept_id, "right")

display(df_rightjoin)

**### Broadcast join for data skewing (performance)**

In [0]:
from pyspark.sql.functions import broadcast

df_broadcast = df_employees.join(broadcast(df_department), df_employees.dept_id == df_department.dept_id, "left").drop(df_employees.dept_id)

df_broadcast.write.format("delta").mode("overwrite").saveAsTable("workspace.default.test")


df_employees.write.format("csv").mode("overwrite").save("/Volumes/workspace/default/join/transform")



In [0]:
df_broadcast.write.format("csv").mode("overwrite").partitionBy("dept_name").save("/Volumes/workspace/default/join/test")

#display(df_broadcast)

In [0]:
df_uber = spark.read.format("csv").option("header", True).option("inferschema", True).load("/Volumes/workspace/default/uber")

### ** REGEXP_REPLACE VALUE REPLACE**

In [0]:
from pyspark.sql.functions import *

dfclean_uber = df_uber.withColumn("Booking ID", regexp_replace(col("Booking ID"), '"', '')) ## remove """""" from  "booking ID" column

df_clean = dfclean_uber.withColumn("Customer Rating", regexp_replace(col("Customer Rating"), "null", "0.0")) 
## change "customer Rating" "String" null to 0.0 to convert the string datatype to double

df_cast = df_clean.withColumn("Customer Rating",(col("Customer Rating").cast("double")))
 ## changed "customer Rating" column datatype from string  to double

df_ubercolumn = df_cast.withColumn("Bookingstar", when(col("Customer Rating") >= 4.5, "Good").otherwise("bad")) 
## creating new column "Booking Star" with data Good or bad  based on "Customer Rating" column



In [0]:
#display(df_uber) #before regexp_replace
#display(dfclean_uber) #after regexp_replace
dfclean_uber.printSchema()


### **LOWERCASE DATA/ UPPERCASE DATA**

In [0]:
from pyspark.sql.functions import *

df_lower = df_uber.withColumn("Vehicle Type", lower(col("Vehicle Type")))
display(df_lower)

df_upper = df_lower.withColumn("Vehicle Type", upper(col("Vehicle Type")))
display(df_upper)


**### DROP NULL BASED ON COLUMN / select only notnull values**

In [0]:
from pyspark.sql.functions import *

df_null = df_uber.filter (col("Booking ID").isNotNull())
dfclean_uber.filter(col("Booking ID") == "CNR9742182").display()
df_null.count()

**GroupBy/Count/OrderBy** **### Find total number of rides baes on Booking Status and Vehicle Type.**

In [0]:
df_uber.groupby("Booking Status","Vehicle Type").count().orderBy("Booking Status").display()

Count unique customers?
-  select/distinct on column
- dropDuplicate on column to get unique value

In [0]:
df_uber.select("Customer ID").distinct().count()

df_uber.dropDuplicates(subset=["Customer ID"]).count()

Get top 5 pickup locations with maximum completed rides.
GroupBY/ Count/ orderBy / Filter/ show


In [0]:
df_uber.groupBy("Booking Status", "Pickup Location").count().orderBy(desc("count")).filter(col("Booking Status") == "Completed").show(5)

- Change Datatype of "Booking Value","Ride Distance" from string to double
-  Calculate average ride distance & booking value per Vehicle Type
-  group by/ avg / `display`

In [0]:
from pyspark.sql.functions import col, regexp_replace

df_master = (
  df_uber
  .withColumn(
    "Booking Value",
    regexp_replace(col("Booking Value"), "null", "0").cast("double")
  )
  .withColumn(
    "Ride Distance",
    regexp_replace(col("Ride Distance"), "null", "0").cast("double")
  )
)

df_master.groupBy("Vehicle Type").avg("Booking Value","Ride Distance" ).display()

Repartition/ Colasce

In [0]:
from pyspark.sql.functions import *

dfus = spark.read.format("csv").option("header", "True").option("inferschema", "True").load("/Volumes/workspace/default/ecom/US_Accidents_March23.csv")


In [0]:
dfus.display()

In [0]:
df_withre = spark.read.format("csv").option("header", "True").option("inferschema", "True").load("/Volumes/workspace/default/ecom/US_Accidents_March23.csv")

In [0]:
df_withpartition = dfus.repartition("State")


In [0]:
dfus.groupBy("State").count().display()

In [0]:
df_withpartition.groupBy("State").count().display()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import time

# 1. Aggregation by state and severity
agg1 = dfus.groupBy("State", "Severity") \
         .agg(F.count("*").alias("incident_count"),
              F.avg("Distance(mi)").alias("avg_distance"))

# 2. Aggregation by state only
agg2 = dfus.groupBy("State") \
         .agg(F.avg("Temperature(F)").alias("avg_temp"))

# 3. Join both (shuffle happens here)
joined = agg1.join(agg2, on="State", how="inner")

# 4. Apply window function
windowSpec = Window.partitionBy("State").orderBy(F.desc("incident_count"))
final_df = joined.withColumn("rank_in_state", F.rank().over(windowSpec))

# Measure execution time
t1 = time.time()
final_df.show(10)
t2 = time.time()
print("Without repartition:", t2 - t1, "seconds")


In [0]:
# Repartition data on State before heavy aggregations
df_repart = dfus.repartition("State")

agg1 = df_repart.groupBy("State", "Severity") \
         .agg(F.count("*").alias("incident_count"),
              F.avg("Distance(mi)").alias("avg_distance"))

agg2 = df_repart.groupBy("State") \
         .agg(F.avg("Temperature(F)").alias("avg_temp"))

joined = agg1.join(agg2, on="State", how="inner")

windowSpec = Window.partitionBy("State").orderBy(F.desc("incident_count"))
final_df = joined.withColumn("rank_in_state", F.rank().over(windowSpec))

# Measure execution time
t1 = time.time()
final_df.show(10)
t2 = time.time()
print("With repartition:", t2 - t1, "seconds")


In [0]:
df.filter(df["column_name"].isNull()).show()

df.filter(df["column_name"].isNotNull()).show()

#drop.null

df.na.drop()
df.na.drop(subset=["col1", "col2"])


df.na.fill("unknown")  # if columns are strings
df.na.fill(0)          # if columns are numeric

df.na.fill({"age": 0, "name": "N/A"})

#replace with condition

from pyspark.sql.functions import when, col

df_na = df.withColumn("age_cleaned", when(col("age").isNull(), 0).otherwise(col("age")))


col1    col2      col3
car   electric   sedan
car    electric   null
car    petrol     suv

from pyspark.sql.functions import when, col

# Assuming your DataFrame is called df
df_updated = df.withColumn(
    "col3",
    when(
        (col("col1") == "car") & (col("col2") == "electric") & col("col3").isNull(),"sedan").otherwise(col("col3")))




 



In [0]:
#INCREMENTAL LOGIC
#If Using Delta Lake + MERGE (for UPSERT)
#To handle both new and updated rows, use MERGE INTO (available in Delta Lake):

from delta.tables import DeltaTable

delta_target = DeltaTable.forName(spark, "target_table")

delta_target.alias("target").merge(
    source_df.alias("source"),
    "target.id = source.id"  # join condition
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

      