# Assignment: Analyzing Airline Flight Delays 
#### By Brett Hallum, Chris Ficklin, and Ryan Shuhart<br>April 2017

For a full treatment of the unit 14 case study, please review module 14.3. Some points from the video are given below.

Work with the airline data set (use R or Python to manage out-of-core).
Answer the following questions by using the split-apply-combine technique:
* Which airports are most likely to be delayed flying out of or into?
* Which flights with same origin and destination are most likely to be delayed?
* Can you regress how delayed a flight will be before it is delayed?
* What are the most important features for this regression?

Remember to properly cross-validate models.

Use meaningful evaluation criteria.

Create at least one new feature variable for the regression.

In [21]:
import dask.dataframe as dd #http://dask.pydata.org/en/latest/
import pandas as pd
import numpy as np
from datetime import datetime
from bokeh.io import output_notebook

# from dask.distributed import Client
# client = Client(set_as_default=True)
# print(client)

### Other Settings
# Show more rows
pd.options.display.max_rows = 999
pd.options.display.max_columns = 999

# Prevent scientific notation of decimals
pd.set_option('precision',3)
pd.options.display.float_format = '{:,.3f}'.format

In [22]:
# Allow inline display of bokeh graphics
output_notebook()

## [Here is some info about Dask]...

...General facts about Dask... blah blah

## Data

In [23]:
# http://stat-computing.org/dataexpo/2009/the-data.html
var_desc = pd.read_csv("../ref/var_descriptions.csv", index_col='var_id')
var_desc

Unnamed: 0_level_0,Name,Data Type,Description
var_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,Year,int64,1987-2008
2,Month,int64,1 - 12
3,DayofMonth,int64,1 - 31
4,DayOfWeek,int64,1 (Monday) - 7 (Sunday)
5,DepTime,float64,"actual departure time (local, hhmm)"
6,CRSDepTime,int64,"scheduled departure time (local, hhmm)"
7,ArrTime,float64,"actual arrival time (local, hhmm)"
8,CRSArrTime,int64,"scheduled arrival time (local, hhmm)"
9,UniqueCarrier,O,unique carrier code
10,FlightNum,int64,flight number


In [24]:
# Data Location

#parq_folder = "C:/Users/ryan.shuhart/Downloads/AirlineDelays.tar/AirlineDelays/parquet-tiny/"
#parq_folder = "C:/Users/ryan.shuhart/Downloads/AirlineDelays.tar/AirlineDelays/parquet_25/"
parq_folder = "C:/Users/ryan.shuhart/Downloads/AirlineDelays.tar/AirlineDelays/parquet/"

# Load compressed Parquet format of all years ~2 sec
start = datetime.now()
df = dd.read_parquet(parq_folder)
print("Load parquet time: ", datetime.now() - start)
print()

# Length of dask dataframe ~3 min
start = datetime.now()
print("There are {:,d} rows".format(len(df))) #123,534,969 Matches Eric Larson
print("Time to determine row count: ", datetime.now() - start)

Load parquet time:  0:00:00.633036

There are 123,534,969 rows
Time to determine row count:  0:02:10.348455


### Glance at Beginning and End

In [25]:
print("First 5 rows:")
df.head()

First 5 rows:


Unnamed: 0,Year,Month,DayOfWeek,DepTime,CRSDepTime,UniqueCarrier,TailNum,ArrDelay,DepDelay,Origin,Dest,Distance
0,1987,10,3,741.0,730,PS,,23.0,11.0,SAN,SFO,447.0
1,1987,10,4,729.0,730,PS,,14.0,-1.0,SAN,SFO,447.0
2,1987,10,6,741.0,730,PS,,29.0,11.0,SAN,SFO,447.0
3,1987,10,7,729.0,730,PS,,-2.0,-1.0,SAN,SFO,447.0
4,1987,10,1,749.0,730,PS,,33.0,19.0,SAN,SFO,447.0


In [26]:
print("Last 5 rows:")
df.tail()

Last 5 rows:


