Step 1:Import Libraries

In [0]:
# Spark + Pandas
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Start Spark session
spark = SparkSession.builder.appName("FlightDelayPreprocessing").getOrCreate()


Step 2: Load Dataset

In [0]:
# File path
file_path = "/Volumes/workspace/default/airlines/Flight_delay.csv"

# Load with Pandas (for quick exploration)
df = pd.read_csv(file_path)

# Load with Spark (for large-scale processing)
df_spark = spark.read.csv(file_path, header=True, inferSchema=True)

print("✅ Dataset loaded successfully")


✅ Dataset loaded successfully


Step 3: Explore Schema & Data Summary

In [0]:
# Pandas overview
print("\n📌 Pandas Info:")
print(df.info())
print("\n📌 Null values (Pandas):")
print(df.isnull().sum())

# Spark overview
print("\n📌 Spark Schema:")
df_spark.printSchema()

print("\n📌 First 5 rows (Spark):")
df_spark.show(5)

print("\n📌 Null values (Spark):")
df_spark.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_spark.columns
]).show()



📌 Pandas Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 484551 entries, 0 to 484550
Data columns (total 29 columns):
 #   Column             Non-Null Count   Dtype 
---  ------             --------------   ----- 
 0   DayOfWeek          484551 non-null  int64 
 1   Date               484551 non-null  object
 2   DepTime            484551 non-null  int64 
 3   ArrTime            484551 non-null  int64 
 4   CRSArrTime         484551 non-null  int64 
 5   UniqueCarrier      484551 non-null  object
 6   Airline            484551 non-null  object
 7   FlightNum          484551 non-null  int64 
 8   TailNum            484551 non-null  object
 9   ActualElapsedTime  484551 non-null  int64 
 10  CRSElapsedTime     484551 non-null  int64 
 11  AirTime            484551 non-null  int64 
 12  ArrDelay           484551 non-null  int64 
 13  DepDelay           484551 non-null  int64 
 14  Origin             484551 non-null  object
 15  Org_Airport        483374 non-null  object
 16  Dest

Step 4: Handle Missing Values

In [0]:
# Fill Org_Airport and Dest_Airport with "Unknown"
df_spark = df_spark.fillna({"Org_Airport": "Unknown", "Dest_Airport": "Unknown"})

# Handle CancellationCode → Replace empty values with "Not Cancelled"
df_spark = df_spark.withColumn(
    "CancellationCode",
    F.when(F.col("CancellationCode") == "", "Not Cancelled")
     .otherwise(F.col("CancellationCode"))
)

print("✅ Missing values handled")


✅ Missing values handled


Step 5: Feature Engineering

In [0]:
# Ensure Date is in date format
df_spark = df_spark.withColumn("Date", F.to_date("Date", "yyyy-MM-dd"))

# Extract Month
df_spark = df_spark.withColumn("Month", F.month("Date"))

# Extract Day of Week (1 = Sunday … 7 = Saturday)
df_spark = df_spark.withColumn("DayNumber", F.dayofweek("Date"))

# Extract Hour from DepTime (e.g. 1829 → 18)
df_spark = df_spark.withColumn("Hour", (F.col("DepTime")/100).cast("int"))

# Create Route column (Origin-Dest)
df_spark = df_spark.withColumn("Route", F.concat_ws("-", F.col("Origin"), F.col("Dest")))

print("✅ Feature engineering completed")


✅ Feature engineering completed


Step 6: Verify New Features

In [0]:
print("📌 Sample with new features:")
df_spark.select("Date", "Month", "DayNumber", "Hour", "Origin", "Dest", "Route").show(10, truncate=False)


