# Working with Spark 
- Using dataset from the DataCamp course on ML with Apache Spark to get more familiar with it

## Imports and Setup

In [40]:
# Import the PySpark module
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.feature import (StringIndexer, 
                                OneHotEncoderEstimator,
                                VectorAssembler)
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from datetime import datetime

In [2]:
import multiprocessing

multiprocessing.cpu_count()

4

The SparkSession is able to use all available cores (4) with the `local[*]` for master node.

In [3]:
# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()

In [4]:
flights = spark.read.csv('flights-larger.csv',
                        sep=',',
                        header=True,
                        inferSchema=True,
                        nullValue='NA')

## Peruse the dataset

Working with about 275K records of flights

In [5]:
flights.count()

275000

In [6]:
flights.dtypes

[('mon', 'int'),
 ('dom', 'int'),
 ('dow', 'int'),
 ('carrier', 'string'),
 ('flight', 'int'),
 ('org', 'string'),
 ('mile', 'int'),
 ('depart', 'double'),
 ('duration', 'int'),
 ('delay', 'int')]

Info on columns (from DataCamp: https://bit.ly/2YcnE39)

Data dictionary:

- mon — month (integer between 1 and 12)
- dom — day of month (integer between 1 and 31)
- dow — day of week (integer; 1 = Monday and 7 = Sunday)
- org — origin airport (IATA code)
- mile — distance (miles)
- carrier — carrier (IATA code)
- depart — departure time (decimal hour)
- duration — expected duration (minutes)
- delay — delay (minutes)

## Prepare the Data

The flight number isn't going to be very useful, so we will drop it

In [7]:
flights = flights.drop('flight')

In [8]:
flights.show(5)

+---+---+---+-------+---+----+------+--------+-----+
|mon|dom|dow|carrier|org|mile|depart|duration|delay|
+---+---+---+-------+---+----+------+--------+-----+
| 10| 10|  1|     OO|ORD| 157|  8.18|      51|   27|
|  1|  4|  1|     OO|ORD| 466|  15.5|     102| null|
| 11| 22|  1|     OO|ORD| 738|  7.17|     127|  -19|
|  2| 14|  5|     B6|JFK|2248| 21.17|     365|   60|
|  5| 25|  3|     WN|SJC| 386| 12.92|      85|   22|
+---+---+---+-------+---+----+------+--------+-----+
only showing top 5 rows



In [9]:
print("Rows with NAs: ", flights.count())
flights = flights.dropna()
print("Rows after removing NAs: ", flights.count())

Rows with NAs:  275000
Rows after removing NAs:  258289


- removed about 17K rows with NAs

In [10]:
# split into train and test data
flights_train, flights_val, flights_test = flights.randomSplit([0.6, 0.2, 0.2], seed=42)

## Linear Regression (Just Duration)

### Pipeline Setup

In [11]:
assembler = VectorAssembler(inputCols=['mile'], outputCol="features")

### Model Setup

In [12]:
regression = LinearRegression(labelCol='duration')

In [13]:
pipeline_lr = Pipeline(stages=[assembler, regression])

In [14]:
pipeline_lr = pipeline_lr.fit(flights_train)

In [15]:
predictions_lr = pipeline_lr.transform(flights_val)

In [16]:
predictions_lr.select(['duration', 'prediction']).show(5)

+--------+------------------+
|duration|        prediction|
+--------+------------------+
|     175|134.21303811191063|
|     170|152.11301362630496|
|     250| 213.2408211652571|
|      65| 75.52060139124345|
|      90| 84.77501050072624|
+--------+------------------+
only showing top 5 rows



So we now have some predictions using just how many miles the flight was

In [17]:
rmse_lr = RegressionEvaluator(labelCol='duration', metricName='rmse').evaluate(predictions_lr)
print("RMSE for LR of just duration of flight: ", rmse_lr)

RMSE for LR of just duration of flight:  17.12497621729543


## Linear Regression (more features)

Let's add some more columns to try to better predict how long a flight will be

Look at
- dow (day of week)
- org (origin of the flight)
- carrier (airline who gave flight)
- mile (distance of flight)

### Pipeline Setup

In [21]:
indexer_dow = StringIndexer(inputCol='dow', outputCol='dow_idx')
indexer_org = StringIndexer(inputCol='org', outputCol='org_idx')
indexer_carrier = StringIndexer(inputCol='carrier', outputCol='carrier_idx')

onehot = OneHotEncoderEstimator(inputCols=['dow_idx', 'org_idx', 'carrier_idx'], 
                                outputCols=['dow_dummy', 'org_dummy', 'carrier_dummy'])

assembler = VectorAssembler(inputCols=['dow_dummy', 'org_dummy', 'carrier_dummy', 'mile'], 
                            outputCol='features')

### Model Setup

In [42]:
pipeline_lr_more_features = Pipeline(stages=[indexer_dow, 
                                             indexer_org,
                                             indexer_carrier, 
                                             onehot, assembler, 
                                             regression])

In [23]:
pipeline_lr_more_features = pipeline_lr_more_features.fit(flights_train)

In [24]:
predictions_lr_more_features = pipeline_lr_more_features.transform(flights_val)

In [25]:
predictions_lr_more_features.select('duration', 'prediction').show(5)

+--------+------------------+
|duration|        prediction|
+--------+------------------+
|     175| 158.6845406529181|
|     170|168.04581416725657|
|     250|228.47116576268348|
|      65| 72.88209181301005|
|      90| 82.03015301072409|
+--------+------------------+
only showing top 5 rows



In [28]:
rmse_lr_more_features = RegressionEvaluator(labelCol='duration', 
                                            metricName='rmse').evaluate(predictions_lr_more_features)
print("RMSE for LR with more features: ", rmse_lr_more_features)

RMSE for LR with more features:  10.76661416893813


## Grid Search

- do a grid search of the above LR with more features

In [43]:
pipeline_lr_more_features = Pipeline(stages=[indexer_dow, 
                                             indexer_org,
                                             indexer_carrier, 
                                             onehot, assembler, 
                                             regression])

In [44]:
params = ParamGridBuilder() \
            .addGrid(regression.regParam, [0.01, 0.1, 1, 10]) \
               .addGrid(regression.elasticNetParam, [0, 0.5, 1])
params = params.build()

evaluator = RegressionEvaluator(labelCol='duration', metricName='rmse')

In [45]:
cv = CrossValidator(estimator=pipeline_lr_more_features, 
                    estimatorParamMaps=params, 
                    evaluator=evaluator,
                    numFolds = 5)

In [46]:
start = datetime.now()
cv = cv.fit(flights_train)
print("Time to train: ", datetime.now() - start)

Time to train:  0:01:13.415644


In [47]:
best_model = cv.bestModel

In [50]:
predictions_cv = best_model.transform(flights_val)

In [53]:
rmse_cv = evaluator.evaluate(predictions_cv)
print("RMSE for LR with grid search: ", rmse_cv)

RMSE for LR with grid search:  10.766750293651768


- No big difference tuning the parameters for the LR

In [None]:
# Terminate the cluster
spark.stop()