In [1]:
#Enable Spark Context
#System Variables - SPARK_HOME, PACKAGES, PYSPARK_SUBMIT_ARGS
#PACKAGES=com.databricks:spark-csv_2.11:1.4.0
#PYSUBMIT_SUBMIT_ARGS=--packages %PACKAGES% pyspark-shell 

import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

filename = os.path.join(spark_home, 'python/pyspark/shell.py')
exec(compile(open(filename, "rb").read(), filename, 'exec'))

spark_release_file = spark_home + "/RELEASE"

if os.path.exists(spark_release_file) and "Spark 1.6.1" in open(spark_release_file).read():
    pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
    if not "pyspark-shell" in pyspark_submit_args: 
        pyspark_submit_args += " pyspark-shell"
        os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 3.5.1 (default, Feb 16 2016 09:49:46)
SparkContext available as sc, HiveContext available as sqlContext.


In [2]:
#Load Airline and Weather data and make 1 DF for each Airline and Weather
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df_airline_2007 = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load(r"C:\Users\Neil-Laptop\Documents\datasets\airline_2007.csv")

df_airline_2008 = sqlContext.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true') \
    .load(r"C:\Users\Neil-Laptop\Documents\datasets\airline_2008.csv")
    
    
df_weather_2007 = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='false', inferschema='true') \
    .load(r"C:\Users\Neil-Laptop\Documents\datasets\weather_2007.csv")

df_weather_2008 = sqlContext.read.format('com.databricks.spark.csv')\
    .options(header='false', inferschema='true') \
    .load(r"C:\Users\Neil-Laptop\Documents\datasets\weather_2008.csv") 
    
    
df_airline_raw = df_airline_2007.unionAll(df_airline_2008)
df_weather_raw = df_weather_2007.unionAll(df_weather_2008)
#df = sqlContext.csvcoload(r"C:\Users\Neil-Laptop\Downloads\Parking_Citations.csv")

In [3]:
#Function to create year,month,day into date for airline to join on to weather
def to_date(year,month,day): 
    dt = "%04d%02d%02d" % (year, month, day)
    return dt

sqlContext.udf.register("to_date", to_date)

In [4]:
#Function to discrentize time in airline
def discretize_tod(val):
    hour = int(val[:2])
    if hour < 8:
        return 0
    if hour < 16:
        return 1
    return 2

sqlContext.udf.register("discretize_tod", discretize_tod)

In [5]:
df_airline_raw.registerTempTable("df_airpline_raw")
df_weather_raw.registerTempTable("df_weather_raw")

In [6]:
#Create Final Airline transformation
df_airline = sqlContext.sql("""SELECT 
                            Year as year, Month as month, DayofMonth as day, DayOfWeek as dow,
                            CarrierDelay as carrier, Origin as origin, Dest as dest, Distance as distance, 
                            discretize_tod(DepTime) as tod, CASE WHEN DepDelay >= 15 THEN 1 ELSE 0 END as delay, 
                            to_date(Year, Month, DayofMonth) As date 
                            FROM df_airpline_raw
                            WHERE Cancelled = 0 AND Origin = 'ORD'""")

In [7]:
#Create Base Weather Transformation Table
df_weather = sqlContext.sql("""SELECT 
                                C0 AS station,
                                C1 As date,
                                C2 As metric,
                                C3 As value, 
                                C4 As t1, 
                                C5 As t2, 
                                C6 As t3, 
                                C7 As time
                                FROM df_weather_raw
                                """)

#Create Tmin and Tmax Weather DF
df_weather.registerTempTable("df_weather")

#Create DFs for Weather Tmin and Tmax Values 
df_weather_tmin = sqlContext.sql("""SELECT 
                                        date, 
                                        value as temp_min 
                                    FROM df_weather 
                                    WHERE station = 'USW00094846' 
                                    AND metric = 'TMIN'""")
                                    
df_weather_tmax = sqlContext.sql("""SELECT 
                                        date, 
                                        value as temp_max 
                                    FROM df_weather 
                                    WHERE station = 'USW00094846' 
                                    AND metric = 'TMAX'""")

In [8]:
#Join Airline with Weather Tmin and Tmax Dataframes
df_airline_tmin = df_airline.join(df_weather_tmin, 
                                  df_weather_tmin.date == df_airline.date, 
                                  "inner").drop(df_weather_tmin.date)

df_airline_tmin_and_tmax = df_airline_tmin.join(df_weather_tmax, 
                                                df_weather_tmax.date == df_airline_tmin.date, 
                                                "inner").drop(df_weather_tmax.date)

In [9]:
df_airline_tmin_and_tmax.registerTempTable("df_airline_tmin_and_tmax")
df_all = sqlContext.sql("""SELECT 
                                delay,
                                year,
                                month, 
                                day, 
                                dow, 
                                cast (tod AS int) tod, 
                                distance, 
                                temp_min, 
                                temp_max
                            FROM df_airline_tmin_and_tmax""")

