In [1]:
# Import libraries
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, max, min, month, year, format_number, col, count, round
import time

In [2]:
# Start a Spark session
from pyspark.sql import SparkSession
# change number of workers in master argument below (1,2,4,*)
spark = SparkSession.builder \
    .config("spark.jars", "/usr/share/java/mysql-connector-j-8.0.31.jar") \
    .master("local[*]").appName("AirportQueries").getOrCreate()
# Read in data
file_path = 'subset_1.csv'
data = spark.read.csv(file_path, header=True)

23/11/29 00:56:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

Average baseFare during the summer months (June, July, August 2022) per startingAirport

In [3]:
# convert 'flightDate' column type to date
data = data.withColumn('flightDate', data['flightDate'].cast('date'))
# filter flights for June, July, August (summer months) in 2022
data_filtered = data.filter((month(data.flightDate).isin(6, 7, 8)) \
    & (year(data.flightDate) == 2022))
# group by startingAirport and find the average baseFare
avg_flight_prices = data_filtered.groupBy("startingAirport") \
    .agg(format_number(avg("baseFare"), 2).alias("average_baseFare"))
# result
avg_flight_prices.show()

[Stage 1:>                                                          (0 + 2) / 2]

+---------------+----------------+
|startingAirport|average_baseFare|
+---------------+----------------+
|            OAK|          472.28|
|            LGA|          278.70|
|            BOS|          257.92|
|            EWR|          280.52|
|            DEN|          338.53|
|            IAD|          345.85|
|            CLT|          291.22|
|            MIA|          267.14|
|            DFW|          277.27|
|            SFO|          457.08|
|            ATL|          282.64|
|            ORD|          266.12|
|            DTW|          287.27|
|            LAX|          386.33|
|            JFK|          320.49|
|            PHL|          313.33|
+---------------+----------------+



                                                                                

Average baseFare when startingAirport = 'SFO' and destinationAirport = 'LAX'

In [4]:
average_basefare = data.filter((data.startingAirport == 'SFO') & (data.destinationAirport == 'LAX'))\
    .select(avg(data.baseFare).alias('average_baseFare'))
average_basefare.show()



+------------------+
|  average_baseFare|
+------------------+
|167.99256745904322|
+------------------+



                                                                                

Average number of flights and average cost per starting airport

In [5]:
# average number of flights per starting airport
avg_flights_per_day = data.groupBy("startingAirport", "flightDate") \
    .agg(count("legId").alias("numFlightsPerDay")) \
    .groupBy("startingAirport") \
    .agg(round(avg("numFlightsPerDay")).cast("integer").alias("avgFlightsPerDay"))
# average cost per starting airport rounded to two decimal places
avg_cost_per_airport = data.groupBy("startingAirport") \
    .agg(round(avg("baseFare"), 2).alias("avgCost"))
# join the two results based on startingAirport
result = avg_flights_per_day.join(avg_cost_per_airport, "startingAirport")
# result
result.show()



+---------------+----------------+-------+
|startingAirport|avgFlightsPerDay|avgCost|
+---------------+----------------+-------+
|            OAK|            2806|  468.8|
|            LGA|            4913| 271.47|
|            BOS|            4793|  258.6|
|            EWR|            3279|  271.4|
|            DEN|            3880| 310.66|
|            IAD|            2897| 340.65|
|            CLT|            4341| 281.34|
|            MIA|            4040| 264.34|
|            DFW|            4507| 258.69|
|            SFO|            4686| 419.31|
|            ATL|            4242| 271.78|
|            ORD|            4411| 249.54|
|            DTW|            3695| 289.21|
|            LAX|            6665| 360.35|
|            JFK|            3493| 340.25|
|            PHL|            3837| 309.48|
+---------------+----------------+-------+



                                                                                

Airport with the maximum baseFare in the dataset

In [6]:
# group by destinationAirport and find the maximum baseFare
max_basefare_per_destination = data.groupBy("destinationAirport").agg(max("baseFare").alias("max_baseFare"))
# destination(s) with the highest baseFare
max_expensive_destinations = max_basefare_per_destination.filter(max_basefare_per_destination.max_baseFare\
     == max_basefare_per_destination.agg(max("max_baseFare")).collect()[0][0])
# result
max_expensive_destinations.show()



+------------------+------------+
|destinationAirport|max_baseFare|
+------------------+------------+
|               LGA|      999.99|
|               SFO|      999.99|
+------------------+------------+



                                                                                

Airport with the minimum baseFare in the dataset

