In [1]:
#To check whether all the necessary data is mounted, "mnt" stands for the mounted folder
FOLDER = "1_year_data"
PATH = "/mnt/%s/%s" % (MOUNT_NAME,FOLDER)

display(dbutils.fs.ls(PATH))

In [2]:
#By Charlie Chan.C.H
#Mounted data from Amazon S3 
#To try this code, import the following data to S3 and mount
#https://drive.google.com/file/d/12iQ-xZF5vMewT3Arrx_p7NduY3vqd1ye/view?usp=sharing

ACCESS_KEY = "XXXXXXXXX"
SECRET_KEY = "XXXXXXXXXXXXXXXXXXXXXXXXXX"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "cloud4651"
MOUNT_NAME = "cloud4651"

dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, FloatType

In [4]:
#Basic loading and test the validity of the dataset, should test with hashes if time permits

DFTEMP = spark.read.csv("/mnt/cloud4651/1_year_data/399175305_T_ONTIME_2015_1.csv", header=True, inferSchema=True)

assert (DFTEMP.count()==469968), "There is a problem with your dataset, counts don't match"

In [5]:
#import all csv into a single data-frame (Takes about 10 minutes, do not run frequently)

DF=0
for files in dbutils.fs.ls(PATH):
  if (DF==0):
    DF = spark.read.csv("%s/%s" % (PATH,files.name), header=True, inferSchema=True)
  else:
    DF = DF.unionAll(spark.read.csv("%s/%s" % (PATH,files.name),header=True, inferSchema=True))

print DF.count()


In [6]:
#print the structure of the dataframe

display(DF.limit(10))

In [7]:
print DF.printSchema()

In [8]:
#Create smaller dataframes for better testing, one for a random sample of roughly a month's data 0.083 ~= 1/12, the other using data from January 2017, note that for sample the numbers might not add up to the fraction exactly
DF2015 = DF.filter(DF.YEAR == 2015)
DF2016 = DF.filter(DF.YEAR == 2016)
smallDF2015 = DF2015.sample(False, 0.083)
smallDF2016 = DF2016.sample(False, 0.083)
smallDF2016.cache()
smallDF2015.cache()

print smallDF2015.count()
print smallDF2016.count()

In [9]:
#Convert to RDD similar to the ones used in apache log and cache them, do not use fullRDD outside of production
fullRDD2015 = DF2015.rdd
fullRDD2016 = DF2016.rdd
smallRDD2015 = smallDF2015.rdd.cache()
smallRDD2016 = smallDF2016.rdd.cache()

In [10]:
#Define simple functions
def reduceAdd (a,b):
  return a+b

#For a certain key in the RDD, compute the total number of entries  
def numberOf(key,RDD):
  return RDD.map(lambda a: (a[key],1)).reduceByKey(reduceAdd).sortBy(lambda x: -x[1])

#For a certain key in the RDD, compute the portion of it's entries compared to all 
def shareOf(key,RDD):
  fieldRDD = numberOf(key,RDD)
  count = fieldRDD.map(lambda a:a[1]).reduce(reduceAdd)
  return fieldRDD.map(lambda a: (a[0],float(a[1])/count)).sortBy(lambda x: -x[1])

In [11]:
#Toggle for dataset used
FULL = 1

if (FULL):
  smallRDD2015=fullRDD2015
  smallRDD2016=fullRDD2016

In [12]:
import matplotlib.pyplot as plt

In [13]:
#Library for plotting, type "plotly==2.2.2" in "Worksapce/Create (The little arrow on top) /Library/Upload Python Egg... /PyPi Name"

import plotly as py
import plotly.graph_objs as go

py.offline.init_notebook_mode(connected=True)