#Cache Dataframe because we split it later on
df_all.cache()

DataFrame[delay: int, year: int, month: int, day: int, dow: int, tod: int, distance: int, temp_min: int, temp_max: int]

In [10]:
df_all.show(10)

+-----+----+-----+---+---+---+--------+--------+--------+
|delay|year|month|day|dow|tod|distance|temp_min|temp_max|
+-----+----+-----+---+---+---+--------+--------+--------+
|    0|2007|    5| 24|  4|  2|     925|     172|     311|
|    1|2007|    5| 24|  4|  2|     316|     172|     311|
|    1|2007|    5| 24|  4|  2|     316|     172|     311|
|    0|2007|    5| 24|  4|  1|     925|     172|     311|
|    0|2007|    5| 24|  4|  2|     316|     172|     311|
|    0|2007|    5| 24|  4|  2|     316|     172|     311|
|    1|2007|    5| 24|  4|  1|     316|     172|     311|
|    0|2007|    5| 24|  4|  1|     316|     172|     311|
|    0|2007|    5| 24|  4|  2|     316|     172|     311|
|    0|2007|    5| 24|  4|  1|     654|     172|     311|
+-----+----+-----+---+---+---+--------+--------+--------+
only showing top 10 rows



In [11]:
#Linear Regression
#import necessary librarys
from pyspark.mllib.regression import  LabeledPoint
from pyspark.mllib.tree import DecisionTree, RandomForest
from pyspark.mllib.linalg import DenseVector

In [12]:
#Create labeledPoint Parser
def parseDF(row):
    values = [row.delay, row.month, row.day, row.dow, row.tod, row.distance, row.temp_min, row.temp_max]
    return LabeledPoint(values[0], DenseVector(values[1:]))

In [13]:
#Convert Dataframes to LabeledPoint for modeling
train_data = df_all.filter("year=2007").map(parseDF)
test_data = df_all.filter("year=2008").map(parseDF)

In [14]:
#Train Models
modelCART = DecisionTree.trainClassifier(train_data, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5)

modelRF = RandomForest.trainClassifier(train_data, numClasses=2, categoricalFeaturesInfo={},
                                      numTrees=500, impurity='gini', maxDepth=5)

In [15]:
#Apply CART model on Test Data
predictionsCART = modelCART.predict(test_data.map(lambda x: x.features))
predictionsAndLabelsCARTRDD = predictionsCART.zip(test_data.map(lambda lp: lp.label))
predictionsAndLabelsCART = predictionsAndLabelsCARTRDD.collect()

In [16]:
#Apply CART model on Test Data
predictionsRF = modelRF.predict(test_data.map(lambda x: x.features))
predictionsAndLabelsRFRDD = predictionsRF.zip(test_data.map(lambda lp: lp.label))
predictionsAndLabelsRF = predictionsAndLabelsRFRDD.collect()

In [19]:
#Create confusion matrix
#Spark COnfusion Matrix can't interface with it
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(predictionsAndLabelsCARTRDD)

print(metrics.confusionMatrix())

DenseMatrix([[ 229130.,   10764.],
             [  87559.,    7877.]])


In [20]:
#Instead due Pandas crosstab to create confusion matrix instead
import pandas as pd

#Create function

def confusion_matrix(predAndLabel):
    y_actual = pd.Series([x for x, y in predAndLabel], name = 'Actual')
    y_pred = pd.Series([y for x, y in predAndLabel], name = 'Predicted')
    
    matrix = pd.crosstab(y_actual,y_pred)
    accuracy = (matrix[0][0] + matrix[1][1])/ \
                (matrix[0][0] + matrix[0][1] + matrix[1][0] + matrix[1][1])

    return matrix, accuracy

In [21]:
#CART Confusion Matrix and Model Accuracy
df_confusion_CART, accuracy_CART = confusion_matrix(predictionsAndLabelsCART)

print('CART Confusion Matrix:')
print(df_confusion_CART)
print('\nCART Model Accuracy: {0}'.format(accuracy_CART))

CART Confusion Matrix:
Predicted     0.0    1.0
Actual                  
0.0        229130  87559
1.0         10764   7877

CART Model Accuracy: 0.7067873438105746


In [22]:
#RandomForest Confusion Matrix and Model Accuracy
df_confusion_RF, accuracy_RF = confusion_matrix(predictionsAndLabelsRF)

print('RF Confusion Matrix:')
print(df_confusion_RF)
print('\nRF Model Accuracy: {0}'.format(accuracy_RF))

RF Confusion Matrix:
Predicted     0.0    1.0
Actual                  
0.0        238077  93466
1.0          1817   1970

RF Model Accuracy: 0.7158530402886709


In [None]:
#It looks like RF Model is better
#And with more parameter tuning it can get better

In [None]:
sc.stop()