In [1]:
import findspark
findspark.init()

findspark.find()
import pyspark

In [2]:
pyspark.__version__

'3.0.1'

In [142]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Tokenizer, StopWordsRemover, HashingTF, IDF, OneHotEncoder, Bucketizer
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [4]:
import numpy as np
import pandas as pd

# Loading Data

## Loading `flights` data

In [5]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [6]:
# read data from csv file
flights = spark.read.csv('flights.csv',
                         sep=',',
                         header=True,
                         inferSchema=True,
                         nullValue='NA')

In [7]:
# get number of records
print(f'The data contains {flights.count()} records.')

# view the first five records
flights.show(5)

# check col dtypes
print(flights.dtypes)

The data contains 50000 records.
+---+---+---+-------+------+---+----+------+--------+-----+
|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|
+---+---+---+-------+------+---+----+------+--------+-----+
| 11| 20|  6|     US|    19|JFK|2153|  9.48|     351| null|
|  0| 22|  2|     UA|  1107|ORD| 316| 16.33|      82|   30|
|  2| 20|  4|     UA|   226|SFO| 337|  6.17|      82|   -8|
|  9| 13|  1|     AA|   419|ORD|1236| 10.33|     195|   -5|
|  4|  2|  5|     AA|   325|ORD| 258|  8.92|      65| null|
+---+---+---+-------+------+---+----+------+--------+-----+
only showing top 5 rows

[('mon', 'int'), ('dom', 'int'), ('dow', 'int'), ('carrier', 'string'), ('flight', 'int'), ('org', 'string'), ('mile', 'int'), ('depart', 'double'), ('duration', 'int'), ('delay', 'int')]


## Loading `SMS` spam data

In [8]:
# specify column names and types
schema = StructType([
    StructField('id', IntegerType()),
    StructField('text', StringType()),
    StructField('label', IntegerType())
])

# load data
sms = spark.read.csv('sms.csv',
                     sep=';',
                     header='False',
                     schema=schema)

sms.printSchema()

sms.show(5)

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|Sorry, I'll call ...|    0|
|  2|Dont worry. I gue...|    0|
|  3|Call FREEPHONE 08...|    1|
|  4|Win a 1000 cash p...|    1|
|  5|Go until jurong p...|    0|
+---+--------------------+-----+
only showing top 5 rows



# Data Preparation

## Removing columns and rows

In [9]:
# remove the 'flight' column
flights_drop_col = flights.drop('flight')

# number of records with missing 'delay' values
print(f'missing "delay" values: {flights_drop_col.filter("delay IS NULL").count()}')

# remove records with missing 'delay' values
flights_valid_delay = flights_drop_col.filter('delay IS NOT NULL')

# remove records with missing values in any column and get the number of remaining rows
flights_none_missing = flights_valid_delay.dropna()

print(f'remaining rows: {flights_none_missing.count()}')

missing "delay" values: 2978
remaining rows: 47022


## Column manipulation

In [10]:
# convert 'mile' to 'km' and drop 'mile' column
flights_km = flights_none_missing.withColumn('km', F.round(flights_none_missing.mile * 1.60934)).drop('mile')

# create 'label' col indicating whether flight delayed (1) or not (0)
flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))

flights_km.show(5)

+---+---+---+-------+---+------+--------+-----+------+-----+
|mon|dom|dow|carrier|org|depart|duration|delay|    km|label|
+---+---+---+-------+---+------+--------+-----+------+-----+
|  0| 22|  2|     UA|ORD| 16.33|      82|   30| 509.0|    1|
|  2| 20|  4|     UA|SFO|  6.17|      82|   -8| 542.0|    0|
|  9| 13|  1|     AA|ORD| 10.33|     195|   -5|1989.0|    0|
|  5|  2|  1|     UA|SFO|  7.98|     102|    2| 885.0|    0|
|  7|  2|  6|     AA|ORD| 10.83|     135|   54|1180.0|    1|
+---+---+---+-------+---+------+--------+-----+------+-----+
only showing top 5 rows



## Categorical columns

In [11]:
# create an indexer
indexer = StringIndexer(inputCol='carrier',
                        outputCol='carrier_idx')

# fit and transform
flights_indexed = indexer.fit(flights_km).transform(flights_km)

