# Import libraries

In [1]:
from tabulate import tabulate
import sys, os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col, isnan, when, count, concat, lit, substring, udf, desc
import pyspark.sql.functions as F

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator



**Arguments from command line**

In [2]:
print("\nArguments passed:", end = " ")
for i in range(1, len(sys.argv)):
    print(sys.argv[i], end = " ")


Arguments passed: -f C:\Users\nerim\AppData\Roaming\jupyter\runtime\kernel-105f4721-4114-4fd1-a086-94d2b0aad1bf.json 

*python app.py --path ./Data*

In [3]:
if '--path' in sys.argv:
    dir_path = sys.argv[sys.argv.index('--path')+1]
else:
    print('Add the argument "--path" followed by the address where the data is located.')
    exit(-1)

Add the argument "--path" followed by the address where the data is located.


**We create our app**

In [5]:
spark = SparkSession.builder.appName("BigDataApp").getOrCreate()
spark.sparkContext

# LOADING THE DATA

**Create a schema for the DataFrame**

In [6]:
schema = StructType([
    StructField("Year",IntegerType(),nullable=True),
    StructField("Month",IntegerType(),nullable=True),
    StructField("DayofMonth",IntegerType(),nullable=True),
    StructField("DayOfWeek",IntegerType(),nullable=True),
    StructField("DepTime",IntegerType(),nullable=True),
    StructField("CRSDepTime",IntegerType(),nullable=True),
    StructField("ArrTime",IntegerType(),nullable=True),
    StructField("CRSArrTime",IntegerType(),nullable=True),
    StructField("UniqueCarrier",StringType(),nullable=True),
    StructField("FlightNum",IntegerType(),nullable=True),
    StructField("TailNum",StringType(),nullable=True),
    StructField("ActualElapsedTime",IntegerType(),nullable=True),
    StructField("CRSElapsedTime",IntegerType(),nullable=True),
    StructField("AirTime",IntegerType(),nullable=True),
    StructField("ArrDelay",IntegerType(),nullable=True),
    StructField("DepDelay",IntegerType(),nullable=True),
    StructField("Origin",StringType(),nullable=True),
    StructField("Dest",StringType(),nullable=True),
    StructField("Distance",IntegerType(),nullable=True),
    StructField("TaxiIn",IntegerType(),nullable=True),
    StructField("TaxiOut",IntegerType(),nullable=True),
    StructField("Cancelled",IntegerType(),nullable=True),
    StructField("CancellationCode",StringType(),nullable=True),
    StructField("Diverted",IntegerType(),nullable=True),
    StructField("CarrierDelay",IntegerType(),nullable=True),
    StructField("WeatherDelay",IntegerType(),nullable=True),
    StructField("NASDelay",IntegerType(),nullable=True),
    StructField("SecurityDelay",IntegerType(),nullable=True),
    StructField("LateAircraftDelay",IntegerType(),nullable=True)
])

**Load data into DataFrame**

In [7]:
arg = './src/main/resources/data/2008.csv.bz2' #'./resources/data/2008.csv.bz2'
dir_path = './src/main/resources/data' #'./resources/data'

if dir_path[-1] != '/':
    dir_path+='/'

In [8]:
files_path = []
for path in os.listdir(dir_path):
    # check if current path is a file
    if os.path.isfile(os.path.join(dir_path, path)) and path.endswith(".csv.bz2"):
        files_path.append(dir_path+path)
        
files_path#df1.union(df2)

['./src/main/resources/data/2008.csv.bz2']

In [9]:
df = spark.read.csv(path=files_path[0], schema=schema, header=True)

for f in files_path[1:]:
    df = df.union(spark.read.csv(path=f, schema=schema, header=True))
display(df.take(5))

[Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=1343, CRSDepTime=1325, ArrTime=1451, CRSArrTime=1435, UniqueCarrier='WN', FlightNum=588, TailNum='N240WN', ActualElapsedTime=68, CRSElapsedTime=70, AirTime=55, ArrDelay=16, DepDelay=18, Origin='HOU', Dest='LIT', Distance=393, TaxiIn=4, TaxiOut=9, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=16, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=1125, CRSDepTime=1120, ArrTime=1247, CRSArrTime=1245, UniqueCarrier='WN', FlightNum=1343, TailNum='N523SW', ActualElapsedTime=82, CRSElapsedTime=85, AirTime=71, ArrDelay=2, DepDelay=5, Origin='HOU', Dest='MAF', Distance=441, TaxiIn=3, TaxiOut=8, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=None, WeatherDelay=None, NASDelay=None, SecurityDelay=None, LateAircraftDelay=None),
 Row(Year=2008, Month=1, DayofMonth=3, DayOfWeek=4, DepTime=2009, CRSDepTime=2015, ArrTime=2136, CRSArrTime=2

**Remove forbidden variables**

In [10]:
df = df.drop("ArrTime").drop("ActualElapsedTime"
        ).drop("AirTime").drop("TaxiIn").drop("Diverted"
        ).drop("CarrierDelay").drop("WeatherDelay").drop("NASDelay"
        ).drop("SecurityDelay").drop("LateAircraftDelay")

In [11]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)



In [12]:
df.show(5, False)

+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+---------+----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiOut|Cancelled|CancellationCode|
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+---------+----------------+
|2008|1    |3         |4        |1343   |1325      |1435      |WN           |588      |N240WN |70            |16      |18      |HOU   |LIT |393     |9      |0        |null            |
|2008|1    |3         |4        |1125   |1120      |1245      |WN           |1343     |N523SW |85            |2       |5       |HOU   |MAF |441     |8      |0        |null            |
|2008|1    |3         |4        |2009   |2015      |2140      |WN          

# PROCESSING THE DATA

In [13]:
(df.count(), len(df.columns))

(2389217, 19)

## Unique values of the variables

In [14]:
for c in df.columns:
    print(df.groupBy(c).count().show())

+----+-------+
|Year|  count|
+----+-------+
|2008|2389217|
+----+-------+

None
+-----+------+
|Month| count|
+-----+------+
|    1|605765|
|    2|569236|
|    3|616090|
|    4|598126|
+-----+------+

None
+----------+-----+
|DayofMonth|count|
+----------+-----+
|        31|40905|
|        28|81974|
|        26|74226|
|        27|79527|
|        12|73690|
|        22|77730|
|         1|76749|
|        13|79091|
|         6|79879|
|        16|76898|
|         3|81108|
|        20|79565|
|         5|75303|
|        19|74270|
|        15|77716|
|         9|76023|
|        17|80508|
|         4|82101|
|         8|77320|
|        23|76860|
+----------+-----+
only showing top 20 rows

None
+---------+------+
|DayOfWeek| count|
+---------+------+
|        1|347984|
|        6|288097|
|        3|365560|
|        5|350566|
|        4|349831|
|        7|328237|
|        2|358942|
+---------+------+

None
+-------+-----+
|DepTime|count|
+-------+-----+
|   1829| 2055|
|   1238| 2056|
|    833| 2

In [15]:
# check if Year is same as title of csv
# check Month in 1-12 interval
# check DayofMonth in 1-31 interval
# check DayOfWeek in 1-7 interval
# Que es UniqueCarrier?
# CRSElapsedTime = CRSArrTime - CRSDepTime

## Removing noise

**Remove duplicated rows**

In [16]:
df = df.distinct()

In [17]:
df.groupBy(df.columns).count().filter("count > 1").show()

+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+---------+----------------+-----+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiOut|Cancelled|CancellationCode|count|
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+---------+----------------+-----+
+----+-----+----------+---------+-------+----------+----------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+---------+----------------+-----+



**Remove instances of cancelled flights**

In [18]:
df.groupBy('Cancelled').count().show()

+---------+-------+
|Cancelled|  count|
+---------+-------+
|        1|  64442|
|        0|2324771|
+---------+-------+



In [19]:
df.groupBy('CancellationCode').count().show()

+----------------+-------+
|CancellationCode|  count|
+----------------+-------+
|            null|2324771|
|               B|  25744|
|               C|  12617|
|               A|  26075|
|               D|      6|
+----------------+-------+



In [20]:
df = df.filter(df.Cancelled == 0)

In [21]:
df = df.filter(df.CancellationCode.isNull())

In [22]:
df = df.drop('CancellationCode', 'Cancelled')

