## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [2]:
# File location and type
file_location = "/FileStore/tables/2008.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,1,3,4,2003.0,1955,2211.0,2225,WN,335,N712SW,128.0,150,116.0,-14.0,8.0,IAD,TPA,810,4.0,8.0,0,,0,,,,,
2008,1,3,4,754.0,735,1002.0,1000,WN,3231,N772SW,128.0,145,113.0,2.0,19.0,IAD,TPA,810,5.0,10.0,0,,0,,,,,
2008,1,3,4,628.0,620,804.0,750,WN,448,N428WN,96.0,90,76.0,14.0,8.0,IND,BWI,515,3.0,17.0,0,,0,,,,,
2008,1,3,4,926.0,930,1054.0,1100,WN,1746,N612SW,88.0,90,78.0,-6.0,-4.0,IND,BWI,515,3.0,7.0,0,,0,,,,,
2008,1,3,4,1829.0,1755,1959.0,1925,WN,3920,N464WN,90.0,90,77.0,34.0,34.0,IND,BWI,515,3.0,10.0,0,,0,2.0,0.0,0.0,0.0,32.0
2008,1,3,4,1940.0,1915,2121.0,2110,WN,378,N726SW,101.0,115,87.0,11.0,25.0,IND,JAX,688,4.0,10.0,0,,0,,,,,
2008,1,3,4,1937.0,1830,2037.0,1940,WN,509,N763SW,240.0,250,230.0,57.0,67.0,IND,LAS,1591,3.0,7.0,0,,0,10.0,0.0,0.0,0.0,47.0
2008,1,3,4,1039.0,1040,1132.0,1150,WN,535,N428WN,233.0,250,219.0,-18.0,-1.0,IND,LAS,1591,7.0,7.0,0,,0,,,,,
2008,1,3,4,617.0,615,652.0,650,WN,11,N689SW,95.0,95,70.0,2.0,2.0,IND,MCI,451,6.0,19.0,0,,0,,,,,
2008,1,3,4,1620.0,1620,1639.0,1655,WN,810,N648SW,79.0,95,70.0,-16.0,0.0,IND,MCI,451,3.0,6.0,0,,0,,,,,


In [3]:
# File location and type
file_location = "/FileStore/tables/carriers__1_-6545c.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
carriers = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
#Adding Carrier description details to dataframe
carriers=carriers.withColumnRenamed("Code", "UniqueCarrier")
df = df.join(carriers, 'UniqueCarrier')
#display(carriers)
display(df)

UniqueCarrier,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,Description
WN,2008,1,3,4,2003.0,1955,2211.0,2225,335,N712SW,128.0,150,116.0,-14.0,8.0,IAD,TPA,810,4.0,8.0,0,,0,,,,,,Southwest Airlines Co.
WN,2008,1,3,4,754.0,735,1002.0,1000,3231,N772SW,128.0,145,113.0,2.0,19.0,IAD,TPA,810,5.0,10.0,0,,0,,,,,,Southwest Airlines Co.
WN,2008,1,3,4,628.0,620,804.0,750,448,N428WN,96.0,90,76.0,14.0,8.0,IND,BWI,515,3.0,17.0,0,,0,,,,,,Southwest Airlines Co.
WN,2008,1,3,4,926.0,930,1054.0,1100,1746,N612SW,88.0,90,78.0,-6.0,-4.0,IND,BWI,515,3.0,7.0,0,,0,,,,,,Southwest Airlines Co.
WN,2008,1,3,4,1829.0,1755,1959.0,1925,3920,N464WN,90.0,90,77.0,34.0,34.0,IND,BWI,515,3.0,10.0,0,,0,2.0,0.0,0.0,0.0,32.0,Southwest Airlines Co.
WN,2008,1,3,4,1940.0,1915,2121.0,2110,378,N726SW,101.0,115,87.0,11.0,25.0,IND,JAX,688,4.0,10.0,0,,0,,,,,,Southwest Airlines Co.
WN,2008,1,3,4,1937.0,1830,2037.0,1940,509,N763SW,240.0,250,230.0,57.0,67.0,IND,LAS,1591,3.0,7.0,0,,0,10.0,0.0,0.0,0.0,47.0,Southwest Airlines Co.
WN,2008,1,3,4,1039.0,1040,1132.0,1150,535,N428WN,233.0,250,219.0,-18.0,-1.0,IND,LAS,1591,7.0,7.0,0,,0,,,,,,Southwest Airlines Co.
WN,2008,1,3,4,617.0,615,652.0,650,11,N689SW,95.0,95,70.0,2.0,2.0,IND,MCI,451,6.0,19.0,0,,0,,,,,,Southwest Airlines Co.
WN,2008,1,3,4,1620.0,1620,1639.0,1655,810,N648SW,79.0,95,70.0,-16.0,0.0,IND,MCI,451,3.0,6.0,0,,0,,,,,,Southwest Airlines Co.


