# Download Data

In [1]:
!wget  https://s3.amazonaws.com/imcbucket/data/flights/2008.csv

--2020-04-13 03:19:16--  https://s3.amazonaws.com/imcbucket/data/flights/2008.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.1.182
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.1.182|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 689413344 (657M) [binary/octet-stream]
Saving to: ‘2008.csv’


2020-04-13 03:19:26 (63.2 MB/s) - ‘2008.csv’ saved [689413344/689413344]



# Upload to HDFS

In [6]:
!hdfs dfs -mkdir -p /user/peerapolntl2/input

In [10]:
!hadoop fs -put 2008.csv /user/peerapolntl2/input

In [11]:
!hdfs dfs -ls /user/peerapolntl2/input

Found 1 items
-rw-r--r--   2 root hadoop  689413344 2020-04-13 03:23 /user/peerapolntl2/input/2008.csv


# Load the CSV to a DataFrame

In [36]:
airline_df = spark.read.format('csv').\
option('header','true').option('mode','DROPMALFORMED')\
.load('/user/peerapolntl2/input/2008.csv')

In [39]:
airline_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

# Modeling (and making some data transformation)
## Convert some string fields to double

In [53]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf

airline_df = airline_df.\
withColumn('DepTime',airline_df['DepTime'].\
           cast(DoubleType())).\
withColumn('TaxiOut',airline_df['TaxiOut'].\
           cast(DoubleType())).\
withColumn('TaxiIn',airline_df['TaxiIn'].\
           cast(DoubleType())).\
withColumn('DepDelay',airline_df['DepDelay'].\
           cast(DoubleType())).\
withColumn('DayOfWeek',airline_df['DayOfWeek'].\
           cast(DoubleType())).\
withColumn('Distance',airline_df['Distance'].\
           cast(DoubleType())).\
withColumn('ArrDelay',airline_df['ArrDelay'].\
           cast(DoubleType()))

