In [1]:
from pyspark.sql import SparkSession
# import pyspark.sql.functions as F
from pyspark.sql.functions import col, isnan, when, count, udf, to_date, year, month, date_format, size, split, datediff, regexp_extract, lit, abs
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, MinMaxScaler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
spark

In [3]:
cleaned_data_path = 'gs://my-bigdata-project-rn/cleaned/cleaned_itineraries.parquet'
#Sample used to collect only 5% of random data. Will be adjusted
sdf = spark.read.parquet(cleaned_data_path, header=True, inferSchema=True)

                                                                                

In [4]:
#Ensure search and flight date are datetime values
sdf.printSchema()

root
 |-- searchDate: date (nullable = true)
 |-- flightDate: date (nullable = true)
 |-- startingAirport: string (nullable = true)
 |-- destinationAirport: string (nullable = true)
 |-- travelDuration: string (nullable = true)
 |-- elapsedDays: integer (nullable = true)
 |-- isBasicEconomy: boolean (nullable = true)
 |-- isNonStop: boolean (nullable = true)
 |-- totalFare: double (nullable = true)
 |-- seatsRemaining: integer (nullable = true)
 |-- totalTravelDistance: integer (nullable = true)
 |-- segmentsArrivalAirportCode: string (nullable = true)
 |-- segmentsDepartureAirportCode: string (nullable = true)
 |-- segmentsAirlineCode: string (nullable = true)
 |-- segmentsEquipmentDescription: string (nullable = true)
 |-- segmentsDurationInSeconds: string (nullable = true)
 |-- segmentsDistance: string (nullable = true)
 |-- segmentsCabinCode: string (nullable = true)



In [5]:
#Remove totalFare values over $5500. Considered outliers
sdf = sdf.filter(col('totalFare') < 5500)

In [6]:
sdf.summary().show()

24/12/13 00:55:11 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+---------------+------------------+--------------+-------------------+------------------+------------------+-------------------+--------------------------+----------------------------+-------------------+----------------------------+-------------------------+------------------+--------------------+
|summary|startingAirport|destinationAirport|travelDuration|        elapsedDays|         totalFare|    seatsRemaining|totalTravelDistance|segmentsArrivalAirportCode|segmentsDepartureAirportCode|segmentsAirlineCode|segmentsEquipmentDescription|segmentsDurationInSeconds|  segmentsDistance|   segmentsCabinCode|
+-------+---------------+------------------+--------------+-------------------+------------------+------------------+-------------------+--------------------------+----------------------------+-------------------+----------------------------+-------------------------+------------------+--------------------+
|  count|       74754181|          74754181|      74754181|           747

In [7]:
# Engineer additional date feature columns based on the order_date
# Goal is to have a flightDate_OnWeekend column
sdf = sdf.withColumn("flightDate_DayOfWeek", date_format(col("flightDate"), "EEEE"))         # 'Monday' 'Tuesday' etc.
sdf = sdf.withColumn("flightDate_OnWeekend", when(sdf.flightDate_DayOfWeek == 'Saturday',1.0).when(sdf.flightDate_DayOfWeek == 'Sunday', 1.0).otherwise(0).cast('int'))

# Check columns to see if we got good values
sdf.select(['flightDate','flightDate_DayOfWeek', 'flightDate_OnWeekend']).show(10)

+----------+--------------------+--------------------+
|flightDate|flightDate_DayOfWeek|flightDate_OnWeekend|
+----------+--------------------+--------------------+
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
|2022-07-01|              Friday|                   0|
+----------+--------------------+--------------------+
only showing top 10 rows



In [8]:
#Create daysBetweenFlight column
sdf = sdf.withColumn("daysBetweenFlight", datediff(sdf.flightDate, sdf.searchDate))

In [9]:
#Create total travel duration column in a proper format
sdf = sdf.withColumn("travelduration_hours", regexp_extract(col("travelDuration"), r"(\d+)H",1).cast("int"))
sdf = sdf.withColumn("travelduration_minutes", regexp_extract(col("travelDuration"), r"(\d+)M",1).cast("int"))
sdf = sdf.na.fill(value=0.0,subset=["travelduration_minutes", "travelduration_hours"])
sdf = sdf.withColumn("travelduration_total_minutes", col("travelduration_hours") * 60 + col("travelduration_minutes"))