**Analyze missing values**

In [23]:
print(tabulate([[c, df.filter(col(c).isNull()).count()/df.count()] for c in df.columns], headers=['Name', 'Count %']))

Name                Count %
--------------  -----------
Year            0
Month           0
DayofMonth      0
DayOfWeek       0
DepTime         0
CRSDepTime      0
CRSArrTime      0
UniqueCarrier   0
FlightNum       0
TailNum         1.29045e-06
CRSElapsedTime  0.000123883
ArrDelay        0.00243207
DepDelay        0
Origin          0
Dest            0
Distance        0
TaxiOut         0


**TailNum and CRSElapsedTime can not be imputed from any other column, and ArrDelay is the target variable, but their number of missing values is not significant taking into account the total number, so the rows containing those missing values are removed**

In [24]:
df = df.na.drop()

In [25]:
(df.count(), len(df.columns))

(2319115, 17)

## !!!!!Date preprocess

In [26]:
df = df.withColumn(
    "Date", F.date_format(F.expr("make_date(Year, Month, DayofMonth)"), "MM/dd/yyyy")
)

df.groupBy('Date').count().show()

+----------+-----+
|      Date|count|
+----------+-----+
|04/05/2008|16939|
|03/28/2008|20534|
|01/15/2008|19167|
|04/24/2008|20524|
|01/08/2008|19104|
|03/29/2008|17168|
|03/02/2008|19095|
|03/14/2008|20591|
|04/13/2008|19442|
|01/13/2008|18549|
|03/27/2008|19997|
|02/10/2008|18438|
|04/23/2008|20320|
|03/11/2008|19882|
|02/05/2008|18322|
|01/24/2008|19899|
|04/30/2008|20311|
|03/08/2008|15670|
|03/23/2008|19309|
|04/29/2008|19964|
+----------+-----+
only showing top 20 rows



In [27]:
df = df.withColumn(
    "Season", when((df.Month>2) & (df.Month<6), 1).when((df.Month>5) & (df.Month<9), 2
        ).when((df.Month>8) & (df.Month<12), 3).otherwise(4)
)
df.groupBy('Season').count().show()

+------+-------+
|Season|  count|
+------+-------+
|     1|1185064|
|     4|1134051|
+------+-------+



**Check that DepDelay = DepTime - CRSDepTime**

In [28]:
df = df.withColumn("DepTimeNew", when(F.length(df.DepTime) == 3, concat(lit("0"),df.DepTime)) \
        .when(F.length(df.DepTime) == 2, concat(lit("00"),df.DepTime)) \
        .otherwise(df.DepTime))

In [29]:
df = df.withColumn("CRSDepTimeNew", when(F.length(df.CRSDepTime) == 3, concat(lit("0"),df.CRSDepTime)) \
        .when(F.length(df.CRSDepTime) == 2, concat(lit("00"),df.CRSDepTime)) \
        .otherwise(df.CRSDepTime))

In [30]:
df = df.withColumn("CRSArrTimeNew", when(F.length(df.CRSArrTime) == 3, concat(lit("0"),df.CRSArrTime)) \
        .when(F.length(df.CRSArrTime) == 2, concat(lit("00"),df.CRSArrTime)) \
        .otherwise(df.CRSArrTime))

In [31]:
df.select("DepTime","DepTimeNew","CRSDepTime","CRSDepTimeNew","CRSArrTime","CRSArrTimeNew").show()

+-------+----------+----------+-------------+----------+-------------+
|DepTime|DepTimeNew|CRSDepTime|CRSDepTimeNew|CRSArrTime|CRSArrTimeNew|
+-------+----------+----------+-------------+----------+-------------+
|    813|      0813|       810|         0810|       925|         0925|
|   1210|      1210|      1200|         1200|      1325|         1325|
|   2154|      2154|      1829|         1829|      1930|         1930|
|   1940|      1940|      1950|         1950|      2108|         2108|
|   2300|      2300|      2005|         2005|      2055|         2055|
|   2045|      2045|      2019|         2019|      2212|         2212|
|    607|      0607|       600|         0600|       706|         0706|
|    747|      0747|       750|         0750|       955|         0955|
|   1155|      1155|      1155|         1155|      1249|         1249|
|   1224|      1224|      1225|         1225|      1330|         1330|
|   1225|      1225|      1235|         1235|      1309|         1309|
|   14

