# Models

In [3]:
from pyspark.sql.functions import *
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt

In [4]:
#spark ML imports
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StringIndexer, CountVectorizer, IDF
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [5]:
import pyspark.ml.feature
from pyspark.sql.functions import when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
from pyspark.ml.feature import StandardScaler

In [8]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
import tempfile
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import col, explode, array, lit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

In [9]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Airline Cancellation Prediction

### Data processing and feature engineering

In [26]:
#Load data into PySpark
df = spark.read.csv("/user/nhu2/airline/Data_logistics_clean.csv", inferSchema=True, header=True)

In [27]:
# check data type
df.dtypes

[('FL_DATE', 'timestamp'),
 ('OP_CARRIER', 'string'),
 ('ORIGIN', 'string'),
 ('CRS_DEP_TIME', 'double'),
 ('CRS_ELAPSED_TIME', 'double'),
 ('CANCELLED', 'double')]

In [13]:
# convert to date datatype
df = df.withColumn("FL_DATE",to_date(col("FL_DATE"),"yyyy-MM-dd"))

In [14]:
# construct seasonal variable based on date 
df = df.withColumn("SEASON", \
   when((month(df.FL_DATE) >= 3) & (month(df.FL_DATE) <= 5), lit("Spring")) \
     .when((month(df.FL_DATE) >= 6) & (month(df.FL_DATE) <= 8), lit("Summer")) \
.when((month(df.FL_DATE) >= 9) & (month(df.FL_DATE) <= 11), lit("Fall")) \
     .otherwise(lit("Winter")) \
  )

In [15]:
#convert relevant categorical into one hot encoded
indexer1 = StringIndexer(inputCol="OP_CARRIER", outputCol="OP_CARRIERIDX").setHandleInvalid("skip")
indexer2 = StringIndexer(inputCol="ORIGIN", outputCol="ORIGINIDX").setHandleInvalid("skip")
indexer3 = StringIndexer(inputCol="SEASON", outputCol="SEASONIDX").setHandleInvalid("skip")

#gather all indexers as inputs to the One Hot Encoder
inputs = [indexer1.getOutputCol(), indexer2.getOutputCol(), \
          indexer3.getOutputCol()]

#create the one hot encoder
encoder = OneHotEncoder(inputCols=inputs,  \
                                 outputCols=["OP_CARRIERVec", "ORIGINVec", \
                                             "SEASONVec"])

#run it through a pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, encoder])
encodedData = pipeline.fit(df).transform(df)

#we have removed NAs so dont need to impute missing values.
#pipeline = pipeline.na.fill(0) 

encodedData.show(5)

                                                                                

+----------+----------+------+------------+----------------+---------+------+-------------+---------+---------+---------------+----------------+---------+
|   FL_DATE|OP_CARRIER|ORIGIN|CRS_DEP_TIME|CRS_ELAPSED_TIME|CANCELLED|SEASON|OP_CARRIERIDX|ORIGINIDX|SEASONIDX|  OP_CARRIERVec|       ORIGINVec|SEASONVec|
+----------+----------+------+------------+----------------+---------+------+-------------+---------+---------+---------------+----------------+---------+
|2009-01-01|        XE|   DCA|      1100.0|            62.0|      0.0|Winter|         11.0|     22.0|      3.0|(22,[11],[1.0])|(379,[22],[1.0])|(3,[],[])|
|2009-01-01|        XE|   EWR|      1510.0|            82.0|      0.0|Winter|         11.0|     13.0|      3.0|(22,[11],[1.0])|(379,[13],[1.0])|(3,[],[])|
|2009-01-01|        XE|   EWR|      1100.0|            70.0|      0.0|Winter|         11.0|     13.0|      3.0|(22,[11],[1.0])|(379,[13],[1.0])|(3,[],[])|
|2009-01-01|        XE|   DCA|      1240.0|            77.0|      0.0|

In [16]:
#gather feature vector and identify features
assembler = VectorAssembler(inputCols = ["OP_CARRIERVec", "ORIGINVec", \
                                             "SEASONVec"], \
                            outputCol = 'features')

encodedData = assembler.transform(encodedData)

In [17]:
#scaler = StandardScaler(inputCol = "features", outputCol = "scaledFeatures", withStd = True, withMean = False)