# repeat for 'org' column
flights_indexed = StringIndexer(inputCol='org',
                                outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)

## Assembling columns

In [12]:
# create assembler obj
input_cols = ['mon', 'dom', 'dow', 'carrier_idx', 'org_idx', 'km', 'depart', 'duration']
assembler = VectorAssembler(inputCols=input_cols,
                            outputCol='features')

# consolidate predictor cols
flights_assembled = assembler.transform(flights_indexed)

# check
flights_assembled.select('features', 'delay').show(5, truncate=False)

+-----------------------------------------+-----+
|features                                 |delay|
+-----------------------------------------+-----+
|[0.0,22.0,2.0,0.0,0.0,509.0,16.33,82.0]  |30   |
|[2.0,20.0,4.0,0.0,1.0,542.0,6.17,82.0]   |-8   |
|[9.0,13.0,1.0,1.0,0.0,1989.0,10.33,195.0]|-5   |
|[5.0,2.0,1.0,0.0,1.0,885.0,7.98,102.0]   |2    |
|[7.0,2.0,6.0,1.0,0.0,1180.0,10.83,135.0] |54   |
+-----------------------------------------+-----+
only showing top 5 rows



# Decision Tree

In [13]:
# split
flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=42)

## Build a Decision Tree

In [14]:
# create classifier and fit
tree = DecisionTreeClassifier()
tree_model = tree.fit(flights_train)

# predict and inspect
preds = tree_model.transform(flights_test)
preds.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|0    |1.0       |[0.3568002951303492,0.6431997048696507] |
|0    |1.0       |[0.3568002951303492,0.6431997048696507] |
|1    |1.0       |[0.3568002951303492,0.6431997048696507] |
|1    |0.0       |[0.5777899945024739,0.42221000549752613]|
|1    |1.0       |[0.3568002951303492,0.6431997048696507] |
+-----+----------+----------------------------------------+
only showing top 5 rows



## Evaluate the Decision Tree

In [15]:
# create conf matrix
preds.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1374|
|    0|       0.0| 2542|
|    1|       1.0| 3512|
|    0|       1.0| 1993|
+-----+----------+-----+



In [16]:
# calculate the elements of the confusion matrix
TN = preds.filter('prediction = 0 AND label = prediction').count()
TP = preds.filter('prediction = 1 AND label = prediction').count()
FN = preds.filter('prediction = 0 AND label = 1').count()
FP = preds.filter('prediction = 1 AND label = 0').count()

print(f'test acc: {(TN + TP) / (TN + TP + FN + FP): .1%}')

test acc:  64.3%


# Logistic Regression

## Build a Logistic Regression model

In [17]:
# create and train
lr = LogisticRegression().fit(flights_train)

# create preds and show conf matrix
preds = lr.transform(flights_test)
preds.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0| 1678|
|    0|       0.0| 2584|
|    1|       1.0| 3208|
|    0|       1.0| 1951|
+-----+----------+-----+



## Evaluate the Logistic Regression model

In [18]:
# calculate the elements of the confusion matrix
TN = preds.filter('prediction = 0 AND label = prediction').count()
TP = preds.filter('prediction = 1 AND label = prediction').count()
FN = preds.filter('prediction = 0 AND label = 1').count()
FP = preds.filter('prediction = 1 AND label = 0').count()

print(f'test acc: {(TN + TP) / (TN + TP + FN + FP): .1%}')

test acc:  61.5%


In [19]:
# calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print(f'precision: {precision: .1%}\nrecall: {recall: .1%}')

precision:  62.2%
recall:  65.7%


In [20]:
# find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(preds, {multi_evaluator.metricName: 'weightedPrecision'})

# find AUC
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(preds, {binary_evaluator.metricName: 'areaUnderROC'})

print(f'weighted precision: {weighted_precision: .1%}\nAUC: {auc: .1%}')

weighted precision:  61.4%
AUC:  65.4%


# Turning Text into Tables

## Punctuations, numbers, and tokens

In [21]:
sms.show(5, truncate=False)

+---+---------------------------------------------------------------------------------------------------------------+-----+
|id |text                                                                                                           |label|
+---+---------------------------------------------------------------------------------------------------------------+-----+
|1  |Sorry, I'll call later in meeting                                                                              |0    |
|2  |Dont worry. I guess he's busy.                                                                                 |0    |
|3  |Call FREEPHONE 0800 542 0578 now!                                                                              |1    |
|4  |Win a 1000 cash prize or a prize worth 5000                                                                    |1    |
|5  |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...|0    |
+---+---