In [4]:
df.columns

In [5]:
df.select('ArrTime','DepTime','ArrDelay','DepDelay','Distance').describe().show()

In [6]:
#Total cancelled flights
Cancelled=df.filter("Cancelled == 1").count()
print(Cancelled)

In [7]:
#Total Flights 
Total=df.count()
#Percentage of flights cancelled
percentage= Cancelled/Total*100
print("Percentage of Flights cancelled in a year ", percentage)



In [8]:
#Which day of the week has more cancellations?
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.functions import format_number
df_week_cancelled=df.select('DayOfWeek','Cancelled').groupBy('DayOfWeek').agg(f.count("Cancelled"),f.sum("Cancelled"))\
.withColumn("DayOfWeek",regexp_replace('DayOfWeek', '1', 'Monday'))\
.withColumn("DayOfWeek",regexp_replace('DayOfWeek', '2', 'Tuesday'))\
.withColumn("DayOfWeek",regexp_replace('DayOfWeek', '3', 'Wednesday'))\
.withColumn("DayOfWeek",regexp_replace('DayOfWeek', '4', 'Thursday'))\
.withColumn("DayOfWeek",regexp_replace('DayOfWeek', '5', 'Friday'))\
.withColumn("DayOfWeek",regexp_replace('DayOfWeek', '6', 'Saturday'))\
.withColumn("DayOfWeek",regexp_replace('DayOfWeek', '7', 'Sunday'))\
.withColumnRenamed('count(Cancelled)','scheduled')\
.withColumnRenamed('sum(Cancelled)','Times_cancelled')\
.withColumn('Percentage_cancelled',format_number(col('Times_cancelled')/col('scheduled')*100,2))\
.orderBy('Percentage_cancelled')
display(df_week_cancelled['DayOfWeek','Percentage_cancelled'])

DayOfWeek,Percentage_cancelled
Saturday,1.58
Thursday,1.73
Sunday,1.86
Wednesday,1.94
Monday,1.98
Tuesday,2.24
Friday,2.31


In [9]:
day=df_week_cancelled.toPandas()['DayOfWeek'].values.tolist()
percentage=df_week_cancelled.toPandas()['Percentage_cancelled'].values.tolist()
print(day,percentage)

In [10]:
import matplotlib.pyplot as plt
plt.bar(day,percentage,color='orange')
plt.xlabel('Day of the week')
plt.ylabel('Percentage Cancellations')
plt.show()
display()





In [11]:
#Flight number with more cancellations?
df_flightnum_cancelled=df.select('FlightNum','Cancelled').groupBy('FlightNum').agg(f.count("Cancelled"),f.sum("Cancelled"))\
.withColumnRenamed('count(Cancelled)','scheduled')\
.withColumnRenamed('sum(Cancelled)','Times_cancelled')\
.withColumn('Percentage_cancelled',format_number(col('Times_cancelled')/col('scheduled')*100,2))\
.orderBy('Percentage_cancelled', ascending=False)
df_flightnum_cancelled.show(n=50)

