# Flight Delay Prediction Demo Using SystemML

This notebook is based on datascientistworkbench.com's tutorial notebook for predicting flight delay.

## Loading SystemML 

To use one of the released version, use "%AddDeps org.apache.systemml systemml 0.9.0-incubating". To use nightly build, "%AddJar https://sparktc.ibmcloud.com/repo/latest/SystemML.jar"

Or you provide SystemML.jar and dependency through commandline when starting the notebook (for example: --packages com.databricks:spark-csv_2.10:1.4.0 --jars SystemML.jar)

In [1]:
%AddJar https://sparktc.ibmcloud.com/repo/latest/SystemML.jar

Using cached version of SystemML.jar


Use Spark's CSV package for loading the CSV file

In [2]:
%AddDeps com.databricks spark-csv_2.10 1.4.0

:: loading settings :: url = jar:file:/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
:: resolving dependencies :: com.ibm.spark#spark-kernel;working [not transitive]
	confs: [default]
	found com.databricks#spark-csv_2.10;1.4.0 in central
downloading https://repo1.maven.org/maven2/com/databricks/spark-csv_2.10/1.4.0/spark-csv_2.10-1.4.0.jar ...
	[SUCCESSFUL ] com.databricks#spark-csv_2.10;1.4.0!spark-csv_2.10.jar (66ms)
:: resolution report :: resolve 459ms :: artifacts dl 70ms
	:: modules in use:
	com.databricks#spark-csv_2.10;1.4.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   1   |   1   |   0   ||   1 

## Import Data

Download the airline dataset from stat-computing.org if not already downloaded

In [3]:
import sys.process._
import java.net.URL
import java.io.File
val url = "http://stat-computing.org/dataexpo/2009/2007.csv.bz2"
val localFilePath = "airline2007.csv.bz2"
if(!new java.io.File(localFilePath).exists) {
    new URL(url) #> new File(localFilePath) !!
}

Load the dataset into DataFrame using Spark CSV package

In [4]:
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
val sqlContext = new SQLContext(sc)
val fmt = sqlContext.read.format("com.databricks.spark.csv")
val opt = fmt.options(Map("header"->"true", "inferSchema"->"true"))
val airline = opt.load(localFilePath).na.replace( "*", Map("NA" -> "0.0") )

In [5]:
airline.printSchema

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

## Data Exploration
Which airports have the most delays?

In [6]:
airline.registerTempTable("airline")
sqlContext.sql("""SELECT Origin, count(*) conFlight, avg(DepDelay) delay
                    FROM airline
                    GROUP BY Origin
                    ORDER BY delay DESC""").show

+------+---------+------------------+
|Origin|conFlight|             delay|
+------+---------+------------------+
|   PIR|        4|              45.5|
|   ACK|      314|45.296178343949045|
|   SOP|      195| 34.02051282051282|
|   HHH|      997| 22.58776328986961|
|   MCN|      992|22.496975806451612|
|   AKN|      235|21.123404255319148|
|   CEC|     1055|20.807582938388627|
|   GNV|     1927| 20.69797612869746|
|   EYW|     1052|20.224334600760457|
|   ACY|      735|20.141496598639456|
|   SPI|     1745|19.545558739255014|
|   GST|       90|19.233333333333334|
|   EWR|   154113|18.800853918877706|
|   BRW|      726| 18.02754820936639|
|   AGS|     2286|17.728346456692915|
|   ORD|   375784|17.695756072637472|
|   TRI|     1207| 17.63628831814416|
|   SBN|     5128|17.505850234009362|
|   FAY|     2185| 17.48970251716247|
|   PHL|   104063|17.067776250924922|
+------+---------+------------------+
only showing top 20 rows



## Modeling: Logistic Regression

Predict departure delays of greater than 15 of flights from JFK

