# Predicting Airline Delay with Weather Data Using PySpark

### Checking sparkcontext is working

In [1]:
sc

<pyspark.context.SparkContext at 0x100675550>

## Importing libraries

In [2]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

### Building SparkSession

In [3]:
sqlContext = SparkSession.builder.master("local[*]").appName("appName").config("spark.sql.warehouse.dir", "./spark-warehouse").getOrCreate()

# Reads flight 2007 data

In [None]:
flight_2007 = sqlContext.read.load('/Users/jayantitrivedi/Desktop/Resume/LOL/airline/2007.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

### Viewing Schema for 2007 Airline data

In [5]:
flight_2007.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

In [6]:
flight_2007.cache()

DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: int, TaxiIn: int, TaxiOut: int, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: int, WeatherDelay: int, NASDelay: int, SecurityDelay: int, LateAircraftDelay: int]

# Reads flight 2008 data

In [None]:
flight_2008 = sqlContext.read.load('/Users/jayantitrivedi/Desktop/Resume/LOL/airline/2008.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
flight_2008.cache()

### Viewing Schema for 2008 Airline data

In [7]:
flight_2008.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

# Checking Count of rows in Datasets

### Checks type of data object created

In [8]:
type(flight_2007)

pyspark.sql.dataframe.DataFrame

### Checks total flights in 2007 data

In [9]:
flight_2007.count()

7453215

### Checks the number of rows which have null in DepDelay column

In [10]:
flight_2007.where(flight_2007.DepDelay.isNull()).count()

0

# Pre-processing with Hadoop and Spark

### To get clean data and remove rows which have NAs in DepDelay

In [11]:
filter_2007 = flight_2007.dropna(subset=['DepDelay'])
filter_2008 = flight_2008.dropna(subset=['DepDelay'])

### Filters out flights that originate in ORD

In [12]:
filter_2007 = filter_2007[filter_2007['Origin']=='ORD']
filter_2008 = filter_2008[filter_2008['Origin']=='ORD']

### Number of flights originating from ORD for Year 2007

In [96]:
filter_2007.count()

375784

### Number of flights originating from ORD for Year 2008

In [13]:
filter_2008.count()

350380

## Creates a flag for delayed flights

In [14]:
filter_2007 = filter_2007.withColumn("DelayedFlag", when(filter_2007['DepDelay'] > 15, 1).otherwise(0))
filter_2008 = filter_2008.withColumn("DelayedFlag", when(filter_2008['DepDelay'] > 15, 1).otherwise(0))

### Verifies that the newly created column is correct for Year 2007

In [98]:
filter_2007.head(2)

[Row(Year=2007, Month=1, DayofMonth=25, DayOfWeek=4, DepTime=u'1052', CRSDepTime=1100, ArrTime=u'1359', CRSArrTime=1414, UniqueCarrier=u'XE', FlightNum=1202, TailNum=u'N12167', ActualElapsedTime=u'127', CRSElapsedTime=u'134', AirTime=u'105', ArrDelay=u'-15', DepDelay=u'-8', Origin=u'ORD', Dest=u'EWR', Distance=719, TaxiIn=5, TaxiOut=17, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, DelayedFlag=0, HourOfDay=11),
 Row(Year=2007, Month=1, DayofMonth=28, DayOfWeek=7, DepTime=u'1541', CRSDepTime=1500, ArrTime=u'1811', CRSArrTime=1750, UniqueCarrier=u'XE', FlightNum=2836, TailNum=u'N12163', ActualElapsedTime=u'150', CRSElapsedTime=u'170', AirTime=u'128', ArrDelay=u'21', DepDelay=u'41', Origin=u'ORD', Dest=u'IAH', Distance=925, TaxiIn=7, TaxiOut=15, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=5, SecurityDelay=0, LateAircraftDelay=16, DelayedFlag=1, HourOfDay=15)]

### Verifies that the newly created column is correct for Year 2008

In [99]:
filter_2008.head(2)

[Row(Year=2008, Month=1, DayofMonth=8, DayOfWeek=2, DepTime=u'1711', CRSDepTime=1600, ArrTime=u'2031', CRSArrTime=1945, UniqueCarrier=u'XE', FlightNum=1226, TailNum=u'N14953', ActualElapsedTime=u'140', CRSElapsedTime=u'165', AirTime=u'106', ArrDelay=u'46', DepDelay=u'71', Origin=u'ORD', Dest=u'EWR', Distance=719, TaxiIn=u'8', TaxiOut=u'26', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=u'0', WeatherDelay=u'0', NASDelay=u'2', SecurityDelay=u'0', LateAircraftDelay=u'44', DelayedFlag=1, HourOfDay=16),
 Row(Year=2008, Month=1, DayofMonth=25, DayOfWeek=5, DepTime=u'1813', CRSDepTime=1742, ArrTime=u'2040', CRSArrTime=2004, UniqueCarrier=u'XE', FlightNum=2229, TailNum=u'N15509', ActualElapsedTime=u'87', CRSElapsedTime=u'82', AirTime=u'49', ArrDelay=u'36', DepDelay=u'31', Origin=u'ORD', Dest=u'CLE', Distance=316, TaxiIn=u'6', TaxiOut=u'32', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=u'0', WeatherDelay=u'0', NASDelay=u'5', SecurityDelay=u'0', LateAircraftDelay=u

## Computes hour from departure time and casts it to integer

In [17]:
filter_2007 = filter_2007.withColumn('HourOfDay', concat(filter_2007.CRSDepTime.substr(1, 2)).cast("integer"))
filter_2008 = filter_2008.withColumn('HourOfDay', concat(filter_2008.CRSDepTime.substr(1, 2)).cast("integer"))

### Verifies the newly created column values for year 2007

In [100]:
filter_2007.head(2)

[Row(Year=2007, Month=1, DayofMonth=25, DayOfWeek=4, DepTime=u'1052', CRSDepTime=1100, ArrTime=u'1359', CRSArrTime=1414, UniqueCarrier=u'XE', FlightNum=1202, TailNum=u'N12167', ActualElapsedTime=u'127', CRSElapsedTime=u'134', AirTime=u'105', ArrDelay=u'-15', DepDelay=u'-8', Origin=u'ORD', Dest=u'EWR', Distance=719, TaxiIn=5, TaxiOut=17, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, DelayedFlag=0, HourOfDay=11),
 Row(Year=2007, Month=1, DayofMonth=28, DayOfWeek=7, DepTime=u'1541', CRSDepTime=1500, ArrTime=u'1811', CRSArrTime=1750, UniqueCarrier=u'XE', FlightNum=2836, TailNum=u'N12163', ActualElapsedTime=u'150', CRSElapsedTime=u'170', AirTime=u'128', ArrDelay=u'21', DepDelay=u'41', Origin=u'ORD', Dest=u'IAH', Distance=925, TaxiIn=7, TaxiOut=15, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=5, SecurityDelay=0, LateAircraftDelay=16, DelayedFlag=1, HourOfDay=15)]

### View the Filtered 2007 data Schema

In [20]:
filter_2007.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- C

### Verifies the newly created column values for year 2008

In [101]:
filter_2008.head(2)

[Row(Year=2008, Month=1, DayofMonth=8, DayOfWeek=2, DepTime=u'1711', CRSDepTime=1600, ArrTime=u'2031', CRSArrTime=1945, UniqueCarrier=u'XE', FlightNum=1226, TailNum=u'N14953', ActualElapsedTime=u'140', CRSElapsedTime=u'165', AirTime=u'106', ArrDelay=u'46', DepDelay=u'71', Origin=u'ORD', Dest=u'EWR', Distance=719, TaxiIn=u'8', TaxiOut=u'26', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=u'0', WeatherDelay=u'0', NASDelay=u'2', SecurityDelay=u'0', LateAircraftDelay=u'44', DelayedFlag=1, HourOfDay=16),
 Row(Year=2008, Month=1, DayofMonth=25, DayOfWeek=5, DepTime=u'1813', CRSDepTime=1742, ArrTime=u'2040', CRSArrTime=2004, UniqueCarrier=u'XE', FlightNum=2229, TailNum=u'N15509', ActualElapsedTime=u'87', CRSElapsedTime=u'82', AirTime=u'49', ArrDelay=u'36', DepDelay=u'31', Origin=u'ORD', Dest=u'CLE', Distance=316, TaxiIn=u'6', TaxiOut=u'32', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=u'0', WeatherDelay=u'0', NASDelay=u'5', SecurityDelay=u'0', LateAircraftDelay=u

### View the Filtered 2008 data Schema

In [21]:
filter_2008.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

# Weather data

### Defining the Schema for the weather data, as header's are missing in the csv

In [69]:
schema = StructType([
        StructField("Station", StringType()),
        StructField("MeasurementDate", DateType()),
        StructField("Type", StringType()),
        StructField("Measurement", DoubleType()),
        StructField("measurement_Flag", StringType()),
        StructField("Latitude", StringType()),
        StructField("Longitude", StringType()),
        StructField("Elevation", IntegerType()),
    ])

## Reads 2007 weather data into a spark dataframe

In [70]:
weather_2007 = sqlContext.read.schema(schema).option("mode","DROPMALFORMED").option("dateFormat","yyyyMMdd").load('/Users/jayantitrivedi/Desktop/Resume/LOL/weather/2007.csv', 
                          format='com.databricks.spark.csv', 
                          header='false', 
                          inferSchema='false')

### Viewing Schema for 2007 Weather data

In [71]:
weather_2007.printSchema()

root
 |-- Station: string (nullable = true)
 |-- MeasurementDate: date (nullable = true)
 |-- Type: string (nullable = true)
 |-- Measurement: double (nullable = true)
 |-- measurement_Flag: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)



In [73]:
weather_2007.head(2)

[Row(Station=u'CA002303986', MeasurementDate=datetime.date(2007, 1, 1), Type=u'TMAX', Measurement=-130.0, measurement_Flag=None, Latitude=None, Longitude=u'G', Elevation=None),
 Row(Station=u'CA002303986', MeasurementDate=datetime.date(2007, 1, 1), Type=u'TMIN', Measurement=-220.0, measurement_Flag=None, Latitude=None, Longitude=u'G', Elevation=None)]

## Reads 2008 weather data into a spark dataframe

In [74]:
weather_2008 = sqlContext.read.schema(schema).option("mode","DROPMALFORMED").option("dateFormat","yyyyMMdd").load('/Users/jayantitrivedi/Desktop/Resume/LOL/weather/2008.csv', 
                          format='com.databricks.spark.csv', 
                          header='false', 
                          inferSchema='false')

### Viewing Schema for 2008 Weather data

In [75]:
weather_2008.printSchema()

root
 |-- Station: string (nullable = true)
 |-- MeasurementDate: date (nullable = true)
 |-- Type: string (nullable = true)
 |-- Measurement: double (nullable = true)
 |-- measurement_Flag: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Elevation: integer (nullable = true)



In [76]:
weather_2008.head(2)

[Row(Station=u'CA002303986', MeasurementDate=datetime.date(2008, 1, 1), Type=u'TMAX', Measurement=-315.0, measurement_Flag=None, Latitude=None, Longitude=u'G', Elevation=None),
 Row(Station=u'CA002303986', MeasurementDate=datetime.date(2008, 1, 1), Type=u'PRCP', Measurement=4.0, measurement_Flag=None, Latitude=None, Longitude=u'G', Elevation=None)]

## Filters only for Chicago o Hare International airport for year 2007

In [81]:
filter_wea_2007 = weather_2007[weather_2007['Station']=='USW00094846']

In [102]:
filter_wea_2007.head(2)

[Row(Station=u'USW00094846', MeasurementDate=datetime.date(2007, 1, 1), Type=u'TMAX', Measurement=61.0, measurement_Flag=None, Latitude=None, Longitude=u'0', Elevation=2400),
 Row(Station=u'USW00094846', MeasurementDate=datetime.date(2007, 1, 1), Type=u'TMIN', Measurement=-17.0, measurement_Flag=None, Latitude=None, Longitude=u'0', Elevation=2400)]

## Filters only for Chicago o Hare International airport for year 2008

In [103]:
filter_wea_2008 = weather_2008[weather_2008['Station']=='USW00094846']

In [104]:
filter_wea_2008.head(2)

[Row(Station=u'USW00094846', MeasurementDate=datetime.date(2008, 1, 1), Type=u'TMAX', Measurement=-28.0, measurement_Flag=None, Latitude=None, Longitude=u'0', Elevation=2400),
 Row(Station=u'USW00094846', MeasurementDate=datetime.date(2008, 1, 1), Type=u'TMIN', Measurement=-156.0, measurement_Flag=None, Latitude=None, Longitude=u'0', Elevation=2400)]

## Transforms rows to columns for year 2007

In [83]:
reshaped_wea_2007 = filter_wea_2007.groupby('MeasurementDate').pivot('Type').max('Measurement').fillna(0)

In [89]:
reshaped_wea_2007.head(2)

[Row(MeasurementDate=datetime.date(2007, 11, 23), AWND=34.0, FMTM=1338.0, PGTM=1338.0, PRCP=0.0, SNOW=0.0, SNWD=0.0, TMAX=0.0, TMIN=-56.0, WDF2=230.0, WDF5=240.0, WESD=0.0, WSF2=63.0, WSF5=76.0, WT01=0.0, WT02=0.0, WT03=0.0, WT04=0.0, WT05=0.0, WT06=0.0, WT08=0.0, WT09=0.0, WT11=0.0, WT13=0.0, WT14=0.0, WT15=0.0, WT16=0.0, WT17=0.0, WT18=0.0, WT19=0.0, WT21=0.0, WT22=0.0),
 Row(MeasurementDate=datetime.date(2007, 11, 15), AWND=60.0, FMTM=901.0, PGTM=913.0, PRCP=0.0, SNOW=0.0, SNWD=0.0, TMAX=44.0, TMIN=-22.0, WDF2=330.0, WDF5=320.0, WESD=0.0, WSF2=112.0, WSF5=148.0, WT01=0.0, WT02=0.0, WT03=0.0, WT04=0.0, WT05=0.0, WT06=0.0, WT08=0.0, WT09=0.0, WT11=0.0, WT13=0.0, WT14=0.0, WT15=0.0, WT16=0.0, WT17=0.0, WT18=1.0, WT19=0.0, WT21=0.0, WT22=0.0)]

## Transforms rows to columns for year 2008

In [105]:
reshaped_wea_2008 = filter_wea_2008.groupby('MeasurementDate').pivot('Type').max('Measurement').fillna(0)

In [106]:
reshaped_wea_2008.head(2)

[Row(MeasurementDate=datetime.date(2008, 7, 15), AWND=33.0, FMTM=1615.0, PGTM=1429.0, PRCP=0.0, SNOW=0.0, SNWD=0.0, TMAX=322.0, TMIN=161.0, WDF2=240.0, WDF5=230.0, WESD=0.0, WSF2=98.0, WSF5=130.0, WT01=0.0, WT02=0.0, WT03=0.0, WT04=0.0, WT05=0.0, WT07=0.0, WT08=0.0, WT09=0.0, WT11=0.0, WT13=0.0, WT14=0.0, WT16=0.0, WT17=0.0, WT18=0.0, WT22=0.0),
 Row(MeasurementDate=datetime.date(2008, 9, 26), AWND=19.0, FMTM=1314.0, PGTM=1214.0, PRCP=0.0, SNOW=0.0, SNWD=0.0, TMAX=261.0, TMIN=150.0, WDF2=70.0, WDF5=70.0, WESD=0.0, WSF2=54.0, WSF5=112.0, WT01=1.0, WT02=0.0, WT03=0.0, WT04=0.0, WT05=0.0, WT07=0.0, WT08=1.0, WT09=0.0, WT11=0.0, WT13=1.0, WT14=0.0, WT16=0.0, WT17=0.0, WT18=0.0, WT22=0.0)]

## Counts total rows in dataframe to check if it's an year for 2007 data

In [85]:
reshaped_wea_2007.count()

365

In [145]:
finalweather_2007 = reshaped_wea_2007.select(year("MeasurementDate").alias('year'), month("MeasurementDate").alias('MonthofYear'), dayofmonth("MeasurementDate").alias('day'), 'TMAX', 'TMIN', 'PRCP', 'SNOW')

In [146]:
finalweather_2007.head(2)

[Row(year=2007, MonthofYear=11, day=23, TMAX=0.0, TMIN=-56.0, PRCP=0.0, SNOW=0.0),
 Row(year=2007, MonthofYear=11, day=15, TMAX=44.0, TMIN=-22.0, PRCP=0.0, SNOW=0.0)]

## Counts total rows in dataframe to check if it's an year for 2008 data

In [107]:
reshaped_wea_2008.count()

366

In [147]:
finalweather_2008 = reshaped_wea_2008.select(year("MeasurementDate").alias('year'), month("MeasurementDate").alias('MonthofYear'), dayofmonth("MeasurementDate").alias('day'), 'TMAX', 'TMIN', 'PRCP', 'SNOW')

In [148]:
finalweather_2008.head(2)

[Row(year=2008, MonthofYear=7, day=15, TMAX=322.0, TMIN=161.0, PRCP=0.0, SNOW=0.0),
 Row(year=2008, MonthofYear=9, day=26, TMAX=261.0, TMIN=150.0, PRCP=0.0, SNOW=0.0)]

# Merging airline and weather datasets for 2007

In [149]:
final_data_2007 = filter_2007.join(finalweather_2007,(filter_2007.Month==finalweather_2007.MonthofYear)&(filter_2007.DayofMonth==finalweather_2007.day))

In [150]:
final_data_2007.head(2)

[Row(Year=2007, Month=3, DayofMonth=22, DayOfWeek=4, DepTime=u'NA', CRSDepTime=2000, ArrTime=u'NA', CRSArrTime=2208, UniqueCarrier=u'XE', FlightNum=2163, TailNum=u'0', ActualElapsedTime=u'NA', CRSElapsedTime=u'68', AirTime=u'NA', ArrDelay=u'NA', DepDelay=u'NA', Origin=u'ORD', Dest=u'CLE', Distance=316, TaxiIn=0, TaxiOut=0, Cancelled=1, CancellationCode=u'C', Diverted=0, CarrierDelay=0, WeatherDelay=0, NASDelay=0, SecurityDelay=0, LateAircraftDelay=0, DelayedFlag=0, HourOfDay=20, year=2007, MonthofYear=3, day=22, TMAX=178.0, TMIN=67.0, PRCP=23.0, SNOW=0.0),
 Row(Year=2007, Month=3, DayofMonth=22, DayOfWeek=4, DepTime=u'1848', CRSDepTime=1745, ArrTime=u'2103', CRSArrTime=1959, UniqueCarrier=u'XE', FlightNum=2023, TailNum=u'N14950', ActualElapsedTime=u'75', CRSElapsedTime=u'74', AirTime=u'49', ArrDelay=u'64', DepDelay=u'63', Origin=u'ORD', Dest=u'CLE', Distance=316, TaxiIn=9, TaxiOut=17, Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=0, WeatherDelay=7, NASDelay=1, SecurityDe

In [151]:
final_data_2007.count()

375784

# Merging airline and weather datasets for 2008

In [152]:
final_data_2008 = filter_2008.join(finalweather_2008,(filter_2008.Month==finalweather_2008.MonthofYear)&(filter_2008.DayofMonth==finalweather_2008.day))

In [153]:
final_data_2008.head(2)

[Row(Year=2008, Month=3, DayofMonth=22, DayOfWeek=6, DepTime=u'700', CRSDepTime=600, ArrTime=u'915', CRSArrTime=813, UniqueCarrier=u'XE', FlightNum=2318, TailNum=u'N14542', ActualElapsedTime=u'75', CRSElapsedTime=u'73', AirTime=u'49', ArrDelay=u'62', DepDelay=u'60', Origin=u'ORD', Dest=u'CLE', Distance=316, TaxiIn=u'10', TaxiOut=u'16', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=u'60', WeatherDelay=u'0', NASDelay=u'2', SecurityDelay=u'0', LateAircraftDelay=u'0', DelayedFlag=1, HourOfDay=60, year=2008, MonthofYear=3, day=22, TMAX=28.0, TMIN=-28.0, PRCP=20.0, SNOW=15.0),
 Row(Year=2008, Month=3, DayofMonth=22, DayOfWeek=6, DepTime=u'1559', CRSDepTime=1600, ArrTime=u'1858', CRSArrTime=1909, UniqueCarrier=u'XE', FlightNum=1226, TailNum=u'N14974', ActualElapsedTime=u'119', CRSElapsedTime=u'129', AirTime=u'101', ArrDelay=u'-11', DepDelay=u'-1', Origin=u'ORD', Dest=u'EWR', Distance=719, TaxiIn=u'4', TaxiOut=u'14', Cancelled=0, CancellationCode=None, Diverted=0, CarrierDelay=u

In [154]:
final_data_2008.count()

350380

# Model Building

In [116]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

In [155]:
train = final_data_2007
test = final_data_2008

In [156]:
assembler = VectorAssembler(
    inputCols = ["Month", "DayofMonth", "DayOfWeek",
                        "HourOfDay", "Distance", "TMAX", "TMIN", "PRCP", "SNOW"],
    outputCol='features')

label_indexer = StringIndexer(inputCol = 'DelayedFlag', outputCol = 'label')

## Random Forest

In [119]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [157]:
classifier = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
pipeline = Pipeline(stages=[label_indexer, assembler, classifier])
model = pipeline.fit(train)

In [158]:
#Predicts and calculates area under curve
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

In [159]:
auroc

0.6911954708117838

In [160]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

In [161]:
print("Accuracy of random forest model = %g" % (accuracy*100))
print("Test Error of random forest model = %g" % ((1.0 - accuracy)*100))

Accuracy of random forest model = 73.8236
Test Error of random forest model = 26.1764


## Decision Tree

In [162]:
#Trains a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline_dt = Pipeline(stages=[label_indexer, assembler, dt])

In [163]:
# Train model.  This also runs the indexers.
model_dt = pipeline_dt.fit(train)

In [164]:
# Make predictions.
predictions_dt = model_dt.transform(test)

In [165]:
# Select (prediction, true label) and compute test error
evaluator_dt = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_dt = evaluator_dt.evaluate(predictions_dt)

In [166]:
print("Accuracy of decision tree model = %g" % (accuracy_dt*100))
print("Test Error of decision tree model = %g " % ((1.0 - accuracy_dt)*100))

Accuracy of decision tree model = 73.9868
Test Error of decision tree model = 26.0132 