In [22]:
# remove punctuation and numbers
wrangled = sms.withColumn('text', F.regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))
wrangled = wrangled.withColumn('text', F.regexp_replace(wrangled.text, '[0-9]', ' '))

# merge multiple spaces
wrangled = wrangled.withColumn('text', F.regexp_replace(wrangled.text, ' +', ' '))

# split text into words
wrangled = Tokenizer(inputCol='text',
                     outputCol='words').transform(wrangled)

wrangled.show(5, truncate=False)

+---+-------------------------------------------------------------------------------------------------------+-----+---------------------------------------------------------------------------------------------------------------------------+
|id |text                                                                                                   |label|words                                                                                                                      |
+---+-------------------------------------------------------------------------------------------------------+-----+---------------------------------------------------------------------------------------------------------------------------+
|1  |Sorry I'll call later in meeting                                                                       |0    |[sorry, i'll, call, later, in, meeting]                                                                                    |
|2  |Dont worry I guess he's busy       

## Stop words and hashing

In [23]:
sms_ = wrangled.select('id', 'words', 'label')

In [24]:
# remove stop words
wrangled = StopWordsRemover(inputCol='words',
                            outputCol='terms').transform(sms_)

# apply hashing
wrangled = HashingTF(inputCol='terms',
                     outputCol='hash',
                     numFeatures=1024).transform(wrangled)

# convert to TF-IDF
tf_idf = IDF(inputCol='hash',
             outputCol='features').fit(wrangled).transform(wrangled)

tf_idf.select('terms', 'features').show(4, truncate=False)

+--------------------------------+----------------------------------------------------------------------------------------------------+
|terms                           |features                                                                                            |
+--------------------------------+----------------------------------------------------------------------------------------------------+
|[sorry, call, later, meeting]   |(1024,[138,384,577,996],[2.273418200008753,3.6288353225642043,3.5890949939146903,4.104259019279279])|
|[dont, worry, guess, busy]      |(1024,[215,233,276,329],[3.9913186080986836,3.3790235241678332,4.734227298217693,4.58299632849377]) |
|[call, freephone]               |(1024,[133,138],[5.367951058306837,2.273418200008753])                                              |
|[win, cash, prize, prize, worth]|(1024,[31,47,62,389],[3.6632029660684124,4.754846585420428,4.072170704727778,7.064594791043114])    |
+--------------------------------+--------------

## Training a spam classifier

In [25]:
# split
sms_train, sms_test = tf_idf.randomSplit([0.8, 0.2], seed=13)

In [26]:
# fit logreg model and eval
lr = LogisticRegression(regParam=0.2).fit(sms_train)

preds = lr.transform(sms_test)

preds.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   41|
|    0|       0.0|  948|
|    1|       1.0|  105|
|    0|       1.0|    2|
+-----+----------+-----+



# One-Hot Encoding

## Encoding flight origin

In [37]:
flights_ = StringIndexer(inputCol='org',
                         outputCol='org_idx').fit(flights).transform(flights)

flights_ = flights_.dropna()

In [38]:
# instantiate one hot encoder
onehot = OneHotEncoder(inputCols=['org_idx'],
                       outputCols=['org_dummy'])

# apply to data
flights_onehot = onehot.fit(flights_).transform(flights_)

flights_onehot.select('org', 'org_idx', 'org_dummy').distinct().sort('org_idx').show()

+---+-------+-------------+
|org|org_idx|    org_dummy|
+---+-------+-------------+
|ORD|    0.0|(7,[0],[1.0])|
|SFO|    1.0|(7,[1],[1.0])|
|JFK|    2.0|(7,[2],[1.0])|
|LGA|    3.0|(7,[3],[1.0])|
|SJC|    4.0|(7,[4],[1.0])|
|SMF|    5.0|(7,[5],[1.0])|
|TUS|    6.0|(7,[6],[1.0])|
|OGG|    7.0|    (7,[],[])|
+---+-------+-------------+



# Regression

In [43]:
flights_onehot = flights_onehot.withColumn('km', F.round(flights_onehot.mile * 1.60934)).drop('mile')

