In [None]:
# Installing pyspark
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 28 kB/s 
[?25hCollecting py4j
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 61.8 MB/s 
[?25h  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 14.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=3da28261e8fa0b1edeee4aae862fdcaf190a7a23dba1bb03b4b0f10770a526dd
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
%%bash
# Do not change or modify this cell
# Need to install pyspark
# if pyspark is already installed, will print a message indicating pyspark already installed
# pip install pyspark &> /dev/null

# Download the data files from github
# If the data file does not exist in the colab environment
data_file_1=NZ_Airfares.csv

if [[ ! -f ./${data_file_1} ]]; then 
   # download the data file from github and save it in this colab environment instance
   wget https://raw.githubusercontent.com/jasumonga17/New-Zealand-AirFare/main/NZ_Airfares.csv
fi

--2022-12-03 04:07:59--  https://raw.githubusercontent.com/jasumonga17/New-Zealand-AirFare/main/NZ_Airfares.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13271399 (13M) [text/plain]
Saving to: ‘NZ_Airfares.csv’

     0K .......... .......... .......... .......... ..........  0% 2.82M 4s
    50K .......... .......... .......... .......... ..........  0% 5.21M 3s
   100K .......... .......... .......... .......... ..........  1% 9.41M 3s
   150K .......... .......... .......... .......... ..........  1% 33.7M 2s
   200K .......... .......... .......... .......... ..........  1% 12.9M 2s
   250K .......... .......... .......... .......... ..........  2% 29.3M 2s
   300K .......... .......... .......... .......... ..........  2% 9.08M 2s
   350K ..........

In [None]:
# Importing the Necesaary libraries
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as fn
import matplotlib.pyplot as plt

In [None]:
# Initiating Spark Session
# appName is the name of the application, which will be displayed in the spark UI.
# getOrCreate() will either create an existing SparkSession or, if none exists, will create one.
spark=SparkSession.builder.appName('spark-intro').getOrCreate()
sc=spark.sparkContext

In [None]:
# Reading the CSV file
# inferSchema- is to understand automatically that the data is string or integer or double.
data=spark.read.csv('NZ_Airfares.csv',header=True,inferSchema=True)

In [None]:
data.show(5)

+-----------+------------+---------+------------+---------+--------+--------+-------+-------+---------------+------------+
|Travel Date|Dep. airport|Dep. time|Arr. airport|Arr. time|Duration|  Direct|Transit|Baggage|        Airline|Airfare(NZ$)|
+-----------+------------+---------+------------+---------+--------+--------+-------+-------+---------------+------------+
|    9/19/19|         AKL|  1:35 PM|         CHC|  3:00 PM|  1h 25m|(Direct)|   null|   null|        Jetstar|         111|
|    9/19/19|         AKL|  3:55 PM|         CHC|  5:20 PM|  1h 25m|(Direct)|   null|   null|        Jetstar|         111|
|    9/19/19|         AKL| 11:40 AM|         CHC|  1:05 PM|  1h 25m|(Direct)|   null|   null|        Jetstar|         132|
|    9/19/19|         AKL|  8:00 PM|         CHC|  9:25 PM|  1h 25m|(Direct)|   null|   null|        Jetstar|         132|
|    9/19/19|         AKL|  9:00 AM|         CHC| 10:25 AM|  1h 25m|(Direct)|   null|   null|Air New Zealand|         133|
+-----------+---

In [None]:
# Schema
# printSchema()-gives what kind of fields are in the dataset.
print("Schema:", data.printSchema())

root
 |-- Travel Date: string (nullable = true)
 |-- Dep. airport: string (nullable = true)
 |-- Dep. time: string (nullable = true)
 |-- Arr. airport: string (nullable = true)
 |-- Arr. time: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Direct: string (nullable = true)
 |-- Transit: string (nullable = true)
 |-- Baggage: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Airfare(NZ$): integer (nullable = true)

Schema: None


In [None]:
# Shape of DataFrame
print("Rows:",data.count(),"Columns:",len(data.columns))

Rows: 162833 Columns: 11


In [None]:
# Updated DataFrame
data_df=data.toPandas()

In [None]:
# Pandas DataFrame
data_df.head()

Unnamed: 0,Travel Date,Dep. airport,Dep. time,Arr. airport,Arr. time,Duration,Direct,Transit,Baggage,Airline,Airfare(NZ$)
0,9/19/19,AKL,1:35 PM,CHC,3:00 PM,1h 25m,(Direct),,,Jetstar,111
1,9/19/19,AKL,3:55 PM,CHC,5:20 PM,1h 25m,(Direct),,,Jetstar,111
2,9/19/19,AKL,11:40 AM,CHC,1:05 PM,1h 25m,(Direct),,,Jetstar,132
3,9/19/19,AKL,8:00 PM,CHC,9:25 PM,1h 25m,(Direct),,,Jetstar,132
4,9/19/19,AKL,9:00 AM,CHC,10:25 AM,1h 25m,(Direct),,,Air New Zealand,133


In [None]:
# Changing data type of Travel
import datetime 
import pandas as pd
dt=datetime.datetime.today()
data_df['Travel Date']=pd.to_datetime(data_df['Travel Date'])
data_df['Day']=data_df['Travel Date'].dt.day_name()
data_df['Month']=data_df['Travel Date'].dt.strftime('%B')
# New column Quarter 
data_df['Quarter']=data_df['Travel Date'].dt.quarter
# Direct
data_df['Direct'] = data_df['Direct'].str.strip('()')

In [None]:
# Spark data frame
data_spark_df=spark.createDataFrame(data_df)
data_spark_df.show(5)

+-------------------+------------+---------+------------+---------+--------+------+-------+-------+---------------+------------+--------+---------+-------+
|        Travel Date|Dep. airport|Dep. time|Arr. airport|Arr. time|Duration|Direct|Transit|Baggage|        Airline|Airfare(NZ$)|     Day|    Month|Quarter|
+-------------------+------------+---------+------------+---------+--------+------+-------+-------+---------------+------------+--------+---------+-------+
|2019-09-19 00:00:00|         AKL|  1:35 PM|         CHC|  3:00 PM|  1h 25m|Direct|   null|   null|        Jetstar|         111|Thursday|September|      3|
|2019-09-19 00:00:00|         AKL|  3:55 PM|         CHC|  5:20 PM|  1h 25m|Direct|   null|   null|        Jetstar|         111|Thursday|September|      3|
|2019-09-19 00:00:00|         AKL| 11:40 AM|         CHC|  1:05 PM|  1h 25m|Direct|   null|   null|        Jetstar|         132|Thursday|September|      3|
|2019-09-19 00:00:00|         AKL|  8:00 PM|         CHC|  9:25 

In [None]:
# Data Pre-Processing
from pyspark.sql import functions as fn
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType,BooleanType,DateType,LongType
from pyspark.sql.functions import *
from pyspark.sql.types import StructField
from pyspark.sql.functions import expr

In [None]:
# Changing Column Names
data_updated=data_spark_df.withColumnRenamed("Dep. airport","Departure_Airport").withColumnRenamed("Dep. time","Departure_Time").withColumnRenamed("Arr. airport","Arrival_Airport").withColumnRenamed("Arr. time","Arrival_Time").withColumnRenamed("Airfare(NZ$)","Airfare(NZ)").withColumnRenamed('Travel Date','Travel_Date')
data_updated.show(5)

+-------------------+-----------------+--------------+---------------+------------+--------+------+-------+-------+---------------+-----------+--------+---------+-------+
|        Travel_Date|Departure_Airport|Departure_Time|Arrival_Airport|Arrival_Time|Duration|Direct|Transit|Baggage|        Airline|Airfare(NZ)|     Day|    Month|Quarter|
+-------------------+-----------------+--------------+---------------+------------+--------+------+-------+-------+---------------+-----------+--------+---------+-------+
|2019-09-19 00:00:00|              AKL|       1:35 PM|            CHC|     3:00 PM|  1h 25m|Direct|   null|   null|        Jetstar|        111|Thursday|September|      3|
|2019-09-19 00:00:00|              AKL|       3:55 PM|            CHC|     5:20 PM|  1h 25m|Direct|   null|   null|        Jetstar|        111|Thursday|September|      3|
|2019-09-19 00:00:00|              AKL|      11:40 AM|            CHC|     1:05 PM|  1h 25m|Direct|   null|   null|        Jetstar|        132|Th

In [None]:
# Null Value check in the dataset
print("Schema:", data_updated.printSchema())

root
 |-- Travel_Date: timestamp (nullable = true)
 |-- Departure_Airport: string (nullable = true)
 |-- Departure_Time: string (nullable = true)
 |-- Arrival_Airport: string (nullable = true)
 |-- Arrival_Time: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Direct: string (nullable = true)
 |-- Transit: string (nullable = true)
 |-- Baggage: string (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Airfare(NZ): long (nullable = true)
 |-- Day: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Quarter: long (nullable = true)

Schema: None


In [None]:
#Checking for Null Values in all the columns
from pyspark.sql.functions import col,isnan, when, count
data_updated.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_updated.columns[1:]]
   ).show()


+-----------------+--------------+---------------+------------+--------+------+-------+-------+-------+-----------+---+-----+-------+
|Departure_Airport|Departure_Time|Arrival_Airport|Arrival_Time|Duration|Direct|Transit|Baggage|Airline|Airfare(NZ)|Day|Month|Quarter|
+-----------------+--------------+---------------+------------+--------+------+-------+-------+-------+-----------+---+-----+-------+
|               24|             0|             24|           5|       0|     0|  39756| 160522|      5|          0|  0|    0|      0|
+-----------------+--------------+---------------+------------+--------+------+-------+-------+-------+-----------+---+-----+-------+



In [None]:
# Dropping Records from Departure Airport and Arrival Airport
data_updated=data_updated.na.drop(subset=['Departure_Airport','Arrival_Airport','Arrival_Time'])

In [None]:
# Replacing Null Values in Transit, Baggage and Airline Column
data_updated=data_updated.na.fill('No Transit',['Transit']).na.fill('Not Defined',['Baggage']).na.fill('Other',['Airline'])

In [None]:
# Modifying the airport names
data_updated=data_updated.replace('CHC','ChristChurch').replace('WLG','Wellington').replace('AKL','Auckland').replace('ZQN','Queenstown').replace('NPL','New Plymouth Airport').replace('DUD','Dunedin').replace('NPE',"Hawke's").replace('NSN','Nelson').replace('PMR','Palmerston')

In [None]:
# Shape of DataFrame
print("Rows:",data_updated.count(),"Columns:",len(data_updated.columns))

Rows: 162804 Columns: 14


In [None]:
#Checking for Null Values in all the columns
from pyspark.sql.functions import col,isnan, when, count
data_updated.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_updated.columns[1:]]
   ).show()

