### Exercise 1: Load a CSV file into a DataFrame

**Expected Result:** Shows first 5 rows of the Airlines dataset.

In [None]:
df = spark.read.option("header", True).option("inferSchema", True).csv("/databricks-datasets/airlines/part-00000")
df.show(5)

### Exercise 2: Cache a DataFrame for faster access

**Expected Result:** Caches the DataFrame into memory.

In [None]:
df.cache()
df.count()

### Exercise 3: Select specific columns from a DataFrame

**Expected Result:** Shows selected columns.

In [None]:
df.select("Year", "Month", "DepDelay").show(5)

### Exercise 4: Filter rows where DepDelay > 30 minutes

**Expected Result:** Shows flights delayed more than 30 minutes.

In [None]:
df.filter(df.DepDelay > 30).show(5)

### Exercise 5: Add a new column 'DelayCategory'

**Expected Result:** Adds column categorizing flights as Late or OnTime.

In [None]:
from pyspark.sql.functions import when

df2 = df.withColumn("DelayCategory", when(df.DepDelay > 30, "Late").otherwise("OnTime"))
df2.select("DepDelay", "DelayCategory").show(5)

### Exercise 6: Group by Origin and count flights

**Expected Result:** Shows airports with most flights.

In [None]:
df.groupBy("Origin").count().orderBy("count", ascending=False).show(5)

### Exercise 7: Sort flights by Departure Delay descending

**Expected Result:** Flights with highest departure delay first.

In [None]:
df.orderBy(df.DepDelay.desc()).show(5)

### Exercise 8: Rename column DepDelay to DepartureDelayMinutes

**Expected Result:** Column renamed.

In [None]:
df2 = df.withColumnRenamed("DepDelay", "DepartureDelayMinutes")
df2.show(5)

### Exercise 9: Drop rows with null DepDelay

**Expected Result:** Null DepDelay rows removed.

In [None]:
df_clean = df.na.drop(subset=["DepDelay"])
df_clean.show(5)

### Exercise 10: Get distinct Origin airports

**Expected Result:** Lists distinct origin airports.

In [None]:
df.select("Origin").distinct().show(5)

### Exercise 11: Save DataFrame as Delta Table

**Expected Result:** Saves as Delta format.

In [None]:
df.write.format("delta").mode("overwrite").save("/tmp/airlines_delta")

### Exercise 12: Load Delta Table into DataFrame

**Expected Result:** Loads Delta format back.

In [None]:
df_delta = spark.read.format("delta").load("/tmp/airlines_delta")
df_delta.show(5)

### Exercise 13: Create a SQL table from Delta location

**Expected Result:** Creates SQL table linked to Delta files.

In [None]:
spark.sql("CREATE TABLE IF NOT EXISTS airlines_delta USING DELTA LOCATION '/tmp/airlines_delta'")

### Exercise 14: Update records in Delta Table

**Expected Result:** Updates NULL DepDelay values.

In [None]:
spark.sql("UPDATE airlines_delta SET DepDelay = 0 WHERE DepDelay IS NULL")

### Exercise 15: Time Travel to old Delta Table version

**Expected Result:** Reads earlier version.

In [None]:
df_old = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/airlines_delta")
df_old.show(5)

### Exercise 16: Merge new data into Delta Table

**Expected Result:** Upserts data into Delta Table.

In [None]:
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/tmp/airlines_delta")
deltaTable.alias("old").merge(
    df.alias("new"), "old.FlightNum = new.FlightNum"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

### Exercise 17: Optimize Delta Table

**Expected Result:** Improves Delta query speed.

In [None]:
spark.sql("OPTIMIZE airlines_delta")

### Exercise 18: Vacuum old files from Delta Table

**Expected Result:** Cleans up old files safely.

In [None]:
spark.sql("VACUUM airlines_delta RETAIN 168 HOURS")

### Exercise 19: Add a new column to Delta Table

**Expected Result:** New column added.

In [None]:
spark.sql("ALTER TABLE airlines_delta ADD COLUMNS (NewColumn STRING)")

### Exercise 20: Drop a column from Delta Table

**Expected Result:** Column removed from Delta Table.

In [None]:
spark.sql("ALTER TABLE airlines_delta DROP COLUMN NewColumn")

### Exercise 21: Run SQL query on Delta Table

**Expected Result:** Average delays per Carrier.

In [None]:
spark.sql("SELECT Carrier, AVG(DepDelay) AS AvgDepDelay FROM airlines_delta GROUP BY Carrier ORDER BY AvgDepDelay DESC").show()

### Exercise 22: Create managed Delta Table from DataFrame

**Expected Result:** Creates managed SQL table.

In [None]:
df.write.saveAsTable("managed_airlines_table")

### Exercise 23: Drop SQL table

**Expected Result:** Deletes SQL table.

In [None]:
spark.sql("DROP TABLE IF EXISTS managed_airlines_table")

### Exercise 24: Create a temporary SQL View

**Expected Result:** Creates view for easy querying.

In [None]:
df.createOrReplaceTempView("temp_view_flights")
spark.sql("SELECT * FROM temp_view_flights LIMIT 5").show()

### Exercise 25: Check Delta Table history

**Expected Result:** Shows table modification history.

In [None]:
spark.sql("DESCRIBE HISTORY airlines_delta").show(truncate=False)

### Exercise 26: Top 5 flights with highest Arrival Delay

**Expected Result:** Top delayed flights displayed.

In [None]:
df.orderBy(df.ArrDelay.desc()).select("FlightNum", "ArrDelay").show(5)

### Exercise 27: Find flights with negative DepDelay (early departures)

**Expected Result:** Early departing flights listed.

In [None]:
df.filter(df.DepDelay < 0).select("FlightNum", "DepDelay").show(5)

### Exercise 28: Group by Carrier and calculate avg DepDelay

**Expected Result:** Best and worst airlines by delay.

In [None]:
df.groupBy("Carrier").avg("DepDelay").orderBy("avg(DepDelay)").show(5)

### Exercise 29: Register DataFrame as SQL Table then query

**Expected Result:** Top airports displayed.

In [None]:
df.write.format("delta").mode("overwrite").saveAsTable("airport_summary")
spark.sql("SELECT Origin, COUNT(*) FROM airport_summary GROUP BY Origin ORDER BY COUNT(*) DESC").show(5)

### Exercise 30: Perform Delta Time Travel using SQL

**Expected Result:** Reads old snapshot of table.

In [None]:
spark.sql("SELECT * FROM airlines_delta VERSION AS OF 0 LIMIT 5").show()