In [32]:
df.select("DepTimeNew","CRSDepTimeNew","DepDelay").show()

+----------+-------------+--------+
|DepTimeNew|CRSDepTimeNew|DepDelay|
+----------+-------------+--------+
|      0813|         0810|       3|
|      1210|         1200|      10|
|      2154|         1829|     205|
|      1940|         1950|     -10|
|      2300|         2005|     175|
|      2045|         2019|      26|
|      0607|         0600|       7|
|      0747|         0750|      -3|
|      1155|         1155|       0|
|      1224|         1225|      -1|
|      1225|         1235|     -10|
|      1417|         1415|       2|
|      1451|         1455|      -4|
|      1558|         1600|      -2|
|      1739|         1730|       9|
|      1757|         1800|      -3|
|      1904|         1855|       9|
|      2016|         2020|      -4|
|      2044|         2050|      -6|
|      2116|         2120|      -4|
+----------+-------------+--------+
only showing top 20 rows



In [33]:
df = df.withColumn("CRSDepTimeNewHour", substring(df.CRSDepTimeNew, 1,2)) \
    .withColumn("CRSDepTimeNewMinute", substring(df.CRSDepTimeNew, 3,2)) \
    .withColumn("DepTimeNewHour", substring(df.DepTimeNew, 1,2)) \
    .withColumn("DepTimeNewMinute", substring(df.DepTimeNew, 3,2))

df = df.withColumn("CRSArrTimeNewHour", substring(df.CRSArrTimeNew, 1,2)) \
    .withColumn("CRSArrTimeNewMinute", substring(df.CRSArrTimeNew, 3,2))

In [34]:
df.select("CRSDepTimeNew","CRSDepTimeNewHour","CRSDepTimeNewMinute","DepTimeNew","DepTimeNewHour","DepTimeNewMinute").show()

+-------------+-----------------+-------------------+----------+--------------+----------------+
|CRSDepTimeNew|CRSDepTimeNewHour|CRSDepTimeNewMinute|DepTimeNew|DepTimeNewHour|DepTimeNewMinute|
+-------------+-----------------+-------------------+----------+--------------+----------------+
|         0810|               08|                 10|      0813|            08|              13|
|         1200|               12|                 00|      1210|            12|              10|
|         1829|               18|                 29|      2154|            21|              54|
|         1950|               19|                 50|      1940|            19|              40|
|         2005|               20|                 05|      2300|            23|              00|
|         2019|               20|                 19|      2045|            20|              45|
|         0600|               06|                 00|      0607|            06|              07|
|         0750|               

In [35]:
df = df.withColumn(
    "datetime",
    F.date_format(
        F.expr("make_timestamp(Year, Month, DayofMonth, CRSDepTimeNewHour, CRSDepTimeNewMinute, 0)"),
        "dd/MM/yyyy HH:mm"
    )
)

df.select("datetime").show()

+----------------+
|        datetime|
+----------------+
|07/01/2008 08:10|
|03/01/2008 12:00|
|02/01/2008 18:29|
|04/01/2008 19:50|
|31/01/2008 20:05|
|03/01/2008 20:19|
|27/01/2008 06:00|
|13/01/2008 07:50|
|09/01/2008 11:55|
|29/01/2008 12:25|
|12/01/2008 12:35|
|11/01/2008 14:15|
|09/01/2008 14:55|
|28/01/2008 16:00|
|28/01/2008 17:30|
|20/01/2008 18:00|
|23/01/2008 18:55|
|22/01/2008 20:20|
|29/01/2008 20:50|
|30/01/2008 21:20|
+----------------+
only showing top 20 rows



#   !!!!!!