+-----------------+--------------+---------------+------------+--------+------+-------+-------+-------+-----------+---+-----+-------+
|Departure_Airport|Departure_Time|Arrival_Airport|Arrival_Time|Duration|Direct|Transit|Baggage|Airline|Airfare(NZ)|Day|Month|Quarter|
+-----------------+--------------+---------------+------------+--------+------+-------+-------+-------+-----------+---+-----+-------+
|                0|             0|              0|           0|       0|     0|      0|      0|      0|          0|  0|    0|      0|
+-----------------+--------------+---------------+------------+--------+------+-------+-------+-------+-----------+---+-----+-------+



In [None]:
# Creating Average Fare column 
# If Avg Fare>Fare then 1 else 0
data_updated=data_updated.withColumn('Average_Fare',when(fn.lit(data_updated.approxQuantile('Airfare(NZ)',[0.5],0)[0])>col('Airfare(NZ)'),1).otherwise(0))


In [None]:
df=data_updated.toPandas()

In [None]:
# Converting into pandas DataFrame
import pandas as pd
from pyspark.sql.functions import sum,avg,max,mean

# Converting into Pandas Data Frame
mean_fare=data_updated.groupby('Airline').agg(fn.round(mean('Airfare(NZ)'),2).alias('Average_Fare'))

