# Machine Learning Pipelines with PySpark

* [Overview](#Overview)
* [Data Preparation](#Data-Preparation)
    - Joining Tables
    - Converting Data Types
    - Assembling a Vector
* [Pipeline](#Pipeline)
* [Model](#Model)

## Overview

**ML Pipelines** provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines. The pipeline concept is mostly inspired by the _scikit-learn_ project.

ML API uses `DataFrame` from Spark SQL as an ML dataset, which can hold a variety of data types. `Transformer` and `Estimator` classes are the core of `pyspark.ml` module. Machine learning pipelines in Spark are made up of Transformers and Estimators.  A Transformer is an algorithm which can transform one DataFrame into another DataFrame. (E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.) An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. (E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.)

**Transformer** classes have a `.transform()` method that takes a DataFrame and returns a new DataFrame; usually the original one with a new column appended. For example, you might use the class `Bucketizer` to create discrete bins from a continuous feature or the class `PCA` to reduce the dimensionality of your dataset using principal component analysis.

**Estimator** classes implement a `.fit()` method which also takes a DataFrame, but instead of returning another DataFrame they return a model object. This can be a `StringIndexerModel` for including categorical data saved as strings in your models, or a `RandomForestModel` that uses the random forest algorithm for classification or regression. Technically, an Estimator implements a method `fit()`, which accepts a _DataFrame_ and produces a _Model_, which is a **Transformer**.

More on [Apache Spark](https://spark.apache.org/docs/latest/ml-pipeline.html#transformers).

## Data Preparation

* Joining Tables
* Converting Data Types
* Assembling a Vector

In [1]:
# Import necessary modules
from pyspark import SparkContext
from pyspark.sql import SparkSession

#Create an instance of the SparkContext class
sc = SparkContext(appName="myApp")

# Verify SparkContext
print(sc)

# Create spark (Make a new SparkSession called "spark")
spark = SparkSession.builder.getOrCreate()

# Print spark to verify it's a SparkSession
print(spark)

<SparkContext master=local[*] appName=myApp>
<pyspark.sql.session.SparkSession object at 0x116d0fef0>


In [3]:
# Path to the file
file_path1 = "data/flights.csv"
file_path2 = "data/planes.csv"

# Read in the data
flights = spark.read.csv(file_path1, header=True, inferSchema = True)
planes = spark.read.csv(file_path2, header=True, inferSchema = True)

# Show the data (first 3 rows)
flights.show(3)
planes.show(3)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
only showing top 3 rows

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manu

### Joining Tables

We'll be working to build a model that predicts _whether or not a flight will be delayed_ based on the `flights` data we've been working with. This model will also include information about the `plane` that flew the route. 

The first step is to **join** the two tables: `flights` and `planes`.

In [4]:
# Rename year column
planes = planes.withColumnRenamed("year", "plane_year")

# Join the DataFrames
model_data = flights.join(planes, on="tailnum", how="leftouter")

In [5]:
# Print the schema of DataFrame
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



### Converting Data Types

Spark needs **numeric values** (doubles or integers) to do machine learning. When we imported our data, we let Spark guess what kind of information each column held. However, Spark doesn't always guess right!

We can use the `.cast()` method in combination with the `.withColumn()` method to convert necessary columns to _integers_. Note that `.cast()` works on **columns**, while `.withColumn()` works on **DataFrames**.

The only argument we need to pass to `.cast()` is the type of value we want to create, in string form. For example, to create integers, we'll pass the argument "integer" and for decimal numbers we'll use "double".

We can put this call to `.cast()` inside a call to `.withColumn()` to overwrite the already existing column.

In [6]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

# Print the schema of DataFrame
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



`plane_year` column holds the year each plane was manufactured. However, the model will use the planes' age. Therefore we need to calculate the age and create a new column called "plane_age".

In [8]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

We'll need to create another column which indicates whether the flight was late or not. Moreover, we'll need to remove missing values.

In [9]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Convert to an integer
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

# Remove missing values
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

We'll also be using the airline and the plane's `carrier` and `destination` as features in the model. These are coded as _strings_ and we'll convert them to a _numeric_ data type. PySpark has functions for handling this built into the `pyspark.ml.features` submodule. We need to create a `StringIndexer` and a `OneHotEncoder`, and the `Pipeline` will take care of the rest.

1. The first step to encoding categorical features is to create a `StringIndexer`. Members of this class are `Estimators` that take a DataFrame with a column of strings and map each unique string to a number. Then, the Estimator returns a `Transformer` that takes a DataFrame, attaches the mapping to it as metadata, and returns a new DataFrame with a numeric column corresponding to the string column. (StringIndexer encodes a string column of labels to a column of label indices.)

2. The second step is to encode this numeric column as a one-hot vector using a `OneHotEncoder`. This works exactly the same way as the `StringIndexer` by creating an Estimator and then a Transformer. The end result is a column that encodes  categorical features as a vector that's suitable for machine learning routines. (One-hot encoding maps a column of label indices to a column of binary vectors, with at most a single one-value. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.)

In [10]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

# Create a StringIndexer
carr_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")

# Create a OneHotEncoder
carr_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

# Create a StringIndexer
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")

# Create a OneHotEncoder
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

### Assembling a Vector

Spark modeling routine expects the data to be in a combined  vector form.  The `pyspark.ml.feature` submodule contains a _Transformer_ called `VectorAssembler` which takes all of the specified columns and combines them into a new vector column.

In [11]:
from pyspark.ml.feature import VectorAssembler

# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")

## Pipeline

A **Pipeline** combines all the `Estimators` and `Transformers` that we've created. 

In [12]:
# Import Pipeline
from pyspark.ml import Pipeline

# Make the pipeline
flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

In Spark it's important to make sure to **split the data after all the transformations**. This is because operations like `StringIndexer` don't always produce the same index even when given the same list of strings.

In [15]:
# Fit and transform the data
piped_data = flights_pipe.fit(model_data).transform(model_data)

# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

## Model

We'll fit a **logistic regression** model and tune it using a k-fold cross validation as a method of estimating the model's performance. PySpark's default $k$ value for the k-fold CV is three.

In [16]:
# Import LogisticRegression
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

`pyspark.ml.evaluation` is used for evaluating different kinds of models. Our model is a _binary classification model_, so we'll be using the `BinaryClassificationEvaluator` from the `pyspark.ml.evaluation` module. This evaluator calculates the area under the ROC (AUC). With that metric, we can compare different models. 

In [17]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

We'll be using cross validation to choose the **hyperparameters** by creating a grid of the possible pairs of values for the two hyperparameters, `elasticNetParam` and `regParam`, and using the _cross validation error_ to compare all the different models so you can choose the best one.

The submodule `pyspark.ml.tuning` includes a class called `ParamGridBuilder` to create a grid of values to search over when looking for the optimal hyperparameters. We'll need to use the `.addGrid()` and `.build()` methods to create a grid that we can use for cross validation.

In [19]:
# Import the tuning submodule
import pyspark.ml.tuning as tune

import numpy as np

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

`pyspark.ml.tuning` also has a class called `CrossValidator` for performing cross validation. 

In [21]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator
               )

### Fitting a Model

In [22]:
# Fit cross validation models
models = cv.fit(training)

# Extract the best model
best_lr = models.bestModel

In [23]:
print(best_lr)

LogisticRegressionModel: uid = LogisticRegression_64a3d8e73a19, numClasses = 2, numFeatures = 81


Cross validation selected the parameter values `regParam=0` and `elasticNetParam=0` as being the best. These are the default values, so we don't need to do anything else with lr before fitting the model.

In [24]:
# Call lr.fit() to fit a Spark model
best_lr = lr.fit(training)

# Print best_lr
print(best_lr)

LogisticRegressionModel: uid = LogisticRegression_64a3d8e73a19, numClasses = 2, numFeatures = 81


### Evaluating the Model (AUC)

In [25]:
# Use the model to predict the test set
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.6898700478611577