In [18]:
#scalerdf = scaler.fit(df)

In [19]:
# drop categorical variables 
encodedData = encodedData.drop("OP_CARRIER", "ORIGIN", "SEASON", "OP_CARRIERIDX", "ORIGINIDX", "SEASONIDX", "FL_DATE")

In [20]:
encodedData

CRS_DEP_TIME,CRS_ELAPSED_TIME,CANCELLED,OP_CARRIERVec,ORIGINVec,SEASONVec,features
1100.0,62.0,0.0,"(22,[11],[1.0])","(379,[22],[1.0])","(3,[],[])","(404,[11,44],[1.0..."
1510.0,82.0,0.0,"(22,[11],[1.0])","(379,[13],[1.0])","(3,[],[])","(404,[11,35],[1.0..."
1100.0,70.0,0.0,"(22,[11],[1.0])","(379,[13],[1.0])","(3,[],[])","(404,[11,35],[1.0..."
1240.0,77.0,0.0,"(22,[11],[1.0])","(379,[22],[1.0])","(3,[],[])","(404,[11,44],[1.0..."
1715.0,105.0,0.0,"(22,[11],[1.0])","(379,[27],[1.0])","(3,[],[])","(404,[11,49],[1.0..."
1915.0,147.0,0.0,"(22,[11],[1.0])","(379,[0],[1.0])","(3,[],[])","(404,[11,22],[1.0..."
1645.0,117.0,0.0,"(22,[11],[1.0])","(379,[37],[1.0])","(3,[],[])","(404,[11,59],[1.0..."
1915.0,80.0,0.0,"(22,[11],[1.0])","(379,[22],[1.0])","(3,[],[])","(404,[11,44],[1.0..."
1715.0,83.0,0.0,"(22,[11],[1.0])","(379,[13],[1.0])","(3,[],[])","(404,[11,35],[1.0..."
1300.0,68.0,0.0,"(22,[11],[1.0])","(379,[13],[1.0])","(3,[],[])","(404,[11,35],[1.0..."


In [21]:
# rename CANCELLED to label
encodedData = encodedData.withColumn("label", encodedData.CANCELLED)
encodedData = encodedData.drop("CANCELLED")

In [22]:
encodedData.dtypes

[('CRS_DEP_TIME', 'double'),
 ('CRS_ELAPSED_TIME', 'double'),
 ('OP_CARRIERVec', 'vector'),
 ('ORIGINVec', 'vector'),
 ('SEASONVec', 'vector'),
 ('features', 'vector'),
 ('label', 'double')]

In [23]:
# undersample majority data 
major_df = encodedData.filter(col("label") == 0)
minor_df = encodedData.filter(col("label") == 1)
ratio = major_df.count()/minor_df.count()
print("ratio: {}".format(ratio))

sampled_majority_df = major_df.sample(False, 1/ratio)
combined_df_2 = sampled_majority_df.unionAll(minor_df)
combined_df_2.show()

                                                                                

