## This notebook is part of Hadoop and Spark training delivered by IT-DB group
### SPARK MLlib Hands-On Lab
_ by Prasanth Kothuri _

 ### Import the required libraries

In [1]:
import os
from pyspark import SparkContext, SQLContext, SparkConf
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from collections import namedtuple
from pprint import pprint

### create spark context

In [2]:
# add databricks csv package
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.5.0 pyspark-shell'
# create sparkContext and sqlContext
conf = SparkConf().set("spark.executor.memory", "4g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

### load flight data into spark DataFrame and inspect data

In [3]:
# read train and test datasets
flights_df = sqlContext.read.format('com.databricks.spark.csv')\
        .options(header='true', inferSchema='true').load("../data/US_Flight_Data.csv")

In [4]:
flights_df.printSchema()

root
 |-- dofM: integer (nullable = true)
 |-- dofW: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flnum: integer (nullable = true)
 |-- org_id: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest_id: integer (nullable = true)
 |-- dest: string (nullable = true)
 |-- crsdeptime: integer (nullable = true)
 |-- deptime: integer (nullable = true)
 |-- depdelaymins: double (nullable = true)
 |-- crsarrtime: integer (nullable = true)
 |-- arrtime: integer (nullable = true)
 |-- arrdelay: double (nullable = true)
 |-- crselapsedtime: double (nullable = true)
 |-- dist: double (nullable = true)



In [5]:
flights_df.show(3)

+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+----------+-------+--------+--------------+------+
|dofM|dofW|carrier|tailnum|flnum|org_id|origin|dest_id|dest|crsdeptime|deptime|depdelaymins|crsarrtime|arrtime|arrdelay|crselapsedtime|  dist|
+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+----------+-------+--------+--------------+------+
|   1|   7|     AA| N3CGAA|  307| 11292|   DEN|  14107| PHX|      1145|   1135|         0.0|      1345|   1328|   -17.0|         120.0| 602.0|
|   1|   7|     AA| N3CGAA|  307| 14107|   PHX|  14057| PDX|      1510|   1502|         0.0|      1701|   1653|    -8.0|         171.0|1009.0|
|   1|   7|     AA| N3EVAA|  309| 11278|   DCA|  13303| MIA|       659|    646|         0.0|       944|    930|   -14.0|         165.0| 919.0|
+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+----------+-------+--------+--------------+------+

### Prepare data for ML Algorithm ###
#### 1. create delayed label column ####
_ mark flight as delayed if depdelaymins > 40 min _

In [6]:
def delayed(record):
  if record:
        if float(record) > 40:
            return 1.0
        else:
            return 0.0
  else:
    return 0.0

In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
dlay = udf(lambda x: delayed(x), FloatType())
features_df = flights_df.withColumn("delayed", dlay(flights_df.depdelaymins.cast('float'))).fillna(0)

In [8]:
features_df.show(3)

+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+----------+-------+--------+--------------+------+-------+
|dofM|dofW|carrier|tailnum|flnum|org_id|origin|dest_id|dest|crsdeptime|deptime|depdelaymins|crsarrtime|arrtime|arrdelay|crselapsedtime|  dist|delayed|
+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+----------+-------+--------+--------------+------+-------+
|   1|   7|     AA| N3CGAA|  307| 11292|   DEN|  14107| PHX|      1145|   1135|         0.0|      1345|   1328|   -17.0|         120.0| 602.0|    0.0|
|   1|   7|     AA| N3CGAA|  307| 14107|   PHX|  14057| PDX|      1510|   1502|         0.0|      1701|   1653|    -8.0|         171.0|1009.0|    0.0|
|   1|   7|     AA| N3EVAA|  309| 11278|   DCA|  13303| MIA|       659|    646|         0.0|       944|    930|   -14.0|         165.0| 919.0|    0.0|
+----+----+-------+-------+-----+------+------+-------+----+----------+-------+------------+--

#### 2. Encode string column of labels to column of label indices ####

In [9]:
carrierIdx = StringIndexer(inputCol="carrier", outputCol="carrierIdx")
originIdx = StringIndexer(inputCol="origin", outputCol="originIdx")
destIdx = StringIndexer(inputCol="dest", outputCol="destIdx")
#pipeline1 = Pipeline(stages=[carrierIdx,originIdx,destIdx])
#f_df = pipeline1.fit(features_df).transform(features_df)
#f_df.show()

#### 3. Create feature vector ####

In [11]:
# identify the list of features
# assembler will create a single column with vector of features
assembler = VectorAssembler(
    inputCols=["dofM", "dofW", "crsdeptime", "crsarrtime", "crselapsedtime", "carrierIdx", "originIdx", "destIdx"],
    outputCol="features")

In [None]:
import pandas as pd
assembler.transform(trainingData.limit(5)).toPandas()

### choose the machine learning algorithm ###
_ Decision tree is commonly used for classification _

In [22]:
dt = DecisionTreeClassifier(labelCol="delayed", featuresCol="features", maxDepth=12, maxBins=9000, impurity="gini")
print("DecisionTreeClassifier parameters:\n" + dt.explainParams() + "\n")

DecisionTreeClassifier parameters:
cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini, current: gini)
labelCol: label column name. (default: label, current: delayed)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32, current: 9000)
maxDepth: Maximum depth of th

### create a sequence of actions / transformation to be run ###
_ Spark MLlib represents such a sequence as pipeline _

In [13]:
# create a pipeline to run carrierIdx->originIdx->destIdx->assembler->dt
pipeline = Pipeline(stages=[carrierIdx,originIdx,destIdx,assembler,dt])

###  Split data into training and test ###

In [14]:
(trainingData, testData) = features_df.randomSplit([0.7, 0.3])

### train the model by running the pipeline ###

In [15]:
model = pipeline.fit(trainingData)

### apply the model to the testdata to get the predictions ###

In [16]:
predictions = model.transform(testData)

In [17]:
predictions.groupby("prediction").count().collect()
#predictions.select("dofM","dofW","carrier","flnum","origin","dest","deptime","prediction").show(10)

[Row(prediction=0.0, count=131622), Row(prediction=1.0, count=3560)]

### What is the accuracy?

In [18]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="delayed", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.105783 
