## Initializing SQL Dataframe

In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col

sc = SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)

In [2]:
flight_df = sqlCtx.read.parquet("data/full_df.parquet")

## Logistic Regression

In [3]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors

In [4]:
flight_df.show(5)

+-----+---+-----------+-------+--------------+-------------------+--------------+--------+-----------------+---------------+-----+
|MONTH|DAY|DAY_OF_WEEK|AIRLINE|ORIGIN_AIRPORT|SCHEDULED_DEPARTURE|SCHEDULED_TIME|DISTANCE|SCHEDULED_ARRIVAL|DEPARTURE_DELAY|DELAY|
+-----+---+-----------+-------+--------------+-------------------+--------------+--------+-----------------+---------------+-----+
|    3| 22|          7|     13|            95|               1405|         190.0|    1216|             1715|          -23.0|    0|
|   11| 29|          7|      1|             5|               1521|          64.0|     192|             1625|           12.0|    1|
|    7| 22|          3|      9|           223|               1245|          85.0|     331|             1410|           -5.0|    0|
|   11| 17|          2|      2|            15|                815|         207.0|    1547|             1242|            1.0|    1|
|   11| 30|          1|      2|           163|                635|         183.0|  

In [5]:
assembler = VectorAssembler(inputCols=['MONTH', 'DAY', 'DAY_OF_WEEK','AIRLINE',
                                       'SCHEDULED_DEPARTURE', 'ORIGIN_AIRPORT', 'SCHEDULED_TIME', 'DISTANCE', 
                                       'SCHEDULED_ARRIVAL'], outputCol="features")

In [6]:
transformed = assembler.transform(flight_df)

In [7]:
transformed.select(['DELAY', 'features']).show(5)