In [7]:
sqlContext.udf.register("checkDelay", (depDelay:String) => try { if(depDelay.toDouble > 15) 1.0 else 2.0 } catch { case e:Exception => 1.0 })
val tempSmallAirlineData = sqlContext.sql("SELECT *, checkDelay(DepDelay) label FROM airline WHERE Origin = 'JFK'").persist(StorageLevel.MEMORY_AND_DISK)
val popularDest = tempSmallAirlineData.select("Dest").map(y => (y.get(0).toString, 1)).reduceByKey(_ + _).filter(_._2 > 1000).collect.toMap
sqlContext.udf.register("onlyUsePopularDest", (x:String) => popularDest.contains(x))
tempSmallAirlineData.registerTempTable("tempAirline")
val smallAirlineData = sqlContext.sql("SELECT * FROM tempAirline WHERE onlyUsePopularDest(Dest)")

val datasets = smallAirlineData.randomSplit(Array(0.7, 0.3))
val trainDataset = datasets(0).cache
val testDataset = datasets(1).cache
trainDataset.count
testDataset.count

### Feature selection

Encode the destination using one-hot encoding and include the columns Year, Month, DayofMonth, DayOfWeek, Distance

In [8]:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}

val indexer = new StringIndexer().setInputCol("Dest").setOutputCol("DestIndex") // .setHandleInvalid("skip") // Only works on Spark 1.6 or later
val encoder = new OneHotEncoder().setInputCol("DestIndex").setOutputCol("DestVec")
val assembler = new VectorAssembler().setInputCols(Array("Year","Month","DayofMonth","DayOfWeek","Distance","DestVec")).setOutputCol("features")

### Build the model: Use SystemML's MLPipeline wrapper. 

This wrapper invokes MultiLogReg.dml (for training) and GLM-predict.dml (for prediction). These DML algorithms are available at https://github.com/apache/incubator-systemml/tree/master/scripts/algorithms

In [9]:
import org.apache.spark.ml.Pipeline
import org.apache.sysml.api.ml.LogisticRegression

val lr = new LogisticRegression("log", sc).setRegParam(1e-4).setTol(1e-2).setMaxInnerIter(0).setMaxOuterIter(100)

val pipeline = new Pipeline().setStages(Array(indexer, encoder, assembler, lr))
val model = pipeline.fit(trainDataset)


Welcome to Apache SystemML!

BEGIN MULTINOMIAL LOGISTIC REGRESSION SCRIPT
Reading X...
Reading Y...
-- Initially:  Objective = 56210.77060750876,  Gradient Norm = 4.4526526969600074E7,  Trust Delta = 0.001024586722033724
-- Outer Iteration 1: Had 1 CG iterations
   -- Obj.Reduction:  Actual = 9231.806553563853,  Predicted = 8882.93092452256  (A/P: 1.0393),  Trust Delta = 4.153060819933389E-4
   -- New Objective = 46978.96405394491,  Beta Change Norm = 3.989950049590561E-4,  Gradient Norm = 3495966.22664141
 
-- Outer Iteration 2: Had 2 CG iterations
   -- Obj.Reduction:  Actual = 114.78674011961266,  Predicted = 112.77285709513916  (A/P: 1.0179),  Trust Delta = 4.153060819933389E-4
   -- New Objective = 46864.1773138253,  Beta Change Norm = 1.1145435198192975E-4,  Gradient Norm = 90515.83715717653
Termination / Convergence condition satisfied.


### Evaluate the model 

Output RMS error on test data

In [10]:
val predictions = model.transform(testDataset.withColumnRenamed("label", "OriginalLabel"))
predictions.select("prediction", "OriginalLabel").show
sqlContext.udf.register("square", (x:Double) => Math.pow(x, 2.0))

