In [1]:
# Analysis of departure delay
import csv
import os
from csv import reader
from pyspark.mllib.regression import LabeledPoint
from numpy import array
import StringIO
import math
import matplotlib
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt

In [2]:
# Read in files, create an RDD
textFiles = sc.textFile("/mnt/airlines/*/*.csv")

In [3]:
def csvLine(line):
  """ Parse a line using csv.reader and replace unicode character not handled by ascii, 
  Return a list
  """
  line = line.replace(u"\ufffd", "?")
  row = reader([line])
  return row.next()

In [4]:
def isFloat(string):
  """ Check if a particular string is float or not
  Return True if float, False otherwise
  """
  try: 
    float(string)
    return True
  except ValueError:
    return False

In [5]:
def checkFeats(x, headDict, impInd):
  """Checks if numeric features are in the required format 
  Returns a boolean value indicating the same
  """
  exc = impInd.index(headDict["DEP_TIME_BLK"])
  for i, item in enumerate(x):
    if i!=exc and not isFloat(item): 
      return False
    else: 
      if item=='':
        return False
  return True
  

In [6]:
def createLabeledPoint(featList):
  """ Create labeled point from the RDD, departure delay being the label
  """
  featList[0] = year.index(featList[0])
  featList[1] = quart.index(featList[1])
  featList[2] = mon.index(featList[2])
  featList[3] = day.index(featList[3])
  featList[4] = airline.index(featList[4])
  featList[5] = origAirport.index(featList[5])
  featList[6] = origCity.index(featList[6])
  featList[7] = origState.index(featList[7])
  featList[8] = destAirport.index(featList[8])
  featList[9] = destCity.index(featList[9])
  featList[10] = destState.index(featList[10])
  featList[11]= depBlk.index(featList[11])
  featList[12]= float(featList[12])
  featList[13]= float(featList[13])
  featList[14]= float(featList[14])
  return LabeledPoint(featList[14], array(featList[:14]))

In [7]:
# Headers
header = textFiles.first()
headList = csvLine(header)
headList = [head.strip('"') for head in headList]
headDict = {element:index for index,element in enumerate(headList)}

In [8]:
# Split the rows on using the method csvLine
textFiles = (textFiles.filter(lambda x: x!=header)
                      .map(csvLine)
                      .map(lambda x: [el.strip('"').strip() for el in x]))

In [9]:
# Change in Number of flights over the years
numFlightsYear = textFiles.map(lambda x: x[0]).countByValue()
x=range(1988,2015)
y=[numFlightsYear[str(i)] for i in x]
fig, ax = plt.subplots()
plt.plot(x,y, 'bo')
plt.ylabel("Number of flights")
plt.xlabel("Year")
display(fig)


In [10]:
# Retain relevant columns
impInd = [0,1,2,4,7,11,13,17,20,22,26,35,36,54,31]
textFiles = textFiles.map(lambda x: [x[i] for i in impInd]).filter(lambda x: checkFeats(x, headDict, impInd))
textFiles.cache()

In [11]:
# Retain rows with positive departure delay
airlinesDelay = textFiles.filter(lambda x: float(x[impInd.index(headDict["DEP_DELAY"])])>0)

In [12]:
airlinesDelay.first()

In [13]:
year = sorted(airlinesDelay.map(lambda x:x[0]).distinct().collect())
quart = sorted(airlinesDelay.map(lambda x:x[1]).distinct().collect())
mon = sorted(airlinesDelay.map(lambda x:x[2]).distinct().collect())
day = sorted(airlinesDelay.map(lambda x:x[3]).distinct().collect())
airline = sorted(airlinesDelay.map(lambda x:x[4]).distinct().collect())
origAirport = sorted(airlinesDelay.map(lambda x:x[5]).distinct().collect())
origCity = sorted(airlinesDelay.map(lambda x:x[6]).distinct().collect())
origState = sorted(airlinesDelay.map(lambda x:x[7]).distinct().collect())
destAirport = sorted(airlinesDelay.map(lambda x:x[8]).distinct().collect())
destCity = sorted(airlinesDelay.map(lambda x:x[9]).distinct().collect())
destState = sorted(airlinesDelay.map(lambda x:x[10]).distinct().collect())
depBlk = sorted(airlinesDelay.map(lambda x:x[11]).distinct().collect())


In [14]:
airlinesDelay = airlinesDelay.map(createLabeledPoint)
airlinesDelay.cache()

In [15]:
# Explore the data, Draw histogram of departure delay
delay = airlinesDelay.map(lambda x: x.label)
x=delay.collect()
x=[i for i in x if i<600]
fig, ax = plt.subplots()
plt.title("Histogram of departure delays")
plt.hist(x, 100, normed=1, facecolor='green', alpha=0.75)
display(fig)

In [16]:
# Plot mean departure delays for different airlines
airlineMean = (airlinesDelay.map(lambda x: [x.features[impInd.index(headDict["AIRLINE_ID"])], x.label])
                            .groupByKey().mapValues(lambda x: sum(x)/len(x))
                            .collect())

In [17]:
x = [i[0] for i in airlineMean]
y = [i[1] for i in airlineMean]
fig, ax = plt.subplots()
plt.plot(x, y, 'bo')
plt.title("Mean Departure Delays for different Airlines")
display(fig)

In [18]:
# Find airline codes for which number of flights is more than one million
impAirlines = airlinesDelay.map(lambda x: (x.features[4],1)).countByKey()
airlineCodes = sorted(impAirlines, key = impAirlines.get)
impAirlineCodes = [i for (i,j) in impAirlines.items() if j>1000000]
for i in impAirlineCodes:
  print airline[int(i)]


In [19]:
var = [2,3,4,11,13]
for i in var:
  print headList[impInd[i]]

In [20]:
# Create dataset to model departure delay as function of month, day, departure block, airline and distance
var = [2,3,4,11,13]
dataset = airlinesDelay.map(lambda x: LabeledPoint(x.label, [x.features[i] for i in var]))
dataset = dataset.filter(lambda x: x.features[2] in impAirlineCodes)
dataset.cache()
print dataset.first()


In [21]:
# Reindex airline codes
dataset = (dataset.map(lambda x: [x.label] + list(x.features))
           .map(lambda x: LabeledPoint(x[0], array(x[1:3]+[impAirlineCodes.index(x[3])] + x[4:])))
           )
dataset.first()

In [22]:
dataset.count()

In [23]:
# Decision Tree Modeling of the departure delay
from pyspark.mllib.tree import DecisionTree
train, test, val = dataset.randomSplit([0.6, 0.2, 0.2], seed = 88)

In [24]:
catInfo = {0:len(mon), 1:len(day), 2:len(impAirlineCodes), 3:len(depBlk)}

In [25]:
impInd

In [26]:
prevError = 100000000
bestDepth = 0
for depth in range(2D,5):
  tree_model = DecisionTree.trainRegressor(data=train, categoricalFeaturesInfo=catInfo, impurity = "variance", maxDepth = depth, maxBins = 1000, minInstancesPerNode = 10, minInfoGain = 0.0)
  pred = tree_model.predict(test.map(lambda x: x.features))
  predlabel = test.map(lambda x: x.label).zip(pred).collect()
  rmse = math.sqrt(sum([(x[1]-x[0])**2 for x in predlabel])/len(predlabel))
  if rmse<prevError:
    bestModel = tree_model
    bestDepth = depth
  
  print "{0} : {1}".format(depth, rmse)

In [27]:
print tree_model.toDebugString()