# SparkR flights data load

Load libraries:

In [1]:
library(magrittr)

In [2]:
library(SparkR)


Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    cov, filter, lag, na.omit, predict, sd, var, window

The following objects are masked from ‘package:base’:

    as.data.frame, colnames, colnames<-, drop, endsWith, intersect,
    rank, rbind, sample, startsWith, subset, summary, transform, union



In [3]:
sc <- sparkR.session(master='spark://s01:7077')

Spark package found in SPARK_HOME: /opt/spark-2.1.0-bin-hadoop2.7


Launching java with spark-submit command /opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit   sparkr-shell /tmp/Rtmp5CVbI3/backend_port88955bac67d 


Define CSV schema (attributes types):

In [4]:
#sqlContext <- sparkRSQL.init(sc)

In [5]:
schema <- structType(
structField("Year", "integer"),
structField("Month", "integer"),
structField("DayofMonth", "integer"),
structField("DayOfWeek", "integer"),
structField("DepTime", "integer"),
structField("CRSDepTime", "integer"),
structField("ArrTime", "integer"),
structField("CRSArrTime", "integer"),
structField("UniqueCarrier", "string"),
structField("FlightNum", "integer"),
structField("TailNum", "string"),
structField("ActualElapsedTime", "integer"),
structField("CRSElapsedTime", "integer"),
structField("AirTime", "integer"),
structField("ArrDelay", "integer"),
structField("DepDelay", "integer"),
structField("Origin", "string"),
structField("Dest", "string"),
structField("Distance", "integer"),
structField("TaxiIn", "integer"),
structField("TaxiOut", "integer"),
structField("Cancelled", "integer"),
structField("CancellationCode", "string"),
structField("Diverted", "integer"),
structField("CarrierDelay", "string"),
structField("WeatherDelay", "string"),
structField("NASDelay", "string"),
structField("SecurityDelay", "string"),
structField("LateAircraftDelay", "string"))

In [6]:
schema

StructType
|-name = "Year", type = "IntegerType", nullable = TRUE
|-name = "Month", type = "IntegerType", nullable = TRUE
|-name = "DayofMonth", type = "IntegerType", nullable = TRUE
|-name = "DayOfWeek", type = "IntegerType", nullable = TRUE
|-name = "DepTime", type = "IntegerType", nullable = TRUE
|-name = "CRSDepTime", type = "IntegerType", nullable = TRUE
|-name = "ArrTime", type = "IntegerType", nullable = TRUE
|-name = "CRSArrTime", type = "IntegerType", nullable = TRUE
|-name = "UniqueCarrier", type = "StringType", nullable = TRUE
|-name = "FlightNum", type = "IntegerType", nullable = TRUE
|-name = "TailNum", type = "StringType", nullable = TRUE
|-name = "ActualElapsedTime", type = "IntegerType", nullable = TRUE
|-name = "CRSElapsedTime", type = "IntegerType", nullable = TRUE
|-name = "AirTime", type = "IntegerType", nullable = TRUE
|-name = "ArrDelay", type = "IntegerType", nullable = TRUE
|-name = "DepDelay", type = "IntegerType", nullable = TRUE
|-name = "Origin", type = "Str

Read CSV and save in parquet file:

In [6]:
#sdf <- read.df(path = "/data/2000.csv", header='true', source = "com.databricks.spark.csv", inferSchema = "true")

In [7]:
df <- read.df("/data/2000.csv", source="csv", header="true", schema = schema, na.strings = "NA")

In [8]:
write.df(df, path = "/data/2000.parquet", source = "parquet", mode = "overwrite")

Read parquet file:

In [7]:
df3 <- loadDF("/data/2000.parquet", "parquet", mergeSchema = "true")

Check read parquet file schema:

In [8]:
printSchema(df3)

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)

# Data Curation

In [9]:
df4 <- select(df3, "Month", "DayofMonth", "DayOfWeek", "DepTime", "UniqueCarrier", "DepDelay", "Origin", "Dest", "Distance")

In [10]:
head(df4)

Month,DayofMonth,DayOfWeek,DepTime,UniqueCarrier,DepDelay,Origin,Dest,Distance
3,10,5,1027,HP,3,PHX,BWI,1999
3,11,6,1035,HP,11,PHX,BWI,1999
3,12,7,1026,HP,2,PHX,BWI,1999
3,13,1,1041,HP,17,PHX,BWI,1999
3,14,2,1035,HP,11,PHX,BWI,1999
3,15,3,1107,HP,43,PHX,BWI,1999


Filter NAs from DepDelay

In [11]:
#df4 <- filter(df4, isNotNull(df4$DepDelay))
df4 <- dropna(df4)

# Data Normalization

In [12]:
createOrReplaceTempView(df4, "df4")

Find Unique Carriers:

In [13]:
df6 <- sql("SELECT DISTINCT(UniqueCarrier) FROM df4 ORDER BY UniqueCarrier ASC")
df6 <- collect(df6)
df6_len <- nrow(df6)