In [47]:
flights_prep = VectorAssembler(inputCols=['km'],
                               outputCol='features').transform(flights_onehot)

In [49]:
flights_train, flights_test = flights_prep.randomSplit([0.8, 0.2], seed=42)

## Flight duration model: just using `km`

In [51]:
# create regression model and fit
lr = LinearRegression(labelCol='duration').fit(flights_train)

# eval
preds = lr.transform(flights_test)
preds.select('duration', 'prediction').show(5, False)

print(f'RMSE: {RegressionEvaluator(labelCol="duration").evaluate(preds): .2f}')

+--------+------------------+
|duration|prediction        |
+--------+------------------+
|560     |561.0619125749561 |
|310     |346.9912658584909 |
|165     |133.37496080467133|
|120     |133.37496080467133|
|240     |213.26336981983607|
+--------+------------------+
only showing top 5 rows

RMSE:  16.92


## Interpreting the coefficients

In [52]:
# intercept
inter = lr.intercept
print(f'intercept: {inter}')

# coefs
coefs = lr.coefficients
print(f'coefs: {coefs}')

# average mins per km
min_per_km = lr.coefficients[0]
print(f'mins per km: {min_per_km}')

# average speed in kph
avg_speed = 60 / min_per_km
print(f'avg speed: {avg_speed}')

intercept: 44.02110048439229
coefs: [0.07572361044091444]
mins per km: 0.07572361044091444
avg speed: 792.3552462783949


## Flight duration model: adding origin airport

In [53]:
flights_prep = VectorAssembler(inputCols=['km', 'org_dummy'],
                               outputCol='features').transform(flights_onehot)

flights_train, flights_test = flights_prep.randomSplit([0.8, 0.2], seed=42)

In [58]:
# create regressor, fit, eval
lr = LinearRegression(labelCol='duration').fit(flights_train)

preds = lr.transform(flights_test)

print(f'RMSE: {RegressionEvaluator(labelCol="duration").evaluate(preds): .2f}')

RMSE:  11.02


## Interpreting coefficients

In [59]:
# intercept
inter = lr.intercept
print(f'intercept: {inter}')

# coefs
coefs = lr.coefficients
print(f'coefs: {coefs}')

intercept: 15.51432820470039
coefs: [0.0743187885664477,28.85360121411286,20.804901599913148,52.63560265547631,47.08803307610951,18.577771019304308,15.957548580589801,18.400432009994915]


In [65]:
# avg speed in kph
avg_speed = 60 / coefs[0]
print(f'avg speed: {avg_speed: .2f}')

# avg mins on ground at OGG (reference dummy for org)
print(f'avg mins on ground at OGG: {inter: .2f}')

# avg mins at JFK
avg_jfk = inter + coefs[3]
print(f'avg mins on ground at JFK: {avg_jfk: .2f}')

# avg mins at LGA
avg_lga = inter + coefs[4]
print(f'avg mins on ground at LGA: {avg_lga: .2f}')

avg speed:  807.33
avg mins on ground at OGG:  15.51
avg mins on ground at JFK:  68.15
avg mins on ground at LGA:  62.60


# Bucketing & Engineering

## Bucketing departure time

In [71]:
# create buckets at 3 hr intervals
buckets = Bucketizer(splits=list(range(0, 25, 3)),
                     inputCol='depart',
                     outputCol='depart_bucket')

# bucket the departure times
bucketed = buckets.transform(flights_onehot)
bucketed.select('depart', 'depart_bucket').show(5)

# one-hot encode
onehot = OneHotEncoder(inputCols=['depart_bucket'],
                       outputCols=['depart_dummy'])

flights_oh = onehot.fit(bucketed).transform(bucketed)
flights_oh.select('depart', 'depart_bucket', 'depart_dummy').show(5)

+------+-------------+
|depart|depart_bucket|
+------+-------------+
| 16.33|          5.0|
|  6.17|          2.0|
| 10.33|          3.0|
|  7.98|          2.0|
| 10.83|          3.0|
+------+-------------+
only showing top 5 rows