In [57]:
airline_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: double (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay:

In [51]:
airline_df.take(5)

[Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime=2003.0, CRSDepTime='1955', ArrTime='2211', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay=-14.0, DepDelay=8.0, Origin='IAD', Dest='TPA', Distance=810.0, TaxiIn=4.0, TaxiOut=8.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime=754.0, CRSDepTime='735', ArrTime='1002', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay=2.0, DepDelay=19.0, Origin='IAD', Dest='TPA', Distance=810.0, TaxiIn=5.0, TaxiOut=10.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='20

## Discretize DepTime from scalar (double) into 1 of the size possible value.

In [54]:
def t_timeperiod(origin):
    if origin is None:
        period = None
    elif origin > 0 and origin < 600:
        period = '00.01-05.59'
    elif origin >= 600 and origin <=1200:
        period = '06.00-11.59'
    elif origin >= 1200 and origin <= 1800:
        period = '12.00-17.59'
    elif origin >= 1800 and origin <= 2400:
        period = '18.00-24.00'
    else:
        period = 'NA'
    return period

In [55]:
timeperiod = udf(lambda x: t_timeperiod(x),StringType())

In [59]:
airline_df2 = airline_df.withColumn('DepTime',timeperiod(airline_df['DepTime']))

In [60]:
airline_df2.take(5)

[Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='18.00-24.00', CRSDepTime='1955', ArrTime='2211', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay=-14.0, DepDelay=8.0, Origin='IAD', Dest='TPA', Distance=810.0, TaxiIn=4.0, TaxiOut=8.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='06.00-11.59', CRSDepTime='735', ArrTime='1002', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay=2.0, DepDelay=19.0, Origin='IAD', Dest='TPA', Distance=810.0, TaxiIn=5.0, TaxiOut=10.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA')

## Normalization
### Distance is a scalar value. Its range is unpredictable. We will normalize its value into 0..1.

In [61]:
from pyspark.sql.functions import *
max_distance = airline_df2.select(max('Distance')).collect()[0][0]
min_distance = airline_df2.select(min('Distance')).collect()[0][0]

In [62]:
max_distance,min_distance

(4962.0, 11.0)

In [70]:
def t_normalized_distance(origin):
    if origin is None:
        return None
    else:
        return ((origin-min_distance)/(max_distance-min_distance))

In [71]:
normalized_distance = udf(lambda x: t_normalized_distance(x),DoubleType())

In [72]:
normalized_df = airline_df2.withColumn('Distance', normalized_distance(airline_df2['Distance']))

In [73]:
normalized_df.take(5)

[Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='18.00-24.00', CRSDepTime='1955', ArrTime='2211', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay=-14.0, DepDelay=8.0, Origin='IAD', Dest='TPA', Distance=0.16138153908301353, TaxiIn=4.0, TaxiOut=8.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='06.00-11.59', CRSDepTime='735', ArrTime='1002', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay=2.0, DepDelay=19.0, Origin='IAD', Dest='TPA', Distance=0.16138153908301353, TaxiIn=5.0, TaxiOut=10.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='

### Normalize ArrDelay

In [75]:
max_ArrDelay = airline_df2.select(max('ArrDelay')).collect()[0][0]
min_ArrDelay = airline_df2.select(min('ArrDelay')).collect()[0][0]

In [76]:
max_ArrDelay,min_ArrDelay

(2461.0, -519.0)

In [77]:
def t_normalized_ArrDelay(origin):
    if origin is None:
        return None
    else:
        return ((origin-min_ArrDelay)/(max_ArrDelay-min_ArrDelay))

In [78]:
normalized_ArrDelay = udf(lambda x: t_normalized_ArrDelay(x),DoubleType())

In [79]:
normalized_df = normalized_df.withColumn('ArrDelay', normalized_ArrDelay(normalized_df['ArrDelay']))

In [80]:
normalized_df.take(5)

[Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='18.00-24.00', CRSDepTime='1955', ArrTime='2211', CRSArrTime='2225', UniqueCarrier='WN', FlightNum='335', TailNum='N712SW', ActualElapsedTime='128', CRSElapsedTime='150', AirTime='116', ArrDelay=0.16946308724832215, DepDelay=8.0, Origin='IAD', Dest='TPA', Distance=0.16138153908301353, TaxiIn=4.0, TaxiOut=8.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', NASDelay='NA', SecurityDelay='NA', LateAircraftDelay='NA'),
 Row(Year='2008', Month='1', DayofMonth='3', DayOfWeek=4.0, DepTime='06.00-11.59', CRSDepTime='735', ArrTime='1002', CRSArrTime='1000', UniqueCarrier='WN', FlightNum='3231', TailNum='N772SW', ActualElapsedTime='128', CRSElapsedTime='145', AirTime='113', ArrDelay=0.17483221476510066, DepDelay=19.0, Origin='IAD', Dest='TPA', Distance=0.16138153908301353, TaxiIn=5.0, TaxiOut=10.0, Cancelled='0', CancellationCode=None, Diverted='0', CarrierDelay='NA', WeatherDelay='NA', 

## Choose only intereesting fields

In [81]:
features_df = normalized_df.select(['UniqueCarrier','Origin','Dest',\
        'DepTime','TaxiOut','TaxiIn','DepDelay',\
        'DayOfWeek','Distance','ArrDelay'])

## Get rid of records with null

In [82]:
final_df = features_df.dropna()

In [83]:
final_df.count()

6855029

In [84]:
final_df.show()

+-------------+------+----+-----------+-------+------+--------+---------+--------------------+-------------------+
|UniqueCarrier|Origin|Dest|    DepTime|TaxiOut|TaxiIn|DepDelay|DayOfWeek|            Distance|           ArrDelay|
+-------------+------+----+-----------+-------+------+--------+---------+--------------------+-------------------+
|           WN|   IAD| TPA|18.00-24.00|    8.0|   4.0|     8.0|      4.0| 0.16138153908301353|0.16946308724832215|
|           WN|   IAD| TPA|06.00-11.59|   10.0|   5.0|    19.0|      4.0| 0.16138153908301353|0.17483221476510066|
|           WN|   IND| BWI|06.00-11.59|   17.0|   3.0|     8.0|      4.0|  0.1017976166431024|0.17885906040268457|
|           WN|   IND| BWI|06.00-11.59|    7.0|   3.0|    -4.0|      4.0|  0.1017976166431024|0.17214765100671142|
|           WN|   IND| BWI|18.00-24.00|   10.0|   3.0|    34.0|      4.0|  0.1017976166431024| 0.1855704697986577|
|           WN|   IND| JAX|18.00-24.00|   10.0|   4.0|    25.0|      4.0|  0.136

## Divide dataset into training and test sets

In [49]:
training_df,test_df = final_df.randomSplit([0.8,0.2],seed=12)

In [41]:
training_df.count()

5608036

## Display schema of the data frame

In [26]:
type(training_df)

pyspark.rdd.PipelinedRDD

In [50]:
training_df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: double (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- CRSDepTime: string (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: string (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: double (nullable = true)
 |-- DepDelay: double (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- Cancelled: string (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: string (nullable = true)
 |-- CarrierDelay: