In [1]:
import os
import sys
import re
from pyspark import SparkContext
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import pyspark.sql.functions as func
import matplotlib.patches as mpatches
import time as time

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

# to start testing, we can focus on a single year
# input_path = "/datasets/airline/1994.csv"
input_path = "/Users/quentinleroy/Documents/Eurecom/AML/Lab5+6/data/1994.csv"
raw_data = sc.textFile(input_path)

In [2]:
# extract the header
header = raw_data.first()

def deleteNA(row):
    line = row.split(",")
    for (i,col) in enumerate(line):
        if col=='NA':
            line[i]=''
    return ','.join(line)

# replace invalid data with NULL and remove header
cleaned_data = (raw_data\
        # filter out the header
        .filter(lambda row: row!=header)
        # replace the missing values with empty characters
        .map(lambda row: deleteNA(row))
        )
        
#print("number of rows after cleaning:", raw_data.count())

In [3]:
sqlContext = SQLContext(sc)


# Declare the data schema
# see http://stat-computing.org/dataexpo/2009/the-data.html
# for more information
airline_data_schema = StructType([ \
    #StructField( name, dataType, nullable)
    StructField("year",                     IntegerType(), True), \
    StructField("month",                    IntegerType(), True), \
    StructField("day_of_month",             IntegerType(), True), \
    StructField("day_of_week",              IntegerType(), True), \
    StructField("departure_time",           IntegerType(), True), \
    StructField("scheduled_departure_time", IntegerType(), True), \
    StructField("arrival_time",             IntegerType(), True), \
    StructField("scheduled_arrival_time",   IntegerType(), True), \
    StructField("carrier",                  StringType(),  True), \
    StructField("flight_number",            StringType(),  True), \
    StructField("tail_number",              StringType(), True), \
    StructField("actual_elapsed_time",      IntegerType(), True), \
    StructField("scheduled_elapsed_time",   IntegerType(), True), \
    StructField("air_time",                 IntegerType(), True), \
    StructField("arrival_delay",            IntegerType(), True), \
    StructField("departure_delay",          IntegerType(), True), \
    StructField("src_airport",              StringType(),  True), \
    StructField("dest_airport",             StringType(),  True), \
    StructField("distance",                 IntegerType(), True), \
    StructField("taxi_in_time",             IntegerType(), True), \
    StructField("taxi_out_time",            IntegerType(), True), \
    StructField("cancelled",                StringType(),  True), \
    StructField("cancellation_code",        StringType(),  True), \
    StructField("diverted",                 StringType(),  True), \
    StructField("carrier_delay",            IntegerType(), True), \
    StructField("weather_delay",            IntegerType(), True), \
    StructField("nas_delay",                IntegerType(), True), \
    StructField("security_delay",           IntegerType(), True), \
    StructField("late_aircraft_delay",      IntegerType(), True)\
])

In [4]:
# convert each line into a tuple of features (columns)
cleaned_data_to_columns = cleaned_data.map(lambda l: l.split(","))\
    .map(lambda cols: 
         (
            int(cols[0])  if cols[0] else None,
            int(cols[1])  if cols[1] else None,
            int(cols[2])  if cols[2] else None,
            int(cols[3])  if cols[3] else None,
            int(cols[4])  if cols[4] else None,
            int(cols[5])  if cols[5] else None,
            int(cols[6])  if cols[6] else None,
            int(cols[7])  if cols[7] else None,
            cols[8]       if cols[8] else None,
            cols[9]       if cols[9] else None,
            cols[10]      if cols[10] else None,
            int(cols[11]) if cols[11] else None,
            int(cols[12]) if cols[12] else None,
            int(cols[13]) if cols[13] else None,
            int(cols[14]) if cols[14] else None,
            int(cols[15]) if cols[15] else None,
            cols[16]      if cols[16] else None,
            cols[17]      if cols[17] else None,
            int(cols[18]) if cols[18] else None,
            int(cols[19]) if cols[19] else None,
            int(cols[20]) if cols[20] else None,
            cols[21]      if cols[21] else None,
            cols[22]      if cols[22] else None,
            cols[23]      if cols[23] else None,
            int(cols[24]) if cols[24] else None,
            int(cols[25]) if cols[25] else None,
            int(cols[26]) if cols[26] else None,
            int(cols[27]) if cols[27] else None,
            int(cols[28]) if cols[28] else None
         ))

In [5]:
# create dataframe df
df = (sqlContext.createDataFrame(cleaned_data_to_columns, schema = airline_data_schema)
        .select(["year", "month", "day_of_month", "day_of_week", \
                "scheduled_departure_time", "scheduled_arrival_time", \
                "arrival_delay", "distance", "src_airport", "dest_airport", "carrier", "distance"])
        .cache()
    )

In [6]:
df_with_hour = df.withColumn('hour', round(df.scheduled_departure_time/100, 0))
df_with__hour_delay = df_with_hour.withColumn('is_delay', when(df["arrival_delay"] >= 15, 1).otherwise(0))

In [7]:
# select distinct {source airports, destination airports, carriers} and map values to index
# sort the {src_airport, dest_airport, carrier} by their frequency descending
# so the most common {src_aiport, dest_airport, carrier} will be on the top
stat_src = (
    df
        .groupBy("src_airport")
        .agg(func.count("*").alias('count'))
        .orderBy(desc('count'))
    )

stat_dest = (
    df
        .groupBy("dest_airport")
        .agg(func.count("*").alias('count'))
        .orderBy(desc('count'))
    )

