Logistic Regression Example with MLlib and Spark ML
====

Start up Spark
-------------

In [None]:
import os
import sys

spark_home = '/opt/spark'
os.environ['SPARK_HOME'] = spark_home

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip'))

execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Spark MLlib
----------


In [None]:
rdd = sc.textFile("data/logreg.txt")
rdd

In [None]:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint

def parsePoint(line):
    values = [float(s) for s in line.split(' ')]
    return LabeledPoint(values[0], DenseVector(values[1:]))

points = rdd.map(parsePoint)
points.collect()[0:10]

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

model = LogisticRegressionWithSGD.train(points, 100)

Let's predict:

In [None]:
model.predict([0.6,0.6])

In [None]:
model.setThreshold(0.8)
model.predict([0.6,0.6])

In [None]:
model.predict([100,0.6])

Spark ML
-------

Read training and test data. In this case test data is labeled as well (we will generate our label based on the `arrdelay` field) 

In [1]:
training = sqlContext.read.parquet("data/training.parquet")
test = sqlContext.read.parquet("data/test.parquet")

test.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- deptime: integer (nullable = true)
 |-- crsdeptime: integer (nullable = true)
 |-- arrtime: integer (nullable = true)
 |-- crsarrtime: integer (nullable = true)
 |-- actualelapsetime: integer (nullable = true)
 |-- crselapsetime: integer (nullable = true)
 |-- airtime: integer (nullable = true)
 |-- arrdelay: integer (nullable = true)
 |-- depdelay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- taxiin: integer (nullable = true)
 |-- taxiout: integer (nullable = true)
 |-- cancelled: integer (nullable = true)
 |-- diverted: integer (nullable = true)
 |-- carrierdelay: integer (nullable = true)
 |-- weatherdelay: integer (nullable = true)
 |-- nasdelay: integer (nullable = true)
 |-- securitydelay: integer (nullable = true)
 |-- lateaircraftdelay: integer (nullable = true)



In [2]:
test.first()

Row(year=2006, month=2, dayofmonth=21, dayofweek=2, deptime=902, crsdeptime=905, arrtime=1027, crsarrtime=1030, actualelapsetime=205, crselapsetime=205, airtime=190, arrdelay=-3, depdelay=-3, distance=1162, taxiin=7, taxiout=8, cancelled=0, diverted=0, carrierdelay=0, weatherdelay=0, nasdelay=0, securitydelay=0, lateaircraftdelay=0)

In [3]:
training.cache()
test.cache()

DataFrame[year: int, month: int, dayofmonth: int, dayofweek: int, deptime: int, crsdeptime: int, arrtime: int, crsarrtime: int, actualelapsetime: int, crselapsetime: int, airtime: int, arrdelay: int, depdelay: int, distance: int, taxiin: int, taxiout: int, cancelled: int, diverted: int, carrierdelay: int, weatherdelay: int, nasdelay: int, securitydelay: int, lateaircraftdelay: int]

Generate label column for the training data

In [4]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf

is_late = udf(lambda delay: 1.0 if delay > 0 else 0.0, DoubleType())
training = training.withColumn("is_late",is_late(training.arrdelay))


Create and fit Spark ML model

In [5]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Create feature vectors. Ignore arr_delay and it's derivate, is_late
feature_assembler = VectorAssembler(
    inputCols=[x for x in training.columns if x not in ["is_late","arrdelay"]],
    outputCol="features")

reg = LogisticRegression().setParams(
    maxIter = 100,
    labelCol="is_late",
    predictionCol="prediction")

model = Pipeline(stages=[feature_assembler, reg]).fit(training)


Predict whether the aircraft will be late

In [6]:
predicted = model.transform(test)



In [11]:
predicted.take(1)

[Row(year=2006, month=2, dayofmonth=21, dayofweek=2, deptime=902, crsdeptime=905, arrtime=1027, crsarrtime=1030, actualelapsetime=205, crselapsetime=205, airtime=190, arrdelay=-3, depdelay=-3, distance=1162, taxiin=7, taxiout=8, cancelled=0, diverted=0, carrierdelay=0, weatherdelay=0, nasdelay=0, securitydelay=0, lateaircraftdelay=0, features=DenseVector([2006.0, 2.0, 21.0, 2.0, 902.0, 905.0, 1027.0, 1030.0, 205.0, 205.0, 190.0, -3.0, 1162.0, 7.0, 8.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), rawPrediction=DenseVector([0.756, -0.756]), probability=DenseVector([0.6805, 0.3195]), prediction=0.0)]

Check model performance

In [12]:
predicted = predicted.withColumn("is_late",is_late(predicted.arrdelay))
predicted.crosstab("is_late","prediction").show()

+------------------+----+----+
|is_late_prediction| 1.0| 0.0|
+------------------+----+----+
|               1.0|1448|1110|
|               0.0|  62|2805|
+------------------+----+----+