+------+-------------+-------------+
|depart|depart_bucket| depart_dummy|
+------+-------------+-------------+
| 16.33|          5.0|(7,[5],[1.0])|
|  6.17|          2.0|(7,[2],[1.0])|
| 10.33|          3.0|(7,[3],[1.0])|
|  7.98|          2.0|(7,[2],[1.0])|
| 10.83|          3.0|(7,[3],[1.0])|
+------+-------------+-------------+
only showing top 5 rows



## Flight duration model: adding departure time

In [77]:
flights_prep = VectorAssembler(inputCols=['km', 'org_dummy', 'depart_dummy'],
                               outputCol='features').transform(flights_oh)

flights_train, flights_test = flights_prep.randomSplit([0.8, 0.2], seed=42)

In [78]:
# create regressor, fit, eval
lr = LinearRegression(labelCol='duration').fit(flights_train)

preds = lr.transform(flights_test)

print(f'RMSE: {RegressionEvaluator(labelCol="duration").evaluate(preds): .2f}')

RMSE:  10.73


In [80]:
# avg mins on ground at OGG for flights departing between 21:00 and 24:00 (reference)
avg_eve_ogg = lr.intercept
print(f'avg mins on ground at OGG for 21:00-24:00: {avg_eve_ogg: .2f}')

# avg mins on ground at OGG for flights departing between 00:00 and 03:00
avg_night_ogg = lr.intercept + lr.coefficients[8]
print(f'avg mins on ground at OGG for 00:00-03:00: {avg_night_ogg: .2f}')

# Average minutes on ground at JFK for flights departing between 00:00 and 03:00
avg_night_jfk = lr.intercept + lr.coefficients[3] + lr.coefficients[8]
print(f'avg mins on ground at JFK for 00:00-03:00: {avg_night_jfk: .2f}')

avg mins on ground at OGG for 21:00-24:00:  10.01
avg mins on ground at OGG for 00:00-03:00: -4.85
avg mins on ground at JFK for 00:00-03:00:  46.91


# Regularization

## Flight duration model: More features!

In [82]:
# one-hot encode
onehot = OneHotEncoder(inputCols=['dow', 'mon'],
                       outputCols=['dow_dummy', 'mon_dummy'])

flights_oh = onehot.fit(flights_oh).transform(flights_oh)

In [85]:
flights_prep = VectorAssembler(inputCols=['km', 'org_dummy', 'depart_dummy', 'dow_dummy', 'mon_dummy'],
                               outputCol='features').transform(flights_oh)

flights_train, flights_test = flights_prep.randomSplit([0.8, 0.2], seed=42)

In [90]:
# create regressor, fit, eval
lr = LinearRegression(labelCol='duration').fit(flights_train)

preds = lr.transform(flights_test)

print(f'RMSE: {RegressionEvaluator(labelCol="duration").evaluate(preds): .2f}')

RMSE:  10.64


In [91]:
# look at model coefficients
coefs = lr.coefficients
print(coefs)

[0.07442209559515502,27.659450151115774,20.45918930623718,51.85957014048378,46.18664461701692,17.845461098188387,15.340633664518776,17.905358631225713,-15.273418440592236,0.7300571207888041,4.131977434639745,7.126546170088426,4.664162537576813,8.83969858386643,8.806754088197247,0.46746560369110074,0.16539185557587052,-0.25012969981315836,0.19119328659810572,0.17211666989806534,0.05606261143314675,-2.1461312335803973,-2.3902244741875878,-2.1572730021883637,-3.787912634052201,-4.2281201026909425,-4.44040786158588,-4.62143727685574,-4.463261506930194,-4.026770099104065,-2.831627030737389,-0.8592174134481364]


## Flight duration model: Regularization!

In [92]:
# create regressor, fit, eval

# Lasso Regression
lr = LinearRegression(labelCol='duration',
                      regParam=1,
                      elasticNetParam=1).fit(flights_train)

preds = lr.transform(flights_test)

print(f'RMSE: {RegressionEvaluator(labelCol="duration").evaluate(preds): .2f}')

RMSE:  11.55


In [93]:
# look at coefs
coefs = lr.coefficients
print(coefs)

# number of zero coefs
zero_cs = sum([beta == 0 for beta in coefs])
print(f'No. of zero coefficients: {zero_cs}')

[0.0735413151602253,5.697832374535206,0.0,28.90193980919395,22.063034341417183,0.0,-2.289374478161381,0.0,0.0,0.0,0.0,0.0,0.0,1.005851813807962,1.1028050009787465,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]
No. of zero coefficients: 25