In [12]:
'''import pyspark.sql.functions as f
from pyspark.sql.window import Window
df_flightnum_cancelled.withColumn('Percentage_Cancelled',f.col('Times_Cancelled')/f.sum('Times_Cancelled').over(Window.partitionBy())).\
withColumn('Percentage_Cancelled',col('Percentage_Cancelled')*100).orderBy('Percentage_Cancelled',ascending=False).show(n=50)
'''

In [13]:
#Flight number with more cancellations?
df_carrier_cancelled=df.select('UniqueCarrier','Cancelled','Description').groupBy('UniqueCarrier','Description').agg(f.count("Cancelled"),f.sum("Cancelled"))\
.withColumnRenamed('count(Cancelled)','scheduled')\
.withColumnRenamed('sum(Cancelled)','Times_cancelled')\
.withColumn('Percentage_cancelled',format_number(col('Times_cancelled')/col('scheduled')*100,2))\
.orderBy('Percentage_cancelled', ascending=False)
df_carrier_cancelled.show(n=50)

In [14]:
#df.select('FlightNum','Cancelled','Description').show()

In [15]:
'''df_fcarrier_cancelled.withColumn('Percentage_Cancelled',f.col('Times_Cancelled')/f.sum('Times_Cancelled').over(Window.partitionBy())).\
withColumn('Percentage_Cancelled',col('Percentage_Cancelled')*100).orderBy('Percentage_Cancelled',ascending=False).show(n=50)'''

In [16]:
# Total records with arrival delays
Total=df.select('ArrDelay').count()
Delayed=df.select('ArrDelay').filter("ArrDelay> 0").count()
print(Delayed)

In [17]:
#Total records with no arrival delays
Ontime=df.select('ArrDelay').filter("ArrDelay<= 0").count()
print(Ontime)

In [18]:
print(Ontime/Total)

In [19]:
df_delay_Ontime = df.withColumn(
    'status',
    #f.when(f.col("ArrDelay").between(5, 10), '05-10')\
    f.when(f.col("ArrDelay")> 10, 'Delayed')\
      .otherwise('On time')).select('Month','status','ArrDelay')
display(df_delay_Ontime)

Month,status,ArrDelay
1,On time,-14.0
1,On time,2.0
1,Delayed,14.0
1,On time,-6.0
1,Delayed,34.0
1,Delayed,11.0
1,Delayed,57.0
1,On time,-18.0
1,On time,2.0
1,On time,-16.0


In [20]:
df_delvsot=df_delay_Ontime.groupBy('status').count()\
               .withColumnRenamed('count','Frequency')
print(df_delvsot.show())
display(df_delvsot)

status,Frequency
On time,5207374
Delayed,1802354


In [21]:
#using Pandas
status= df_delvsot.toPandas()['status'].values.tolist()
frequency=df_delvsot.toPandas()['Frequency'].values.tolist()
plt.bar(status,frequency,color='orange')
plt.xlabel('Status')
plt.ylabel('Frequency')
plt.show()
display()

In [22]:
df_avgdelay_month=df_delay_Ontime.select('ArrDelay','Month').filter("status == 'Delayed'").groupBy('Month').agg(f.avg("ArrDelay"))\
.withColumnRenamed('avg(ArrDelay)','AVG_Delay')\
.orderBy('AVG_Delay',ascending=False)
df_avgdelay_month.show()
display(df_avgdelay_month)


Month,AVG_Delay
12,56.27801608996271
7,55.54181525788889
6,53.542344849312194
8,51.829989091037056
2,51.437643597414834
3,49.73489744006061
1,49.62634558966222
4,45.78731315484697
11,45.5107056936648
5,44.87128855776104


In [23]:
#Using pandas
Months= df_avgdelay_month.toPandas()['Month'].values.tolist()
Delay=df_avgdelay_month.toPandas()['AVG_Delay'].values.tolist()
plt.bar(Months,Delay,color='orange')
plt.xlabel('Months')
plt.ylabel('Delay')
plt.show()
display()

In [24]:
df_delay_month=df_delay_Ontime.filter("status == 'Delayed'").groupBy('Month').count()\
.withColumnRenamed('count','Times_Delayed')\
.orderBy('Times_Delayed',ascending=False)
display(df_delay_month)