# Creating pandas dataframe
mean_fare_df=mean_fare.toPandas()
mean_fare_df=mean_fare_df.sort_values(by='Average_Fare',ascending=False)

In [None]:
import plotly.express as px
fig=px.bar(mean_fare_df,x='Airline',y='Average_Fare',color='Airline',text_auto=True)
fig.update_layout(title_text="Airline's Average Fare")
fig.show()

In [None]:
# Converting into Pandas Data Frame
direct_fare=data_updated.groupby('Airline','Direct').agg(fn.round(mean('Airfare(NZ)'),2).alias('Average_Fare'))

# Regex
from pyspark.sql.functions import regexp_replace

# Creating pandas dataframe
direct_fare_df=direct_fare.toPandas()
direct_fare_df=direct_fare_df.sort_values(by='Average_Fare',ascending=False)
direct_fare_df.head()


Unnamed: 0,Airline,Direct,Average_Fare
0,Air New Zealand,2 stops,585.98
6,Air New Zealand,3 stops,553.3
4,Sounds Air,2 stops,553.27
2,Air New Zealand,1 stop,415.51
1,Sounds Air,1 stop,356.34


In [None]:
# Direct vs Average Fare
fig = px.bar(direct_fare_df, x="Direct", y="Average_Fare", color="Airline",text_auto=True)
fig.update_layout(title_text="Direct vs Average Fare")
fig.show()

