We are going to do the analysis and predicting the score if Airlines delay on ord airport in year 2008. we have downloaded the data airlines and weather data from external sources

# Importing the library/module required from spark

In [31]:
# For SQL-type queries (Spark)
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import udf

# For regression and other possible ML tools (Spark)
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.util import MLUtils

# Typycal Python tools
import sys
import pandas as pd
import os.path
import numpy

1. Loading the data into test and train RDDs after doing the preprocessing, In preprocessing we have merge the Airlines and weather data in order to extact the feature from the raw dataset.

#Features

1. delay : this the delay of the flight while taking off from the airport
2. month : Month of the flight delay, Values should be between  1-12
3. day   : day of the flight delay, values should be between 1-31
4. dow   : day of the week, value should be between 0-7
5. tow   : time of departure, what hour of the day flight take off
6. distance : distance in mile from source to destination Airport
7. temp_min : Minimum temperature of the flight delay's day
8. temp_max : Maximum temperature of the flight delay's day


# Loading the data into Test and Training RDD

In [3]:
# loading the files into spark RDD
trainRDD = sc.textFile("/user/hdfs/ord_2007/*")
testRDD = sc.textFile("/user/hdfs/ord_2008/*")

In [4]:
print "Train Loaded: %s Test Loaded: %s" % (trainRDD.count(),testRDD.count())

Train Loaded: 7292467 Test Loaded: 6872294


# Converting RDD to DataFrame

In [18]:
def parse(r):
    try:
        x=Row(delay=int(r[0]),\
          month=int(r[1]),\
          day=int(r[2]),\
          dow=int(r[3]),\
          tod=int(r[4]), \
          distance=int(r[5]),\
          temp_min=int(r[6]),\
          temp_max=int(r[7]))  
    except:
        x=None  
    return x


rowRDD = trainRDD.map(lambda x: x.split(',')).map(lambda r: parse(r)).filter(lambda x: x != None)
airlinetrain_df = sqlContext.createDataFrame(rowRDD)

rowRDD = testRDD.map(lambda x: x.split(',')).map(lambda r: parse(r)).filter(lambda x: x != None)
airlinetest_df = sqlContext.createDataFrame(rowRDD)

In [19]:
airlinetrain_df.head()

Row(day=1, delay=27, distance=86, dow=1, month=1, temp_max=61, temp_min=-17, tod=2)

In [20]:
airlinetest_df.head()

Row(day=4, delay=-7, distance=1023, dow=5, month=1, temp_max=22, temp_min=-56, tod=2)

=> Adding a Addintion column with binary format like delay as 1 and no delay as 0. Filter out the delay if its more then 15 mins as delay else will consider as no delay.

In [21]:
# adding the depdelay column if delay is more than 15 mins
def delay(x):
    if x > 15 :
        return 1
    else :
        return 0

depdelay = udf(delay, IntegerType())

airlinetrain_df= airlinetrain_df.withColumn("depdelay", depdelay("delay"))
airlinetest_df= airlinetest_df.withColumn("depdelay", depdelay("delay"))

In [22]:
airlinetrain_df.take(1) # you can see if the delay is more than 15 its giving depdelay as 1

[Row(day=1, delay=27, distance=86, dow=1, month=1, temp_max=61, temp_min=-17, tod=2, depdelay=1)]

In [23]:
airlinetest_df.head()  # you can see if the delay is Less than 15 its giving depdelay as 0

Row(day=4, delay=-7, distance=1023, dow=5, month=1, temp_max=22, temp_min=-56, tod=2, depdelay=0)

# Creating the LabeledPoint for logistic regression 

In [24]:
def parsePoint(line):
    return LabeledPoint(line[8], line[:7])

parseTrainData = airlinetrain_df.rdd.map(parsePoint)
parseTestData = airlinetest_df.rdd.map(parsePoint)

In [25]:
parseTrainData.take(3)

[LabeledPoint(1.0, [1.0,27.0,86.0,1.0,1.0,61.0,-17.0]),
 LabeledPoint(0.0, [1.0,0.0,247.0,1.0,1.0,61.0,-17.0]),
 LabeledPoint(0.0, [1.0,-3.0,920.0,1.0,1.0,61.0,-17.0])]

In [26]:
parseTestData.take(3)

[LabeledPoint(0.0, [4.0,-7.0,1023.0,5.0,1.0,22.0,-56.0]),
 LabeledPoint(0.0, [4.0,6.0,1671.0,5.0,1.0,22.0,-56.0]),
 LabeledPoint(0.0, [4.0,4.0,1013.0,5.0,1.0,22.0,-56.0])]

# Building the logistic Regression Model

In [29]:
# Training the model with Training dataset with iteration as 10
model = LogisticRegressionWithLBFGS.train(parseTrainData,iterations=10)

In [30]:
# now as the model is builed we can Evaluate the model on test dataset
l_and_p = parseTestData.map(lambda p: (p.label, model.predict(p.features)))

# Calculating the error value 
trainErr = l_and_p.filter(lambda x: x[0] != x[1]).count() / float(parseTestData.count())
print("Training Error = " + str(trainErr))

Training Error = 0.0192951582106


# Calculating the Accuracy of the model 

In [35]:
def conf(r):
    if r[0] == r[1] ==1: x= 'TP'
    if r[0] == r[1] ==0: x= 'TN'
    if r[0] == 1 and  r[1] ==0: x= 'FN'
    if r[0] == 0 and  r[1] ==1: x= 'FP'
    return (x)
acc1 = l_and_p.map(lambda (v, p): ((v, p),1)).reduceByKey(lambda a, b: a + b).take(5)
acc = [(conf(x[0]),x[1]) for x in acc1]

TP=TN=FP=FN=0.0
for x in acc: 
    if x[0]=='TP': TP= x[1]
    if x[0]=='TN': TN= x[1]
    if x[0]=='FP': FP= x[1]
    if x[0]=='FN': FN= x[1]
eps = sys.float_info.epsilon
Accuracy = (TP+TN) / (TP + TN+ FP+FN+eps) 
print "Model Accuracy for ORD: %1.2f %%" % (Accuracy*100)

Model Accuracy for ORD: 98.07 %


# Thanks 