In [0]:
from pyspark.sql import SparkSession

In [0]:
from pyspark.sql.functions import col, mean, substring,count, max, mode, avg, udf
from pyspark.sql.types import StringType, FloatType

In [0]:
spark = SparkSession.builder.appName("Airline_Delay_Project").getOrCreate()
file_path = "/FileStore/tables/2019-3.csv"
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .option("quote", '"') \
    .option("escape", '\\') \
    .load(file_path)

df.head(5)

Out[4]: [Row(FL_DATE=datetime.date(2019, 1, 1), OP_UNIQUE_CARRIER='9E', OP_CARRIER_FL_NUM=3280, ORIGIN='GNV', DEST='ATL', DEP_TIME=601, DEP_DELAY=1.0, TAXI_OUT=22.0, WHEELS_OFF=623, WHEELS_ON=714, TAXI_IN=8.0, ARR_TIME=722, ARR_DELAY=-1.0, AIR_TIME=51.0, DISTANCE=300.0, CARRIER_DELAY=None, WEATHER_DELAY=None, NAS_DELAY=None, SECURITY_DELAY=None, LATE_AIRCRAFT_DELAY=None, _c20=None),
 Row(FL_DATE=datetime.date(2019, 1, 1), OP_UNIQUE_CARRIER='9E', OP_CARRIER_FL_NUM=3281, ORIGIN='MSP', DEST='CVG', DEP_TIME=1359, DEP_DELAY=-5.0, TAXI_OUT=15.0, WHEELS_OFF=1414, WHEELS_ON=1629, TAXI_IN=4.0, ARR_TIME=1633, ARR_DELAY=-36.0, AIR_TIME=75.0, DISTANCE=596.0, CARRIER_DELAY=None, WEATHER_DELAY=None, NAS_DELAY=None, SECURITY_DELAY=None, LATE_AIRCRAFT_DELAY=None, _c20=None),
 Row(FL_DATE=datetime.date(2019, 1, 1), OP_UNIQUE_CARRIER='9E', OP_CARRIER_FL_NUM=3282, ORIGIN='DTW', DEST='CVG', DEP_TIME=1215, DEP_DELAY=-5.0, TAXI_OUT=18.0, WHEELS_OFF=1233, WHEELS_ON=1323, TAXI_IN=6.0, ARR_TIME=1329, ARR_DELAY

In [0]:
df.write.saveAsTable("Airline_Data")

In [0]:
df = df.drop("_c20")
col_with_nulls = []
for column in df.columns:
    num = df.filter(col(column).isNull()).count()
    if num!=0:
        col_with_nulls.append(column)
print(col_with_nulls)


['DEP_TIME', 'DEP_DELAY', 'TAXI_OUT', 'WHEELS_OFF', 'WHEELS_ON', 'TAXI_IN', 'ARR_TIME', 'ARR_DELAY', 'AIR_TIME', 'CARRIER_DELAY', 'WEATHER_DELAY', 'NAS_DELAY', 'SECURITY_DELAY', 'LATE_AIRCRAFT_DELAY']


In [0]:
df_null_ops = df
for i in col_with_nulls:
    datatype = df_null_ops.select(i).dtypes[0][1]
    if datatype.lower() == "string":
        mode_val = df_null_ops.select(mode(i)).head()[0]
        df_null_ops = df_null_ops.fillna({i:mode_val})
        print(f"{i}: {mode_val}")
    else:
        mean_val = df_null_ops.select(mean(i)).head()[0]
        df_null_ops = df_null_ops.fillna({i:mean_val})
        print(f"{i}: {mean_val}")


In [0]:
# df.select(avg("dep_delay")).head()[0]
# df.groupBy(col("origin")).count().show()
# df.groupBy(col("op_unique_carrier")).count().show()
# df.groupBy(col("origin")).agg(avg("dep_delay")).show()
# df.groupBy(col("op_unique_carrier")).agg(avg("dep_delay").alias("Avg_Dep_delay")).orderBy(col("Avg_Dep_delay").desc()).show()
# df.groupBy(col("origin")).agg(avg("distance")).show()


+-----------------+------------------+
|op_unique_carrier|     Avg_Dep_delay|
+-----------------+------------------+
|               B6|17.745473360997547|
|               EV| 17.21400657286479|
|               F9|14.577009288343467|
|               YV| 13.80316319064953|
|               UA|13.004563709088917|
|               OO|12.564053392669365|
|               AA|12.114915337571487|
|               NK|10.940950394756443|
|               OH|10.704733524922498|
|               9E|10.245764586889184|
|               WN|10.178762481230244|
|               G4|10.122909563240786|
|               MQ| 9.272981408689438|
|               YX| 8.544063599958992|
|               DL| 8.155754169633253|
|               AS|5.0346367947220125|
|               HA|1.2963997517070143|
+-----------------+------------------+



In [0]:
def localtime_to_minutes(hhmm):
    hours = hhmm // 100
    minutes = hhmm % 100
    return hours * 60 + minutes

def flight_duration(arr_time, dep_time, dep_delay):
    arr_minutes = localtime_to_minutes(arr_time)
    dep_minutes = localtime_to_minutes(dep_time)
    duration = arr_minutes-(dep_minutes+dep_delay)
    # Handle negative durations (e.g., crossing midnight)
    if duration < 0:
        duration += 24 * 60  # Add 24 hours in minutes
    return duration

fl_dur = udf(flight_duration, FloatType())
df = df.withColumn("Flight_Duration", fl_dur(col("arr_time"), col("dep_time"), col("dep_delay")))

df.select(col("Flight_Duration")).head(3)

Out[45]: [Row(Flight_Duration=80.0),
 Row(Flight_Duration=159.0),
 Row(Flight_Duration=79.0)]