+-----+--------------------+
|DELAY|            features|
+-----+--------------------+
|    0|[3.0,22.0,7.0,13....|
|    1|[11.0,29.0,7.0,1....|
|    0|[7.0,22.0,3.0,9.0...|
|    1|[11.0,17.0,2.0,2....|
|    1|[11.0,30.0,1.0,2....|
+-----+--------------------+
only showing top 5 rows



In [8]:
dataRDD = transformed.select(['DELAY','features']).rdd.map(tuple)

In [9]:
lp = dataRDD.map(lambda row : (0 if row[0] == 0 else 1, Vectors.dense(row[1])))    \
            .map(lambda row : LabeledPoint(row[0], row[1]))

In [10]:
lp.take(11)

[LabeledPoint(0.0, [3.0,22.0,7.0,13.0,1405.0,95.0,190.0,1216.0,1715.0]),
 LabeledPoint(1.0, [11.0,29.0,7.0,1.0,1521.0,5.0,64.0,192.0,1625.0]),
 LabeledPoint(0.0, [7.0,22.0,3.0,9.0,1245.0,223.0,85.0,331.0,1410.0]),
 LabeledPoint(1.0, [11.0,17.0,2.0,2.0,815.0,15.0,207.0,1547.0,1242.0]),
 LabeledPoint(1.0, [11.0,30.0,1.0,2.0,635.0,163.0,183.0,952.0,938.0]),
 LabeledPoint(0.0, [6.0,20.0,6.0,3.0,1945.0,5.0,95.0,447.0,2120.0]),
 LabeledPoint(1.0, [2.0,23.0,1.0,10.0,2000.0,21.0,73.0,184.0,2113.0]),
 LabeledPoint(1.0, [2.0,27.0,5.0,7.0,623.0,133.0,154.0,723.0,757.0]),
 LabeledPoint(1.0, [11.0,2.0,1.0,4.0,1055.0,15.0,119.0,666.0,1254.0]),
 LabeledPoint(1.0, [4.0,10.0,5.0,9.0,2145.0,15.0,92.0,448.0,2217.0]),
 LabeledPoint(0.0, [2.0,13.0,5.0,3.0,1205.0,142.0,75.0,220.0,1320.0])]

In [11]:
split = lp.randomSplit([0.8, 0.2], 314)
training = split[0]
test = split[1]

In [12]:
LR_model = LogisticRegressionWithLBFGS.train(training)

In [13]:
LR_LAP = test.map(lambda lp: (float(LR_model.predict(lp.features)), lp.label))

In [14]:
LR_acc = 1.0 * LR_LAP.filter(lambda x:x[0] == x[1]).count()/test.count()
print(LR_acc)

0.584769860654


## Random Forest

In [15]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier

In [16]:
RF_model = RandomForest.trainClassifier(training, numClasses = 2,
                                       categoricalFeaturesInfo = {}, 
                                       numTrees = 5, featureSubsetStrategy = "auto", 
                                       impurity = 'gini', maxDepth = 4, maxBins = 32)

In [17]:
RF_pred = RF_model.predict(test.map(lambda x: x.features))

In [18]:
RF_LAP = test.map(lambda lp: lp.label).zip(RF_pred)

In [19]:
RF_testErr = RF_LAP.filter(lambda x: x[0] == x[1]).count() / float(test.count())
print(RF_testErr)

0.600604108409


## Decision Tree

In [20]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

In [21]:
DT_model = DecisionTree.trainClassifier(training, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5, maxBins=32)

In [22]:
DT_pred = DT_model.predict(test.map(lambda x: x.features))

In [23]:
DT_LAP = test.map(lambda lp: lp.label).zip(DT_pred)

In [24]:
DT_LAP = test.map(lambda lp: lp.label).zip(DT_pred)
DT_testErr = DT_LAP.filter(lambda x: x[0] == x[1]).count() / float(test.count())
print(DT_testErr)

0.605099524501


## SVM

In [25]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

In [26]:
SVM_model = SVMWithSGD.train(training, iterations = 1000)

In [27]:
SVM_LAP = test.map(lambda x: (float(SVM_model.predict(x.features)), x.label))

In [28]:
SVM_testErr = SVM_LAP.filter(lambda x: x[0] == x[1]).count()/float(test.count())
print(SVM_testErr)

0.501716753389


## K-Fold Cross Validation

### Logistic Regression

In [29]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import DoubleType

In [30]:
lr_k = LogisticRegression()

In [31]:
grid_k = ParamGridBuilder().addGrid(lr_k.maxIter, [0, 1]).build()

In [32]:
evaluator_k = BinaryClassificationEvaluator()

In [33]:
cv_lr = CrossValidator(estimator = lr_k, estimatorParamMaps = grid_k, evaluator = evaluator_k)

In [34]:
flight_cv = transformed.select(['DELAY', 'features'])
flight_cv = flight_cv.withColumnRenamed('DELAY', 'label')
flight_cv = flight_cv.select(flight_cv.label.cast(DoubleType()).alias('label'), 
                                 'features')
flight_cv.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[3.0,22.0,7.0,13....|
|  1.0|[11.0,29.0,7.0,1....|
|  0.0|[7.0,22.0,3.0,9.0...|
|  1.0|[11.0,17.0,2.0,2....|
|  1.0|[11.0,30.0,1.0,2....|
|  0.0|[6.0,20.0,6.0,3.0...|
|  1.0|[2.0,23.0,1.0,10....|
|  1.0|[2.0,27.0,5.0,7.0...|
|  1.0|[11.0,2.0,1.0,4.0...|
|  1.0|[4.0,10.0,5.0,9.0...|
|  0.0|[2.0,13.0,5.0,3.0...|
|  1.0|[7.0,25.0,6.0,4.0...|
|  0.0|[4.0,22.0,3.0,2.0...|
|  0.0|[6.0,19.0,5.0,2.0...|
|  1.0|[3.0,26.0,4.0,9.0...|
|  1.0|[4.0,1.0,3.0,7.0,...|
|  0.0|[1.0,17.0,6.0,9.0...|
|  1.0|[7.0,11.0,6.0,1.0...|
|  0.0|[1.0,11.0,7.0,3.0...|
|  0.0|[8.0,23.0,7.0,4.0...|
+-----+--------------------+
only showing top 20 rows



In [35]:
train_cv, test_cv = flight_cv.randomSplit([0.8, 0.2], 314)

In [36]:
cvmodel_lr = cv_lr.fit(train_cv)



In [37]:
evaluator_k.evaluate(cvmodel_lr.transform(train_cv))

0.6222040497876321

### Random Forest

In [91]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorIndexer, IndexToString

In [88]:
labelIndexer = StringIndexer(inputCol = "label", 
                             outputCol = "indexedLabel").fit(flight_cv)

In [87]:
featureIndexer = VectorIndexer(inputCol="features", 
                              outputCol="indexedFeatures", 
                              maxCategories=4).fit(flight_cv)

In [89]:
rf_k = RandomForestClassifier(labelCol = "indexedLabel", 
                              featuresCol = "indexedFeatures")

In [93]:
labelConverter = IndexToString(inputCol="prediction",
                               outputCol="predictedLabel", 
                               labels=labelIndexer.labels)

In [112]:
evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel",
                                                 predictionCol="prediction")
numFolds = 5

In [113]:
pipeline_rf = Pipeline(stages=[labelIndexer, 
                               featureIndexer,
                               rf_k,
                               labelConverter])

In [114]:
cv_rf = CrossValidator(estimator = pipeline_rf, 
                       estimatorParamMaps = grid_k, 
                       evaluator = evaluator_rf, 
                       numFolds = numFolds)

In [115]:
cvmodel_rf = cv_rf.fit(train_cv)

In [116]:
predictions_rf = cvmodel_rf.transform(test_cv)

In [117]:
predictions_rf.select("predictedLabel", "label", "features").show(5)

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|           0.0|  0.0|[1.0,1.0,4.0,1.0,...|
|           0.0|  0.0|[1.0,1.0,4.0,1.0,...|
|           1.0|  0.0|[1.0,1.0,4.0,2.0,...|
|           1.0|  0.0|[1.0,1.0,4.0,3.0,...|
|           1.0|  0.0|[1.0,1.0,4.0,3.0,...|
+--------------+-----+--------------------+
only showing top 5 rows



In [118]:
evaluator_rf.evaluate(predictions_rf)

0.6071491674276406

In [98]:
#evaluator_rf.evaluate(cvmodel_rf.transform(train_cv))

0.3861788226173339

### Decision Tree