In [None]:
# Converting into Pandas Data Frame
month_fare=data_updated.groupby('Month').agg(fn.round(mean('Airfare(NZ)'),2).alias('Average_Fare'))

# Regex
from pyspark.sql.functions import regexp_replace

# Creating pandas dataframe
month_fare_df=month_fare.toPandas()
month_fare_df=month_fare_df.sort_values(by='Average_Fare',ascending=False)
month_fare_df.head()

Unnamed: 0,Month,Average_Fare
2,November,421.74
0,October,417.15
1,September,413.4
3,December,365.03


In [None]:
# Line Chart
# Average Fare vs Month
fig = px.line(month_fare_df, x="Month", y="Average_Fare",text='Average_Fare')
fig.update_traces(textposition="bottom right")
fig.update_layout(title_text="Month vs Average Fare")
fig.show()

In [None]:
# Day and Month

# Converting into Pandas Data Frame
dm_fare=data_updated.groupby('Day','Month').agg(fn.round(mean('Airfare(NZ)'),2).alias('Average_Fare'))

# Regex
from pyspark.sql.functions import regexp_replace

# Creating pandas dataframe
dm_fare_df=dm_fare.toPandas()
dm_fare_df=dm_fare_df.sort_values(by='Average_Fare',ascending=False)
dm_fare_df.head()

Unnamed: 0,Day,Month,Average_Fare
5,Saturday,September,484.5
8,Saturday,October,483.44
7,Sunday,October,480.74
27,Saturday,November,479.19
16,Sunday,November,469.11


In [None]:
# Line Chart
# Day vs Average Fare
fig = px.line(dm_fare_df, x="Day", y="Average_Fare",color='Month',text='Average_Fare',markers=True)
fig.update_traces(textposition="bottom right")
fig.update_layout(title_text="Day vs Average Fare")
fig.show()

In [None]:
# Pipeline
from pyspark.sql.functions import col, when
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression
from pyspark.ml import feature

# Creating Airfare_pipe
airfare_pipe=Pipeline(stages=[feature.StringIndexer(inputCols=('Departure_Airport','Arrival_Airport','Direct','Transit','Baggage','Airline','Day','Month'),
                                                    outputCols=('encoded_Departure_Airport','encoded_Arrival_Airport','encoded_Direct','encoded_Transit','encoded_Baggage','encoded_Airline','enoded_Day','encoded_Month'))])

