# 3. Machine Learning Pipelines

This series is based on the Datacamp course [Introduction to PySpark](https://app.datacamp.com/learn/courses/introduction-to-pyspark). The course has the following chapters:

1. Basics: Getting to know PySpark
2. Manipulating data
3. **Getting started with machine learning pipelines**: The current notebook.

In this notebook, basic data processing is perform in form of a pipeline and a logistic regression model is trained with grid search.

**Table of Contents:**

- [Setup: Create Session + Upload Data](#Setup:-Create-Session-+-Upload-Data)
- [3.1 Introduction to Machine Learning in Spark](#3.1-Introductio-to-Machine-Learning-in-Spark)
- [3.2 Data Processing Pipeline](#3.2-Data-Processing-Pipeline)
    - Join tables
    - Cast Types
    - New Features/Columns
    - Remove Missing Values
    - Encode Categoricals
    - Assemble a Vector and Create a Pipeline
    - Fit and Transform the Pipeline
    - Train/Test Split
- [3.3 Model Tuning and Selection](#3.3-Model-Tuning-and-Selection)
    - Instantiate Logistic Regression Model
    - Instantiate Evaluation Metric
    - Instantiate Parameter Grid
    - Cross Validation Object
    - Fit the Model with Grid Search
    - Evaluate the Model

## Setup: Create Session + Upload Data

In [1]:
import findspark
findspark.init()

In [2]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create or get a (new) SparkSession: session
session = SparkSession.builder.getOrCreate()

# Print session: our SparkSession
print(session)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/15 16:21:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/15 16:21:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


<pyspark.sql.session.SparkSession object at 0x1321ff1d0>


In [3]:
# Load and register flights dataframe
flights = session.read.csv("../data/flights_small.csv", header=True, inferSchema=True)
flights.createOrReplaceTempView("flights")

# Load and register airports dataframe
airports = session.read.csv("../data/airports.csv", header=True, inferSchema=True)
airports.createOrReplaceTempView("airports")

# Load and register planes dataframe
planes = session.read.csv("../data/planes.csv", header=True, inferSchema=True)
planes.createOrReplaceTempView("planes")

print(session.catalog.listTables())

                                                                                

[Table(name='airports', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='flights', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='planes', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]


## 3.1 Introduction to Machine Learning in Spark

We have two types of classes defined in the module `pyspark.ml`:

- `Transformer` classes: they take a Spark SQL Dataframe and `.transform()` it to yield a new Spark SQL Dataframe.
- `Estimator` classes: they take a Spark SQL Dataframe and `.fit()` a model to it to deliver back an object, which can be a trained `Transformer` ready to `transform()` the data. For instance, a model is an `Estimator` which returns a `Transformer`; then, scoring a model consists in calling `transform()` on the returned `Transformer` using the desired dataset.

## 3.2 Data Processing Pipeline

In this section, basic and typical data processing steps are carried out on the loaded datasets. In spark, feature engineering is done with Pipelines. Shown steps:

- Join tables.
- Cast types: numeric values are required for modeling.
- New Features/Columns
- Remove Missing Values
- Encode Categoricals
- Assemble a Vector and Create a Pipeline
- Fit and Transform the Pipeline
- Train/Test Split

### Join

In [4]:
# Rename year column
planes = planes.withColumnRenamed("year", "plane_year")

# Join the DataFrames
model_data = flights.join(planes, on="tailnum", how="leftouter")

### Cast Types

In [5]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: string (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



In [6]:
# Cast the columns to integers
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

In [7]:
model_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)



### New Features/Columns

In [17]:
# Create the column plane_age
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

In [18]:
# Create is_late
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)

# Convert to an integer: booleans need to be converted to integers, too
model_data = model_data.withColumn("label", model_data.is_late.cast("integer"))

### Remove Missing Values

In [20]:
# Remove missing values
model_data = model_data.filter("""arr_delay is not NULL 
                                  and dep_delay is not NULL
                                  and air_time is not NULL
                                  and plane_year is not NULL""")

### Encode Categoricals

We need to instantiate `StringIndexer` to map all unique categorical levels to numbers and a `OneHotEncoder` to create dummy variables from the numbers. All these objects need to be instantiated and arranged in a vector which is then `fit()` on the dataframe. After that, we can `transform()` the data.

In [22]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

In [23]:
# Create a StringIndexer: Estimator that needs to be fit() and returns a Transformer
# StringIndexer: map all unique categorical levels to numbers
carr_indexer = StringIndexer(inputCol="carrier",
                             outputCol="carrier_index")

# Create a OneHotEncoder: Estimator that needs to be fit() and returns a Transformer
carr_encoder = OneHotEncoder(inputCol="carrier_index",
                             outputCol="carrier_fact")

In [24]:
# Create a StringIndexer: Estimator that needs to be fit() and returns a Transformer
# StringIndexer: map all unique categorical levels to numbers
dest_indexer = StringIndexer(inputCol="dest",
                             outputCol="dest_index")

# Create a OneHotEncoder: Estimator that needs to be fit() and returns a Transformer
dest_encoder = OneHotEncoder(inputCol="dest_index",
                             outputCol="dest_fact")

### Assemble a Vector and Create a Pipeline

In [25]:
# Make a VectorAssembler: Transformer
vec_assembler = VectorAssembler(inputCols=["month",
                                           "air_time",
                                           "carrier_fact",
                                           "dest_fact",
                                           "plane_age"],
                                outputCol="features")

In [26]:
# Make the pipeline: we append in series all the Estimator/Transformer objects
# and the VectorAssembler
flights_pipe = Pipeline(stages=[dest_indexer,
                                dest_encoder,
                                carr_indexer,
                                carr_encoder,
                                vec_assembler])

### Fit and Transform the Pipeline

In [27]:
# Fit and transform the data:
# - first, the Estimators are fit, which generate trained Transformers
# - then, the dataset is passed through the trained Transformers
piped_data = flights_pipe.fit(model_data).transform(model_data)

                                                                                

In [28]:
piped_data.printSchema()

root
 |-- tailnum: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- plane_year: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- engines: integer (nullable = true)
 |-- seats: integer (nullable = true)
 |-- speed: string (nullable = true)
 |-- engine: string (nullable = true)
 |-- plane_age: integer (nullable = true)
 |-- is_late: boole

### Train/Test Split

In [29]:
# Split the data into training and test sets
# train 60%, test 40%
# Always split after the complete dataset has been processed!
training, test = piped_data.randomSplit([.6, .4])

## 3.3 Model Tuning and Selection

In this section, a logistic regression model is tuned and trained.

### Instantiate Logistic Regression Model

In [30]:
# Import LogisticRegression: Estimator
from pyspark.ml.classification import LogisticRegression

# Create a LogisticRegression Estimator
lr = LogisticRegression()

### Instantiate Evaluation Metric

In [31]:
# Import the evaluation submodule
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

### Instantiate Parameter Grid

In [32]:
# Import the tuning submodule
import numpy as np
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameters to be tried in the grid
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

### Cross Validation Object

In [33]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
                         estimatorParamMaps=grid,
                         evaluator=evaluator)

### Fit the Model with Grid Search

In [34]:
# Fit cross validation models
models = cv.fit(training)

25/04/15 17:01:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/04/15 17:02:04 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


In [35]:
# Extract the best model
best_lr = models.bestModel

In [36]:
# We can also train the model
# without cross validation and grid search
not_best_lr = lr.fit(training)

In [37]:
# Print best_lr
print(best_lr)

LogisticRegressionModel: uid=LogisticRegression_6c9efb1e80cd, numClasses=2, numFeatures=81


### Evaluate the Model

In [38]:
# Use the model to predict the test set
# Note that the model does not have a predict() function
# but it transforms() the data into predictions!
test_results = best_lr.transform(test)

# Evaluate the predictions
print(evaluator.evaluate(test_results))

0.6978154007164993


25/04/16 00:37:54 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 944742 ms exceeds timeout 120000 ms
25/04/16 00:37:54 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/16 00:38:00 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o