In [7]:
# group by destinationAirport and find the minimum baseFare
min_basefare_per_destination = data.groupBy("destinationAirport").agg(min("baseFare").alias("min_baseFare"))
# destination(s) with the lowet baseFare
min_expensive_destinations = min_basefare_per_destination.filter(min_basefare_per_destination.min_baseFare == \
    min_basefare_per_destination.agg(min("min_baseFare")).collect()[0][0])
# result
min_expensive_destinations.show()



+------------------+------------+
|destinationAirport|min_baseFare|
+------------------+------------+
|               ATL|        0.41|
|               BOS|        0.41|
|               DFW|        0.41|
|               DTW|        0.41|
|               EWR|        0.41|
|               LGA|        0.41|
|               MIA|        0.41|
|               ORD|        0.41|
+------------------+------------+



                                                                                

Least expensive flight, most expensive flight, average flight cost per destination airport

In [8]:
# group by destinationAirport, find the maximum baseFare
max_min_avg_expensive_destinations = data.groupBy("destinationAirport").agg(min("baseFare").alias("min_baseFare"), \
    max("baseFare").alias("max_baseFare"), format_number(avg("baseFare"),2).alias("avg_baseFare"))
# show the result
max_min_avg_expensive_destinations.show()



+------------------+------------+------------+------------+
|destinationAirport|min_baseFare|max_baseFare|avg_baseFare|
+------------------+------------+------------+------------+
|               ATL|        0.41|      999.95|      275.45|
|               BOS|        0.41|      999.07|      273.08|
|               CLT|        10.0|      999.07|      281.73|
|               DEN|        10.2|      998.14|      316.83|
|               DFW|        0.41|      999.02|      261.74|
|               DTW|        0.41|      998.15|      298.94|
|               EWR|        0.41|      999.07|      284.63|
|               IAD|      100.46|      999.07|      339.03|
|               JFK|      100.46|      999.07|      336.71|
|               LAX|         1.0|      997.21|      335.76|
|               LGA|        0.41|      999.99|      269.55|
|               MIA|        0.41|      999.08|      260.17|
|               OAK|         1.0|      999.42|      480.79|
|               ORD|        0.41|      9

                                                                                

For each airline, show the most expensive flight and the corresponding start and destination airport. Only show the flights with one leg (meaning no layovers). This was done because some flights involved multiple different airlines.

In [9]:
# filter out one leg flights only
filtered_data_oneleg = data.filter(~col("segmentsAirlineName").contains("||"))
# find the most expensive flight, grouped by airline name
most_expensive_per_airline = filtered_data_oneleg.groupBy("segmentsAirlineName") \
    .agg(max("baseFare").alias("maxBaseFare"))
# retrieve the rows with the highest baseFare for each airline and remove duplicates
expensive_flights_info_oneleg = filtered_data_oneleg.join(most_expensive_per_airline.withColumnRenamed("segmentsAirlineName", "max_segmentsAirlineName"),
                                            (filtered_data_oneleg.baseFare == most_expensive_per_airline.maxBaseFare) &
                                            (filtered_data_oneleg.segmentsAirlineName == most_expensive_per_airline.segmentsAirlineName),
                                            "inner") \
    .select("segmentsAirlineName", "startingAirport", "destinationAirport", "baseFare").dropDuplicates(["segmentsAirlineName"]) 
# show the result
expensive_flights_info_oneleg.show()

                                                                                

+-------------------+---------------+------------------+--------+
|segmentsAirlineName|startingAirport|destinationAirport|baseFare|
+-------------------+---------------+------------------+--------+
|    Alaska Airlines|            JFK|               SFO|   99.53|
|  American Airlines|            CLT|               MIA|   99.53|
|              Delta|            JFK|               SFO|   99.53|
|  Frontier Airlines|            ATL|               PHL|   99.89|
|    JetBlue Airways|            EWR|               SFO|   99.53|
|    Spirit Airlines|            ATL|               DTW|    99.0|
|             United|            EWR|               SFO|   99.53|
+-------------------+---------------+------------------+--------+



                                                                                

Flights with two legs, could involve the same airline twice or two different airlines.

In [10]:
# airline name column contains '||' once means its a two leg flight
filtered_data_twoleg = data.filter(col("segmentsAirlineName").rlike(r'^[^|]*\|{2}[^|]*$'))
# find most expensive flight, grouped by airline
most_expensive_per_airline = filtered_data_twoleg.groupBy("segmentsAirlineName") \
    .agg(max("baseFare").alias("maxBaseFare"))