In [None]:
# Fitting Pipeline
air_fit=airfare_pipe.fit(data_updated)

# Transform
data_updated=air_fit.transform(data_updated)

# Dropping Columns
data_updated = data_updated.drop('Travel_Date','Departure_Time','Arrival_Time','Duration','Departure_Airport','Arrival_Airport','Direct','Transit','Baggage','Airline','Day','Month')

# Arranging the columns
data_updated = data_updated.select('Quarter','encoded_Departure_Airport','encoded_Arrival_Airport', 'encoded_Direct','encoded_Transit','encoded_Airline','enoded_Day','encoded_Month','Average_Fare','Airfare(NZ)')

In [None]:
# Train-Test Split
train, validation, test = data_updated.randomSplit([0.6, 0.2, 0.2],seed=0)
train.show(5)

+-------+-------------------------+-----------------------+--------------+---------------+---------------+----------+-------------+------------+-----------+
|Quarter|encoded_Departure_Airport|encoded_Arrival_Airport|encoded_Direct|encoded_Transit|encoded_Airline|enoded_Day|encoded_Month|Average_Fare|Airfare(NZ)|
+-------+-------------------------+-----------------------+--------------+---------------+---------------+----------+-------------+------------+-----------+
|      3|                      0.0|                    0.0|           0.0|            2.0|            0.0|       0.0|          2.0|           1|        307|
|      3|                      0.0|                    0.0|           0.0|            2.0|            0.0|       1.0|          2.0|           1|        307|
|      3|                      0.0|                    0.0|           0.0|            2.0|            0.0|       1.0|          2.0|           1|        307|
|      3|                      0.0|                    0.0

In [None]:
# Vetcor Assembler
va=feature.VectorAssembler(inputCols=['Quarter','encoded_Departure_Airport','encoded_Arrival_Airport', 'encoded_Direct','encoded_Transit','encoded_Airline','enoded_Day','encoded_Month','Average_Fare'], outputCol='features')

In [None]:
# Standard Scaler
sc=feature.StandardScaler(withMean=True,inputCol='features',outputCol='sc_features')

**Linear Regression**

In [None]:
# Model Building

# Linear Regression
from pyspark.sql.functions import col, when
from pyspark.ml.pipeline import Pipeline, PipelineModel
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import feature

# Regression object
lr=LinearRegression(featuresCol='sc_features',labelCol='Airfare(NZ)')

# Pipeline
lr_pipeline=Pipeline(stages=[va,sc,lr])

# Fit on Train set
train_lr=lr_pipeline.fit(train)

# Validation Set
val_lr=train_lr.transform(validation)

# Test set
test_lr=train_lr.transform(test)

evaluator = RegressionEvaluator(labelCol='Airfare(NZ)',predictionCol='prediction')
lr_validation_mse=evaluator.evaluate(val_lr,{evaluator.metricName: "mse"})
print('MSE_LR_Validation:',lr_validation_mse)
lr_test_mse=evaluator.evaluate(test_lr,{evaluator.metricName: "mse"})
print('MSE_LR_Test:',lr_test_mse)
print('R2_LR:',evaluator.evaluate(val_lr,{evaluator.metricName: "r2"}))

MSE_LR_Validation: 11112.697940423024
MSE_LR_Test: 10971.035526181646
R2_LR: 0.6110991169929199


In [None]:
# 

**Random Forest Regressor**

In [None]:
# Random Forest Regressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Regression object
rf=RandomForestRegressor(featuresCol='sc_features',labelCol='Airfare(NZ)',maxBins=1221)

# Pipeline
rf_pipeline=Pipeline(stages=[va,sc,rf])

# Fit on Train set
train_rf=rf_pipeline.fit(train)

# Validation Set
val_rf=train_rf.transform(validation)

# Test set
test_rf=train_rf.transform(test)

evaluator = RegressionEvaluator(labelCol='Airfare(NZ)',predictionCol='prediction')
print('MSE_RF_Validation:',evaluator.evaluate(val_rf,{evaluator.metricName: "mse"}))
rf_test_mse=evaluator.evaluate(test_rf,{evaluator.metricName: "mse"})
print('MSE_LR_Test:',rf_test_mse)
print('R2_RF:',evaluator.evaluate(val_rf,{evaluator.metricName: "r2"}))

MSE_RF_Validation: 9487.656149909903
MSE_LR_Test: 9466.756141938326
R2_RF: 0.6679692119637458


**Gradient Boosting Regressor**

In [None]:
# GBT Regressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Regression object
gb=GBTRegressor(featuresCol='sc_features',labelCol='Airfare(NZ)',maxBins=1221)

# Pipeline
gb_pipeline=Pipeline(stages=[va,sc,gb])

# Fit on Train set
train_gb=gb_pipeline.fit(train)

# Validation Set
val_gb=train_gb.transform(validation)

# Test set
test_gb=train_gb.transform(test)

evaluator = RegressionEvaluator(labelCol='Airfare(NZ)',predictionCol='prediction')
print('MSE_GB_Validation:',evaluator.evaluate(val_gb,{evaluator.metricName: "mse"}))
gb_test_mse=evaluator.evaluate(test_gb,{evaluator.metricName: "mse"})
print('MSE_GB_Test:',gb_test_mse)
print('R2_GB:',evaluator.evaluate(val_gb,{evaluator.metricName: "r2"}))

MSE_GB_Validation: 7619.505151853937
MSE_GB_Test: 7523.448256659913
R2_GB: 0.7333471765794984


**Cross Validation-Random Forest Regressor**

In [None]:
# Cross-Validation Random Forest Regressor

from pyspark.ml.tuning import ParamGridBuilder
import numpy as np
from pyspark.ml.tuning import CrossValidator
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [5,10]) \
.addGrid(rf.maxDepth, [5,10]) \
.build()
crossval = CrossValidator(estimator=rf_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol='Airfare(NZ)',predictionCol='prediction'),
                          numFolds=3)
cvModel=crossval.fit(train)
predictions=cvModel.transform(validation)
best=cvModel.bestModel
bestModel = best.stages[2]
print('numTrees - ', bestModel.getNumTrees)
print('maxDepth - ', bestModel.getOrDefault('maxDepth'))


numTrees -  10
maxDepth -  10


**Best Model-Random Forest**

In [None]:
# Best Model-Random Forest
rf_best=RandomForestRegressor(numTrees=15, maxDepth=15,labelCol='Airfare(NZ)',predictionCol='prediction',maxBins=1221)
best_rf_pipe=Pipeline(stages=[va,sc,rf_best])

# Train
train_rfe=best_rf_pipe.fit(train)

# Validation
val_rfe=train_rfe.transform(validation)

# Test Data
test_rfe=train_rfe.transform(test)

#MSE Scoring Metric
evaluator = RegressionEvaluator(labelCol='Airfare(NZ)',predictionCol='prediction')
rf_validation_mse=evaluator.evaluate(val_rfe,{evaluator.metricName: "mse"}) 
#rf_train_mse=evaluator.evaluate(train_rfe_transform,{evaluator.metricName: "mse"}) 
rf_test_mse=evaluator.evaluate(test_rfe,{evaluator.metricName: "mse"}) 
print('RF_Validation_MSE:',rf_validation_mse)
print('RF_Test_MSE:',rf_test_mse)
print('R2_LR:',evaluator.evaluate(val_rfe,{evaluator.metricName: "r2"}))

RF_Validation_MSE: 5812.5105767952045
RF_Test_MSE: 5750.200415383202
R2_LR: 0.7965849060306953


In [None]:
# Feature Importance
import pandas as pd
best_model=train_rfe.stages[-1]

#best_model.featureImportances.toArray()