Month,Times_Delayed
12,192791
6,190460
3,184770
2,181497
1,175202
7,162919
4,147649
5,145496
8,142085
11,99760


In [25]:
#Using pandas
Month= df_delay_month.toPandas()['Month'].values.tolist()
frequency=df_delay_month.toPandas()['Times_Delayed'].values.tolist()
plt.bar(Month,frequency,color='orange')
plt.xlabel('Month')
plt.ylabel('Frequency')
plt.show()
display()

In [26]:
df_delay = df.withColumn(
    'DelayRange',
    #f.when(f.col("ArrDelay").between(5, 10), '05-10')\
    f.when(f.col("ArrDelay").between(11, 15), '10-15')
     .when(f.col("ArrDelay").between(16, 20), '15-20')\
     .when(f.col("ArrDelay").between(21, 25), '20-25')\
     .when(f.col("ArrDelay").between(25, 30), '25-30')\
     .when(f.col("ArrDelay").between(31, 35), '30-35')\
     .when(f.col("ArrDelay").between(36,40), '35-40')\
     .when(f.col("ArrDelay").between(41, 45), '40-45')\
     .when(f.col("ArrDelay").between(46,50), '45-50')\
     .when(f.col("ArrDelay").between(51,55), '51-55')\
     .when(f.col("ArrDelay").between(56,60), '55-60')\
     .when(f.col("ArrDelay").between(61, 65), '60-65')\
     .when(f.col("ArrDelay").between(66, 70), '65-70')\
     .when(f.col("ArrDelay").between(71, 75), '70-75')\
     .when(f.col("ArrDelay").between(75, 80), '75-80')\
     .when(f.col("ArrDelay").between(81, 85), '80-85')\
     .when(f.col("ArrDelay").between(85, 90), '85-90')\
     .when(f.col("ArrDelay").between(91, 95), '90-95')\
     .when(f.col("ArrDelay").between(96 ,100), '95-100')
     .when(f.col("ArrDelay")>105, '>105')\
    .otherwise('On time')
)
display(df_delay)

UniqueCarrier,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,Description,DelayRange
WN,2008,1,3,4,2003.0,1955,2211.0,2225,335,N712SW,128.0,150,116.0,-14.0,8.0,IAD,TPA,810,4.0,8.0,0,,0,,,,,,Southwest Airlines Co.,On time
WN,2008,1,3,4,754.0,735,1002.0,1000,3231,N772SW,128.0,145,113.0,2.0,19.0,IAD,TPA,810,5.0,10.0,0,,0,,,,,,Southwest Airlines Co.,On time
WN,2008,1,3,4,628.0,620,804.0,750,448,N428WN,96.0,90,76.0,14.0,8.0,IND,BWI,515,3.0,17.0,0,,0,,,,,,Southwest Airlines Co.,10-15
WN,2008,1,3,4,926.0,930,1054.0,1100,1746,N612SW,88.0,90,78.0,-6.0,-4.0,IND,BWI,515,3.0,7.0,0,,0,,,,,,Southwest Airlines Co.,On time
WN,2008,1,3,4,1829.0,1755,1959.0,1925,3920,N464WN,90.0,90,77.0,34.0,34.0,IND,BWI,515,3.0,10.0,0,,0,2.0,0.0,0.0,0.0,32.0,Southwest Airlines Co.,30-35
WN,2008,1,3,4,1940.0,1915,2121.0,2110,378,N726SW,101.0,115,87.0,11.0,25.0,IND,JAX,688,4.0,10.0,0,,0,,,,,,Southwest Airlines Co.,10-15
WN,2008,1,3,4,1937.0,1830,2037.0,1940,509,N763SW,240.0,250,230.0,57.0,67.0,IND,LAS,1591,3.0,7.0,0,,0,10.0,0.0,0.0,0.0,47.0,Southwest Airlines Co.,55-60
WN,2008,1,3,4,1039.0,1040,1132.0,1150,535,N428WN,233.0,250,219.0,-18.0,-1.0,IND,LAS,1591,7.0,7.0,0,,0,,,,,,Southwest Airlines Co.,On time
WN,2008,1,3,4,617.0,615,652.0,650,11,N689SW,95.0,95,70.0,2.0,2.0,IND,MCI,451,6.0,19.0,0,,0,,,,,,Southwest Airlines Co.,On time
WN,2008,1,3,4,1620.0,1620,1639.0,1655,810,N648SW,79.0,95,70.0,-16.0,0.0,IND,MCI,451,3.0,6.0,0,,0,,,,,,Southwest Airlines Co.,On time