In [36]:
#df = df.withColumn("DepDelayNew", when(abs((col("DepTimeNewHour").cast('long')*60 + col("DepTimeNewMinute")) - (col("CRSDepTimeNewHour").cast('long')*60 + col("CRSDepTimeNewMinute"))) < abs((col("CRSDepTimeNewHour").cast('long')*60 + col("CRSDepTimeNewMinute")) - (col("DepTimeNewHour").cast('long')*60 + col("DepTimeNewMinute"))), 
#    (col("DepTimeNewHour").cast('long')*60 + col("DepTimeNewMinute")) - (col("CRSDepTimeNewHour").cast('long')*60 + col("CRSDepTimeNewMinute"))) \
#    .otherwise((col("CRSDepTimeNewHour").cast('long')*60 + col("CRSDepTimeNewMinute")) - (col("DepTimeNewHour").cast('long')*60 + col("DepTimeNewMinute"))))
                   
df = df.withColumn("Duration", 
    (col("CRSArrTimeNewHour").cast('long')*60 + col("CRSArrTimeNewMinute").cast('long')) - (col("CRSDepTimeNewHour").cast('long')*60 + col("CRSDepTimeNewMinute").cast('long')))

In [37]:
#df = df.withColumn("DepDelayNew",col("DepDelayNew").cast(IntegerType()))
df = df.withColumn("Duration",col("Duration").cast(IntegerType()))

In [38]:
#df.select("DepTimeNew","CRSDepTimeNew","DepDelay","DepDelayNew").show()

In [39]:
df.select("CRSArrTimeNew","CRSDepTimeNew","Duration","CRSElapsedTime").show()

+-------------+-------------+--------+--------------+
|CRSArrTimeNew|CRSDepTimeNew|Duration|CRSElapsedTime|
+-------------+-------------+--------+--------------+
|         0925|         0810|      75|            75|
|         1325|         1200|      85|            85|
|         1930|         1829|      61|            61|
|         2108|         1950|      78|           138|
|         2055|         2005|      50|            50|
|         2212|         2019|     113|           173|
|         0706|         0600|      66|            66|
|         0955|         0750|     125|           125|
|         1249|         1155|      54|            54|
|         1330|         1225|      65|            65|
|         1309|         1235|      34|            34|
|         1510|         1415|      55|           115|
|         1635|         1455|     100|           160|
|         1725|         1600|      85|            85|
|         1845|         1730|      75|            75|
|         1915|         1800

In [40]:
df = df.drop("CRSDepTime",'CRSDepTimeNew','CRSDepTimeNewHour','CRSDepTimeNewMinute',"CRSArrTime",'CRSArrTimeNew',
             'CRSArrTimeNewHour', 'CRSArrTimeNewMinute',"DepTime", 'DepTimeNew', 'DepTimeNewHour',  'DepTimeNewMinute', 
             'DepDelayNew')
# por que no se deja nada de CRSDepTime, CRSArrTime y DepTime?

## Concordancy between related variables

**No flights with same Origin and Destination**

In [41]:
df = df.filter(df.Origin != df.Dest)

In [42]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Season: integer (nullable = false)
 |-- datetime: string (nullable = true)
 |-- Duration: integer (nullable = true)



## Input

In [43]:
print(tabulate([[c, df.filter(col(c).isNull()).count()] for c in df.columns], headers=['Name', 'Count %']))

Name              Count %
--------------  ---------
Year                    0
Month                   0
DayofMonth              0
DayOfWeek               0
UniqueCarrier           0
FlightNum               0
TailNum                 0
CRSElapsedTime          0
ArrDelay                0
DepDelay                0
Origin                  0
Dest                    0
Distance                0
TaxiOut                 0
Date                    0
Season                  0
datetime               32
Duration             3879


In [44]:
df.filter(df.Duration.isNull()).show(5)

+----+-----+----------+---------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+----------+------+----------------+--------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|FlightNum|TailNum|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiOut|      Date|Season|        datetime|Duration|
+----+-----+----------+---------+-------------+---------+-------+--------------+--------+--------+------+----+--------+-------+----------+------+----------------+--------+
|2008|    1|        14|        1|           WN|      904| N650SW|           105|       1|      -1|   LAX| SLC|     590|     12|01/14/2008|     4|14/01/2008 21:20|    null|
|2008|    1|        28|        1|           WN|       61| N700GS|           290|      19|      31|   LAS| MHT|    2356|     23|01/28/2008|     4|28/01/2008 16:15|    null|
|2008|    1|        22|        2|           DL|     1586| N373DA|           142|       1|      12|   ATL| JFK|     760|     27|01/22/2008|  