In [10]:
#Mapping true/false columns to 1 and 0
sdf = sdf.withColumn('isBasicEconomy', when(sdf.isBasicEconomy == 'true', 1).otherwise(0))
sdf = sdf.withColumn('isNonStop', when(sdf.isNonStop == 'true', 1).otherwise(0))

In [11]:
#Remove outlier elapsedDays values greater than 1
sdf = sdf.filter(col('elapsedDays') < 2)

In [12]:
#Verify no more outliers exist
sdf.groupBy('elapsedDays').count().show()

[Stage 7:>                                                          (0 + 1) / 1]

+-----------+--------+
|elapsedDays|   count|
+-----------+--------+
|          1|10943529|
|          0|63810591|
+-----------+--------+



                                                                                

In [14]:
#Apply MinMax to TotalTravelDistance
sdf = sdf.withColumn('totalTravelDistance', sdf.totalTravelDistance.cast('double'))
distance_assembler = VectorAssembler(inputCols=['totalTravelDistance'], outputCol='distanceVector')
totalDistance_scaler = MinMaxScaler(inputCol = 'distanceVector', outputCol = 'totalDistanceScaled')

#Indexer for string columns
indexer = StringIndexer(inputCols=['startingAirport', 'destinationAirport', 'segmentsArrivalAirportCode', 'segmentsCabinCode'], 
                        outputCols=['startAirportIndex', 'destAirportIndex', 'arrivalAirportCodeIndex', 'cabinCodeIndex'], handleInvalid="keep")

#One-Hot encoder
encoder = OneHotEncoder(inputCols=['startAirportIndex', 'destAirportIndex', 'arrivalAirportCodeIndex', 'cabinCodeIndex'],
                        outputCols=['startAirportVector', 'destAirportVector', 'arrivalAirportCodeVector', 'cabinCodeVector'],
                        dropLast=False)

#Creating the assembler for the feature vectors and integer columns used
assembler = VectorAssembler(inputCols=['startAirportVector', 'destAirportVector', 'arrivalAirportCodeVector', 'cabinCodeVector','daysBetweenFlight', 'flightDate_OnWeekend','travelduration_total_minutes', 'elapsedDays', 'isBasicEconomy', 'isNonStop', 'seatsRemaining', 'totalDistanceScaled'], outputCol='features')

# Create a Ridge Regression Estimator
ridge_reg = LinearRegression(labelCol='totalFare',  elasticNetParam=0, regParam=0.1)

# Create a regression evaluator (to get RMSE, R2, RME, etc.)
evaluator = RegressionEvaluator(labelCol='totalFare')

#Creating the pipeline

regression_pipe = Pipeline(stages=[indexer, encoder, distance_assembler, totalDistance_scaler, assembler, ridge_reg])

# Call .fit to transform the data
transformed_sdf = regression_pipe.fit(sdf).transform(sdf)

# Review the transformed features
print("Transformed features")
transformed_sdf.select('daysBetweenFlight', 'flightDate_OnWeekend','startingAirport','destinationAirport', 'travelduration_total_minutes', 'elapsedDays', 'isBasicEconomy', 'isNonStop', 'seatsRemaining', 'totalTravelDistance', 'segmentsArrivalAirportCode', 'segmentsCabinCode', 'totalFare', 'features').show(30, truncate=False)

ERROR:root:KeyboardInterrupt while sending command.               (0 + 18) / 18]
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/miniconda3/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 



In [15]:
# Split the data into 70% training and 30% test sets  
trainingData, testData = sdf.randomSplit([0.7, 0.3], seed=42)

# Create the pipeline   Indexer is stage 0 and Ridge Regression (ridge_reg)  is stage 5
regression_pipe = Pipeline(stages=[indexer, encoder, distance_assembler, totalDistance_scaler, assembler, ridge_reg])

# Create a grid to hold hyperparameters 
grid = ParamGridBuilder()

# Two ways to try .fitIntercept
params = ParamGridBuilder() \
.addGrid(ridge_reg.fitIntercept, [True, False]) \
.addGrid(ridge_reg.regParam, [0.001, 0.01, 0.1, 1, 10]) \
.addGrid(ridge_reg.elasticNetParam, [0, 0.25, 0.5, 0.75, 1]) \
.build()

# Build the parameter grid
grid = grid.build()

print('Number of models to be tested: ', len(params))

# Create the CrossValidator using the hyperparameter grid
cv = CrossValidator(estimator=regression_pipe, 
                    estimatorParamMaps=grid, 
                    evaluator=evaluator, 
                    numFolds=3,seed=42)

# Train the models
all_models  = cv.fit(trainingData)

# Show the average performance over the three folds for each grid combination
print(f"Average metric {all_models.avgMetrics}")

# Get the best model from all of the models trained
bestModel = all_models.bestModel

# Use the model 'bestModel' to predict the test set
test_results = bestModel.transform(testData)

# Show the predicted totalFare
test_results.select('daysBetweenFlight','flightDate_OnWeekend','startingAirport','destinationAirport', 'travelduration_total_minutes', 'elapsedDays', 'isBasicEconomy', 'isNonStop', 'seatsRemaining', 'totalTravelDistance', 'segmentsArrivalAirportCode', 'segmentsCabinCode', 'totalFare', 'prediction').show(truncate=False)

Number of models to be tested:  50


24/12/13 01:14:06 WARN DAGScheduler: Broadcasting large task binary with size 1749.7 KiB
24/12/13 01:14:32 WARN DAGScheduler: Broadcasting large task binary with size 1750.9 KiB
24/12/13 01:14:32 WARN DAGScheduler: Broadcasting large task binary with size 1750.2 KiB
24/12/13 01:14:59 WARN DAGScheduler: Broadcasting large task binary with size 1751.4 KiB
24/12/13 01:14:59 WARN DAGScheduler: Broadcasting large task binary with size 1750.2 KiB
24/12/13 01:14:59 WARN DAGScheduler: Broadcasting large task binary with size 1751.4 KiB
24/12/13 01:14:59 WARN DAGScheduler: Broadcasting large task binary with size 1750.2 KiB
24/12/13 01:15:00 WARN DAGScheduler: Broadcasting large task binary with size 1751.4 KiB
24/12/13 01:15:00 WARN DAGScheduler: Broadcasting large task binary with size 1750.2 KiB
24/12/13 01:15:00 WARN DAGScheduler: Broadcasting large task binary with size 1751.4 KiB
24/12/13 01:15:00 WARN DAGScheduler: Broadcasting large task binary with size 1750.2 KiB
24/12/13 01:15:01 WAR

Average metric [126.28749819892796]


24/12/13 01:29:30 WARN DAGScheduler: Broadcasting large task binary with size 1858.2 KiB
[Stage 446:>                                                        (0 + 1) / 1]

+-----------------+--------------------+---------------+------------------+----------------------------+-----------+--------------+---------+--------------+-------------------+--------------------------+-------------------+---------+------------------+
|daysBetweenFlight|flightDate_OnWeekend|startingAirport|destinationAirport|travelduration_total_minutes|elapsedDays|isBasicEconomy|isNonStop|seatsRemaining|totalTravelDistance|segmentsArrivalAirportCode|segmentsCabinCode  |totalFare|prediction        |
+-----------------+--------------------+---------------+------------------+----------------------------+-----------+--------------+---------+--------------+-------------------+--------------------------+-------------------+---------+------------------+
|40               |1                   |ORD            |MIA               |186                         |0          |0             |1        |2             |1192.0             |MIA                       |coach              |338.61   |230.2773

                                                                                

In [16]:
# RMSE measures the differences between what the model predicted ('prediction') and the actual values ('totalFare').
rmse = evaluator.evaluate(test_results, {evaluator.metricName:'rmse'})
# R-Squared measures how much of the variability in the target variable (totalFare) can be explained by the model
r2 =evaluator.evaluate(test_results,{evaluator.metricName:'r2'})
print(f"RMSE: {rmse}  R-squared:{r2}")

# bestModel.coeff.
# bestModel = all_models.bestModel
# print("Coeeff ", bestModel.coeff)
# bestModel = all_models.bestModel.stages(4)

print(bestModel.stages)

coefficients = bestModel.stages[5].coefficients
# print("bestModel coefficients", coefficients)
intercept = bestModel.stages[5].intercept
print("bestModel intercept", intercept)

#Finding the most important features
feature_names = ['daysBetweenFlight','flightDate_OnWeekend','startingAirport','destinationAirport', 'travelduration_total_minutes', 'elapsedDays', 'isBasicEconomy', 'isNonStop', 'seatsRemaining', 'totalTravelDistance', 'segmentsArrivalAirportCode', 'segmentsCabinCode']

coef_map = dict(zip(feature_names, coefficients))

