In [1]:
!pip install pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master("local[*]")\
    .config("spark.driver.memory", "16g")\
    .config("spark.executor.memory", "16g")\
    .getOrCreate()
spark

Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317146 sha256=da001ca24d94954b937890520408635681464e82c4376b7a4b0434e3838206f6
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
[0m

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/26 17:55:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.functions import col
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from datetime import datetime
from dateutil import parser
import pyarrow.parquet as pq
from pyspark.ml.feature import VectorAssembler
from PIL import Image
from pyspark.sql.types import IntegerType



In [3]:
# spark.debug.maxToStringFields=100
df = spark.read.option("header", "True").csv("/kaggle/input/flightprices/itineraries.csv")

filtered_df = df.select('searchDate', 'flightDate', 'startingAirport', 'destinationAirport', 'fareBasisCode'\
                        , 'travelDuration', 'elapsedDays', 'isBasicEconomy', 'isRefundable','isNonStop', 'baseFare'\
                        , 'seatsRemaining', 'totalTravelDistance', 'segmentsDepartureTimeRaw'\
                        , 'segmentsArrivalTimeRaw', 'segmentsArrivalAirportCode', 'segmentsDepartureAirportCode'\
                        , 'segmentsAirlineCode', 'segmentsCabinCode')
filtered_df.show(10)



                                                                                

+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+--------------+-------------------+------------------------+----------------------+--------------------------+----------------------------+-------------------+-----------------+
|searchDate|flightDate|startingAirport|destinationAirport|fareBasisCode|travelDuration|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|seatsRemaining|totalTravelDistance|segmentsDepartureTimeRaw|segmentsArrivalTimeRaw|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineCode|segmentsCabinCode|
+----------+----------+---------------+------------------+-------------+--------------+-----------+--------------+------------+---------+--------+--------------+-------------------+------------------------+----------------------+--------------------------+----------------------------+-------------------+-----------------+
|2022-04-16|2022-04-17|     

In [4]:
from pyspark.sql.functions import expr, regexp_extract
from pyspark.sql.types import BooleanType

#add stop count
filtered_df = filtered_df.withColumn("stopCount",
                   (length("segmentsArrivalAirportCode") - length(expr("regexp_replace(segmentsArrivalAirportCode, '[\|]', '')"))) / 2)

#add search month, day, hour, minute and flight month, day, hour, minute
filtered_df = filtered_df.withColumn('searchMonth', month(filtered_df.searchDate))
filtered_df = filtered_df.withColumn("searchDate",to_timestamp(col("searchDate")))\
                        .withColumn("searchDayOfMonth", date_format(col("searchDate"), "d").cast('int'))
filtered_df = filtered_df.withColumn('flightMonth', month(filtered_df.flightDate))
filtered_df = filtered_df.withColumn("flightDate",to_timestamp(col("flightDate")))\
                        .withColumn("flightDayOfMonth", date_format(col("flightDate"), "d").cast('int'))
filtered_df = filtered_df.drop("searchDate", "flightDate")

#add departure hour and minute and arrival hour and minute
filtered_df = filtered_df.withColumn("departureHour", substring(df["segmentsDepartureTimeRaw"], 12, 2).cast("int"))
filtered_df = filtered_df.withColumn("departureMinute", substring(df["segmentsDepartureTimeRaw"], 15, 2).cast("int"))
filtered_df = filtered_df.withColumn("arrivalHour", substring(df["segmentsArrivalTimeRaw"], -18, 2).cast("int"))
filtered_df = filtered_df.withColumn("arrivalMinute", substring(df["segmentsArrivalTimeRaw"], -15, 2).cast("int"))
filtered_df = filtered_df.drop("segmentsDepartureTimeRaw", "segmentsArrivalTimeRaw")


filtered_df = filtered_df.withColumn("airline", split(df["segmentsAirlineCode"], "\|\|")[0])
filtered_df = filtered_df.withColumn("cabin", split(df["segmentsCabinCode"], "\|\|")[0])
filtered_df = filtered_df.drop("segmentsDepartureAirportCode", "segmentsArrivalAirportCode", "segmentsAirlineCode", "segmentsCabinCode")

#add duration(minutes)
filtered_df = filtered_df.withColumn("durationHour", regexp_extract(filtered_df.travelDuration, r"(\d+)", 1).cast('int'))
filtered_df = filtered_df.withColumn("durationMinute", regexp_extract(filtered_df.travelDuration, r"(\d+)H(\d+)", 2).cast('int'))
filtered_df = filtered_df.withColumn("duration", filtered_df.durationHour*60 + filtered_df.durationMinute)
filtered_df = filtered_df.drop("travelDuration", "durationHour", "durationMinute")

#change data type
filtered_df = filtered_df.withColumn("elapsedDays", filtered_df["elapsedDays"].cast('int'))
filtered_df = filtered_df.withColumn("baseFare", filtered_df["baseFare"].cast('double'))
filtered_df = filtered_df.withColumn("seatsRemaining", filtered_df["seatsRemaining"].cast('int'))
filtered_df = filtered_df.withColumn("totalTravelDistance", filtered_df["totalTravelDistance"].cast('int'))
filtered_df = filtered_df.withColumn("seatsRemaining", filtered_df["seatsRemaining"].cast('int'))
filtered_df = filtered_df.withColumn("isBasicEconomy", filtered_df["isBasicEconomy"].cast(BooleanType()).cast('int'))
filtered_df = filtered_df.withColumn("isRefundable", filtered_df["isRefundable"].cast(BooleanType()).cast('int'))
filtered_df = filtered_df.withColumn("isNonStop", filtered_df["isNonStop"].cast(BooleanType()).cast('int'))

#drop null
filtered_df = filtered_df.dropna()


filtered_df.show(5)

+---------------+------------------+-------------+-----------+--------------+------------+---------+--------+--------------+-------------------+---------+-----------+----------------+-----------+----------------+-------------+---------------+-----------+-------------+-------+-----+--------+
|startingAirport|destinationAirport|fareBasisCode|elapsedDays|isBasicEconomy|isRefundable|isNonStop|baseFare|seatsRemaining|totalTravelDistance|stopCount|searchMonth|searchDayOfMonth|flightMonth|flightDayOfMonth|departureHour|departureMinute|arrivalHour|arrivalMinute|airline|cabin|duration|
+---------------+------------------+-------------+-----------+--------------+------------+---------+--------+--------------+-------------------+---------+-----------+----------------+-----------+----------------+-------------+---------------+-----------+-------------+-------+-----+--------+
|            ATL|               BOS|     LA0NX0MC|          0|             0|           0|        1|  217.67|             9|

In [5]:
print(filtered_df)

DataFrame[startingAirport: string, destinationAirport: string, fareBasisCode: string, elapsedDays: int, isBasicEconomy: int, isRefundable: int, isNonStop: int, baseFare: double, seatsRemaining: int, totalTravelDistance: int, stopCount: double, searchMonth: int, searchDayOfMonth: int, flightMonth: int, flightDayOfMonth: int, departureHour: int, departureMinute: int, arrivalHour: int, arrivalMinute: int, airline: string, cabin: string, duration: int]


In [6]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

starting_airport_indexer = StringIndexer(inputCol="startingAirport", outputCol="startingAirportIndex")
destination_airport_indexer = StringIndexer(inputCol="destinationAirport", outputCol="destinationAirportIndex")
fare_basis_code_indexer = StringIndexer(inputCol="fareBasisCode", outputCol="fareBasisCodeIndex")
airline_indexer = StringIndexer(inputCol="airline", outputCol="airineIndex")
cabin_indexer = StringIndexer(inputCol="cabin", outputCol="cabinIndex")

starting_airport_encoder = OneHotEncoder(inputCol="startingAirportIndex", outputCol="startingAirportFact")
destination_airport_encoder = OneHotEncoder(inputCol="destinationAirportIndex", outputCol="destinationAirportFact")
fare_basis_code_encoder = OneHotEncoder(inputCol="fareBasisCodeIndex", outputCol="fareBasisCodeFact")
airline_encoder = OneHotEncoder(inputCol="airineIndex", outputCol="airlineFact")
cabin_encoder = OneHotEncoder(inputCol="cabinIndex", outputCol="cabinFact")

In [7]:
# |startingAirport|destinationAirport|fareBasisCode|elapsedDays|isBasicEconomy|isRefundable|
# isNonStop|baseFare|seatsRemaining|totalTravelDistance|stopCount|searchMonth|searchDayOfMonth|
# flightMonth|flightDayOfMonth|departureHour|departureMinute|arrivalHour|arrivalMinute|airline|cabin|duration|

from pyspark.ml import Pipeline

vec_assembler = VectorAssembler(inputCols=["startingAirportFact", "destinationAirportFact", "fareBasisCodeFact"\
                                           , "elapsedDays", "isBasicEconomy", "isRefundable", "isNonStop"\
                                           , "seatsRemaining", "totalTravelDistance", "stopCount"\
                                           , "searchMonth", "searchDayOfMonth", "flightMonth", "flightDayOfMonth"\
                                           , "departureHour", "departureMinute", "arrivalHour", "arrivalMinute"\
                                           , "arrivalHour", "arrivalMinute", "airlineFact", "cabinFact", "duration"]\
                                    , outputCol="features", handleInvalid="keep")

flights_pipe = Pipeline(stages=[starting_airport_indexer, destination_airport_indexer, fare_basis_code_indexer\
                                , airline_indexer, cabin_indexer, starting_airport_encoder, destination_airport_encoder\
                                , fare_basis_code_encoder, airline_encoder, cabin_encoder, vec_assembler])

In [8]:
all_data = flights_pipe.fit(filtered_df).transform(filtered_df)


23/05/26 18:39:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


AnalysisException: [DATATYPE_MISMATCH.BINARY_OP_WRONG_TYPE] Cannot resolve "((searchMonth = 9) & (searchDayOfMonth <= 18))" due to data type mismatch: the binary operator requires the input type "INTEGRAL", not "BOOLEAN".; line 1 pos 0;
'Filter ((searchMonth#208 = 9) & (searchDayOfMonth#252 <= 18))
+- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, ... 9 more fields]
   +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, ... 8 more fields]
      +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, ... 7 more fields]
         +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, ... 6 more fields]
            +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, ... 5 more fields]
               +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, ... 4 more fields]
                  +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, ... 3 more fields]
                     +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, ... 2 more fields]
                        +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, destinationAirportIndex#6250, UDF(cast(fareBasisCode#22 as string)) AS fareBasisCodeIndex#6281]
                           +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, startingAirportIndex#6220, UDF(cast(destinationAirport#21 as string)) AS destinationAirportIndex#6250]
                              +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620, UDF(cast(startingAirport#20 as string)) AS startingAirportIndex#6220]
                                 +- Filter atleastnnonnulls(22, startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620)
                                    +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, isRefundable#875, cast(cast(isNonStop#27 as boolean) as int) AS isNonStop#898, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                       +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#852, cast(cast(isRefundable#26 as boolean) as int) AS isRefundable#875, isNonStop#27, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                          +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, cast(cast(isBasicEconomy#25 as boolean) as int) AS isBasicEconomy#852, isRefundable#26, isNonStop#27, baseFare#691, seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                             +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#691, cast(seatsRemaining#806 as int) AS seatsRemaining#829, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#691, cast(seatsRemaining#783 as int) AS seatsRemaining#806, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                   +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#691, cast(seatsRemaining#760 as int) AS seatsRemaining#783, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                      +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#691, cast(seatsRemaining#714 as int) AS seatsRemaining#760, totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                         +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#691, seatsRemaining#714, cast(totalTravelDistance#31 as int) AS totalTravelDistance#737, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                            +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#691, cast(seatsRemaining#30 as int) AS seatsRemaining#714, totalTravelDistance#31, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                               +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#668, isBasicEconomy#25, isRefundable#26, isNonStop#27, cast(baseFare#28 as double) AS baseFare#691, seatsRemaining#30, totalTravelDistance#31, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                                  +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, cast(elapsedDays#24 as int) AS elapsedDays#668, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                                     +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, duration#620]
                                                                        +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, durationHour#571, durationMinute#595, ((durationHour#571 * 60) + durationMinute#595) AS duration#620]
                                                                           +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, durationHour#571, cast(regexp_extract(travelDuration#23, (\d+)H(\d+), 2) as int) AS durationMinute#595]
                                                                              +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522, cast(regexp_extract(travelDuration#23, (\d+), 1) as int) AS durationHour#571]
                                                                                 +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, airline#496, cabin#522]
                                                                                    +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, ... 2 more fields]
                                                                                       +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445, split(segmentsAirlineCode#39, \|\|, -1)[0] AS airline#496]
                                                                                          +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, arrivalHour#419, arrivalMinute#445]
                                                                                             +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, ... 2 more fields]
                                                                                                +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, departureMinute#394, cast(substring(segmentsArrivalTimeRaw#35, -18, 2) as int) AS arrivalHour#419]
                                                                                                   +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, departureHour#370, cast(substring(segmentsDepartureTimeRaw#33, 15, 2) as int) AS departureMinute#394]
                                                                                                      +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323, cast(substring(segmentsDepartureTimeRaw#33, 12, 2) as int) AS departureHour#370]
                                                                                                         +- Project [startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, flightDayOfMonth#323]
                                                                                                            +- Project [searchDate#230, flightDate#299, startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275, cast(date_format(flightDate#299, d, Some(Etc/UTC)) as int) AS flightDayOfMonth#323]
                                                                                                               +- Project [searchDate#230, to_timestamp(flightDate#19, None, TimestampType, Some(Etc/UTC), false) AS flightDate#299, startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, flightMonth#275]
                                                                                                                  +- Project [searchDate#230, flightDate#19, startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, searchDayOfMonth#252, month(cast(flightDate#19 as date)) AS flightMonth#275]
                                                                                                                     +- Project [searchDate#230, flightDate#19, startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208, cast(date_format(searchDate#230, d, Some(Etc/UTC)) as int) AS searchDayOfMonth#252]
                                                                                                                        +- Project [to_timestamp(searchDate#18, None, TimestampType, Some(Etc/UTC), false) AS searchDate#230, flightDate#19, startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, searchMonth#208]
                                                                                                                           +- Project [searchDate#18, flightDate#19, startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, stopCount#187, month(cast(searchDate#18 as date)) AS searchMonth#208]
                                                                                                                              +- Project [searchDate#18, flightDate#19, startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43, (cast((length(segmentsArrivalAirportCode#36) - length(regexp_replace(segmentsArrivalAirportCode#36, [|], , 1))) as double) / cast(2 as double)) AS stopCount#187]
                                                                                                                                 +- Project [searchDate#18, flightDate#19, startingAirport#20, destinationAirport#21, fareBasisCode#22, travelDuration#23, elapsedDays#24, isBasicEconomy#25, isRefundable#26, isNonStop#27, baseFare#28, seatsRemaining#30, totalTravelDistance#31, segmentsDepartureTimeRaw#33, segmentsArrivalTimeRaw#35, segmentsArrivalAirportCode#36, segmentsDepartureAirportCode#37, segmentsAirlineCode#39, segmentsCabinCode#43]
                                                                                                                                    +- Relation [legId#17,searchDate#18,flightDate#19,startingAirport#20,destinationAirport#21,fareBasisCode#22,travelDuration#23,elapsedDays#24,isBasicEconomy#25,isRefundable#26,isNonStop#27,baseFare#28,totalFare#29,seatsRemaining#30,totalTravelDistance#31,segmentsDepartureTimeEpochSeconds#32,segmentsDepartureTimeRaw#33,segmentsArrivalTimeEpochSeconds#34,segmentsArrivalTimeRaw#35,segmentsArrivalAirportCode#36,segmentsDepartureAirportCode#37,segmentsAirlineName#38,segmentsAirlineCode#39,segmentsEquipmentDescription#40,... 3 more fields] csv


In [10]:
(trainingData, testData) = all_data.randomSplit([0.7, 0.3])

In [None]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol='baseFare', featuresCol='features')
rf_model = rf.fit(trainingData)
predictions = rf_model.transform(testData)


23/05/26 18:48:11 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/26 18:48:45 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/26 19:26:42 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
23/05/26 20:04:29 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB
23/05/26 20:05:29 WARN MemoryStore: Not enough space to cache rdd_72_0 in memory! (computed 1420.3 MiB so far)
23/05/26 20:05:30 WARN BlockManager: Persisting block rdd_72_0 to disk instead.
23/05/26 20:05:30 WARN MemoryStore: Not enough space to cache rdd_72_1 in memory! (computed 2.1 GiB so far)
23/05/26 20:05:30 WARN BlockManager: Persisting block rdd_72_1 to disk instead.
23/05/26 20:05:30 WARN MemoryStore: Not enough space to cache rdd_72_3 in memory! (computed 2.1 GiB so far)
23/05/26 20:05:30 WARN BlockManager: Persisting block rdd_72_3 to disk instead.
23/05/26 20:05:30 WARN MemoryStore: Not enough space to cache rdd_72_2 in memory! (comput

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

mse_evaluator = RegressionEvaluator(
    labelCol="baseFare", predictionCol="prediction", metricName="mse")
mae_evaluator = RegressionEvaluator(
    labelCol="baseFare", predictionCol="prediction", metricName="mae")
mape_evaluator = RegressionEvaluator(
    labelCol="baseFare", predictionCol="prediction", metricName="mape")
mse = mse_evaluator.evaluate(predictions)
mae = mae_evaluator.evaluate(predictions)
mape = mape_evaluator.evaluate(predictions)

In [None]:
print("MSE on test data = %g" % mse)
print("MAE on test data = %g" % mae)
print("MAPE on test data = %g" % mape)