# Predicting Airline Data using a Generalized Linear Model (GLM) in Spark

In particular, we will predict the probability that a flight is late based on its departure date/time, the expected flight time and distance, the origin and destitation airports.

The core library for the dataframe part is [Spark DataFrame](https://spark.apache.org/docs/latest/sql-programming-guide.html).<br>
The core library for the machine learning part is [Spark MLIB](https://spark.apache.org/docs/latest/ml-guide.html).

Spark can be used from R and python, but has it is originally written in scala, we prefer to use it directly from the scala language.

The [jupyter-scala kernel](https://github.com/alexarchambault/jupyter-scala#spark) is used for this notebook

### Considerations

The objective of this notebook is to define a simple model offerring a point of comparison in terms of computing performances across datascience language and libraries.  In otherwords, this notebook is not for you if you are looking for the most accurate model in airline predictions.  

## Install and Load useful libraries

In [1]:
import $ivy.`org.plotly-scala::plotly-jupyter-scala:0.3.0`

import plotly._
import plotly.element._
import plotly.layout._
import plotly.JupyterScala._

[32mimport [39m[36m$ivy.$                                             

[39m
[32mimport [39m[36mplotly._
[39m
[32mimport [39m[36mplotly.element._
[39m
[32mimport [39m[36mplotly.layout._
[39m
[32mimport [39m[36mplotly.JupyterScala._[39m

In [2]:
import $ivy.`org.slf4j:slf4j-nop:1.7.12`         // for cleaner logs
import $ivy.`org.apache.spark::spark-sql:2.0.2`  // adjust spark version - spark >= 1.6 should be fine, possibly >= 1.3 too
import $ivy.`org.apache.spark::spark-mllib:2.0.2`
import $ivy.`org.jupyter-scala::spark:0.4.0-RC5` // JupyterSparkContext-s (SparkContext aware of the jupyter-scala kernel)
//import $ivy.`co.theasi::plotly:0.2.0`


import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.classification.LogisticRegression

import jupyter.spark._

// The conf can be tweaked a bit before use.
// Mark it transient to prevent serialization issues.
@transient val sparkConf = new SparkConf()
  .setAppName("SBTB")
  .setMaster("local[*]") // use all local CPUs
  .set("spark.executor.memory", "2g")
  .set("spark.driver.memory", "2g")

@transient val sc = new JupyterSparkContext(sparkConf)

val sqlContext = new SQLContext(sc)

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/loicus/.coursier/cache/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-nop/1.7.12/slf4j-nop-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/loicus/.m2/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.helpers.NOPLoggerFactory]
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


[32mimport [39m[36m$ivy.$                                    // for cleaner logs
[39m
[32mimport [39m[36m$ivy.$                                    // adjust spark version - spark >= 1.6 should be fine, possibly >= 1.3 too
[39m
[32mimport [39m[36m$ivy.$                                    
[39m
[32mimport [39m[36m$ivy.$                                    // JupyterSparkContext-s (SparkContext aware of the jupyter-scala kernel)
//import $ivy.`co.theasi::plotly:0.2.0`


[39m
[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
[39m
[32mimport [39m[36morg.apache.spark.ml.{Pipeline, PipelineModel}
[39m
[32mimport [39m[36morg.apache.spark.ml.linalg.Vectors
[39m
[32mimport [39m[36morg.apache.spark.ml.classification.LogisticRegression

[39m
[32mimport [39m[36mjupyter.spark._

// The conf can be tweaked a bit before use.
//

## Load the data

- The dataset is taken from [http://stat-computing.org](http://stat-computing.org/dataexpo/2009/the-data.html).  We take the data corresponding to year 2008.
- We restrict the dataset to the first million rows
- We print all column names and the first 5 rows of the dataset

In [3]:
val dffull = sqlContext
  .read
  .format("csv")
  .option("header", true)
  .option("inferSchema", true)
  .load("2008.csv")
val count = dffull.count

                                                                                

[36mdffull[39m: [32mDataFrame[39m = [Year: int, Month: int ... 27 more fields]
[36mcount[39m: [32mLong[39m = [32m7009728L[39m

In [4]:
val df = dffull.sample(false, 1000000.toFloat/count) //spark way of randomly keeping (approximately) 1M rows

[36mdf[39m: [32mDataset[39m[[32mRow[39m] = [Year: int, Month: int ... 27 more fields]

In [5]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

## Data preparation for training

- We turn origin/destination categorical data to a "one-hot" encoding representation
- We create a new "binary" column indicating if the flight was delayed or not.
- We show the first 5 rows of the modified dataset
- We split the dataset in two parts:  a training dataset and a testing dataset containing 80% and 20% of the rows, respectively.

In [6]:
val df2 = df
          .filter(row => row.getAs[Double]("ArrDelay")!=null)
          .withColumn("IsArrDelayed", (df("ArrDelay")>0).cast("int"))
          .withColumn("DepTime", df("DepTime").cast("int"))
          .withColumn("AirTime", df("AirTime").cast("int"))
          .filter(row => row.getAs[Int]("DepTime")!=null && row.getAs[Int]("AirTime")!=null)
          .select("Year","Month",  "DayofMonth" ,"DayOfWeek", "DepTime", "AirTime", "Distance", "Origin", "Dest", "ArrDelay", "IsArrDelayed")        
df2.show(5)

+----+-----+----------+---------+-------+-------+--------+------+----+--------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|AirTime|Distance|Origin|Dest|ArrDelay|IsArrDelayed|
+----+-----+----------+---------+-------+-------+--------+------+----+--------+------------+
|2008|    1|         3|        4|    617|     70|     451|   IND| MCI|       2|           1|
|2008|    1|         3|        4|   1954|    155|    1093|   ISP| FLL|       4|           1|
|2008|    1|         3|        4|    712|    142|     972|   ISP| MCO|      -7|           0|
|2008|    1|         3|        4|   1312|    140|     972|   ISP| MCO|      -4|           0|
|2008|    1|         3|        4|    634|    142|    1034|   ISP| TPA|     -28|           0|
+----+-----+----------+---------+-------+-------+--------+------+----+--------+------------+
only showing top 5 rows



[36mdf2[39m: [32mDataFrame[39m = [Year: int, Month: int ... 9 more fields]

In [7]:
val OriginIndexer = new StringIndexer()
  .setInputCol("Origin")
  .setOutputCol("OriginIndex")

val OriginEncoder = new OneHotEncoder()
  .setInputCol("OriginIndex")
  .setOutputCol("OriginVec")

val DestIndexer = new StringIndexer()
  .setInputCol("Dest")
  .setOutputCol("DestIndex")

val DestEncoder = new OneHotEncoder()
  .setInputCol("DestIndex")
  .setOutputCol("DestVec")

val Assembler = new VectorAssembler()
  .setInputCols(Array("Year","Month",  "DayofMonth" ,"DayOfWeek", "DepTime", "AirTime", "Distance", "OriginVec", "DestVec"))
  .setOutputCol("Features")

val pipeline = new Pipeline()
  .setStages(Array(OriginIndexer, OriginEncoder, DestIndexer, DestEncoder, Assembler))

val Preparator = pipeline.fit(df2)
val dfPrepared = Preparator.transform(df2).cache()


                                                                                

[36mOriginIndexer[39m: [32mStringIndexer[39m = strIdx_1ce73ff23548
[36mOriginEncoder[39m: [32mOneHotEncoder[39m = oneHot_dc35da52f7f1
[36mDestIndexer[39m: [32mStringIndexer[39m = strIdx_6aa37c5fb316
[36mDestEncoder[39m: [32mOneHotEncoder[39m = oneHot_ed0606908b4e
[36mAssembler[39m: [32mVectorAssembler[39m = vecAssembler_e82c463f074a
[36mpipeline[39m: [32mPipeline[39m = pipeline_07787b63f053
[36mPreparator[39m: [32mPipelineModel[39m = pipeline_07787b63f053
[36mdfPrepared[39m: [32mDataset[39m[[32mRow[39m] = [Year: int, Month: int ... 14 more fields]

In [8]:
val Array(train, test) = dfPrepared.randomSplit(Array(0.8,0.2))
train.select("Features","IsArrDelayed").show(5)

+--------------------+------------+
|            Features|IsArrDelayed|
+--------------------+------------+
|(610,[0,1,2,3,4,5...|           1|
|(610,[0,1,2,3,4,5...|           1|
|(610,[0,1,2,3,4,5...|           1|
|(610,[0,1,2,3,4,5...|           1|
|(610,[0,1,2,3,4,5...|           0|
+--------------------+------------+
only showing top 5 rows



[36mtrain[39m: [32mDataset[39m[[32mRow[39m] = [Year: int, Month: int ... 14 more fields]
[36mtest[39m: [32mDataset[39m[[32mRow[39m] = [Year: int, Month: int ... 14 more fields]

## Model building

- We define the generalized linear model using a binomial function --> Logistic regression.
- We train the model and measure the training time --> ~15sec using the 8 cores of an intel i7-6700K (4.0 GHz) for 800K rows 	
- We show the model coefficients
- We show the 10 most important variables

In [9]:
import org.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
  .setLabelCol("IsArrDelayed")
  .setFeaturesCol("Features") 

val lrModel = lr.fit(train)

                                                                                

[32mimport [39m[36morg.apache.spark.ml.classification.{BinaryLogisticRegressionSummary, LogisticRegression}

[39m
[36mlr[39m: [32mLogisticRegression[39m = logreg_c5e7f54d4193
[36mlrModel[39m: [32mml[39m.[32mclassification[39m.[32mLogisticRegressionModel[39m = logreg_c5e7f54d4193

In [24]:
lrModel.coefficients.toArray.zipWithIndex.map(_.swap).sortBy(a => -math.abs(a._2)).take(10)

[36mres23[39m: [32mArray[39m[([32mInt[39m, [32mDouble[39m)] = [33mArray[39m(
  ([32m609[39m, [32m-18.64603660363778[39m),
  ([32m308[39m, [32m13.4801612068689[39m),
  ([32m306[39m, [32m-12.036568560945861[39m),
  ([32m304[39m, [32m-11.126498828344797[39m),
  ([32m287[39m, [32m-6.719433480968447[39m),
  ([32m298[39m, [32m-6.321784069744478[39m),
  ([32m268[39m, [32m-3.665292029649553[39m),
  ([32m221[39m, [32m-2.8777270416209544[39m),
  ([32m263[39m, [32m-2.869963711124296[39m),
  ([32m302[39m, [32m2.656357508300985[39m)
)

## Model testing

- We add a model prediction column to the testing dataset
- We show the first 10 rows of the test dataset (with the new column)
- We show the model ROC curve
- We measure the model Area Under Curve (AUC) to be 0.706 on the testing dataset.  

This is telling us that our model is not super accurate  (we generally assume that a model is raisonable at predicting when it has an AUC above 0.8).  But, since we are not trying to build the best possible model, but just show comparison of data science code/performance accross languages/libraries.
If none the less you are willing to improve this result, you should try adding more feature column into the model.

In [30]:
val testWithPred = lrModel.transform(test.select("IsArrDelayed","Features"))
testWithPred.show(10)

+------------+--------------------+--------------------+--------------------+----------+
|IsArrDelayed|            Features|       rawPrediction|         probability|prediction|
+------------+--------------------+--------------------+--------------------+----------+
|           1|(610,[0,1,2,3,4,5...|[1.86870909738161...|[0.86630883860640...|       0.0|
|           1|(610,[0,1,2,3,4,5...|[1.47930175230465...|[0.81446709135184...|       0.0|
|           0|(610,[0,1,2,3,4,5...|[1.78182058195163...|[0.85592152534880...|       0.0|
|           1|(610,[0,1,2,3,4,5...|[1.87177312743705...|[0.86666330980389...|       0.0|
|           1|(610,[0,1,2,3,4,5...|[0.98950749209980...|[0.72899063184229...|       0.0|
|           0|(610,[0,1,2,3,4,5...|[0.98053362392551...|[0.72721408615902...|       0.0|
|           1|(610,[0,1,2,3,4,5...|[-0.0419693560510...|[0.48950920083996...|       1.0|
|           0|(610,[0,1,2,3,4,5...|[0.86983454048872...|[0.70471126814918...|       0.0|
|           0|(610,[0

[36mtestWithPred[39m: [32mDataFrame[39m = [IsArrDelayed: int, Features: vector ... 3 more fields]

In [31]:
val trainingSummary = lrModel.evaluate(test)
val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary]
val roc = binarySummary.roc

[36mtrainingSummary[39m: [32mml[39m.[32mclassification[39m.[32mLogisticRegressionSummary[39m = org.apache.spark.ml.classification.BinaryLogisticRegressionSummary@636ee3ee
[36mbinarySummary[39m: [32mBinaryLogisticRegressionSummary[39m = org.apache.spark.ml.classification.BinaryLogisticRegressionSummary@636ee3ee
[36mroc[39m: [32mDataFrame[39m = [FPR: double, TPR: double]

In [32]:
plotly.JupyterScala.init()
val fpr = roc.select("FPR").rdd.map(_.getDouble(0)).collect.toSeq;
val tpr = roc.select("TPR").rdd.map(_.getDouble(0)).collect.toSeq;

plotly.Scatter(fpr, tpr, name = "ROC").plot(title = "ROC Curve")

[36mfpr[39m: [32mSeq[39m[[32mDouble[39m] = [33mArray[39m(
  [32m0.0[39m,
  [32m0.0027274200226834935[39m,
  [32m0.007246115901848885[39m,
  [32m0.012061857526059013[39m,
  [32m0.017498694798999046[39m,
  [32m0.023061551478927755[39m,
  [32m0.02920949826273246[39m,
  [32m0.03558247970187409[39m,
  [32m0.041874448665094426[39m,
  [32m0.048463463373359494[39m,
  [32m0.055187498874826726[39m,
[33m...[39m
[36mtpr[39m: [32mSeq[39m[[32mDouble[39m] = [33mArray[39m(
  [32m0.0[39m,
  [32m0.01946560412516114[39m,
  [32m0.03659908590179304[39m,
  [32m0.05334583382163366[39m,
  [32m0.06928395640454706[39m,
  [32m0.0850580100785187[39m,
  [32m0.10007031524668933[39m,
  [32m0.1147896402203211[39m,
  [32m0.12961443806398687[39m,
  [32m0.14405250205086137[39m,
  [32m0.15831477792101253[39m,
[33m...[39m
[36mres31_3[39m: [32mString[39m = [32m"plot-709911267"[39m

In [33]:
println(s"areaUnderROC: ${binarySummary.areaUnderROC}")

areaUnderROC: 0.6451332096083849


## Key takeaways

- We built a GLM model predicting airline delay probability
- We train it on 800K rows in ~15sec on an intel i7-6700K (4.0 GHz)
- We measure an AUC of 0.702, which is not super accurate but reasonable
- We demonstrated a typical workflow in python language in a Jupyter notebook

I might be biased, but I find the [pandas](http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html)/[scikit-learn](http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html#sklearn.linear_model.LogisticRegression) documentation particularly complete and easy to read.  In addition they are thousdands of recent examples/tutorials all over the web.