📌 Sample with new features:
+----------+-----+---------+----+------+----+-------+
|Date      |Month|DayNumber|Hour|Origin|Dest|Route  |
+----------+-----+---------+----+------+----+-------+
|2019-01-03|1    |5        |18  |IND   |BWI |IND-BWI|
|2019-01-03|1    |5        |19  |IND   |LAS |IND-LAS|
|2019-01-03|1    |5        |16  |IND   |MCO |IND-MCO|
|2019-01-03|1    |5        |14  |IND   |PHX |IND-PHX|
|2019-01-03|1    |5        |13  |IND   |TPA |IND-TPA|
|2019-01-03|1    |5        |14  |ISP   |BWI |ISP-BWI|
|2019-01-03|1    |5        |16  |ISP   |BWI |ISP-BWI|
|2019-01-03|1    |5        |14  |ISP   |FLL |ISP-FLL|
|2019-01-03|1    |5        |21  |ISP   |MCO |ISP-MCO|
|2019-01-03|1    |5        |18  |ISP   |MDW |ISP-MDW|
+----------+-----+---------+----+------+----+-------+
only showing top 10 rows


Step 7: Save Cleaned Dataset

In [0]:
# Convert Spark dataframe to Pandas
df_pandas = df_spark.toPandas()

# Save as a single CSV
output_path = "/Volumes/workspace/default/airlines/Flight_delay_cleaned.csv"
df_pandas.to_csv(output_path, index=False)

print(f"✅ Cleaned CSV saved at: {output_path}")


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
import pandas as pd

df_cleaned = pd.read_csv("/Volumes/workspace/default/airlines/Flight_delay_cleaned.csv")
print("📌 Cleaned CSV Shape:", df_cleaned.shape)
print("\n📌 First 5 rows:")
print(df_cleaned.head())


📌 Cleaned CSV Shape: (403795, 33)

📌 First 5 rows:
   DayOfWeek        Date  DepTime  ArrTime  ...  Month DayNumber Hour    Route
0          4  2019-01-03     1829     1959  ...      1         5   18  IND-BWI
1          4  2019-01-03     1937     2037  ...      1         5   19  IND-LAS
2          4  2019-01-03     1644     1845  ...      1         5   16  IND-MCO
3          4  2019-01-03     1452     1640  ...      1         5   14  IND-PHX
4          4  2019-01-03     1323     1526  ...      1         5   13  IND-TPA

[5 rows x 33 columns]


Step 8:Final Summary

In [0]:
import pandas as pd

df_cleaned = pd.read_csv("/Volumes/workspace/default/airlines/Flight_delay_cleaned.csv")

print("📌 FINAL SUMMARY (Pandas)")

# Rows & columns
print(f"Rows: {df_cleaned.shape[0]}, Columns: {df_cleaned.shape[1]}")

# Sample rows
print("\n📌 Sample 5 rows:")
print(df_cleaned.head())

# Null values
print("\n📌 Null values per column:")
print(df_cleaned.isnull().sum())

# Numeric statistics
print("\n📌 Summary statistics for numeric columns:")
print(df_cleaned.describe())


📌 FINAL SUMMARY (Pandas)
Rows: 403795, Columns: 33

📌 Sample 5 rows:
   DayOfWeek        Date  DepTime  ArrTime  ...  Month DayNumber Hour    Route
0          4  2019-01-03     1829     1959  ...      1         5   18  IND-BWI
1          4  2019-01-03     1937     2037  ...      1         5   19  IND-LAS
2          4  2019-01-03     1644     1845  ...      1         5   16  IND-MCO
3          4  2019-01-03     1452     1640  ...      1         5   14  IND-PHX
4          4  2019-01-03     1323     1526  ...      1         5   13  IND-TPA

[5 rows x 33 columns]

📌 Null values per column:
DayOfWeek               0
Date                    0
DepTime                 0
ArrTime                 0
CRSArrTime              0
UniqueCarrier           0
Airline                 0
FlightNum               0
TailNum                 0
ActualElapsedTime       0
CRSElapsedTime          0
AirTime                 0
ArrDelay                0
DepDelay                0
Origin                  0
Org_Airport      