Unnamed: 0,Year,Month,DayOfWeek,DepTime,CRSDepTime,UniqueCarrier,TailNum,ArrDelay,DepDelay,Origin,Dest,Distance
499212,2008,12,6,1002.0,959,DL,N646DL,14.0,3.0,ATL,IAD,533.0
499213,2008,12,6,834.0,835,DL,N908DL,-2.0,-1.0,ATL,SAT,874.0
499214,2008,12,6,655.0,700,DL,N671DN,0.0,-5.0,PBI,ATL,545.0
499215,2008,12,6,1251.0,1240,DL,N646DL,9.0,11.0,IAD,ATL,533.0
499216,2008,12,6,1110.0,1103,DL,N908DL,-5.0,7.0,SAT,ATL,874.0


## Feature Preparation and Creation

In [27]:
# Create an hour field
# 2400 minutes from midnight reduced to 2399 then int division drops to 23
df = df.assign(Hour=df.CRSDepTime.clip(upper=2399)//100) 

# Make Categories as categorical
df = df.categorize(['DayOfWeek', 'UniqueCarrier', 'Dest', 'Origin'])

# Months from 0 AD
df['FlightAge'] = 12*df['Year']+df['Month']-1

# The months from the first recorded flight is consider the approx age of the plane. 
# Unfortunately, tail numbers not tracked until 1995. 

# Find the first year and month of a tail numbers flight history
tail_births = (df.groupby('TailNum')[['FlightAge']].min().reset_index()
                 .rename(columns={'FlightAge':'FirstFlight'}))

df_with_tails = dd.merge(df, tail_births, how='left', on='TailNum')
df_with_tails['Age'] = df_with_tails['FlightAge'] - df_with_tails['FirstFlight']

df_with_tails = df_with_tails.drop(['FlightAge','FirstFlight'], axis=1)




## Flight Delays

When a schedule airflight is behind more than 15 minutes then it is officially delayed. Same logic will be followed for arrival times. Only arrivals 15 minutes past scheduled time will be considered late

http://aspmhelp.faa.gov/index.php/Types_of_Delay

### Aggregations

View visualization of dask distrubuted at work

http://127.0.0.1:8787/

In [28]:
import dask
start = datetime.now()
# Define some aggregations to plot
aggregations = (
    #1 Average departure delay by year
    df.groupby('Year').DepDelay.mean(),
    
    #2 Average departure delay by Month
    df.groupby('Month').DepDelay.mean(), 
    
    #3 Average departure delay by hour of day
    df.groupby('Hour').DepDelay.mean(), 
    
    #4 Average departure delay by Carrier, top 15
    df.groupby('UniqueCarrier').DepDelay.mean().nlargest(15), 
    
    #5 Average arrival delay by destination, top 15
    (df.groupby('Dest').ArrDelay.mean().nlargest(15) 
     .reset_index().rename(columns={'ArrDelay':'AvgArrDelay'})),
    
    #6 Count of arrivals to destinations, excludes missing
    (df.groupby('Dest').ArrDelay.count() 
     .reset_index().rename(columns={'ArrDelay':'ArrCount'})),
    
    #7 Average departure delay by origin, top 15
    (df.groupby('Origin').DepDelay.mean().nlargest(15).reset_index().rename(columns={'DepDelay':'AvgDepDelay'})),
    
    #8 Count of departures by origin, excludes missing
    (df.groupby('Origin').DepDelay.count().reset_index().rename(columns={'DepDelay':'DepCount'})), 
    
    #9 Average departure by origin and destination
    (df.groupby(['Origin','Dest']).DepDelay.mean().reset_index().rename(columns={'DepDelay':'AvgDepDelay'})),
    
    #10 Count of departures between origin and destination
    (df.groupby(['Origin','Dest']).DepDelay.count().reset_index().rename(columns={'DepDelay':'DepCount'})),
    
    #11 Percentage of officially delayed flights by origin
    ((df[df.DepDelay>15].groupby('Origin').DepDelay.count() / df.groupby('Origin').DepDelay.count())
     .reset_index().rename(columns={'DepDelay':'PercDepDelay'})),
    
    #12 Percentage of officially late flights by destination
    ((df[df.ArrDelay>15].groupby('Dest').ArrDelay.count() / df.groupby('Dest').ArrDelay.count())
     .reset_index().rename(columns={'ArrDelay':'PercArrDelay'})),
                
    #13 Percentage of officially delayed flights by origin and destination
    ((df[df.DepDelay>15].groupby(['Origin','Dest']).DepDelay.count() / df.groupby(['Origin','Dest']).DepDelay.count())
     .reset_index().rename(columns={'DepDelay':'PercDepDelay'})),
                
    #14 Percentage of officially late flights by origin and destination
    ((df[df.ArrDelay>15].groupby(['Origin','Dest']).ArrDelay.count() / df.groupby(['Origin','Dest']).ArrDelay.count())
     .reset_index().rename(columns={'ArrDelay':'PercArrDelay'})),
    
    #15 Average departure delay by hour of day
    df.groupby('DayOfWeek').DepDelay.mean()
)

# Compute them all in a single command
(
delayed_by_year, #1
delayed_by_month, #2
delayed_by_hour, #3
delayed_by_carrier, #4
delayed_by_dest, #5
delayed_by_dest_count, #6
delayed_by_origin, #7
delayed_by_origin_count, #8
delayed_by_origin_dest, #9
delayed_by_origin_dest_count, #10
pct_delayed_by_origin, #11
pct_late_by_dest, #12
pct_delayed_by_origin_dest, #13
pct_late_by_origin_dest, #14
delayed_by_day #15
) = dask.compute(*aggregations)
print(datetime.now() - start)

0:04:13.004471


### Visualization of Average Delay

In [29]:
from bokeh.plotting import figure, show
from bokeh.charts.attributes import cat
from bokeh.charts import Bar
from bokeh.layouts import gridplot

# Average Delay by Year
p1 = Bar(delayed_by_year.reset_index(), 'Year', values= 'DepDelay', 
         legend=False, ylabel="Average Delay in Minutes", 
         title="Average Delay by Year")

# Average Delay by Month
delayed_by_month = delayed_by_month.sort_index()
p2 = Bar(delayed_by_month.reset_index(), 'Month', values= 'DepDelay', 
         legend=False, ylabel="Average Delay in Minutes", 
         title="Average Delay by Month")

# Average Delay by Hour of Day
p3 = Bar(delayed_by_hour.reset_index(), 'Hour', values= 'DepDelay', 
         legend=False, ylabel="Average Delay in Minutes",
         title="Average Delay by Hour of Day")

# Average Delay by Hour of Day
p4 = Bar(delayed_by_day.reset_index(), 'DayOfWeek', values= 'DepDelay', 
         legend=False, ylabel="Average Delay in Minutes",
         title="Average Delay by Day of Week")

# Average Delay by Carrier
delayed_by_carrier = delayed_by_carrier.reset_index()
delayed_by_carrier['UniqueCarrier'] = delayed_by_carrier['UniqueCarrier'].astype('O')
p5 = Bar(delayed_by_carrier, label=cat('UniqueCarrier', sort=False), values= 'DepDelay', 
         legend=False, ylabel="Average Delay in Minutes", xlabel="Unique Carrier", title="Average Delay by Carrier")


show(gridplot([[p1,p2],[p3,p4], [p5,None]], plot_width=400, plot_height=300))

## Which airports are most likely to be delayed flying out of or into?

In [30]:
airport_delays_pcts = (pd.merge(pct_delayed_by_origin, pct_late_by_dest, left_on='Origin', right_on='Dest')
                 .assign(AvgDelay= lambda x: (x['PercDepDelay'] + x['PercArrDelay'])/2)
                 .sort_values(by='AvgDelay', ascending=False)
                 .drop('Dest', axis=1)
                )

airport_delays_pcts = pd.merge(airport_delays_pcts, delayed_by_origin_count, on='Origin')

airport_delays_pcts[airport_delays_pcts['DepCount'] > 50].nlargest(15, 'AvgDelay')

Unnamed: 0,Origin,PercDepDelay,PercArrDelay,AvgDelay,DepCount
3,SOP,0.368,0.414,0.391,307
4,ADK,0.46,0.314,0.387,541
5,OTH,0.317,0.444,0.38,499
6,ACK,0.335,0.403,0.369,1644
7,HHH,0.242,0.387,0.314,1738
8,LMT,0.27,0.287,0.278,552
9,ILG,0.22,0.332,0.276,755
10,DUT,0.289,0.26,0.275,5054
11,MQT,0.155,0.366,0.261,5504
12,PSG,0.233,0.272,0.252,14199


## Which flights with same origin and destination are most likely to be delayed?

In [31]:
org_dest_pcts = (pd.merge(pct_delayed_by_origin_dest, pct_late_by_origin_dest, on=['Origin','Dest'])
                 .assign(AvgDelay= lambda x: (x['PercDepDelay'] + x['PercArrDelay'])/2)
                 .sort_values(by='AvgDelay', ascending=False)
                )

org_dest_pcts = pd.merge(org_dest_pcts, delayed_by_origin_dest_count, on=['Origin','Dest'])

org_dest_pcts[org_dest_pcts['DepCount'] > 50].nlargest(15, 'AvgDelay')

Unnamed: 0,Origin,Dest,PercDepDelay,PercArrDelay,AvgDelay,DepCount
252,JAN,BTR,0.582,0.709,0.646,79
265,LGA,ICT,0.415,0.694,0.554,193
266,IDA,ORD,0.548,0.557,0.552,292
268,MTJ,ATL,0.5,0.574,0.537,54
269,LGA,BTR,0.465,0.6,0.533,129
271,LAX,RDM,0.13,0.926,0.528,69
272,BTR,EWR,0.46,0.596,0.528,250
273,ASE,ORD,0.52,0.534,0.527,540
274,ASE,SFO,0.553,0.5,0.527,150
275,DEN,RDM,0.22,0.831,0.525,59


In [32]:
from bokeh.charts import Histogram

hist = Histogram(df[df['DepDelay']>15][['DepDelay']].dropna().compute(), values='DepDelay', bins=50)

show(hist)

## Can you regress how delayed a flight will be before it is delayed?

## What are the most important features for this regression?

# Regression of Delay

The Dask module is a solution for processing "big data," however, the it currently does not include built in methods for regression or classification, like other big data solutions. The following will use a series of simple random sampling to a size that fits into a pandas dataframe to find the coefficient estimates of a linear model. The coefficients will be averaged to make a final prediction. This process also assists in not over fitting the model.

#### The following features will be explore to predict if the flight will have departure delay

##### The predicted variable will be: 
* Departure Delay (DepDelay)

##### The explanatory variables:
* Scheduled departure hour (Hour)
* Flight distance (Distance)
* Age of plane (Age)

In [33]:
# https://adventuresindatascience.wordpress.com/2014/12/30/minibatch-learning-for-large-scale-data-using-scikit-learn/


In [34]:
# Sample the entire data set as large as possible a few times. Each time has it's own cross validation sampling.
def sample_coef(Xcols, ycol, df, samp_size = .1, seeds = [123,456,789,101,112]):
    from sklearn import linear_model
    from sklearn.metrics import mean_squared_error
    from sklearn.metrics import r2_score
    import dask
    reg = linear_model.LinearRegression(n_jobs=-1)
    coefs = []
    
    for i in range(len(seeds)):
        start = datetime.now()
        # Take a sample from all the data
        all_cols = [ycol] + Xcols
        Xy = df[all_cols].sample(samp_size, random_state=seeds[i]).compute().dropna(axis=0)
        X = Xy[Xcols]
        y = Xy[ycol].values

        reg.fit(X, y)
        #print('Coefficients: \n', reg.coef_)
        coefs.append(reg.coef_)
        print("Time for Sample {}: ".format((i+1)), datetime.now() - start)
        #print(datetime.now() - start)
    
    del Xy, X, y

    coef_df = pd.DataFrame.from_records(coefs, columns=Xcols)
    coef_avg = coef_df.mean()
    print()
    print(coef_df)
    print()
    print("Average Coefficients:")
    print(coef_avg)
    print()
    
    beta_cols = []
    for m, c in zip(coef_avg.index, coef_avg.values):
        b_col = "Beta_"+m
        df["Beta_"+m] = df[m]*c
        beta_cols.append(b_col)

    df['Predicted'] = df[beta_cols].sum(axis=1)

    #df['SqError'] = (df['Predicted'] - df[ycol])**2
    #mse = df[['SqError']].mean().compute()
    
    df_tmp = df[['Predicted']+[ycol]].compute().dropna()
    y_true = df_tmp[ycol]
    y_pred = df_tmp['Predicted']
    mse = mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    print("Mean Squared Error: ", mse)
    print("Mean Squared Error: ", r2)
    return coef_df, coef_avg

In [35]:
Xcols = ['Hour', 'Distance']
ycol =  'DepDelay'
coef_df, coef_avg = sample_coef(Xcols, ycol, df)

Time for Sample 1:  0:03:40.674622
Time for Sample 2:  0:03:40.284599
Time for Sample 3:  0:03:35.933351
Time for Sample 4:  0:03:40.427608
Time for Sample 5:  0:03:52.499298

   Hour  Distance
0 0.726     0.001
1 0.727     0.001
2 0.724     0.001
3 0.725     0.001
4 0.730     0.001

Average Coefficients:
Hour       0.727
Distance   0.001
dtype: float64



MemoryError: 

In [None]:
Xcols = ['Hour', 'Distance', 'Age']
coef_df, coef_avg = sample_coef(Xcols, ycol, df_with_tails)

### Conclusion

Dask is a new "big data" alternative for those preferring the Python language. Although it is in active development by Continuum.io it still lacks certain features, such as, a drop-in generalized.

### Future Work

* Optimize with index key base on Data, deptarture time, and TailNum
* Use of alternative compression, such as snappy or LZ4
    * http://java-performance.info/performance-general-compression/
* Use a diffent big data approach to find a more efficient way to estimating the linear model coefficients:
    * Spark MLLib
    * Dask GLM
    * Turi/Graphlab Create

## Bibliography

* Dask Documentation, http://dask.pydata.org/en/latest/
* Distributed Optimization and Statistical Learning via the Alternating Direction Method of Multipliers, Boyd, et al http://stanford.edu/~boyd/papers/pdf/admm_distr_stats.pdf
* https://www.transtats.bts.gov/OT_Delay/OT_DelayCause1.asp
* Variable Descriptions: http://stat-computing.org/dataexpo/2009/the-data.html
* Dask example using airline data https://jcrist.github.io/dask-sklearn-part-3.html

## Appendices

### Appendix A - CSV to Parquet Conversion

### Appendix B - Benchmark Tests

### Appendix C - Comparison of Dask Files
* Ryan's Hardware: 
    - CPU: Intel i5-4300M @ 2.60GHz
    - Disk: Samsung SSD 850 Pro
    - RAM: 8 GB
    

* Dask using original csv:
    - no conversion
    - size on disk
        - 11.2 gb
    - benchmark of describing 'Distance':
        - Approx. 4 minutes
* Dask using uncompressed parquet: 
    - conversion to parquet
        - approx 10 minutes
    - size on disk:
        - 13.8 gb
    - benchmark of describing 'Distance':
        - 1 loop, best of 3: 6.2 s per loop
* Dask using gzip compressed parquet:
    - converstion to parquet
        - approx 42 minutes
    - size on disk:
        - 1.36 gb <- big difference
    - benchmark of describing 'Distance':
        - 1 loop, best of 3: 8.83 s per loop

#### Summary
Dask allows for out of core management of data sets. CSV files are universal, but slow to process. Converting to parquet file format, speeds up the process by a factor of 38. Using the gzip compression, reduces size on disk from 13.8gb to 1.36 or about 10% of the uncompressed size. This comes in handy for a distributed processing in a cluster since not as much network bandwidth would be needed. The trade off of compression is a 42.4% increasing in processing time, however, 3 additional seconds is hardly noticable, but might be more of an issue for other tasks. 