In [0]:
from pyspark.sql.functions import col, when, avg, count, sum as _sum, max, min, stddev, substring

import time

# Check Spark version and cluster info
print(f"Spark version: {spark.version}")
print(f"Cluster configured successfully!")


Spark version: 4.0.0
Cluster configured successfully!


In [0]:
# Create large dataset by duplicating
print("Creating large dataset (10x duplication)...")

df_base = spark.read.csv("/databricks-datasets/flights/departuredelays.csv", 
                          header=True, inferSchema=True)

df_large = df_base
for i in range(9):
    df_large = df_large.union(df_base)

print("Created ~1 GB dataset with 14M rows")

Creating large dataset (10x duplication)...
Created ~1 GB dataset with 14M rows


In [0]:
# applies 3 filters to make it so we only have rows that originated from SFO, 
# had a delay of more than 60 minutes, and flew more than 500 miles
filtered_df = (
    df_base
    .filter(col("origin") == "SFO")
    .filter(col("delay") > 60)
    .filter(col("distance") > 500)
)

print(f"Filtered rows: {filtered_df.count():,}")

# using .withColumn to add a column named "delay_category"
transformed_df = (
    filtered_df
    .withColumn(
        "delay_category",
        when(col("delay") >= 180, "Severe Delay")
        .when((col("delay") >= 60) & (col("delay") < 180), "Moderate Delay")
        .otherwise("Minor or No Delay")
    )
)

# Repartition before heavy aggregations
transformed_df = transformed_df.repartition(8, "delay_category")


Filtered rows: 1,600


In [0]:
# my complex aggregation (since I am only using 1 dataset) that groups by destination and computes the following metrics:
# - total_flights
# - flights_with_delay
# - avg_delay
# - delay_stddev
# - max_delay
# - min_delay
complex_agg = (
    transformed_df
    .groupBy("destination")
    .agg(
        count("*").alias("total_flights"),
        _sum(when(col("delay") > 0, 1).otherwise(0)).alias("flights_with_delay"),
        avg("delay").alias("avg_delay"),
        stddev("delay").alias("delay_stddev"),
        max("delay").alias("max_delay"),
        min("delay").alias("min_delay")
    )
    .orderBy(col("avg_delay").desc())
)

display(complex_agg.limit(10))

destination,total_flights,flights_with_delay,avg_delay,delay_stddev,max_delay,min_delay
MIA,18,18,179.66666666666666,158.47100310002904,740,63
DTW,6,6,170.83333333333334,89.94980081504721,291,71
HNL,46,46,141.65217391304347,77.93507765122466,329,63
JFK,148,148,139.25675675675674,94.71770600889091,636,61
LIH,8,8,138.375,73.51372563775324,282,69
STL,6,6,136.0,106.81947388,307,62
ORD,138,138,134.36231884057972,143.72611535138554,1638,61
MSP,28,28,132.71428571428572,85.45427936875873,446,65
ATL,25,25,129.8,74.63075773432828,376,62
SUN,11,11,127.36363636363636,56.25526238010579,226,61


In [0]:
# group by delay_category and compute the following metrics:
# - avg_delay
# - num_flights
month_delay_summary = (
    transformed_df
    .groupBy("delay_category")
    .agg(
        avg("delay").alias("avg_delay"),
        count("*").alias("num_flights")
    )
    .orderBy(col("avg_delay").desc())
)

display(month_delay_summary)


delay_category,avg_delay,num_flights
Severe Delay,259.484693877551,196
Moderate Delay,100.18162393162392,1404


In [0]:
# SQL Queries

# Register as a temporary view for Spark SQL
transformed_df.createOrReplaceTempView("flight_delays")

top_delays = spark.sql("""
    SELECT destination, 
           AVG(delay) AS avg_delay, 
           COUNT(*) AS num_flights
    FROM flight_delays
    WHERE delay > 60 AND origin = 'SFO'
    GROUP BY destination
    ORDER BY avg_delay DESC
    LIMIT 10
""")

display(top_delays)