In [27]:
#x= df_test.toPandas()['DelayRange'].values.tolist()

In [28]:
df_delay_frequency=df_delay.select('DelayRange').filter("DelayRange !='On time'").groupBy('DelayRange').agg(f.count("DelayRange")).\
withColumnRenamed('count(DelayRange)','Frequency')\
.orderBy('DelayRange')
df_delay_frequency.show()

In [29]:
x1= df_delay_frequency.toPandas()['DelayRange'].values.tolist()
y1=df_delay_frequency.toPandas()['Frequency'].values.tolist()

In [30]:
plt.bar(x1,y1,color='orange')
plt.xlabel('Delay Range')
plt.ylabel('Frequency')
plt.show()
display()





In [31]:
from pyspark.ml.regression import LinearRegression

In [32]:
df_model=df.select('UniqueCarrier','Dest','Origin','Month','DayOfWeek','ArrTime','Distance','DepDelay')
df_model=df_model.filter("DepDelay>0")

In [33]:
df_model.printSchema()

In [34]:
from pyspark.sql.types import IntegerType
# Converting string objects to int
df_model = df_model.withColumn("DepDelay", df_model["DepDelay"].cast(IntegerType())).\
                 withColumn("ArrTime", df_model["ArrTime"].cast(IntegerType()))
df_model.printSchema()

In [35]:
df_model.describe().show()

In [36]:
display(df_model)

UniqueCarrier,Dest,Origin,Month,DayOfWeek,ArrTime,Distance,DepDelay
WN,TPA,IAD,1,4,2211,810,8
WN,TPA,IAD,1,4,1002,810,19
WN,BWI,IND,1,4,804,515,8
WN,BWI,IND,1,4,1959,515,34
WN,JAX,IND,1,4,2121,688,25
WN,LAS,IND,1,4,2037,1591,67
WN,MCI,IND,1,4,652,451,2
WN,MCO,IND,1,4,916,828,6
WN,MCO,IND,1,4,1845,828,94
WN,MDW,IND,1,4,1651,162,2


In [37]:
df_model=df_model.dropna()

In [38]:
#Finding Distinct Categories in each column
print(df_model.select('UniqueCarrier').distinct().count())
print(df_model.select('Dest').distinct().count())
print(df_model.select('Origin').distinct().count())

In [39]:
from pyspark.ml.feature import VectorAssembler,StringIndexer,StandardScaler
from pyspark.ml import Pipeline

In [40]:
# creating categorical variables
UniqueCarrier_indexer = StringIndexer(inputCol='UniqueCarrier',outputCol='UniqueCarrier_index',handleInvalid='keep')
Dest_indexer = StringIndexer(inputCol='Dest',outputCol='Dest_index',handleInvalid='keep')
Origin_indexer = StringIndexer(inputCol='Origin',outputCol='Origin_index',handleInvalid='keep')

In [41]:
# Assembler for creating input feature vector
assembler = VectorAssembler(inputCols=['UniqueCarrier_index','Dest_index','Origin_index','Month','ArrTime','Distance','DayOfWeek'],
                            outputCol="features")

In [42]:
# Linear regression model with prediction label as 'Depdelay'
lr = LinearRegression(labelCol='DepDelay')

In [43]:

pipe = Pipeline(stages=[UniqueCarrier_indexer,Dest_indexer,
                        Origin_indexer,assembler])

In [44]:
pre_model=pipe.fit(df_model)

In [45]:
preprocessed_data = pre_model.transform(df_model)

