In [None]:
######  SPARK ASSIGNMENT - P2822003 - VRETTEAS STYLIANOS ######

## PREPARATION  

# load basic packages
import findspark
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F 
# from pyspark.sql import Window


findspark.init()

spark =  SparkSession.builder.appName("test").getOrCreate()
spark

#load data
flights_data = spark.read\
                    .option("header","true")\
                    .option("inferSchema","true")\
                    .csv("671009038_T_ONTIME_REPORTING.csv")

In [None]:
# count flights_data last check
n1 = flights_data.count()
print("number of rows all data:", n1)
n2 = flights_data.dropDuplicates().count()
print("number of rows after deleting duplicates:", n2)
n3 = n1 - n2 
print("duplicate_data:", n3)

# drop duplicates 
flights_data = flights_data.dropDuplicates()
print("number of rows final:", flights_data.count())


In [None]:
## TASK 3

# load packages
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.ml.feature import StringIndexer

In [None]:
# create df3
df3 = flights_data.select("DEP_DELAY","ORIGIN","CARRIER","DEP_TIME")
df3=flights_data.filter(flights_data.DEP_DELAY.isNotNull())

In [None]:
# unique values of ORIGIN (360 labels)
df3.select("ORIGIN").distinct().count()


In [None]:
# unique values of CARRIER (17 labels)
df3.select("CARRIER").distinct().count()

In [None]:
# check types
df3.dtypes

In [None]:
# DEP_TIME converst to string first
df3 = df3.withColumn("DEP_TIME",df3["DEP_TIME"].cast(T.StringType()))

In [None]:
# converted DEP_TIME
df3.dtypes

In [None]:
# transformation fill with leading zeros 
df3 = df3.withColumn('DEP_TIME', F.format_string("%04d", F.col('DEP_TIME').cast("int"))) 
df3.show(10)

In [None]:
# check dtypes
df3 = df3.withColumn("DEP_TIME",df3["DEP_TIME"].cast(T.StringType()))
print(df3.dtypes)

In [None]:
# create new column with only two digts 

df3 = df3.withColumn("DEP_TIME_HOUR", df3.DEP_TIME.substr(1,2))
df3.show()

df3.select("DEP_TIME_HOUR").distinct().count()


In [None]:
# see the nu value 
df3.select("DEP_TIME_HOUR").distinct().show()


In [None]:
# replace nu value with "cancelled"

df3 = df3.withColumn("DEP_TIME_HOUR", F.regexp_replace("DEP_TIME_HOUR", "nu", "cancelled"))

df3.select("DEP_TIME_HOUR").distinct().show()

In [None]:
# check outliers
df_outliers3 = df3.select("ORIGIN")

df_outliers_group3 = df_outliers3.groupBy("ORIGIN").count()

df_outliers_group3.orderBy("count", ascending = True).show()

In [None]:
# CHECK OUTLIERS in the number of flights 
outliers3 = df_outliers_group3.approxQuantile("count", [0.01], 0.0)
outliers3


In [None]:
# filter df2 ( remove the outliers)
df3 = df3.filter(~F.col("ORIGIN").isin(["AKN","PGV","GST","DLG"]))
df3.count()


In [None]:
## import Linear Regression packages 

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
#from pyspark.ml.feature import OneHotEncoderEstimator # not used 
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline


In [None]:
# string indexer ORIGIN

indexer = StringIndexer()\
.setInputCol("ORIGIN")\
.setOutputCol("ORIGIN_INDEXED")


In [None]:
# one hot encoder ORIGIN
encoder = OneHotEncoder(dropLast=False)\
.setInputCols(["ORIGIN_INDEXED"])\
.setOutputCols(["ORIGIN_ENCODED"])


In [None]:
# string indexer CARRIER

indexer2 = StringIndexer()\
.setInputCol("CARRIER")\
.setOutputCol("CARRIER_INDEXED")

In [None]:
# one hot encoder CARRIER
encoder2 = OneHotEncoder(dropLast=False)\
.setInputCols(["CARRIER_INDEXED"])\
.setOutputCols(["CARRIER_ENCODED"])

In [None]:
# string indexer DEP_TIME_HOUR

indexer3 = StringIndexer()\
.setInputCol("DEP_TIME_HOUR")\
.setOutputCol("DEP_TIME_HOUR_INDEXED")

In [None]:
# one hot encoder DEP_TIME_HOUR
encoder3 = OneHotEncoder(dropLast=False)\
.setInputCols(["DEP_TIME_HOUR_INDEXED"])\
.setOutputCols(["DEP_TIME_HOUR_ENCODED"])


In [None]:
# vector_assembler
vector_assembler = VectorAssembler()\
.setInputCols(["ORIGIN_ENCODED", "CARRIER_ENCODED", "DEP_TIME_HOUR_ENCODED"])\
.setOutputCol("FEATURES")


In [None]:
# Import Pipeline
 pipe = Pipeline(stages=[indexer, encoder, indexer2, encoder2, indexer3, encoder3, vector_assembler])

In [None]:
 # Fit and transform the data
piped_data = pipe.fit(df3).transform(df3)

In [None]:
# Split the data into training and test sets( use 70 - 30 split)
training, test = piped_data.randomSplit([.7, .3])


In [None]:
# linear model
lr = LinearRegression(featuresCol ='FEATURES', labelCol ='DEP_DELAY',regParam=0.7, elasticNetParam=0.8)

In [None]:
# Fit the model
lrModel = lr.fit(training)


In [None]:
# summary of the model and RMSE (mean squared error)
summary = lrModel.summary
summary.rootMeanSquaredError

In [None]:
# predictions - show first 10 
predictions = lrModel.transform(test)
predictions.show(10)

In [None]:
# actual vs fitted values 

act_fit = predictions.select("DEP_DELAY","ORIGIN","CARRIER","DEP_TIME_HOUR","prediction")

act_fit.show(10)

In [None]:
# Model evaluation on test data 

lr_predictions = lrModel.transform(test)
lr_predictions.select("prediction","DEP_DELAY","FEATURES").show(5)

In [None]:
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="DEP_DELAY",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))


In [None]:
test_result = lrModel.evaluate(test)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)