+----------+-------------+
|prediction|OriginalLabel|
+----------+-------------+
|       2.0|          2.0|
|       2.0|          2.0|
|       2.0|          2.0|
|       2.0|          1.0|
|       2.0|          2.0|
|       2.0|          1.0|
|       2.0|          1.0|
|       2.0|          2.0|
|       2.0|          1.0|
|       2.0|          2.0|
|       2.0|          2.0|
|       2.0|          2.0|
|       2.0|          2.0|
|       2.0|          1.0|
|       2.0|          1.0|
|       2.0|          1.0|
|       2.0|          2.0|
|       2.0|          1.0|
|       2.0|          1.0|
|       2.0|          1.0|
+----------+-------------+
only showing top 20 rows



In [11]:
predictions.registerTempTable("predictions")
sqlContext.sql("SELECT sqrt(avg(square(OriginalLabel - prediction))) FROM predictions").show

+------------------+
|               _c0|
+------------------+
|0.5181495365908705|
+------------------+



### Perform k-fold cross-validation to tune the hyperparameters

Perform cross-validation to tune the regularization parameter for Logistic regression.

In [12]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

val crossval = new CrossValidator().setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator)
val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.1, 1e-3, 1e-6)).build()
crossval.setEstimatorParamMaps(paramGrid)
crossval.setNumFolds(2) // Setting k = 2
val cvmodel = crossval.fit(trainDataset)

BEGIN MULTINOMIAL LOGISTIC REGRESSION SCRIPT
Reading X...
Reading Y...
-- Initially:  Objective = 28149.40014971994,  Gradient Norm = 2.219077877076751E7,  Trust Delta = 0.001024586722033724
-- Outer Iteration 1: Had 1 CG iterations
   -- Obj.Reduction:  Actual = 4575.598456876014,  Predicted = 4404.375311516109  (A/P: 1.0389),  Trust Delta = 4.130115632741793E-4
   -- New Objective = 23573.801692843925,  Beta Change Norm = 3.9695545226363187E-4,  Gradient Norm = 1740806.9734678708
Termination / Convergence condition satisfied.
BEGIN MULTINOMIAL LOGISTIC REGRESSION SCRIPT
Reading X...
Reading Y...
-- Initially:  Objective = 28149.40014971994,  Gradient Norm = 2.219077877076751E7,  Trust Delta = 0.001024586722033724
-- Outer Iteration 1: Had 1 CG iterations
   -- Obj.Reduction:  Actual = 4575.598456884985,  Predicted = 4404.375311523911  (A/P: 1.0389),  Trust Delta = 4.1301156327499524E-4
   -- New Objective = 23573.801692834953,  Beta Change Norm = 3.969554522643349E-4,  Gradient Norm 

Name: java.lang.IllegalArgumentException
Message: Field "rawPrediction" does not exist.
StackTrace: org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:212)
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:212)
scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
scala.collection.AbstractMap.getOrElse(Map.scala:58)
org.apache.spark.sql.types.StructType.apply(StructType.scala:211)
org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.evaluate(BinaryClassificationEvaluator.scala:82)
org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1.apply(CrossValidator.scala:109)
org.apache.spark.ml.tuning.CrossValidator$$anonfun$fit$1.apply(CrossValidator.scala:99)
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
org.apache.spark.ml.tuning.CrossValidator.f

In [15]:
cvmodel

Name: Compile Error
Message: <console>:37: error: not found: value cvmodel
              cvmodel
              ^
StackTrace: 

### Evaluate the cross-validated model

In [14]:
val cvpredictions = cvmodel.transform(testDataset.withColumnRenamed("label", "OriginalLabel"))
cvpredictions.registerTempTable("cvpredictions")
sqlContext.sql("SELECT sqrt(avg(square(OriginalLabel - prediction))) FROM cvpredictions").show

Name: Compile Error
Message: <console>:46: error: not found: value cvmodel
         val cvpredictions = cvmodel.transform(testDataset.withColumnRenamed("label", "OriginalLabel"))
                             ^
StackTrace: 

## Homework ;)

Read http://apache.github.io/incubator-systemml/algorithms-classification.html#multinomial-logistic-regression and perform cross validation on other hyperparameters: for example: icpt, tol, maxOuterIter, maxInnerIter