In [2]:
# import libraries 
from pyspark import SparkFiles
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer,IndexToString, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import collect_set, count
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import StringIndexer
from pyspark.sql import functions as F
from pyspark.sql.functions import split, slice, size, concat_ws, when

In [3]:
# start spark session
spark = SparkSession.builder.appName('Spark-Group1-Flights').getOrCreate()

In [4]:
# load dataset

# Replace 'your-bucket' and 'path/to/your/file.csv' with your actual GCS bucket and file path
gcs_path = "gs://msca-bdp-student-gcs/Group1/itineraries.csv"

# Read the CSV file from GCS
df = spark.read.csv(gcs_path, header=True, inferSchema=True)

# Show the DataFrame
df.show(10)

24/11/13 20:26:05 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segments

In [4]:
# Get the number of rows
num_rows = df.count()

# Get the number of columns
num_columns = len(df.columns)

# Get the shape of the DataFrame
shape = (num_rows, num_columns)

                                                                                

In [5]:
shape

(82138753, 27)

In [5]:
# drop columns that have duplicate or redundant information 
df = df.drop("baseFare", "isBasicEconomy", "segmentsArrivalAirportCode", "segmentsDepartureTimeEpochSeconds",
             "segmentsArrivalTimeEpochSeconds", "segmentsAirlineCode", "fareBasisCode",
            "segmentsDurationInSeconds", "segmentsDistance", "segmentsDepartureTimeRaw")

