In [1]:
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [2]:
spark = SparkSession.builder.appName("FlightPricePrep").getOrCreate() 
spark.conf.set("spark.sql.legacy.timeParserPolicy", "CORRECTED")
spark.sparkContext

In [3]:
price_df = spark.read.parquet("gs://msca-bdp-student-gcs/Group7_Final_Project/flight_pricing/itineraries.parquet", 
                                     header=True, inferSchema=True)

23/11/23 01:58:24 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
23/11/23 01:58:39 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
23/11/23 01:58:54 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
23/11/23 01:59:09 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

In [4]:
price_df.show(1)

23/11/23 01:59:18 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+---------+--------------+-------------------+---------------------------------+------------------------+-------------------------------+----------------------+--------------------------+----------------------------+--------------------+-------------------+----------------------------+-------------------------+----------------+-----------------+
|               legId|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|totalFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeEpochSeconds|segmentsDepartureTimeRaw|segmentsArrivalTimeEpochSeconds|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode| segmentsAirlineName|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|segments

                                                                                

In [5]:
price_df.printSchema()

root
 |-- legId: string (nullable = true)
 |-- searchDate: string (nullable = true)
 |-- flightDate: string (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- fareBasisCode: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isRefundable: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- baseFare: double (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsDepartureTimeEpochSeconds: string (nullable = true)
 |-- segmentsDepartureTimeRaw: string (nullable = true)
 |-- segmentsArrivalTimeEpochSeconds: string (nullable = true)
 |-- segmentsArrivalTimeRaw: string (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCod

In [6]:
price_df = price_df.drop("legId", 
                         "segmentsDurationInSeconds", 
                         "segmentsArrivalTimeEpochSeconds", 
                         "segmentsDepartureTimeEpochSeconds", 
                         "segmentsArrivalAirportCode",
                         "segmentsDepartureAirportCode")

In [7]:
price_df = (price_df.withColumnRenamed('searchDate', 'search_date')
                    .withColumnRenamed('flightDate', 'flight_date')
                    .withColumnRenamed('startingAirport', 'origin')
                    .withColumnRenamed('destinationAirport', 'destination')
                    .withColumnRenamed('fareBasisCode', 'fare_basis_code')
                    .withColumnRenamed('travelDuration', 'travel_duration')
                    .withColumnRenamed('elapsedDays', 'elapsed_days')
                    .withColumnRenamed('isBasicEconomy', 'is_basic_economy')
                    .withColumnRenamed('isRefundable', 'is_refundable')
                    .withColumnRenamed('isNonStop', 'is_nonstop')
                    .withColumnRenamed('baseFare', 'base_fare')
                    .withColumnRenamed('totalFare', 'total_fare')
                    .withColumnRenamed('seatsRemaining', 'seats_remaining')
                    .withColumnRenamed('totalTravelDistance', 'total_distance')
                    .withColumnRenamed('segmentsDepartureTimeRaw', 'departure_time')
                    .withColumnRenamed('segmentsArrivalTimeRaw', 'arrival_time')
                    .withColumnRenamed('segmentsAirlineName', 'airline_name')
                    .withColumnRenamed('segmentsAirlineCode', 'airline_code')
                    .withColumnRenamed('segmentsEquipmentDescription', 'equipment_description')
                    .withColumnRenamed('segmentsDurationInSeconds', 'duration_seconds')
                    .withColumnRenamed('segmentsDistance', 'distance')
                    .withColumnRenamed('segmentsCabinCode', 'cabin_code'))

In [8]:
price_df = price_df.withColumn("flight_date", to_date(price_df["flight_date"], "yyyy-MM-dd"))
price_df = price_df.withColumn("search_date", to_date(price_df["search_date"], "yyyy-MM-dd"))
price_df = price_df.withColumn('days_until_flight', datediff(price_df['flight_date'], price_df['search_date']))

In [9]:
price_df = price_df.withColumn("flight_year", year(col("flight_date")))
price_df = price_df.withColumn("flight_month", month(col("flight_date")))
price_df = price_df.withColumn("flight_day", dayofmonth(col("flight_date")))

In [10]:
indexer = StringIndexer(inputCol='cabin_code', outputCol='cabin_code_index')
encoder = OneHotEncoder(inputCol='cabin_code_index', outputCol='cabin_code_category')

pipeline = Pipeline(stages=[indexer, encoder])

model = pipeline.fit(price_df)
price_df = model.transform(price_df)

                                                                                

In [11]:
indexer = StringIndexer(inputCol='airline_name', outputCol='airline_index')
encoder = OneHotEncoder(inputCol='airline_index', outputCol='airline_category')

pipeline = Pipeline(stages=[indexer, encoder])

model = pipeline.fit(price_df)
price_df = model.transform(price_df)

                                                                                

In [12]:
price_df = price_df.withColumn("class", col("fare_basis_code").substr(1, 1))

In [13]:
indexer = StringIndexer(inputCol='class', outputCol='class_index')
encoder = OneHotEncoder(inputCol='class_index', outputCol='class_category')

pipeline = Pipeline(stages=[indexer, encoder])

model = pipeline.fit(price_df)
price_df = model.transform(price_df)

                                                                                

In [14]:
@F.pandas_udf(IntegerType())
def parse_iso8601_duration_minutes(str_duration: pd.Series) -> pd.Series:
    return str_duration.apply(lambda duration: 1440 + pd.Timedelta(duration).seconds // 60 if duration.startswith('P1D') else pd.Timedelta(duration).seconds // 60)

@F.pandas_udf(IntegerType())
def parse_iso8601_duration_seconds(str_duration: pd.Series) -> pd.Series:
    return str_duration.apply(lambda duration: 86400 if duration.startswith('P1D') else pd.Timedelta(duration).seconds)

price_df = price_df.withColumn("travel_duration_minutes", parse_iso8601_duration_minutes(F.col("travel_duration")))
price_df = price_df.withColumn("travel_duration_seconds", parse_iso8601_duration_seconds(F.col("travel_duration")))

In [15]:
stop_count = F.size(F.split(price_df['departure_time'], '\|\|')) - 1
price_df = price_df.withColumn("num_stops", stop_count)

In [16]:
price_df = price_df.withColumn("initial_departure_datetime", F.split("departure_time", "\|\|")[0])
price_df = price_df.withColumn("final_arrival_datetime", F.split("arrival_time", "\|\|").getItem(F.size(F.split("arrival_time", "\|\|")) - 1))

In [17]:
price_df = price_df.withColumn("initial_departure_datetime", to_timestamp("initial_departure_datetime", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
price_df = price_df.withColumn("final_arrival_datetime", to_timestamp("final_arrival_datetime", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))

price_df = price_df.withColumn("initial_departure_hour", hour("initial_departure_datetime"))
price_df = price_df.withColumn("initial_departure_minute", minute("initial_departure_datetime"))

price_df = price_df.withColumn("final_arrival_hour", hour("final_arrival_datetime"))
price_df = price_df.withColumn("final_arrival_minute", minute("final_arrival_datetime"))

In [18]:
price_df = price_df.withColumn('day_of_week', date_format(col('flight_date'), 'EEEE'))

In [19]:
category_mapping = {
    'Sunday': 0,
    'Monday': 1,
    'Tuesday': 2,
    'Wednesday': 3,
    'Thursday': 4,
    'Friday': 5,
    'Saturday': 6
}

mapping_udf = udf(lambda index: category_mapping.get(index), StringType())

price_df = price_df.withColumn('day_of_week_index', mapping_udf('day_of_week'))
price_df = price_df.withColumn("day_of_week_index", col("day_of_week_index").cast("int"))

In [20]:
price_df.show(1)

23/11/23 02:00:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 11.0 (TID 236) (hub-msca-bdp-dphub-students-rcsurridge-w-3.c.msca-bdp-student-ap.internal executor 1): org.apache.spark.util.TaskCompletionListenerException: Memory was leaked by query. Memory leaked: (131072)
Allocator(stdin reader for /opt/conda/miniconda3/bin/python) 0/131072/266240/9223372036854775807 (res/actual/peak/limit)

	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:145)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:505)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:508)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Work

+-----------+-----------+------+-----------+---------------+---------------+------------+----------------+-------------+----------+---------+----------+---------------+--------------+--------------------+--------------------+--------------------+------------+---------------------+----------+------------+-----------------+-----------+------------+----------+----------------+-------------------+-------------+----------------+-----+-----------+---------------+-----------------------+-----------------------+---------+--------------------------+----------------------+----------------------+------------------------+------------------+--------------------+-----------+-----------------+
|search_date|flight_date|origin|destination|fare_basis_code|travel_duration|elapsed_days|is_basic_economy|is_refundable|is_nonstop|base_fare|total_fare|seats_remaining|total_distance|      departure_time|        arrival_time|        airline_name|airline_code|equipment_description|  distance|  cabin_code|days_un

                                                                                

In [21]:
price_df.printSchema()

root
 |-- search_date: date (nullable = true)
 |-- flight_date: date (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- fare_basis_code: string (nullable = true)
 |-- travel_duration: string (nullable = true)
 |-- elapsed_days: integer (nullable = true)
 |-- is_basic_economy: boolean (nullable = true)
 |-- is_refundable: boolean (nullable = true)
 |-- is_nonstop: boolean (nullable = true)
 |-- base_fare: double (nullable = true)
 |-- total_fare: double (nullable = true)
 |-- seats_remaining: integer (nullable = true)
 |-- total_distance: integer (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- airline_name: string (nullable = true)
 |-- airline_code: string (nullable = true)
 |-- equipment_description: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- cabin_code: string (nullable = true)
 |-- days_until_flight: integer (nullable = true)
 |-- flight_ye

In [22]:
parquet_output_path = "gs://msca-bdp-student-gcs/Group7_Final_Project/flight_pricing/preprocessed_prices.parquet"
price_df.write.parquet(parquet_output_path)

                                                                                