# Pipeline

In [98]:
flights_ = flights.dropna()
flights_ = flights_.withColumn('km', F.round(flights_.mile * 1.60934)).drop('mile')

In [103]:
flights_train, flights_test = flights_.randomSplit([0.8, 0.2], seed=1111)

## Flight duration model: pipeline stages

In [101]:
# convert categorical strings to index values
indexer = StringIndexer(inputCol='org',
                        outputCol='org_idx')

# one-hot encode index values
onehot = OneHotEncoder(inputCols=['org_idx', 'dow'],
                       outputCols=['org_dummy', 'dow_dummy'])

# assemble predictors
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'],
                            outputCol='features')

# linear regression
regression = LinearRegression(labelCol='duration')

## Flight duration model: pipeline model

In [104]:
# construct pipeline
pipeline = Pipeline(stages=[indexer,
                            onehot,
                            assembler,
                            regression])

# train
pipeline = pipeline.fit(flights_train)

# predict
preds = pipeline.transform(flights_test)

## SMS spam pipeline

In [107]:
# break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text',
                      outputCol='words')

# remove stop words
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(),
                           outputCol='terms')

# apply hashing, transform to tf-idf
hasher = HashingTF(inputCol=remover.getOutputCol(),
                   outputCol='hash')
idf = IDF(inputCol=hasher.getOutputCol(),
          outputCol='features')

# create logreg obj pipeline
logistic = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer,
                            remover,
                            hasher,
                            idf,
                            logistic])

# Cross Validation

In [109]:
flights_prep = VectorAssembler(inputCols=['km'],
                               outputCol='features').transform(flights_)

flights_train, flights_test = flights_prep.randomSplit([0.8, 0.2], seed=42)

## Cross validating simple flight duration model

In [112]:
# create an empty parameter grid
params = ParamGridBuilder().build()

# create objects for building and evaluating a regression model
regression = LinearRegression(labelCol='duration')
evaluator = RegressionEvaluator(labelCol='duration')

# create a cross validator
cv = CrossValidator(estimator=regression, 
                    estimatorParamMaps=params,
                    evaluator=evaluator, 
                    numFolds=5)

# Train and test model on multiple folds of the training data
cv = cv.fit(flights_train)

## Cross validating flight duration model pipeline

In [114]:
# Create an indexer for the org field
indexer = StringIndexer(inputCol='org', 
                        outputCol='org_idx')

# Create an one-hot encoder for the indexed org field
onehot = OneHotEncoder(inputCols=['org_idx'], 
                       outputCols=['org_dummy'])

# Assemble the km and one-hot encoded fields
assembler = VectorAssembler(inputCols=['km', 'org_dummy'], 
                            outputCol='features')

# Create a pipeline and cross-validator.
pipeline = Pipeline(stages=[indexer, 
                            onehot, 
                            assembler, 
                            regression])
cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=params,
                    evaluator=evaluator)

# Grid Search

In [116]:
flights_ = flights.dropna()
flights_ = flights_.withColumn('km', F.round(flights_.mile * 1.60934)).drop('mile')

In [117]:
flights_train, flights_test = flights_.randomSplit([0.8, 0.2], seed=1111)

In [119]:
# convert categorical strings to index values
indexer = StringIndexer(inputCol='org',
                        outputCol='org_idx')

# one-hot encode index values
onehot = OneHotEncoder(inputCols=['org_idx', 'dow'],
                       outputCols=['org_dummy', 'dow_dummy'])

# assemble predictors
assembler = VectorAssembler(inputCols=['km', 'org_dummy', 'dow_dummy'],
                            outputCol='features')

# linear regression
regression = LinearRegression(labelCol='duration')

In [120]:
# construct pipeline
pipeline = Pipeline(stages=[indexer,
                            onehot,
                            assembler,
                            regression])

# create evaluator
evaluator = RegressionEvaluator(labelCol='duration')

## Optimizing flights linear regression

In [125]:
# Create parameter grid
params = ParamGridBuilder()

# Add grids for two parameters
params = params.addGrid(regression.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(regression.elasticNetParam, [0, 0.5, 1.0])

# Build the parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))

# Create cross-validator
cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=params, 
                    evaluator=evaluator, 
                    numFolds=5,
                    seed=42)