In [6]:
# see types for each column
df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: string (nullable = true)
 |-- flightDate: string (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsDepartureAirportCode: string (nullable = true)
 |-- segmentsAirlineName: string (nullable = true)
 |-- segmentsEquipmentDescription: string (nullable = true)
 |-- segmentsCabinCode: string (nullable = true)



In [7]:
# make a copy of data for manipulations
copy_df = df

In [8]:
# Use regexp_extract to get the days, hours, and minutes components from the duration string
# travelduration is stored in two different formats, one with Day and one with only hours, so have to handle both
temp_df = copy_df.withColumn("days", F.coalesce(F.regexp_extract("travelDuration", r"P(\d+)D", 1).cast("int"), F.lit(0)))

temp_df = temp_df.withColumn("hours", F.coalesce(F.regexp_extract("travelDuration", r"(\d+)H", 1).cast("int"), F.lit(0)))

temp_df = temp_df.withColumn("minutes", F.coalesce(F.regexp_extract("travelDuration", r"(\d+)M", 1).cast("int"), F.lit(0)))

In [9]:
# convert travelDuration column to minutes by summing total minutes from days hours and minutes 
copy_df = temp_df.withColumn(
    "travelDuration", 
    F.col("days") * 1440 + F.col("hours") * 60 + F.col("minutes")
)

In [10]:
# drop the temporary columns 
copy_df = copy_df.drop("days", "hours", "minutes")

In [11]:
# check the travelDuration column converted correctly 
copy_df.select("travelDuration").show()

+--------------+
|travelDuration|
+--------------+
|           149|
|           150|
|           150|
|           152|
|           154|
|           158|
|           252|
|           318|
|           332|
|           398|
|           286|
|           345|
|           359|
|           438|
|           490|
|           339|
|           158|
|           257|
|           276|
|           285|
+--------------+
only showing top 20 rows



In [12]:
# data cleaning on small subset of data to test that the  cleaning works 
smalldf = copy_df.limit(100).cache()

In [13]:
# convert search dates to date types
smalldf = smalldf.withColumn("searchDate", F.to_date(F.col("searchDate"), "yyyy-MM-dd"))

In [14]:
# convert flight dates to date type
smalldf = smalldf.withColumn("flightDate", F.to_date(F.col("flightDate"), "yyyy-MM-dd"))

In [15]:
# check that conversions were correct
smalldf.select("searchDate", "flightDate", "travelDuration").printSchema()

root
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- travelDuration: integer (nullable = false)



In [16]:
# Calculate the number of days that passed between search and flight date
smalldf = smalldf.withColumn("DaysBeforeFlight", F.datediff(F.col("flightDate"), F.col("searchDate")))

In [17]:
# check that calculation was correct
smalldf.select("searchDate", "flightDate", "DaysPassed").show()



+----------+----------+----------+
|searchDate|flightDate|DaysPassed|
+----------+----------+----------+
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
|2022-06-24|2022-08-07|        44|
+----------+----------+----------+
only showing top 20 rows



                                                                                

In [18]:
# find all airports that layovers occured in and count number of layovers 
smalldf = smalldf.withColumn('Layovers', split(smalldf['segmentsDepartureAirportCode'], '\|\|'))
smalldf = smalldf.withColumn('Layovers', slice(smalldf['Layovers'], 2, 10))
smalldf = smalldf.withColumn('NumStops', size(smalldf['Layovers']))                               

In [19]:
# check that the columns updated correctly 
smalldf.select("segmentsDepartureAirportCode", "Layovers", "NumStops").show(10)

+----------------------------+----------+--------+
|segmentsDepartureAirportCode|  Layovers|NumStops|
+----------------------------+----------+--------+
|               LGA||ATL||SLC|[ATL, SLC]|       2|
|               LGA||ATL||LAX|[ATL, LAX]|       2|
|                         LGA|        []|       0|
|                         LGA|        []|       0|
|                         LGA|        []|       0|
|                         LGA|        []|       0|
|                         LGA|        []|       0|
|                         LGA|        []|       0|
|                         LGA|        []|       0|
|                         LGA|        []|       0|
+----------------------------+----------+--------+
only showing top 10 rows



In [20]:
# drop original column
smalldf = smalldf.drop("segmentsDepartureAirportCode") 

In [21]:
# find all flight companies flewn and count number of unique flights
smalldf = smalldf.withColumn('AirlineNames', split(smalldf['segmentsAirlineName'], '\|\|'))
smalldf = smalldf.withColumn('UniqueAirlineNames', F.array_distinct(F.col('AirlineNames')))
smalldf = smalldf.withColumn('NumUniqueAirlines', F.size(F.col('UniqueAirlineNames')))

In [22]:
# check that the columns updated correctly 
smalldf.select("segmentsAirlineName", "AirlineNames", "UniqueAirlineNames", "NumUniqueAirlines").show(10, truncate = False)

+-------------------+---------------------+-------------------+-----------------+
|segmentsAirlineName|AirlineNames         |UniqueAirlineNames |NumUniqueAirlines|
+-------------------+---------------------+-------------------+-----------------+
|Delta||Delta||Delta|[Delta, Delta, Delta]|[Delta]            |1                |
|Delta||Delta||Delta|[Delta, Delta, Delta]|[Delta]            |1                |
|United             |[United]             |[United]           |1                |
|United             |[United]             |[United]           |1                |
|American Airlines  |[American Airlines]  |[American Airlines]|1                |
|JetBlue Airways    |[JetBlue Airways]    |[JetBlue Airways]  |1                |
|JetBlue Airways    |[JetBlue Airways]    |[JetBlue Airways]  |1                |
|American Airlines  |[American Airlines]  |[American Airlines]|1                |
|American Airlines  |[American Airlines]  |[American Airlines]|1                |
|JetBlue Airways

In [23]:
# drop original and temporary columns
smalldf = smalldf.drop("segmentsAirlineName", "UniqueAirlineNames")

In [24]:
# find all flight craft types flewn and count number of unique crafts  
smalldf = smalldf.withColumn('EquipmentDescriptions', split(smalldf['segmentsEquipmentDescription'], '\|\|'))
smalldf = smalldf.withColumn('UniqueEquipments', F.array_distinct(F.col('EquipmentDescriptions')))
smalldf = smalldf.withColumn('NumUniqueEquipments', F.size(F.col('UniqueEquipments')))

In [25]:
# check that the columns updated correctly 
smalldf.select("segmentsEquipmentDescription", "EquipmentDescriptions", "UniqueEquipments", "NumUniqueEquipments").show(10, truncate = False)

+---------------------------------------------------------+-----------------------------------------------------------+----------------------------------------------+-------------------+
|segmentsEquipmentDescription                             |EquipmentDescriptions                                      |UniqueEquipments                              |NumUniqueEquipments|
+---------------------------------------------------------+-----------------------------------------------------------+----------------------------------------------+-------------------+
|Airbus A321||Airbus A321||Airbus A220-100                |[Airbus A321, Airbus A321, Airbus A220-100]                |[Airbus A321, Airbus A220-100]                |2                  |
|Airbus A321||Airbus A321||Embraer 175 (Enhanced Winglets)|[Airbus A321, Airbus A321, Embraer 175 (Enhanced Winglets)]|[Airbus A321, Embraer 175 (Enhanced Winglets)]|2                  |
|Airbus A319                                              |[Airbu

In [26]:
# drop original and temporary columns
smalldf = smalldf.drop("segmentsEquipmentDescription", "UniqueEquipments")

In [27]:
# find all cabin types flewn and count number of unique cabins
smalldf = smalldf.withColumn('CabinCodes', split(smalldf['segmentsCabinCode'], '\|\|'))
smalldf = smalldf.withColumn('UniqueCabins', F.array_distinct(F.col('CabinCodes')))
smalldf = smalldf.withColumn('NumUniqueCabins', F.size(F.col('UniqueCabins')))

In [29]:
smalldf = smalldf.withColumn(
    "hasFirstClass", 
    F.when(F.array_contains(F.col("UniqueCabins"), "first"), 1).otherwise(0)
)

In [31]:
# check that the columns updated correctly 
smalldf.select("segmentsCabinCode", "CabinCodes", "UniqueCabins", "NumUniqueCabins", "hasFirstClass").show(10, truncate = False)

+-------------------+---------------------+------------+---------------+-------------+
|segmentsCabinCode  |CabinCodes           |UniqueCabins|NumUniqueCabins|hasFirstClass|
+-------------------+---------------------+------------+---------------+-------------+
|first||first||first|[first, first, first]|[first]     |1              |1            |
|first||first||first|[first, first, first]|[first]     |1              |1            |
|coach              |[coach]              |[coach]     |1              |0            |
|coach              |[coach]              |[coach]     |1              |0            |
|coach              |[coach]              |[coach]     |1              |0            |
|coach              |[coach]              |[coach]     |1              |0            |
|coach              |[coach]              |[coach]     |1              |0            |
|coach              |[coach]              |[coach]     |1              |0            |
|coach              |[coach]              |

In [32]:
# drop original and temporary columns
smalldf = smalldf.drop("segmentsCabinCode", "UniqueCabins")

In [34]:
smalldf.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: integer (nullable = false)
 |-- elapsedDays: integer (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- DaysPassed: integer (nullable = true)
 |-- Layovers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- NumStops: integer (nullable = false)
 |-- AirlineNames: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- NumUniqueAirlines: integer (nullable = false)
 |-- EquipmentDescriptions: array (nullable = true)
 |    |-- element: string (containsNull = 