Create a new dataframe containing "Delayed" attribute based on the fact that "DepDelay" is less than 15 minutes or more, also Normalize Month, DayofMonth, DayOfWeek, DepTime, Distance, DepDelay:

In [14]:
df5 <- df4

df5$Delayed <- df5$DepDelay
df5$Delayed <- ifelse(df5$DepDelay > 15, 1, 0)

df5$Month <- df5$Month / 12
df5$DayofMonth <- df5$DayofMonth / 31
df5$DayOfWeek <- df5$DayOfWeek / 7
df5$DepTime <- df5$DepTime / 2359
df5$Distance <- df5$Distance / as.numeric(head(selectExpr(df5,"MAX(Distance)")))
df5$DepDelay <- df5$DepDelay / as.numeric(head(selectExpr(df5,"MAX(DepDelay)")))

Assign Each Unique Carrier an id number from 0 to 1:

In [15]:
df5$UC <- df5$Month

for (i in 1:df6_len){
  tmp <- as.character(df6$UniqueCarrier[i])
  df5$UC <- ifelse(df5$UniqueCarrier == tmp, i/df6_len, df5$UC)
}

Assign Origin and Dest fields a unique id number from 0 to 1:

In [16]:
df6 <- sql("SELECT DISTINCT(Origin) FROM df4 ORDER BY Origin ASC")
df6 <- collect(df6)
df6_len <- nrow(df6)

In [17]:
df5$From <- df5$Month

for (i in 1:df6_len){
  tmp <- as.character(df6$Origin[i])
  df5$From <- ifelse(df5$Origin == tmp, i/df6_len, df5$From)
}

In [18]:
df6 <- sql("SELECT DISTINCT(Dest) FROM df4 ORDER BY Dest ASC")
df6 <- collect(df6)
df6_len <- nrow(df6)

In [19]:
df5$To <- df5$Month

for (i in 1:df6_len){
  tmp <- as.character(df6$Dest[i])
  df5$To <- ifelse(df5$Dest == tmp, i/df6_len, df5$To)
}

In [20]:
head(df5)

Month,DayofMonth,DayOfWeek,DepTime,UniqueCarrier,DepDelay,Origin,Dest,Distance,Delayed,UC,From,To
0.25,0.3225806,0.7142857,0.435354,HP,0.002090592,PHX,BWI,0.4028617,0,0.5454545,0.7524272,0.1456311
0.25,0.3548387,0.8571429,0.4387452,HP,0.007665505,PHX,BWI,0.4028617,0,0.5454545,0.7524272,0.1456311
0.25,0.3870968,1.0,0.4349301,HP,0.001393728,PHX,BWI,0.4028617,0,0.5454545,0.7524272,0.1456311
0.25,0.4193548,0.1428571,0.4412887,HP,0.01184669,PHX,BWI,0.4028617,1,0.5454545,0.7524272,0.1456311
0.25,0.4516129,0.2857143,0.4387452,HP,0.007665505,PHX,BWI,0.4028617,0,0.5454545,0.7524272,0.1456311
0.25,0.483871,0.4285714,0.4692666,HP,0.029965157,PHX,BWI,0.4028617,1,0.5454545,0.7524272,0.1456311


# Linear Regression

In [21]:
model <- spark.glm(df5, DepDelay ~ Month + DayofMonth + DayOfWeek + DepTime + UniqueCarrier + Origin + Dest + Distance, family = "gaussian")

In [22]:
summary(model)


Deviance Residuals: 
(Note: These are approximate quantiles with relative error <= 0.01)
     Min        1Q    Median        3Q       Max  
-0.70351  -0.00977  -0.00519   0.00052   0.99324  

Coefficients:
                  Estimate     Std. Error  t value     Pr(>|t|)  
(Intercept)       -0.01663     0.012176    -1.3658     0.172     
Month             0.0025457    3.4088e-05  74.68       0         
DayofMonth        0.0026692    3.4205e-05  78.035      0         
DayOfWeek         0.0025231    3.4116e-05  73.958      0         
DepTime           0.020703     4.7771e-05  433.38      0         
UniqueCarrier_WN  0.001788     0.00038593  4.633       3.6041e-06
UniqueCarrier_DL  -0.0012858   0.00038599  -3.3311     0.00086499
UniqueCarrier_UA  0.0031904    0.00038458  8.2958      0         
UniqueCarrier_US  -0.00032259  0.00038773  -0.83198    0.40542   
UniqueCarrier_AA  -0.00059631  0.00038645  -1.5431     0.12282   
UniqueCarrier_NW  -0.0014731   0.00039006  -3.7766     0.00015899
U

In [23]:
fitted <- predict(model, df4)

In [24]:
head(select(fitted, "DepDelay", "prediction"),200)
#head(fitted,20)