In [45]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['Duration'], 
    outputCols=['Duration']
)

df = imputer.fit(df).transform(df)

## Fix format of variables

In [46]:
df = df.drop('Date',  'datetime', 'FlightNum', 'TailNum')

In [47]:
df.columns

['Year',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'UniqueCarrier',
 'CRSElapsedTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiOut',
 'Season',
 'Duration']

In [48]:
df.show(5)

+----+-----+----------+---------+-------------+--------------+--------+--------+------+----+--------+-------+------+--------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|CRSElapsedTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiOut|Season|Duration|
+----+-----+----------+---------+-------------+--------------+--------+--------+------+----+--------+-------+------+--------+
|2008|    1|         7|        1|           WN|            75|      -9|       3|   BUR| OAK|     325|      4|     4|      75|
|2008|    1|         3|        4|           WN|            85|       2|      10|   MSY| DAL|     437|      4|     4|      85|
|2008|    1|         2|        3|           YV|            61|     195|     205|   SPI| ORD|     174|      4|     4|      61|
|2008|    1|         4|        5|           OO|           138|     -12|     -10|   BDL| MKE|     780|      4|     4|      78|
|2008|    1|        31|        4|           WN|            50|     169|     175|   HRL| SAT|     233|      4|     4|  

# CREATING THE MODEL

In [49]:
from pyspark.ml.feature import VectorAssembler
training = df.drop( 'Date',  'datetime', 'FlightNum', 'TailNum', 'UniqueCarrier', 'Origin', 'Dest')
training = training.select('Year', 'Month', 'DayofMonth', 'DayOfWeek', 'CRSElapsedTime',
 'DepDelay', 'Distance', 'TaxiOut', 'Season', 'Duration', 'ArrDelay')

vectorAssembler = VectorAssembler(inputCols = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'CRSElapsedTime',
 'DepDelay', 'Distance', 'TaxiOut', 'Season', 'Duration'], outputCol = 'features')
vdf = vectorAssembler.transform(training)
vdf = vdf.select(['features', 'ArrDelay'])
vdf.show(3)

+--------------------+--------+
|            features|ArrDelay|
+--------------------+--------+
|[2008.0,1.0,7.0,1...|      -9|
|[2008.0,1.0,3.0,4...|       2|
|[2008.0,1.0,2.0,3...|     195|
+--------------------+--------+
only showing top 3 rows



In [50]:
training.show(5)

+----+-----+----------+---------+--------------+--------+--------+-------+------+--------+--------+
|Year|Month|DayofMonth|DayOfWeek|CRSElapsedTime|DepDelay|Distance|TaxiOut|Season|Duration|ArrDelay|
+----+-----+----------+---------+--------------+--------+--------+-------+------+--------+--------+
|2008|    1|         7|        1|            75|       3|     325|      4|     4|      75|      -9|
|2008|    1|         3|        4|            85|      10|     437|      4|     4|      85|       2|
|2008|    1|         2|        3|            61|     205|     174|      4|     4|      61|     195|
|2008|    1|         4|        5|           138|     -10|     780|      4|     4|      78|     -12|
|2008|    1|        31|        4|            50|     175|     233|      4|     4|      50|     169|
+----+-----+----------+---------+--------------+--------+--------+-------+------+--------+--------+
only showing top 5 rows



In [51]:
splits = vdf.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [52]:
lr = LinearRegression(featuresCol = 'features', labelCol='ArrDelay', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [53]:
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [0.0,0.0,0.0,0.0,-0.02741521907070583,0.9865102087300102,0.0,0.7979935767670143,0.0,0.0]
Intercept: -10.72275178717567


In [54]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 10.937266
r2: 0.923502


In [56]:
from pyspark.ml.classification import LogisticRegression
training = df.drop( 'Date',  'datetime', 'FlightNum', 'TailNum', 'UniqueCarrier', 'Origin', 'Dest')
training = training.select('Year', 'Month', 'DayofMonth', 'DayOfWeek', 'CRSElapsedTime',
 'DepDelay', 'Distance', 'TaxiOut', 'Season', 'Duration', 'ArrDelay')
trainingData=training.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])


lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(trainingData)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 370.0 failed 1 times, most recent failure: Lost task 0.0 in stage 370.0 (TID 1079) (192.168.1.130 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:551)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:519)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:551)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:519)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 14 more


# Split

In [58]:
# split into training(60%), validation(20%) and test(20%) datasets
trainingDf, validationDf, testDf = df.randomSplit([7, 1, 2])

#print(trainingDf.take(1))

#lets cache these datasets
trainingDf.cache()
validationDf.cache()
testDf.cache()

print("Num of training observations : %s" % trainingDf.count())
print("Num of validation observations : %s" % validationRdd.count())
print("Num of test observations : %s" % testDf.count())

Py4JJavaError: An error occurred while calling o550.randomSplit.
: java.lang.ClassCastException: class java.lang.Integer cannot be cast to class java.lang.Double (java.lang.Integer and java.lang.Double are in module java.base of loader 'bootstrap')
	at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:116)
	at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:76)
	at scala.collection.IterableLike.copyToArray(IterableLike.scala:256)
	at scala.collection.IterableLike.copyToArray$(IterableLike.scala:251)
	at scala.collection.AbstractIterable.copyToArray(Iterable.scala:56)
	at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:334)
	at scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:333)
	at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:342)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
	at org.apache.spark.sql.Dataset.randomSplit(Dataset.scala:2378)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)


## Categoricals

In [59]:
#convert the categorical attributes to binary features
categoricalAttributes = ['Origin', 'Dest', 'UniqueCarrier']

#Build a list of pipelist stages for the machine learning pipeline. 
#start by the feature transformer of one hot encoder for building the categorical features
pipelineStages = []
for columnName in categoricalAttributes:
    stringIndexer = StringIndexer(inputCol=columnName, outputCol=columnName+ "Index")
    pipelineStages.append(stringIndexer)
    oneHotEncoder = OneHotEncoder(inputCol=columnName+ "Index", outputCol=columnName + "Vec")
    pipelineStages.append(oneHotEncoder)
    
    
print("%s string indexer and one hot encoders transformers" %  len(pipelineStages) )

NameError: name 'StringIndexer' is not defined

In [61]:
# Combine all the feature columns into a single column in the dataframe
numericColumns = ['Year', 'Month', 'DayofMonth', 'DayOfWeek',
 'UniqueCarrier', 
 'CRSElapsedTime', 'ArrDelay', 'DepDelay',
 'Origin', 'Dest', 'Distance', 'TaxiOut', 'Season', 'Duration']

categoricalCols = [s + "Vec" for s in categoricalAttributes]

allFeatureCols =  numericColumns + categoricalCols

vectorAssembler = VectorAssembler(
    inputCols=allFeatureCols,
    outputCol="features")
pipelineStages.append(vectorAssembler)

print("%s feature columns: %s" % (len(allFeatureCols),allFeatureCols))

#Build pipeline for feature extraction
featurePipeline = Pipeline(stages=pipelineStages)
featureOnlyModel = featurePipeline.fit(df)

17 feature columns: ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'UniqueCarrier', 'CRSElapsedTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiOut', 'Season', 'Duration', 'OriginVec', 'DestVec', 'UniqueCarrierVec']


NameError: name 'Pipeline' is not defined

In [62]:
#create list of Dataframes with features
trainingFeaturesDf = featureOnlyModel.transform(df)
validationFeaturesDf = featureOnlyModel.transform(validationDf)
testFeaturesDf = featureOnlyModel.transform(testDf)

#peek
trainingFeaturesDf.select("features", "label").rdd.take(2)

NameError: name 'featureOnlyModel' is not defined

# Building A Machine Learning Model With Spark ML

In [63]:
from pyspark.ml.classification import LogisticRegression

# Configure an machine learning pipeline, which consists of the 
# an estimator (classification) (Logistic regression)
lr = LogisticRegression(maxIter=10, regParam=0.01)
lrPipeline = Pipeline(stages=[lr])

# Fit the pipeline to create a model from the training data
lrPipelineModel = lrPipeline.fit(trainingFeaturesDf)