In [46]:
preprocessed_data.show()

df_vector =preprocessed_data.select('features')
vector_col = "features"

In [47]:
#Finding Correlations
print(preprocessed_data.stat.corr('DepDelay','Month'))
print(preprocessed_data.stat.corr('DepDelay','DayOfWeek'))
print(preprocessed_data.stat.corr('DepDelay','UniqueCarrier_index'))
print(preprocessed_data.stat.corr('DepDelay','Dest_index'))
print(preprocessed_data.stat.corr('DepDelay','Origin_index'))
print(preprocessed_data.stat.corr('DepDelay','ArrTime'))
print(preprocessed_data.stat.corr('DepDelay','Distance'))



In [48]:
'''from pyspark.ml.stat import Correlation
matrix = Correlation.corr(df_vector,vector_col)'''

In [49]:
'''matrix.collect()[0]["pearson({})".format(vector_col)].values'''

In [50]:
train_data,test_data=preprocessed_data.randomSplit([0.8,0.2])

In [51]:
lrModel = lr.fit(train_data)

In [52]:
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

In [53]:
predicted=lrModel.transform(test_data)

In [54]:
predicted.show()

In [55]:
  test_results = lrModel.evaluate(test_data)

In [56]:
test_results.residuals.show()

In [57]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))


In [58]:
# It can be seen that RMSE is very high,hence the model doesn't perform that well for predicting delays.So improving the model using more iterations and onehotencoder in the next step

In [59]:
from pyspark.ml.feature import OneHotEncoderEstimator
data_encoder = OneHotEncoderEstimator(inputCols=['UniqueCarrier_index','Dest_index','Origin_index'],
                                      outputCols=['UniqueCarrier_vec','Dest_vec','Origin_vec',],
                                      handleInvalid='keep')
assembler_enc = VectorAssembler(inputCols=['UniqueCarrier_vec','Dest_vec','Origin_vec','Month','ArrTime','Distance','DayOfWeek'],
                           outputCol="features")

#assembler_enc = VectorAssembler(inputCols=['UniqueCarrier_index','Dest_index','Origin_index'],
 #                        outputCol="features")

In [60]:
#Shuffling the data set
df_model = df_model.orderBy(f.rand(seed=123))
df_model.show(n=10)

In [61]:
# Filtering the dataset to include only Delays less than 200 mins for removing the outliers
df_model=df_model.filter("DepDelay<200")
df_model.describe().show()

In [62]:
pipe_encoder = Pipeline(stages=[UniqueCarrier_indexer,Dest_indexer,
                        Origin_indexer,data_encoder,assembler_enc])
pre_model=pipe_encoder.fit(df_model)
preprocessed_data = pre_model.transform(df_model)

In [63]:
'''from pyspark.ml.feature import PolynomialExpansion
polyExpansion = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(preprocessed_data)
polyDF.select('polyFeatures').show()'''

In [64]:
#Linear Regression model with 100 iterations and regularization parameter set to 0.8
lr = LinearRegression(labelCol='DepDelay',maxIter=100, regParam=0.8, elasticNetParam=0.8)

In [65]:
train_data,test_data=preprocessed_data.randomSplit([0.8,0.2])
lrModel = lr.fit(train_data)
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

In [66]:
predicted=lrModel.transform(test_data)
predicted.select('DepDelay','prediction').show()

In [67]:
#Finding residuals
test_results = lrModel.evaluate(test_data)
test_results.residuals.show()

In [68]:

print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))

In [69]:
#It can be seen that RMSE improved compared to last model. Could be because of iterations set to 100, or removing the outliers and onehot encoder
# In the next step comparing the results with Decision tree model

In [70]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [71]:
train_data,test_data=preprocessed_data.randomSplit([0.8,0.2])

In [72]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'DepDelay')
dt_model = dt.fit(train_data)
dt_predictions = dt_model.transform(test_data)
dt_evaluator = RegressionEvaluator(
    labelCol="DepDelay", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [73]:
# Decision tree performed better than linear regression models above, however RMSE improved only by little amount.