DepDelay,prediction
3,25.008106
11,25.178923
2,24.997787
17,25.295865
11,25.176838
43,26.672656
4,25.042301
11,25.192415
86,27.578468
65,27.148895


# Binomial logistic regression

In [25]:
# Load training data
training <- df5
test <- df5

# Fit an binomial logistic regression model with spark.logit
model <- spark.logit(training, Delayed ~ Month + DayofMonth + DayOfWeek + DepTime + UC + From + To + Distance, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
showDF(predictions)

Unnamed: 0,Estimate
(Intercept),-1.413822
Month,0.0
DayofMonth,0.0
DayOfWeek,0.0
DepTime,0.0
UC,0.0
From,0.0
To,0.0
Distance,0.0


+-----+-------------------+-------------------+-------------------+-------------+--------------------+------+----+-------------------+-------+------------------+------------------+-------------------+--------------------+--------------------+----------+
|Month|         DayofMonth|          DayOfWeek|            DepTime|UniqueCarrier|            DepDelay|Origin|Dest|           Distance|Delayed|                UC|              From|                 To|       rawPrediction|         probability|prediction|
+-----+-------------------+-------------------+-------------------+-------------+--------------------+------+----+-------------------+-------+------------------+------------------+-------------------+--------------------+--------------------+----------+
| 0.25| 0.3225806451612903| 0.7142857142857143| 0.4353539635438745|           HP|0.002090592334494...|   PHX| BWI|0.40286174929463925|    0.0|0.5454545454545454|0.7524271844660194|0.14563106796116504|[1.41382246571369...|[0.80436814685026

In [26]:
head(select(predictions,"Delayed","prediction"),300)

Delayed,prediction
0,0.0
0,0.0
0,0.0
1,0.0
0,0.0
1,0.0
0,0.0
0,0.0
1,0.0
1,0.0


In [27]:
pred_df <- select(predictions,"Delayed","prediction")
pred_df$equals <- pred_df$prediction
pred_df$equals <- ifelse(pred_df$Delayed == pred_df$prediction, 1, 0)

createOrReplaceTempView(pred_df, "pred_df")
corrects <- collect(sql("SELECT SUM(equals) FROM pred_df"))

(corrects$"sum(equals)")/nrow(pred_df)

# Random forest classifier

In [29]:
# Load training data
training <- df5
test <- df5

# Fit a random forest classification model with spark.randomForest
model <- spark.randomForest(training, Delayed ~ Month + DayofMonth + DayOfWeek + DepTime + UC + From + To + Distance, "classification", numTrees = 20)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
showDF(predictions)

Formula:  Delayed ~ Month + DayofMonth + DayOfWeek + DepTime + UC + From +     To + Distance
Number of features:  8
Features:  Month DayofMonth DayOfWeek DepTime UC From To Distance
Feature importances:  (8,[0,1,2,3,4,5,6,7],[0.06417822139891059,0.04653327133861529,0.04451299328940076,0.72389434192068,0.10090901924986892,0.006464205594194727,0.007114220890985394,0.006393726317344299])
Number of trees:  20
Tree weights:  1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 RandomForestClassificationModel (uid=rfc_238d895dedda) with 20 trees
  Tree 0 (weight 1.0):
    If (feature 3 <= 0.5680373039423484)
     If (feature 7 <= 0.2543329302700524)
      If (feature 3 <= 0.39465875370919884)
       If (feature 5 <= 0.043689320388349516)
        If (feature 5 <= 0.038834951456310676)
         Predict: 0.0
        Else (feature 5 > 0.038834951456310676)
         Predict: 0.0
       Else (feature 5 > 0.043689320388349516)
        If (feature 1 <= 0.2903225806451613)
         Predict: 0.0
        Else (fea

+-----+-------------------+-------------------+-------------------+-------------+--------------------+------+----+-------------------+-------+------------------+------------------+-------------------+--------------------+--------------------+----------+
|Month|         DayofMonth|          DayOfWeek|            DepTime|UniqueCarrier|            DepDelay|Origin|Dest|           Distance|Delayed|                UC|              From|                 To|       rawPrediction|         probability|prediction|
+-----+-------------------+-------------------+-------------------+-------------+--------------------+------+----+-------------------+-------+------------------+------------------+-------------------+--------------------+--------------------+----------+
| 0.25| 0.3225806451612903| 0.7142857142857143| 0.4353539635438745|           HP|0.002090592334494...|   PHX| BWI|0.40286174929463925|    0.0|0.5454545454545454|0.7524271844660194|0.14563106796116504|[17.0589982964845...|[0.85294991482422

In [30]:
pred_df <- select(predictions,"Delayed","prediction")
pred_df$equals <- pred_df$prediction
pred_df$equals <- ifelse(pred_df$Delayed == pred_df$prediction, 1, 0)

createOrReplaceTempView(pred_df, "pred_df")
corrects <- collect(sql("SELECT SUM(equals) FROM pred_df"))

(corrects$"sum(equals)")/nrow(pred_df)