# - Experiment with AIRLINE opendata 

Data source : http://transtats.bts.gov/DL_SelectFields.asp?Table_ID=236  
TASK : Finding the airports that has the most delayed arrival  


### Import data from S3

In [1]:
import pyspark as ps
import os
sc = ps.SparkContext()

In [2]:
link = 's3n://mortar-example-data/airline-data' # Detail : http://transtats.bts.gov/DL_SelectFields.asp?Table_ID=236

In [3]:
airline = sc.textFile(link)

In [4]:
# Clean data - remove back slash and double quatation
airline_no_quote = airline.map(lambda line: line.replace('\'','').replace('\"', '').split(','))

In [5]:
airline_no_quote.cache() # populate data on in-memory cache

PythonRDD[2] at RDD at PythonRDD.scala:43

In [6]:
# Measure the time to run the cell
# Alternativily use %time 
import time

start = time.time()

results = airline_no_quote.take(2)

end = time.time()

print "TIME Elapsed: ", start - end

TIME Elapsed:  -100.724103928


In [8]:
import time

start = time.time()

results = airline_no_quote.take(2)

end = time.time()

print "TIME Elapsed: ", start - end

TIME Elapsed:  -0.0282990932465


In [9]:
# Check data : first two lines - header & data row
for line in results:
    print line

[u'YEAR', u'MONTH', u'UNIQUE_CARRIER', u'ORIGIN_AIRPORT_ID', u'DEST_AIRPORT_ID', u'DEP_DELAY', u'DEP_DELAY_NEW', u'ARR_DELAY', u'ARR_DELAY_NEW', u'CANCELLED', u'']
[u'2012', u'4', u'AA', u'12478', u'12892', u'-4.00', u'0.00', u'-21.00', u'0.00', u'0.00', u'']


In [10]:
# Get header
header_line = airline_no_quote.first()
header_list = header_line[0:-1] # Remove last empty item
print header_list

[u'YEAR', u'MONTH', u'UNIQUE_CARRIER', u'ORIGIN_AIRPORT_ID', u'DEST_AIRPORT_ID', u'DEP_DELAY', u'DEP_DELAY_NEW', u'ARR_DELAY', u'ARR_DELAY_NEW', u'CANCELLED']


In [11]:
# Get data without header
airline_no_header = airline_no_quote.filter(lambda row: row != header_line)

In [12]:
# Check data format
airline_no_header.take(3)

[[u'2012',
  u'4',
  u'AA',
  u'12478',
  u'12892',
  u'-4.00',
  u'0.00',
  u'-21.00',
  u'0.00',
  u'0.00',
  u''],
 [u'2012',
  u'4',
  u'AA',
  u'12478',
  u'12892',
  u'-7.00',
  u'0.00',
  u'-65.00',
  u'0.00',
  u'0.00',
  u''],
 [u'2012',
  u'4',
  u'AA',
  u'12478',
  u'12892',
  u'-6.00',
  u'0.00',
  u'-63.00',
  u'0.00',
  u'0.00',
  u'']]

In [13]:
# bind header and data to create dict
def row_creation(row):
    row_list = row[0:-1]
    d = dict(zip(header_list, row_list)) # bind with header
    return d

In [14]:
# Create new format
airline_rows = airline_no_header.map(row_creation)

In [15]:
# Check data format
airline_rows.take(2)

[{u'ARR_DELAY': u'-21.00',
  u'ARR_DELAY_NEW': u'0.00',
  u'CANCELLED': u'0.00',
  u'DEP_DELAY': u'-4.00',
  u'DEP_DELAY_NEW': u'0.00',
  u'DEST_AIRPORT_ID': u'12892',
  u'MONTH': u'4',
  u'ORIGIN_AIRPORT_ID': u'12478',
  u'UNIQUE_CARRIER': u'AA',
  u'YEAR': u'2012'},
 {u'ARR_DELAY': u'-65.00',
  u'ARR_DELAY_NEW': u'0.00',
  u'CANCELLED': u'0.00',
  u'DEP_DELAY': u'-7.00',
  u'DEP_DELAY_NEW': u'0.00',
  u'DEST_AIRPORT_ID': u'12892',
  u'MONTH': u'4',
  u'ORIGIN_AIRPORT_ID': u'12478',
  u'UNIQUE_CARRIER': u'AA',
  u'YEAR': u'2012'}]

## Analyze delays

In [16]:
# Convert str to float
def convert_to_float(s):
    try:
        s = float(s)
    except ValueError:
        s = 0
    return s

In [17]:
# Get destination/origin rdd (airport id, mean delay)
destination_rdd = airline_rows.map(lambda row: (row['DEST_AIRPORT_ID'], convert_to_float(row['ARR_DELAY'])))
origin_rdd = airline_rows.map(lambda row: (row['ORIGIN_AIRPORT_ID'], convert_to_float(row['DEP_DELAY'])))

In [18]:
# Check data (airport id, delay)
print destination_rdd.take(2)
print origin_rdd.take(2)

[(u'12892', -21.0), (u'12892', -65.0)]
[(u'12478', -4.0), (u'12478', -7.0)]


### Mean delay for take off and landing

In [19]:
# Create new format (airport id, mean delay)
import numpy as np
mean_delays_destination = destination_rdd.groupByKey().mapValues(lambda delays: np.mean(delays.data))
mean_delays_origin = origin_rdd.groupByKey().mapValues(lambda delays: np.mean(delays.data))

In [21]:
# Check data (airport id, mean delay)
print mean_delays_destination.take(1)
print mean_delays_origin.take(1)

[(u'10141', -4.8770491803278686)]
[(u'10141', -1.8319672131147542)]


### Get an airport which has the largestmean delay for take-off and landing

In [22]:
# Take-off earlier !?
mean_delays_origin.sortBy(lambda t: t[1], ascending=True).take(5)

[(u'12129', -6.7547169811320753),
 (u'15991', -6.0978441127694856),
 (u'12888', -5.9056603773584904),
 (u'14113', -5.3462002412545235),
 (u'10779', -5.1457627118644069)]

In [23]:
# Take-off delayed...more than 30 mins in worst cases
mean_delays_origin.sortBy(lambda t: t[1], ascending=False).take(5)

[(u'13541', 33.845454545454544),
 (u'10930', 32.533490011750878),
 (u'13964', 30.258196721311474),
 (u'10157', 24.69469773725093),
 (u'15295', 20.405063291139239)]

In [24]:
# Arrival earlier... good.
mean_delays_destination.sortBy(lambda t: t[1], ascending=True).take(5)

[(u'12343', -16.925233644859812),
 (u'12888', -8.9444444444444446),
 (u'11415', -7.5650793650793648),
 (u'10466', -7.4375),
 (u'12335', -5.6448598130841123)]

In [25]:
# Arrival delayed...umm.
mean_delays_destination.sortBy(lambda t: t[1], ascending=False).take(5)

[(u'10930', 24.724705882352943),
 (u'13424', 20.856232939035486),
 (u'14487', 20.795507358636716),
 (u'13459', 20.609195402298852),
 (u'10157', 18.995945945945945)]

In [26]:
# Stop spark context
sc.stop()

### *Result
Airport ID: 12541 has the largest delay for take-off  
Airport ID: 10903 has the largest delay for arrival