print("Feature Importance:")
for feature, coef in coef_map.items():
    print("  {}: {:.3f}".format(feature, coef))


24/12/13 01:29:57 WARN DAGScheduler: Broadcasting large task binary with size 1864.6 KiB
24/12/13 01:30:46 WARN DAGScheduler: Broadcasting large task binary with size 1865.7 KiB
24/12/13 01:30:47 WARN DAGScheduler: Broadcasting large task binary with size 1864.6 KiB

RMSE: 126.17129284242456  R-squared:0.5901663946609959
[StringIndexerModel: uid=StringIndexer_f783778c07da, handleInvalid=keep, numInputCols=4, numOutputCols=4, OneHotEncoderModel: uid=OneHotEncoder_e79e52983a30, dropLast=false, handleInvalid=error, numInputCols=4, numOutputCols=4, VectorAssembler_a13b24a51454, MinMaxScalerModel: uid=MinMaxScaler_164b3517cfad, numFeatures=1, min=0.0, max=1.0, VectorAssembler_d20db549e9a8, LinearRegressionModel: uid=LinearRegression_564b952d632a, numFeatures=9977]
bestModel intercept 213.7221382855052
Feature Importance:
  daysBetweenFlight: -2.336
  flightDate_OnWeekend: -30.579
  startingAirport: -17.970
  destinationAirport: 21.467
  travelduration_total_minutes: 2.673
  elapsedDays: -11.462
  isBasicEconomy: -21.451
  isNonStop: 9.625
  seatsRemaining: -15.032
  totalTravelDistance: -36.966
  segmentsArrivalAirportCode: 18.315
  segmentsCabinCode: 15.875


24/12/13 01:31:33 WARN DAGScheduler: Broadcasting large task binary with size 1865.7 KiB
                                                                                

In [18]:
transformed_sdf.write.parquet('gs://my-bigdata-project-rn/trusted/trusted_itineraries.parquet')

24/12/13 01:32:03 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
                                                                                

In [23]:
model_path =  'gs://my-bigdata-project-rn/models/plane_ridgelasso_regression_model'
bestModel.write().overwrite().save(model_path)

                                                                                

In [1]:
# Visualize regression results

# Plot totalFare against predicted totalFare (prediction)
import seaborn as sns
import matplotlib.pyplot as plt

# The Spark dataframe test_results holds the original 'totalFare' as well as the 'prediction'
# Select and convert to a Pandas dataframe
df = test_results.select('totalFare','prediction').toPandas()

plt.figure(figsize=(10,6))

# Set the style for Seaborn plots
sns.set_style("white")
 
# Create a relationship plot between totalFare and prediction
sns.lmplot(x='totalFare', y='prediction', data=df)

plt.savefig('RegressionPlot.png', dpi=600)


NameError: name 'test_results' is not defined

In [None]:
# residuals = bestModel.stages[5].residuals
# test_results.residuals

df = test_results.select('totalFare','prediction').toPandas()
df['residuals'] = df['totalFare'] - df['prediction']

plt.figure(figsize=(10,6))

# Set the style for Seaborn plots
sns.set_style("white")

# Create a relationship plot between totalFare and prediction
sns.regplot(x = 'prediction', y = 'residuals', data = df, scatter = True, color = 'red')

plt.savefig('ResidualPlot.png', dpi=600)

In [None]:
#Distribution of residuals

plt.figure(figsize=(10,6))

sns.histplot(df['residuals'], kde=True)
plt.title("Distribution of Residuals")
plt.xlabel("Residuals")
plt.ylabel("Density")
plt.savefig('ResidualDist.png', dpi=600)

plt.show()

In [None]:
coeff_values = coefficients.toArray()

In [None]:
#import pandas as pd
#coef_df = pd.DataFrame({'Feature': feature_names, 'Coefficient': coeff_values})
plt.barh(feature_names, coeff_values[0:12], color='skyblue')
plt.xlabel("Coefficient Value")
plt.title("Coefficient Plot for Linear Regression")
plt.xticks(rotation=45)
plt.axhline(y=0, color='black', linestyle='--')
plt.show()

In [None]:
import scipy.stats as stats

# Create Q-Q plot
stats.probplot(df['residuals'], dist="norm", plot=plt)
plt.title('Normal Q-Q plot')
plt.xlabel('Theoretical quantiles')
plt.ylabel('Ordered Values')
plt.grid(True)

plt.savefig('QQPlotResiduals.png', dpi=600)

plt.show()