Start up Spark

In [2]:
import os
import sys

if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/opt/spark-1.5/spark-1.5.0-SNAPSHOT-bin-hadoop2.6'
spark_home = os.environ.get('SPARK_HOME', None)

if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0-SNAPSHOT
      /_/

Using Python version 2.7.10 (default, Jul 14 2015 19:46:27)
SparkContext available as sc, HiveContext available as sqlContext.


Load flight dataset sample. It is in Parquet and the file it is a copy of an actual RapidMiner Spark Scripting input.

In [68]:
#flights = sqlContext.read.parquet("/opt/data/flight_sample.rm.parquet").sample(False,0.01)
flights = sqlContext.read.parquet("/opt/data/flight_sample.rm.1023.parquet")

flights.toPandas().head()

Unnamed: 0,year,month,dayofmonth,dayofweek,deptime,crsdeptime,arrtime,crsarrtime,uniquecarrier,flightnum,...,taxiin,taxiout,cancelled,cancellationcode,diverted,carrierdelay,weatherdelay,nasdelay,securitydelay,lateaircraftdelay
0,1992,3,24,2,2109,2105,2145,2145,WN,359,...,,,0,,0,,,,,
1,1998,6,25,4,838,830,1053,955,US,163,...,5.0,71.0,0,,0,,,,,
2,1997,3,10,1,1657,1610,1836,1804,AA,729,...,3.0,15.0,0,,0,,,,,
3,1993,3,24,3,1502,1432,1619,1548,CO,624,...,,,0,,0,,,,,
4,2007,8,8,3,1835,1715,2004,1844,YV,2687,...,14.0,12.0,0,,0,80.0,0.0,0.0,0.0,0.0


only keep numerical fields and get rid of records containing null values:

In [69]:
from pyspark.sql.types import StringType
for i in flights.schema.fields:
    if type(i.dataType) == StringType:
        flights = flights.drop(i.name) # + " " + i.name
flights = flights.dropna()

Generate training and test data:

In [70]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

is_late = udf(lambda delay: 1 if delay > 0 else 0, IntegerType())


flights = flights.withColumn("is_late",is_late(flights.arrdelay))
flights = flights.drop("arrdelay")

#flights.write.parquet("/opt/data/flight-sample.nonulls.onlyints.parquet")

training, test = flights.randomSplit([0.8, 0.2],273)
#test = test.drop("is_late")
training.write.parquet("/opt/data/flight-training.nonulls.onlyints.parquet")
test.write.parquet("/opt/data/flight-test.nonulls.onlyints.parquet")


In [62]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline


def rm_main(training, test):

    ignored_columns = ["id"] # These columns will be ignored by the Logistic Regression algorithm
    label_column = "is_late" # Name of the label column
    
    feature_assembler = VectorAssembler(
        inputCols=[x for x in training.columns if x not in ignored_columns],
        outputCol="features")

    training = training.withColumn(label_column, training[label_column].cast("double"))
    reg = LogisticRegression().setParams(
        maxIter = 10, regParam=0.01,
        labelCol=label_column, predictionCol="prediction")

    model = Pipeline(stages=[feature_assembler, reg]).fit(training)
    predicted = model.transform(test)

    # remove extra fields generated by the Feature Assembler
    result = predicted
    for extra_field in ["features","rawPrediction","probability"]:
        result = result.drop(extra_field) 
    
    return result


In [63]:
rm_main(training, test).toPandas()

Unnamed: 0,year,month,dayofmonth,dayofweek,deptime,crsdeptime,arrtime,crsarrtime,actualelapsetime,crselapsetime,...,taxiout,cancelled,diverted,carrierdelay,weatherdelay,nasdelay,securitydelay,lateaircraftdelay,is_late,prediction
0,2004,5,26,3,1325,1330,1649,1639,144,129,...,32,0,0,0,0,0,0,0,1,0
1,2003,7,11,5,1100,1108,1144,1155,36,47,...,5,0,0,0,0,0,0,0,0,0
2,2005,7,11,1,1204,1150,1318,1315,134,145,...,17,0,0,0,0,0,0,0,1,0
3,2005,8,7,7,1956,1950,2246,2255,350,365,...,28,0,0,0,0,0,0,0,0,0
4,2005,6,26,7,925,920,1025,1035,60,75,...,8,0,0,0,0,0,0,0,0,0
5,2003,6,24,2,1655,1701,1749,1755,54,54,...,16,0,0,0,0,0,0,0,0,0
6,2006,8,17,4,1750,1755,1924,1931,94,96,...,11,0,0,0,0,0,0,0,0,0
7,2007,6,30,6,918,910,1135,1120,77,70,...,25,0,0,8,0,7,0,0,1,0
8,2006,4,12,3,807,815,933,955,86,100,...,18,0,0,0,0,0,0,0,0,0
9,2007,2,27,2,1051,1057,1218,1229,87,92,...,18,0,0,0,0,0,0,0,0,0


In [49]:
training.first()

Row(year=2007, month=8, dayofmonth=8, dayofweek=3, deptime=1835, crsdeptime=1715, arrtime=2004, crsarrtime=1844, actualelapsetime=89, crselapsetime=89, airtime=63, arrdelay=80, depdelay=80, distance=430, taxiin=14, taxiout=12, cancelled=0, diverted=0, carrierdelay=80, weatherdelay=0, nasdelay=0, securitydelay=0, lateaircraftdelay=0, is_late=1)

DataFrame[age: bigint, name: string, x: string]