In [14]:
# average delay time verus day, including the weighting of no delay
# data calulation 
datesAndDelayCount = smallRDD2015.filter(lambda a:a["DEP_DELAY_NEW"] != None).map(lambda a:(a["FL_DATE"], 1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesAndDelayTotal = smallRDD2015.map(lambda a:(a["FL_DATE"],a["DEP_DELAY_NEW"])).filter(lambda (a,b): b != None).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesAndDelay = datesAndDelayTotal.join(datesAndDelayCount).mapValues(lambda x: x[0] / x[1]).sortBy(lambda (a,b):a) 

datesAndDelayCount2 = smallRDD2016.filter(lambda a:a["DEP_DELAY_NEW"] != None).map(lambda a:(a["FL_DATE"], 1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesAndDelayTotal2 = smallRDD2016.map(lambda a:(a["FL_DATE"],a["DEP_DELAY_NEW"])).filter(lambda (a,b): b != None).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesAndDelay2 = datesAndDelayTotal2.join(datesAndDelayCount2).mapValues(lambda x: x[0] / x[1]).sortBy(lambda (a,b):a) 

In [15]:
# Convert Date into same year to compare
from datetime import datetime, time
test = datesAndDelay.map(lambda (x,y):(x.strftime('2016%m%d'),y))
test2 = test.map(lambda (x,y): datetime.strptime(x, '%Y%m%d')).collect()
test3 = datesAndDelay2.map(lambda (x,y):(x.strftime('2016%m%d'),y))
test4 = test3.map(lambda (x,y): datetime.strptime(x, '%Y%m%d')).collect()

In [16]:
# average delay time verus day, including the weighting of no delay
# plotting

trace1 = go.Scatter(
    x=test2,
    y=datesAndDelay.map(lambda (x,y):y).collect(),
    name = '2015',
    line = dict(
        color = ('rgb(22, 96, 252)')
    )
)
trace2 = go.Scatter(
    x=test4,
    y=datesAndDelay2.map(lambda (x,y):y).collect(),
    name = '2016',
    line = dict(
        color = ('rgb(22, 252, 252)')
    )
)
data = [trace1, trace2]
layout = dict(title = 'Average delay time verus day of the year 2015',
              xaxis = dict(title = 'Date'),
              yaxis = dict(title = 'Average Depature Delay Minutes'),
              )
fig = dict(data=data, layout=layout)
displayHTML(py.offline.plot(fig,output_type="div"))

In [17]:
# delay more than 15 minutes verus day
# data calulation
delayFlights = smallRDD2015.filter(lambda a: a["DEP_DEL15"] != 0).map(lambda a: (a["FL_DATE"],1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a) 
delayFlights2 = smallRDD2016.filter(lambda a: a["DEP_DEL15"] != 0).map(lambda a: (a["FL_DATE"],1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a) 

In [18]:
# Convert Date into same year to compare
from datetime import datetime, time
test5 = delayFlights.map(lambda (x,y):(x.strftime('2016%m%d'),y))
test6 = test5.map(lambda (x,y): datetime.strptime(x, '%Y%m%d')).collect()
test7 = delayFlights2.map(lambda (x,y):(x.strftime('2016%m%d'),y))
test8 = test7.map(lambda (x,y): datetime.strptime(x, '%Y%m%d')).collect()

In [19]:
# delay more than 15 minutes verus day
# plotting
trace1 = go.Scatter(
    x=test6,
    y=delayFlightsNorm.map(lambda (x,y):y).collect(),
    name = '2015',
    line = dict(
        color = ('rgb(22, 96, 252)')
    )
)
trace2 = go.Scatter(
    x=test8,
    y=delayFlightsNorm2.map(lambda (x,y):y).collect(),
    name = '2016',
    line = dict(
        color = ('rgb(22, 252, 252)')
    )
)
data = [trace1, trace2]
layout = dict(title = 'Number of delay verus day of the year',
              xaxis = dict(title = 'Date'),
              yaxis = dict(title = 'Number of delay longer than 15 minutes'),
              )
fig = dict(data=data, layout=layout)
displayHTML(py.offline.plot(fig,output_type="div"))

In [20]:
# Compare different type of delay verus day
# data calulation
# weather
datesWeatherDelayCount = smallRDD2015.filter(lambda a:a["WEATHER_DELAY"] != None).map(lambda a:(a["FL_DATE"], 1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesWeatherDelayTotal = smallRDD2015.map(lambda a:(a["FL_DATE"],a["WEATHER_DELAY"])).filter(lambda (a,b): b != None).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesWeatherDelay = datesWeatherDelayTotal.join(datesWeatherDelayCount).mapValues(lambda x: x[0] / x[1]).sortBy(lambda (a,b):a) 
# LateAircraft
datesLateAircraftDelayCount = smallRDD2015.filter(lambda a:a["LATE_AIRCRAFT_DELAY"] != None).map(lambda a:(a["FL_DATE"], 1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesLateAircraftDelayTotal = smallRDD2015.map(lambda a:(a["FL_DATE"],a["LATE_AIRCRAFT_DELAY"])).filter(lambda (a,b): b != None).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesLateAircraftDelay = datesLateAircraftDelayTotal.join(datesLateAircraftDelayCount).mapValues(lambda x: x[0] / x[1]).sortBy(lambda (a,b):a) 
# NAS
datesNASDelayCount = smallRDD2015.filter(lambda a:a["NAS_DELAY"] != None).map(lambda a:(a["FL_DATE"], 1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesNASDelayTotal = smallRDD2015.map(lambda a:(a["FL_DATE"],a["NAS_DELAY"])).filter(lambda (a,b): b != None).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesNASDelay = datesNASDelayTotal.join(datesNASDelayCount).mapValues(lambda x: x[0] / x[1]).sortBy(lambda (a,b):a) 
# Security
datesSecurityDelayCount = smallRDD2015.filter(lambda a:a["SECURITY_DELAY"] != None).map(lambda a:(a["FL_DATE"], 1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesSecurityDelayTotal = smallRDD2015.map(lambda a:(a["FL_DATE"],a["SECURITY_DELAY"])).filter(lambda (a,b): b != None).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesSecurityDelay = datesSecurityDelayTotal.join(datesSecurityDelayCount).mapValues(lambda x: x[0] / x[1]).sortBy(lambda (a,b):a) 
# Carrier
datesCarrierDelayCount = smallRDD2015.filter(lambda a:a["CARRIER_DELAY"] != None).map(lambda a:(a["FL_DATE"], 1)).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesCarrierDelayTotal = smallRDD2015.map(lambda a:(a["FL_DATE"],a["CARRIER_DELAY"])).filter(lambda (a,b): b != None).reduceByKey(reduceAdd).sortBy(lambda (a,b):a)
datesCarrierDelay = datesCarrierDelayTotal.join(datesCarrierDelayCount).mapValues(lambda x: x[0] / x[1]).sortBy(lambda (a,b):a) 


In [21]:
# average delay time verus day, including the weighting of no delay
# plotting

# Create and style traces
trace1 = go.Scatter(
    x=datesAndDelay.map(lambda (x,y):x).collect(),
    y=datesAndDelay.map(lambda (x,y):y).collect(),
    name = 'Overall Delay Performance',
    line = dict(
        color = ('rgb(22, 96, 252)'),
        width = 4
    )
)
trace2 = go.Scatter(
    x=datesCarrierDelay.map(lambda (x,y):x).collect(),
    y=datesCarrierDelay.map(lambda (x,y):y).collect(),
    name = 'Carrier Delay',
    line = dict(
        color = ('rgb(22, 252, 252)')
    )
)
trace3 = go.Scatter(
    x=datesLateAircraftDelay.map(lambda (x,y):x).collect(),
    y=datesLateAircraftDelay.map(lambda (x,y):y).collect(),
    name = 'Late Aircraft Delay',
    line = dict(
        color = ('rgb(252, 96, 252)')
    )
)
trace4 = go.Scatter(
    x=datesNASDelay.map(lambda (x,y):x).collect(),
    y=datesNASDelay.map(lambda (x,y):y).collect(),
    name = 'NAS Delay',
    line = dict(
        color = ('rgb(22, 96, 100)')
    )
)
trace5 = go.Scatter(
    x=datesSecurityDelay.map(lambda (x,y):x).collect(),
    y=datesSecurityDelay.map(lambda (x,y):y).collect(),
    name = 'Security Delay',
    line = dict(
        color = ('rgb(22, 96, 22)')
    )
)
trace6 = go.Scatter(
    x=datesWeatherDelay.map(lambda (x,y):x).collect(),
    y=datesWeatherDelay.map(lambda (x,y):y).collect(),
    name = 'Weather Delay',
    line = dict(
        color = ('rgb(205, 12, 24)')
    )
)
layout = dict(title = 'Different type of delay time verus day of the year',
              xaxis = dict(title = 'Date'),
              yaxis = dict(title = 'Average delay time of different reasons'),
              )
data = [trace2, trace3, trace4, trace5, trace6, trace1]
fig = dict(data=data, layout=layout)
displayHTML(py.offline.plot(fig,output_type="div"))