In [1]:
spark

In [2]:
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import numpy as np

In [3]:
#read the parquet from cleaned folder
sdf=spark.read.parquet("gs://my-bigdata-project-sj/cleaned/cleaned_flight_delay.parquet/", header=True, inferSchema=True)
sdf.printSchema()

                                                                                

root
 |-- FlightDate: date (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)
 |-- AirTime: double (nullable = true)
 |-- CRSElapsedTime: double (nullable = true)
 |-- ActualElapsedTime: double (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- Operated_or_Branded_Code_Share_Partners: string (nullable = true)
 |

In [4]:
#Encode Boolean values as True=1 and False=0
sdf = sdf.withColumn("Cancelled_value", when(col("Cancelled") == True, 1.0).otherwise(0.0))
sdf = sdf.withColumn("Diverted_value", when(col("Diverted") == True, 1.0).otherwise(0.0))

In [5]:
# Create an indexer for the string based columns.
string_columns=["Airline","Origin","Dest"]
string_output=["AirlineIndex","OriginIndex","DestIndex"]
indexer = StringIndexer(inputCols=string_columns, outputCols=string_output, handleInvalid="keep")

In [6]:
# Create an encoder for the three indexes and the integer column.
indexed_columns=["AirlineIndex","OriginIndex","DestIndex","Year","Quarter","Month","DayofMonth","DayOfWeek"]
indexed_output=["AirlineVector","OriginVector","DestVector","YearVector","QuarterVector","MonthVector","DayofMonthVector","DayOfWeekVector"]
encoder = OneHotEncoder(inputCols=indexed_columns,
                        outputCols=indexed_output, dropLast=True, handleInvalid="keep")

In [7]:
# Create an assembler for the individual feature vectors and the float/double columns
vector_columns=["AirlineVector","OriginVector","DestVector","Cancelled_value","Diverted_value","YearVector","QuarterVector","MonthVector","DayofMonthVector","DayOfWeekVector","ArrDelayMinutes"]
assembler = VectorAssembler(inputCols=vector_columns, outputCol="features")

In [8]:
# Create a Ridge Regression Estimator
ridge_reg = LinearRegression(labelCol='DepDelayMinutes',  elasticNetParam=0, regParam=0.1)

In [9]:
# Create a regression evaluator (to get RMSE, R2, RME, etc.)
evaluator = RegressionEvaluator(labelCol='DepDelayMinutes')

In [10]:
# Create the pipeline Indexer
regression_pipe = Pipeline(stages=[indexer, encoder, assembler, ridge_reg])

In [12]:
transformed_sdf = regression_pipe.fit(sdf).transform(sdf)

                                                                                

In [13]:
# Review the transformed features
print("Transformed features")
transformed_sdf.select("Airline","Cancelled_value","Diverted_value","DepDelayMinutes","features").show(10, truncate=False)

Transformed features
+----------------------+---------------+--------------+---------------+--------------------------------------------------------------------------------------+
|Airline               |Cancelled_value|Diverted_value|DepDelayMinutes|features                                                                              |
+----------------------+---------------+--------------+---------------+--------------------------------------------------------------------------------------+
|Southwest Airlines Co.|0.0            |0.0           |0.0            |(2891,[0,31,428,2830,2836,2848,2878,2889],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])          |
|Southwest Airlines Co.|0.0            |0.0           |6.0            |(2891,[0,31,428,2830,2836,2848,2878,2889],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])          |
|Southwest Airlines Co.|0.0            |0.0           |22.0           |(2891,[0,31,428,2830,2836,2848,2878,2889],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])          |
|Southwest Airlines Co.|0

In [None]:
#Saving feature data to trusted foler
output_file_path="gs://my-bigdata-project-sj/trusted/trusted_flight_delay.parquet"
transformed_sdf.write.parquet(output_file_path)

In [19]:
#Take sample of data
sdf=sdf.sample(False, 0.01)
# Split the data into training and test sets
trainingData, testData = sdf.randomSplit([0.70, 0.3], seed=42)

In [15]:
# Create a grid to hold hyperparameters
grid = ParamGridBuilder()
grid=grid.addGrid(ridge_reg.regParam, [0.01, 0.1, 1])
grid = grid.addGrid(ridge_reg.elasticNetParam, [0, 0.5,1])

In [16]:
# Build the parameter grid
grid = grid.build()

In [17]:
# Create the CrossValidator using the hyperparameter grid
cv = CrossValidator(estimator=regression_pipe,
                    estimatorParamMaps=grid,
                    evaluator=evaluator,
                    numFolds=3)

In [18]:
# Train the models
all_models  = cv.fit(trainingData)

24/12/03 01:05:13 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [20]:
# Show the average performance over the three folds
print(f"Average metric {all_models.avgMetrics}")

Average metric [9.34824445911156, 9.342048534858444, 9.338060693528917, 9.348723725523813, 9.337842975146755, 9.34675806399755, 9.398792777732064, 9.417431256874302, 9.435117080376394]


In [21]:
# Get the best model from all of the models trained
bestModel = all_models.bestModel

In [22]:
# Use the model 'bestModel' to predict the test set
test_results = bestModel.transform(testData)

In [23]:
# Show the predicted flight delay
test_results.select("Airline","Origin","Dest","Cancelled_value","Diverted_value","DepDelayMinutes", "prediction").show(truncate=False)

[Stage 332:>                                                        (0 + 1) / 1]

+----------------------+------+----+---------------+--------------+---------------+--------------------+
|Airline               |Origin|Dest|Cancelled_value|Diverted_value|DepDelayMinutes|prediction          |
+----------------------+------+----+---------------+--------------+---------------+--------------------+
|Spirit Air Lines      |MSP   |LAS |0.0            |0.0           |0.0            |5.456554827920326   |
|SkyWest Airlines Inc. |JMS   |DVL |0.0            |0.0           |0.0            |-0.31558602479796166|
|Endeavor Air Inc.     |STL   |LGA |0.0            |0.0           |0.0            |10.359867457838737  |
|SkyWest Airlines Inc. |CVG   |ORD |0.0            |0.0           |0.0            |-0.31558602479796166|
|Southwest Airlines Co.|LAS   |DEN |0.0            |0.0           |0.0            |1.144276666180858   |
|Southwest Airlines Co.|MDW   |LGA |0.0            |0.0           |12.0           |1.671122488102112   |
|United Air Lines Inc. |BOS   |EWR |0.0            |0.0

                                                                                

In [24]:
# Calculate RMSE and R2
rmse = evaluator.evaluate(test_results, {evaluator.metricName:'rmse'})
r2 =evaluator.evaluate(test_results,{evaluator.metricName:'r2'})
print(f"RMSE: {rmse}  R-squared:{r2}")



RMSE: 8.087789221750366  R-squared:0.9610862005703247


                                                                                

In [25]:
model_path =  'gs://my-bigdata-project-sj/models/flight_delay_ridge_regression_model'
bestModel.write().overwrite().save(model_path)

                                                                                