rf_feature_importance=pd.DataFrame(list(zip(data_updated.columns, best_model.featureImportances.toArray())),
            columns = ['feature', 'importance']).sort_values('importance',ascending=False)

display(rf_feature_importance)

Unnamed: 0,feature,importance
8,Average_Fare,0.509239
4,encoded_Transit,0.169338
3,encoded_Direct,0.162429
2,encoded_Arrival_Airport,0.057467
6,enoded_Day,0.038395
1,encoded_Departure_Airport,0.029609
5,encoded_Airline,0.025178
7,encoded_Month,0.006937
0,Quarter,0.001409


**Cross Validation-GBT Regressor**

In [None]:
# Cross Validation-GBT Regressor
from pyspark.ml.tuning import ParamGridBuilder
import numpy as np
from pyspark.ml.tuning import CrossValidator
paramGrid_gb = ParamGridBuilder() \
.addGrid(gb.maxIter, [5,10]) \
.addGrid(gb.maxDepth, [5,10]) \
.build()
crossval_gbt = CrossValidator(estimator=gb_pipeline,
                          estimatorParamMaps=paramGrid_gb,
                          evaluator=RegressionEvaluator(labelCol='Airfare(NZ)',predictionCol='prediction'),
                          numFolds=3)
cvModel_gbt=crossval_gbt.fit(train)
predictions=cvModel_gbt.transform(validation)
best_gbt=cvModel_gbt.bestModel
bestModel_gbt = best_gbt.stages[-1]
print('MaxIter=',bestModel_gbt._java_obj.getMaxIter())
print('MaxDepth=',bestModel_gbt._java_obj.getMaxDepth())


MaxIter= 10
MaxDepth= 10


**Best Model-GB Regressor**

In [None]:
# Best Model-GB Regressor
gb_best=GBTRegressor(maxIter=10,maxDepth=10,labelCol='Airfare(NZ)',predictionCol='prediction',maxBins=1221)
best_gb_pipe=Pipeline(stages=[va,sc,gb_best])

# Train
train_gb=best_gb_pipe.fit(train)

# Validation
val_gb=train_gb.transform(validation)

# Test Data
test_gb=train_gb.transform(test)

#MSE Scoring Metric
evaluator = RegressionEvaluator(labelCol='Airfare(NZ)',predictionCol='prediction')
gbt_validation_mse=evaluator.evaluate(val_gb,{evaluator.metricName: "mse"}) 

#gbt_train_mse=evaluator.evaluate(train_gb_transform,{evaluator.metricName: "mse"}) 
gbt_test_mse=evaluator.evaluate(test_gb,{evaluator.metricName: "mse"}) 
#print(gbt_train_mse)
print('GB_Validation_MSE:',gbt_validation_mse)
print('GB_Test_MSE:',gbt_test_mse)
print('R2_GB:',evaluator.evaluate(val_gb,{evaluator.metricName: "r2"}))

GB_Validation_MSE: 6012.567430714048
GB_Test_MSE: 5888.388311126792
R2_GB: 0.7895837000627326


In [None]:
# Creating a dataframe
import pandas as pd
lr_rf_gbt_mse_compare=pd.DataFrame({'Model':['LR','RF','GBT'],
                                 'Validation MSE':[lr_validation_mse,rf_validation_mse,gbt_validation_mse],
                                 'Test MSE':[lr_test_mse,rf_test_mse,gbt_test_mse],
                                 'R-Squared-Validation':[evaluator.evaluate(val_lr,{evaluator.metricName: "r2"}),
                                              evaluator.evaluate(val_rf,{evaluator.metricName: "r2"}),
                                              evaluator.evaluate(val_gb,{evaluator.metricName: "r2"})]
                                    
                                })
display(lr_rf_gbt_mse_compare)

Unnamed: 0,Model,Validation MSE,Test MSE,R-Squared-Validation
0,LR,11112.69794,10971.035526,0.611099
1,RF,5812.510577,5750.200415,0.667969
2,GBT,6012.567431,5888.388311,0.789584