# retrieve the rows with the highest baseFare for each airline and remove duplicates
expensive_flights_info_twoleg = filtered_data_twoleg.join(most_expensive_per_airline.withColumnRenamed("segmentsAirlineName", "max_segmentsAirlineName"),
                                            (filtered_data_twoleg.baseFare == most_expensive_per_airline.maxBaseFare) &
                                            (filtered_data_twoleg.segmentsAirlineName == most_expensive_per_airline.segmentsAirlineName),
                                            "inner") \
    .select("segmentsAirlineName", "startingAirport", "destinationAirport", "baseFare").dropDuplicates(["segmentsAirlineName"]) 

expensive_flights_info_twoleg.show()



+--------------------+---------------+------------------+--------+
| segmentsAirlineName|startingAirport|destinationAirport|baseFare|
+--------------------+---------------+------------------+--------+
|Alaska Airlines||...|            OAK|               JFK|  997.58|
|Alaska Airlines||...|            SFO|               CLT|  998.14|
|Alaska Airlines||...|            SFO|               OAK|  719.21|
|Alaska Airlines||...|            JFK|               LAX|  378.61|
|American Airlines...|            CLT|               PHL|  378.61|
|American Airlines...|            MIA|               JFK|  3163.0|
|  Cape Air||Cape Air|            JFK|               BOS|  747.72|
|     Cape Air||Delta|            BOS|               JFK|  625.95|
|Cape Air||JetBlue...|            BOS|               LGA|  632.15|
|    Cape Air||United|            BOS|               EWR|  672.46|
|Delta||Alaska Air...|            MIA|               SFO|  996.28|
|     Delta||Cape Air|            LGA|               BOS|  685

                                                                                

In [11]:
# if the column contains "||" twice, it's a three leg flight
filtered_data_threeleg = data.filter(col("segmentsAirlineName").rlike(r'^([^|]*\|{2}[^|]*){2}$'))
# find the most expensive, group by airline
most_expensive_per_airline = filtered_data_threeleg.groupBy("segmentsAirlineName") \
    .agg(max("baseFare").alias("maxBaseFare"))
# retrieve the rows with the highest baseFare for each airline and remove duplicates
expensive_flights_info_threeleg = filtered_data_threeleg.join(most_expensive_per_airline.withColumnRenamed("segmentsAirlineName", "max_segmentsAirlineName"),
                                            (filtered_data_threeleg.baseFare == most_expensive_per_airline.maxBaseFare) &
                                            (filtered_data_threeleg.segmentsAirlineName == most_expensive_per_airline.segmentsAirlineName),
                                            "inner") \
    .select("segmentsAirlineName", "startingAirport", "destinationAirport", "baseFare").dropDuplicates(["segmentsAirlineName"]) 

expensive_flights_info_threeleg.show()

                                                                                

+--------------------+---------------+------------------+--------+
| segmentsAirlineName|startingAirport|destinationAirport|baseFare|
+--------------------+---------------+------------------+--------+
|Alaska Airlines||...|            MIA|               OAK|  997.21|
|Alaska Airlines||...|            OAK|               LGA|  686.51|
|Alaska Airlines||...|            OAK|               LGA|  746.05|
|Alaska Airlines||...|            SFO|               CLT|   489.3|
|Alaska Airlines||...|            SFO|               ORD|  657.68|
|Alaska Airlines||...|            SFO|               LAX|   610.5|
|Alaska Airlines||...|            IAD|               OAK|  885.58|
|Alaska Airlines||...|            EWR|               OAK|  885.59|
|Alaska Airlines||...|            OAK|               DTW|  885.58|
|American Airlines...|            CLT|               BOS|  746.05|
|American Airlines...|            DFW|               BOS|  519.44|
|American Airlines...|            MIA|               IAD|  330

In [12]:
#  average total travel distance per starting airport
avg_distance_per_starting_airport = data.groupBy("startingAirport") \
    .agg(avg("totalTravelDistance").alias("avgTotalTravelDistance"))
# sort by highest average total travel distance
sorted_avg_distance = avg_distance_per_starting_airport.orderBy("avgTotalTravelDistance", ascending=False)
# result
sorted_avg_distance.show()



+---------------+----------------------+
|startingAirport|avgTotalTravelDistance|
+---------------+----------------------+
|            OAK|     2374.689045971551|
|            SFO|    2320.9646963630603|
|            LAX|    2203.1726953873203|
|            MIA|     1624.295997371626|
|            DEN|    1610.6157763633134|
|            JFK|     1459.748781266927|
|            EWR|     1458.600701776437|
|            LGA|     1426.191106078016|
|            PHL|    1421.3307705924203|
|            DFW|    1396.6878585451097|
|            BOS|     1390.902419422506|
|            IAD|     1340.716838688424|
|            DTW|    1337.3763149289214|
|            ATL|    1296.7258822217589|
|            CLT|    1252.5702858668121|
|            ORD|    1167.2054200298282|
+---------------+----------------------+



                                                                                