ratio: 62.25153589825002
+------------+----------------+---------------+-----------------+---------+--------------------+-----+
|CRS_DEP_TIME|CRS_ELAPSED_TIME|  OP_CARRIERVec|        ORIGINVec|SEASONVec|            features|label|
+------------+----------------+---------------+-----------------+---------+--------------------+-----+
|      2130.0|            68.0|(22,[11],[1.0])| (379,[13],[1.0])|(3,[],[])|(404,[11,35],[1.0...|  0.0|
|      1112.0|            93.0|(22,[11],[1.0])|  (379,[6],[1.0])|(3,[],[])|(404,[11,28],[1.0...|  0.0|
|      1800.0|           112.0|(22,[11],[1.0])| (379,[45],[1.0])|(3,[],[])|(404,[11,67],[1.0...|  0.0|
|      1340.0|            67.0|(22,[11],[1.0])| (379,[78],[1.0])|(3,[],[])|(404,[11,100],[1....|  0.0|
|      1020.0|           144.0|(22,[11],[1.0])| (379,[28],[1.0])|(3,[],[])|(404,[11,50],[1.0...|  0.0|
|      1750.0|            87.0|(22,[11],[1.0])|(379,[101],[1.0])|(3,[],[])|(404,[11,123],[1....|  0.0|
|       840.0|           182.0|(22,[11],[1.0])| 

In [24]:
# check whether data is balanced 
combined_df_2.groupBy('label').count().show()



+-----+------+
|label| count|
+-----+------+
|  0.0|974141|
|  1.0|973209|
+-----+------+



                                                                                

In [25]:
#split data into train and test
train_df, test_df = combined_df_2.randomSplit([.8,.2],seed=1234)
train_df

                                                                                

CRS_DEP_TIME,CRS_ELAPSED_TIME,OP_CARRIERVec,ORIGINVec,SEASONVec,features,label
5.0,261.0,"(22,[1],[1.0])","(379,[15],[1.0])","(3,[1],[1.0])","(404,[1,37,402],[...",0.0
5.0,305.0,"(22,[9],[1.0])","(379,[63],[1.0])","(3,[0],[1.0])","(404,[9,85,401],[...",0.0
5.0,305.0,"(22,[9],[1.0])","(379,[63],[1.0])","(3,[0],[1.0])","(404,[9,85,401],[...",0.0
15.0,170.0,"(22,[2],[1.0])","(379,[4],[1.0])","(3,[],[])","(404,[2,26],[1.0,...",0.0
15.0,215.0,"(22,[21],[1.0])","(379,[4],[1.0])","(3,[0],[1.0])","(404,[21,26,401],...",0.0
20.0,200.0,"(22,[2],[1.0])","(379,[7],[1.0])","(3,[0],[1.0])","(404,[2,29,401],[...",0.0
20.0,200.0,"(22,[2],[1.0])","(379,[7],[1.0])","(3,[0],[1.0])","(404,[2,29,401],[...",0.0
20.0,200.0,"(22,[2],[1.0])","(379,[7],[1.0])","(3,[0],[1.0])","(404,[2,29,401],[...",0.0
20.0,211.0,"(22,[21],[1.0])","(379,[4],[1.0])","(3,[1],[1.0])","(404,[21,26,402],...",0.0
20.0,211.0,"(22,[21],[1.0])","(379,[4],[1.0])","(3,[1],[1.0])","(404,[21,26,402],...",0.0


## Logistic Regression

In [41]:
lr = LogisticRegression(featuresCol = 'features', labelCol='label')
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()

# cross validation
# numFolds=3
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, parallelism=2)
cvModel = cv.fit(train_df)

In [42]:
#make predictions
predictions = cvModel.bestModel.transform(test_df)

In [43]:
#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

0.630913013696873
0.6292236298813869


## Elastic Net, LASSO, RIDGE

In [44]:
# linear regression with elastic net, lasso, and ridge 
lr = LogisticRegression(featuresCol = 'features', labelCol='label')
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).addGrid(lr.regParam, [0.1, 1, 10, 20, 50, 100]).addGrid(lr.elasticNetParam, [0, 0.1, 0.2, 0.3, 0.5, 0.7, 1]).build()
evaluator = BinaryClassificationEvaluator()

# cross validation
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, parallelism=2)
cvModel = cv.fit(train_df)

In [45]:
#make predictions
predictions = cvModel.bestModel.transform(test_df)

In [46]:
#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

0.6307488256707979
0.6290733706775817


## Random Forest

In [None]:
# Set parameters for the Random Forest.
rfc = RandomForestClassifier(maxDepth=10, numTrees=500, impurity="gini", labelCol="label", predictionCol="prediction")

# Fit the model to the data.
rfcm = rfc.fit(train_df)

# Given a dataset, predict each point's label, and show the results.
predictions = rfcm.transform(test_df)

22/03/12 05:23:24 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 1397.2 KiB
22/03/12 05:24:13 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2010.3 KiB
22/03/12 05:25:10 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 2.7 MiB
22/03/12 05:26:18 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 3.5 MiB
22/03/12 05:27:33 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.5 MiB
22/03/12 05:29:00 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 5.6 MiB
22/03/12 05:30:35 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 6.9 MiB
                                                                                

In [None]:
#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

22/03/12 05:32:21 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.0 MiB
                                                                                

0.6292901771154602


22/03/12 05:35:08 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 4.0 MiB

0.6292455175913692


                                                                                