delay_category_summary = spark.sql("""
    SELECT delay_category, 
           COUNT(*) AS num_flights,
           AVG(delay) AS avg_delay
    FROM flight_delays
    GROUP BY delay_category
    ORDER BY avg_delay DESC
""")

display(delay_category_summary)

destination,avg_delay,num_flights
MIA,179.66666666666666,18
DTW,170.83333333333334,6
HNL,141.65217391304347,46
JFK,139.25675675675674,148
LIH,138.375,8
STL,136.0,6
ORD,134.36231884057972,138
MSP,132.71428571428572,28
ATL,129.8,25
SUN,127.36363636363636,11


delay_category,num_flights,avg_delay
Severe Delay,196,259.484693877551
Moderate Delay,1404,100.18162393162392


In [0]:
# Writing query results to a table

# top_delays DataFrame → managed table
top_delays.write.mode("overwrite").saveAsTable("top_delays_table")

# delay_category_summary DataFrame → managed table
delay_category_summary.write.mode("overwrite").saveAsTable("delay_category_summary_table")

In [0]:
# Explain
filtered_df.explain(True)
transformed_df.explain(True)
complex_agg.explain(True)
month_delay_summary.explain(True)
delay_category_summary.explain(True)
top_delays.explain(True)





== Parsed Logical Plan ==
'Filter '`>`('distance, 500)
+- 'Filter '`>`('delay, 60)
   +- 'Filter '`==`('origin, SFO)
      +- Relation [date#14956,delay#14957,distance#14958,origin#14959,destination#14960] csv

== Analyzed Logical Plan ==
date: int, delay: int, distance: int, origin: string, destination: string
Filter (distance#14958 > 500)
+- Filter (delay#14957 > 60)
   +- Filter (origin#14959 = SFO)
      +- Relation [date#14956,delay#14957,distance#14958,origin#14959,destination#14960] csv

== Optimized Logical Plan ==
Filter (((((isnotnull(delay#14957) AND isnotnull(distance#14958)) AND (delay#14957 > 60)) AND (distance#14958 > 500)) AND isnotnull(origin#14959)) AND (origin#14959 = SFO))
+- Relation [date#14956,delay#14957,distance#14958,origin#14959,destination#14960] csv

== Physical Plan ==
*(1) ColumnarToRow
+- PhotonResultStage
   +- PhotonFilter (((((isnotnull(delay#14957) AND isnotnull(distance#14958)) AND (delay#14957 > 60)) AND (distance#14958 > 500)) AND isnotnull(origin

In [0]:
# Here, we will do a demonstration of transformations vs actions 

print("=== Demonstrating Lazy (Transformations) vs Eager (Actions) Execution ===")

# --- Load dataset (transformation) ---
df_base = spark.read.csv("/databricks-datasets/flights/departuredelays.csv", header=True, inferSchema=True)

# --- Lazy Transformation ---
start_lazy = time.time()

selected_df = long_flights_df.select("date", "origin", "destination", "distance", "delay")

end_lazy = time.time()
print(f"\nTime to define transformations (lazy): {end_lazy - start_lazy:.4f} seconds")

# --- Eager Action ---
print("\nNow performing an ACTION, which will actually trigger computation:")
start_action = time.time()
row_count = selected_df.count()
end_action = time.time()
print(f"Time for action (actual computation): {end_action - start_action:.2f} seconds")

print("We can see here that the time taken for transformations is significatly less than for actions")


=== Demonstrating Lazy (Transformations) vs Eager (Actions) Execution ===


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-8812838240569174>, line 11[0m
[1;32m      8[0m [38;5;66;03m# --- Lazy Transformation ---[39;00m
[1;32m      9[0m start_lazy [38;5;241m=[39m time[38;5;241m.[39mtime()
[0;32m---> 11[0m selected_df [38;5;241m=[39m long_flights_df[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mdate[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124morigin[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mdestination[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mdistance[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mdelay[39m[38;5;124m"[39m)
[1;32m     13[0m end_lazy [38;5;241m=[39m time[38;5;241m.[39mtime()
[1;32m     14[0m [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124mTime to define transformations (lazy): [39m[38;5;132;01m{[39