stat_carrier = (
    df
        .groupBy("carrier")
        .agg(func.count("*").alias('count'))
        .orderBy(desc('count'))
    )

# extract the {src_airport, dest_airport, carrier} names from {stat_src, stat_dest, stat_carrier}
src_airports = [item[0] for item in stat_src.collect()]
dest_airports = [item[0] for item in stat_dest.collect()]
carriers = [item[0] for item in stat_carrier.collect()]

num_src_airports = len(src_airports)
num_dest_airports = len(dest_airports)
num_carriers = len(carriers)


src_airports_idx = range(0, num_src_airports)
dest_airport_idx = range(0, num_dest_airports)
carrier_idx = range(0, num_carriers)

map_src_airport_to_index = dict(zip(src_airports, src_airports_idx))
map_dest_airport_to_index = dict(zip(dest_airports, dest_airport_idx))
map_carrier_to_index = dict(zip(carriers, carrier_idx))

Find most common values for categorical features (`month`, `day_of_month`, `day_of_week`, `scheduled_departure_time`, `scheduled_arrival_time`) and mean values for numerical features (`distance`, `arrival_delay`)

In [None]:
the_most_common_month = (
    df
        .groupBy("month")
        .agg(func.count("*").alias('count'))
        .orderBy(desc('count'))
    ).first()[0]

the_most_common_day_of_month = (
    df
        .groupBy("day_of_month")
        .agg(func.count("*").alias('count'))
        .orderBy(desc('count'))
    ).first()[0]

the_most_common_day_of_week = (
    df
        .groupBy("day_of_week")
        .agg(func.count("*").alias('count'))
        .orderBy(desc('count'))
    ).first()[0]

the_most_common_s_departure_time = (df
        .groupBy("scheduled_departure_time")
        .agg(func.count("*").alias('count'))
        .orderBy(desc('count'))
    ).first()[0]

the_most_common_s_arrival_time =  (df
        .groupBy("scheduled_arrival_time")
        .agg(func.count("*").alias('count'))
        .orderBy(desc('count'))
    ).first()[0]

mean_distance = df.groupBy().agg((func.sum("distance")/func.count("*")).alias("mean")).collect()[0].mean

mean_arrival_delay = df.groupBy().agg((func.sum("arrival_delay")/func.count("*")).alias("mean")).collect()[0].mean

print("The most common month:", the_most_common_month)
print("The most common day of month:", the_most_common_day_of_month)
print("The most common day of week:", the_most_common_day_of_week)
print("The most common scheduled departure time:", the_most_common_s_departure_time)
print("The most common scheduled arrival time:", the_most_common_s_arrival_time)
print("mean distance:", mean_distance)
print("mean arrival delay:", mean_arrival_delay)



Prepare data (RDD) with chosen features, replace missing values by most common values (categorical features) or mean values (numerical features). Split the data into trainingData (`70%`) and testData (`30%`)

In [8]:
def is_valid(value):
    return value != "NA" and len(value) > 0

data = cleaned_data\
    .map(lambda line: line.split(','))\
    .map(lambda values: 
        LabeledPoint( 
            int(values[14]) if is_valid(values[14]) else mean_arrival_delay, # arrival delay
            [
                int(values[0]), # year
                int(values[1]) if is_valid(values[1]) else most_common_month, # month
                int(values[2]) if is_valid(values[2]) else the_most_common_day_of_month, # day of month
                int(values[3]) if is_valid(values[3]) else the_most_common_day_of_week, # day of week
                int(values[5]) if is_valid(values[5]) else the_most_common_s_departure_time , # scheduled departure time
                int(values[7]) if is_valid(values[7]) else the_most_common_s_arrival_time, # scheduled arrival time
                # if the value is valid, map it to the corresponding index
                # otherwise, use the most common value
                map_carrier_to_index[values[8]] if is_valid(values[8]) \
                    else map_carriers_to_index[carriers[0]], # carrier
                map_src_airport_to_index[values[16]] if is_valid(values[16]) 
                    else map_src_airport_to_index[src_airports[0]], # src_airport
                map_dest_airport_to_index[values[17]] if is_valid(values[17])
                    else map_dest_airport_to_index[dest_airports[0]], # destination_airport
                int(values[18]) if is_valid(values[18]) else mean_distance, # distance
                1 if is_valid(values[3]) and int(values[3]) >= 6 else 0, # is_weekend
            ]
        )
    )

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

trainingData = trainingData.cache()
testData = testData.cache()

We can train a decision model by using function `DecisionTree.trainRegressor(<training_data>, categoricalFeaturesInfo=<categorical_info>, impurity=<impurity_function>,  maxDepth=<max_depth>, maxBins=<max_bins>)`.

Where,

* `training_data` : the data used for training
* `categorical_info` : a dictionary that maps the index of each categorical features to its number of distince values
* `impurity_function` : the function that is used to calculate impurity of data in order to select the best split
* `max_depth`: the maximum depth of the tree
* `max_bins`: the maximum number of bins that the algorithm will divide on each feature. Note that, `max_bins` can not smaller than the number distinct values of every categorical features.

Complete the code below to train a decision tree model.


In [None]:
# declare information of categorical features
# format:  feature_index : number_distinct_values
categorical_info = {6 : num_carriers, 7: num_src_airports, 8: num_dest_airports, 10: 2}
t0 = time.time()
# Train a DecisionTree model.
model = DecisionTree.trainRegressor(trainingData, 
                                    categoricalFeaturesInfo=categorical_info,
                                    impurity='variance', maxDepth=12, maxBins=255)
t1 = time.time()
print("finish in %f seconds" % (t1-t0))