def getAccuracyForPipelineModel(featuresDf, model):
    #perform prediction using the featuresdf and pipelineModel
    #compute the accuracy in percentage float
    results = model.transform(featuresDf)
    labelsAndPreds = results.map(lambda p: (p.label, p.prediction))
    return (calculateAccuracy(labelsAndPreds), results) 

# Evaluating the model on training data
lrTrainAccuracy, lrTrainResultDf = getAccuracyForPipelineModel(trainingFeaturesDf, lrPipelineModel)

# Repeat on test data
lrTestAccuracy, lrTestResultDf = getAccuracyForPipelineModel(testFeaturesDf, lrPipelineModel)

# Repeat on validation data
lrValidationAccuracy, lrValidationResultDf = getAccuracyForPipelineModel(validationFeaturesDf, lrPipelineModel)

print("==========================================")
print("LogisticRegression Model training accuracy (%) = " + str(lrTrainAccuracy))
print("LogisticRegression Model test accuracy (%) = " + str(lrTestAccuracy))
print("LogisticRegression Model validation accuracy (%) = " + str(lrValidationAccuracy))
print("==========================================")


NameError: name 'Pipeline' is not defined

**Hyperparameter Tuning with Grid search**

In [None]:
maxIterRange = [5, 10, 30, 50, 100]
regParamRange = [1e-10, 1e-5, 1e-1]
#baseline values from previous section
bestIter = 10
bestRegParam = 0.01
bestModel = lr
bestAccuracy = lrValidationAccuracy


In [None]:

#for plotting purpose
iterations = []
regParams = []
accuracies = []
for maxIter in maxIterRange:
    for rp in regParamRange:
        currentLr = LogisticRegression(maxIter=maxIter, regParam=rp)
        pipeline = Pipeline(stages=[currentLr])
        model = pipeline.fit(trainingFeaturesDf)
        
        #use validation dataset test for accuracy
        accuracy, resultDf = getAccuracyForPipelineModel(validationFeaturesDf, model)
        print "maxIter: %s, regParam: %s, accuracy: %s " % (maxIter, rp, accuracy)
        accuracies.append(accuracy)
        regParams.append(log(rp))
        iterations.append(maxIter)
        
        if accuracy > lrValidationAccuracy:
            bestIter = maxIter
            bestRegParam = rp
            bestModel = model
            bestAccuracy = accuracy


print "Best parameters: maxIter %s, regParam %s, accuracy : %s" % (bestIter, bestRegParam, bestAccuracy)

# Repeat on test data
gridTestAccuracy, gridTestResultDf = getAccuracyForPipelineModel(testFeaturesDf, bestModel)

print("==========================================")
print("Grid search Model test accuracy (%) = " + str(gridTestAccuracy))
print("==========================================")


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# We use a ParamGridBuilder to construct a grid of parameters to search over.
grid = (ParamGridBuilder()
        .addGrid(lr.maxIter, maxIterRange) 
        .addGrid(lr.regParam,regParamRange )
        .build())


evaluator = BinaryClassificationEvaluator()

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
crossValidator = CrossValidator(estimator=lrPipeline, 
                                estimatorParamMaps=grid, 
                                numFolds=5,
                                evaluator=evaluator)


# Run cross-validation, and choose the best model
bestCvModel = crossValidator.fit(trainingFeaturesDf)

# verify results on training dataset
cvTrainAccuracy, cvTrainResultDf = getAccuracyForPipelineModel(trainingFeaturesDf, bestCvModel)

# Repeat on test data
cvTestAccuracy, cvTestResultDf = getAccuracyForPipelineModel(testFeaturesDf, bestCvModel)

print("==========================================")
print("CV Model training accuracy (%) = " + str(cvTrainAccuracy))
print("CV Model test accuracy (%) = " + str(cvTestAccuracy))
print("==========================================")

# Evaluating the Model

In [None]:
# Coefficients for the model
linearModel.coefficients

In [None]:
# Intercept for the model
linearModel.intercept

In [None]:
coeff_df = pd.DataFrame({"Feature": ["Intercept"] + featureCols, "Co-efficients": np.insert(linearModel.coefficients.toArray(), 0, linearModel.intercept)})
coeff_df = coeff_df[["Feature", "Co-efficients"]]
coeff_df