Number of models to be tested:  12


In [127]:
models = cv.fit(flights_train)

## Dissecting the best flight duration model

In [130]:
# get the best model from cv
best_model = models.bestModel

# look at stages
print(best_model.stages)

# generate preds
preds = best_model.transform(flights_test)
print(f'RMSE: {evaluator.evaluate(preds): .2f}')

[StringIndexerModel: uid=StringIndexer_f549479cb0f9, handleInvalid=error, OneHotEncoderModel: uid=OneHotEncoder_8fcc2a9851ca, dropLast=true, handleInvalid=error, numInputCols=2, numOutputCols=2, VectorAssembler_03651ecb7156, LinearRegressionModel: uid=LinearRegression_388900a404f6, numFeatures=14]
RMSE:  10.96


## SMS spam optimized

In [131]:
# Create parameter grid
params = ParamGridBuilder()

# Add grid for hashing trick parameters
params = params.addGrid(hasher.numFeatures, [1024, 4096, 16384]) \
               .addGrid(hasher.binary, [True, False])

# Add grid for logistic regression parameters
params = params.addGrid(logistic.regParam, [0.01, 0.1, 1.0, 10.0]) \
               .addGrid(logistic.elasticNetParam, [0.0, 0.5, 1.0])

# Build parameter grid
params = params.build()

# Ensemble

## Delayed flights with Gradient-Boosted Trees

In [134]:
flights_ = flights.dropna()
flights_ = flights_.withColumn('label', (flights_.delay >= 15).cast('integer'))

In [136]:
flights_prep = VectorAssembler(inputCols=['mon', 'depart', 'duration'],
                               outputCol='features').transform(flights_)

flights_train, flights_test = flights_prep.randomSplit([0.8, 0.2], seed=1111)

In [141]:
# Create model objects and train on training data
tree = DecisionTreeClassifier().fit(flights_train)
gbt = GBTClassifier().fit(flights_train)

# Compare AUC on testing data
evaluator = BinaryClassificationEvaluator()
print(f'AUROC: {evaluator.evaluate(tree.transform(flights_test)): .1%}')
print(f'AUROC: {evaluator.evaluate(gbt.transform(flights_test)): .1%}')

# Find the number of trees and the relative importance of features
print(f'no. of trees in gbt: {len(gbt.trees)}')
print(f'feature importances: {gbt.featureImportances}')

AUROC:  62.0%
AUROC:  67.1%
no. of trees in gbt: 20
feature importances: (3,[0,1,2],[0.34928613199830205,0.3171833965819637,0.33353047141973424])


## Delayed flights with a Random Forest

In [143]:
# Create a random forest classifier
forest = RandomForestClassifier()

# Create a parameter grid
params = ParamGridBuilder() \
            .addGrid(forest.featureSubsetStrategy, ['all', 'onethird', 'sqrt', 'log2']) \
            .addGrid(forest.maxDepth, [2, 5, 10]) \
            .build()

# Create a binary classification evaluator
evaluator = BinaryClassificationEvaluator()

# Create a cross-validator
cv = CrossValidator(estimator=forest, 
                    estimatorParamMaps=params,
                    evaluator=evaluator, 
                    numFolds=5)

In [144]:
models = cv.fit(flights_train)

# Evaluating Random Forest

In [147]:
# Average AUC for each parameter combination in grid
avg_auc = models.avgMetrics

# Average AUC for the best model
best_model_auc =  max(avg_auc)
print(f'avg AUROC for best model: {best_model_auc: .1%}')

# What's the optimal parameter value?
opt_max_depth = models.bestModel.explainParam('maxDepth')
print(f'optimal max depth: {opt_max_depth}')
opt_feat_substrat = models.bestModel.explainParam('featureSubsetStrategy')
print(f'optimal feature subset strat: {opt_feat_substrat}')

# AUC for best model on testing data
best_auc = evaluator.evaluate(models.bestModel.transform(flights_test))
print(f'test AUROC: {best_auc: .1%}')

avg AUROC for best model:  67.6%
optimal max depth: maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 10)
optimal feature subset strat: featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 'n' (when n is in the range (0, 1.0], use n * number of features. When n is in the range (1, number of features), use n features). default = 'auto' (default: auto, current: onethird)
test AUROC:  67.6%
