In [0]:
from pyspark.sql import SparkSession

# Start a spark session
spark = SparkSession.builder \
    .appName("Airline Performance Analysis") \
    .getOrCreate()

# Load the data from a csv file
file_path = "/FileStore/shared_uploads/manjooranjoseph13@gmail.com/q1.csv"
df = spark.read.csv(file_path, header = True, inferSchema = True)

# Show the schema to understand the structure of the data
df.printSchema()

# Display a few records to preview the data
df.show(5)


root
 |-- YEAR: integer (nullable = true)
 |-- QUARTER: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- FL_DATE: date (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- OP_CARRIER_AIRLINE_ID: integer (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- TAIL_NUM: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- ORIGIN_AIRPORT_SEQ_ID: integer (nullable = true)
 |-- ORIGIN_CITY_MARKET_ID: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- ORIGIN_STATE_FIPS: integer (nullable = true)
 |-- ORIGIN_STATE_NM: string (nullable = true)
 |-- ORIGIN_WAC: integer (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- DEST_AIRPORT_SEQ_ID: integer (null

In [0]:
# List of columns to drop
columns_to_drop = [
    "TAIL_NUM", "OP_CARRIER_FL_NUM", "ORIGIN_AIRPORT_SEQ_ID", "ORIGIN_CITY_MARKET_ID", "ORIGIN_STATE_FIPS",
    "ORIGIN_WAC", "DEST_AIRPORT_SEQ_ID", "DEST_CITY_MARKET_ID", "DEST_STATE_FIPS", "DEST_WAC", "CRS_DEP_TIME",
    "CRS_ARR_TIME", "TAXI_OUT", "TAXI_IN", "WHEELS_OFF", "WHEELS_ON", "CANCELLATION_CODE", "CRS_ELAPSED_TIME",
    "ACTUAL_ELAPSED_TIME", "AIR_TIME", "FLIGHTS", "DISTANCE_GROUP", "CARRIER_DELAY", "WEATHER_DELAY", 
    "NAS_DELAY", "SECURITY_DELAY", "LATE_AIRCRAFT_DELAY", "FIRST_DEP_TIME", "TOTAL_ADD_GTIME", "LONGEST_ADD_GTIME", "_c64"
]

# Drop the columns
df_cleaned = df.drop(*columns_to_drop)

# Show the cleaned DataFrame schema
df_cleaned.printSchema()

root
 |-- YEAR: integer (nullable = true)
 |-- QUARTER: integer (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- DAY_OF_MONTH: integer (nullable = true)
 |-- DAY_OF_WEEK: integer (nullable = true)
 |-- FL_DATE: date (nullable = true)
 |-- OP_UNIQUE_CARRIER: string (nullable = true)
 |-- OP_CARRIER_AIRLINE_ID: integer (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- ORIGIN_AIRPORT_ID: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- ORIGIN_STATE_ABR: string (nullable = true)
 |-- ORIGIN_STATE_NM: string (nullable = true)
 |-- DEST_AIRPORT_ID: integer (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DEST_STATE_ABR: string (nullable = true)
 |-- DEST_STATE_NM: string (nullable = true)
 |-- DEP_TIME: string (nullable = true)
 |-- DEP_DELAY: string (nullable = true)
 |-- DEP_DELAY_NEW: string (nullable = true)
 |-- DEP_DEL15: string (nullabl

In [0]:
from pyspark.sql.functions import col

# Remove canceled and diverted flights, and rows with missing delay data
df_filtered = df_cleaned.filter((col("CANCELLED") == 0) & (col("DIVERTED") == 0)) \
    .filter(col("DEP_DELAY").isNotNull() & col("ARR_DELAY").isNotNull())

df_filtered.show(1)

+----+-------+-----+------------+-----------+----------+-----------------+---------------------+----------+-----------------+------+----------------+----------------+---------------+---------------+----+----------------+--------------+-------------+--------+---------+-------------+---------+---------------+------------+--------+---------+-------------+---------+---------------+------------+---------+--------+--------+
|YEAR|QUARTER|MONTH|DAY_OF_MONTH|DAY_OF_WEEK|   FL_DATE|OP_UNIQUE_CARRIER|OP_CARRIER_AIRLINE_ID|OP_CARRIER|ORIGIN_AIRPORT_ID|ORIGIN|ORIGIN_CITY_NAME|ORIGIN_STATE_ABR|ORIGIN_STATE_NM|DEST_AIRPORT_ID|DEST|  DEST_CITY_NAME|DEST_STATE_ABR|DEST_STATE_NM|DEP_TIME|DEP_DELAY|DEP_DELAY_NEW|DEP_DEL15|DEP_DELAY_GROUP|DEP_TIME_BLK|ARR_TIME|ARR_DELAY|ARR_DELAY_NEW|ARR_DEL15|ARR_DELAY_GROUP|ARR_TIME_BLK|CANCELLED|DIVERTED|DISTANCE|
+----+-------+-----+------------+-----------+----------+-----------------+---------------------+----------+-----------------+------+----------------+-------

In [0]:
from pyspark.sql.functions import avg, round

# Calculate average delays by airline
airline_delay_df = df_filtered.groupBy("OP_UNIQUE_CARRIER") \
    .agg(
        round(avg("DEP_DELAY"), 2).alias("Avg_Departure_Delay"),
        round(avg("ARR_DELAY"), 2).alias("Avg_Arrival_Delay")
    )
    
airline_delay_df.show(10)

+-----------------+-------------------+-----------------+
|OP_UNIQUE_CARRIER|Avg_Departure_Delay|Avg_Arrival_Delay|
+-----------------+-------------------+-----------------+
|               UA|               6.89|             0.43|
|               NK|               6.48|             0.49|
|               AA|               7.43|             0.84|
|               EV|               8.45|             5.22|
|               B6|              10.31|             4.05|
|               DL|               4.93|            -2.94|
|               OO|               9.93|             2.99|
|               F9|               6.51|            -0.33|
|               YV|               8.64|             4.49|
|               MQ|                6.3|              3.1|
+-----------------+-------------------+-----------------+
only showing top 10 rows



In [0]:
# Calculate average delays for each day of the week
day_of_week_delay_df = df_filtered.groupBy("DAY_OF_WEEK") \
    .agg(
        round(avg("DEP_DELAY"), 2).alias("Avg_Departure_Delay"),
        round(avg("ARR_DELAY"), 2).alias("Avg_Arrival_Delay")
    )

day_of_week_delay_df.show()

+-----------+-------------------+-----------------+
|DAY_OF_WEEK|Avg_Departure_Delay|Avg_Arrival_Delay|
+-----------+-------------------+-----------------+
|          1|               6.59|            -0.29|
|          6|               6.61|            -0.36|
|          3|               5.65|            -0.81|
|          5|               7.99|             1.91|
|          4|                8.5|             2.66|
|          7|               7.01|             0.24|
|          2|                4.2|            -2.98|
+-----------+-------------------+-----------------+



In [0]:
# Analyze delays based on origin and destination pairs
route_delay_df = df_filtered.groupBy("ORIGIN", "DEST") \
    .agg(
        round(avg("DEP_DELAY"), 2).alias("Avg_Departure_Delay"),
        round(avg("ARR_DELAY"), 2).alias("Avg_Arrival_Delay")
    ) \
    .orderBy(col("Avg_Departure_Delay").desc())

route_delay_df.show()

+------+----+-------------------+-----------------+
|ORIGIN|DEST|Avg_Departure_Delay|Avg_Arrival_Delay|
+------+----+-------------------+-----------------+
|   CHA| PHL|              258.2|            252.6|
|   CMH| AUS|              218.0|            200.0|
|   AUS| CMH|              217.0|            216.0|
|   PHL| CHA|             207.33|           216.67|
|   BHM| SFO|              140.5|            148.0|
|   AUS| GRR|             127.33|           123.67|
|   SRQ| CMH|             124.33|           116.67|
|   VPS| LCK|             123.33|           120.11|
|   LCK| VPS|             121.89|           117.22|
|   PVU| SNA|             101.71|            98.93|
|   SNA| BOI|               99.5|           109.29|
|   AVL| LAS|              98.75|             92.5|
|   GRR| LAX|              98.43|            87.36|
|   PGD| HTS|               93.0|             76.0|
|   TWF| SFO|              85.14|            71.69|
|   MSY| GSP|              83.67|            77.33|
|   ORD| ERI

In [0]:
# Analyze delay trends by flight distance
df_distance_analysis = df_filtered.groupBy("DISTANCE") \
    .agg(
        round(avg("DEP_DELAY"), 2).alias("Avg_Departure_Delay"),
        round(avg("ARR_DELAY"), 2).alias("Avg_Arrival_Delay")
    )

df_distance_analysis.show(10)

+--------+-------------------+-----------------+
|DISTANCE|Avg_Departure_Delay|Avg_Arrival_Delay|
+--------+-------------------+-----------------+
|    1829|               8.07|             0.16|
|     496|               6.07|            -0.79|
|    1342|               3.14|            -5.44|
|     148|               5.63|            -2.04|
|     471|               5.84|             0.12|
|    1959|               9.77|            -0.44|
|     833|                7.2|             -1.0|
|     463|              11.42|             5.24|
|    1088|              -0.65|            -9.15|
|    1721|               9.33|              1.7|
+--------+-------------------+-----------------+
only showing top 10 rows



In [0]:
# Save individual DataFrames
airline_delay_df.write.csv("/FileStore/shared_uploads/manjooranjoseph13@gmail.com/airline_delay_data.csv", header=True)
day_of_week_delay_df.write.csv("/FileStore/shared_uploads/manjooranjoseph13@gmail.com/day_of_week_delay_data.csv", header=True)
route_delay_df.write.csv("/FileStore/shared_uploads/manjooranjoseph13@gmail.com/route_delay_data.csv", header=True)
df_distance_analysis.write.csv("/FileStore/shared_uploads/manjooranjoseph13@gmail.com/distance_analysis_data.csv", header=True)