# Spark Play 02: First ML algorithms with Spark

At the core of `pyspark.ml` are the `Transformer` and `Estimator` classes.

`Transformer` classes have a `.transform()` method that takes a DataFrame and returns a new DataFrame with columns added or existing columns transformed. Examples are the `Bucketizer` class for binning continuous data, or the `PCA` class for creating principal components.

`Estimator` classes have a `.fit()` method that takes a DataFrame and returns a model object. Examples are the `RandomForestModel` class, for fitting RF classification or regression, or the `StringIndexerModel` class, for including categorical data saved a string in your model (like factors in R?).

In [25]:
import numpy as np
import pandas as pd

from pyspark import SparkContext
from pyspark.sql import SparkSession

import pyspark.sql.functions as psf
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline # Object which lets us access all pyspark.ml's features in chained operations etc.

In [2]:
sc = SparkContext('local[*]')
spark = SparkSession.builder.getOrCreate()

In [3]:
flights = spark.read.csv("./data/dc/flights.csv", inferSchema=True, header=True)
planes = spark.read.csv("./data/dc/planes.csv", inferSchema=True, header=True)
airports = spark.read.csv("./data/dc/airports.csv", inferSchema=True, header=True)

--------------------------------------------------

Let's predict whether a flight will be delayed (classification).

In [4]:
planes.show(5)

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
only showing top 5 rows



In [5]:
planes = planes.withColumnRenamed("year", "plane_year") # flights has year column so kep this distinct

In [6]:
model_data = flights.join(planes, on="tailnum", how="leftouter")

Spark models require all data in numeric form. You can modify columns using `withColumn(colName, col)` as we've seen, but within that call you can use the `cast(dataType)` method on a column object to coerce its data to a different type:

In [8]:
model_data = model_data.withColumn("arr_delay", model_data.arr_delay.cast("integer"))
model_data = model_data.withColumn("air_time", model_data.air_time.cast("integer"))
model_data = model_data.withColumn("month", model_data.month.cast("integer"))
model_data = model_data.withColumn("plane_year", model_data.plane_year.cast("integer"))

We can engineer new columns using `withColumn()` too:

In [9]:
model_data = model_data.withColumn("plane_age", model_data.year - model_data.plane_year)

We need to create our response/target variable too: Was the flight delayed? Code as integer.

In [14]:
model_data = model_data.withColumn("is_late", model_data.arr_delay > 0)
model_data = model_data.withColumn("label", model_data.is_late.cast("integer")) # label is Spark's default name for response

I named the response variable `label` because that is Spark's default name for it in its ML routines.

We can remove missing values using SQL syntax like so:

In [16]:
model_data = model_data.filter("arr_delay is not NULL and dep_delay is not NULL and air_time is not NULL and plane_year is not NULL")

## One-hot encoding

We can use `StringIndexer` class to create integer variables from categorical data. There are several steps to this:
1. `StringIndexer` is an `Estimator` class that takes a DataFrame and maps numbers to unique values of its categorical variable
2. The `Estimator` returns a `Transformer` that takes a DataFrame and returns the DataFrame with an appended column of the mapped numbers
3. This new numeric variable can then be encoding as a one-hot vector using a `OneHotEncoder`. This works like the `StringIndexer`, by creating an `Estimator` followed by a `Transformer` to first _map_ the data to new encodings and then _apply_ those new encodings to the data.

Basically, you need a `StringIndexer` and then a `OneHotEncoder`.

Let's transform the carrier categorical variable as an example. In each step supply the input column name and a name for the new output column.

In [29]:
carrier_indexer = StringIndexer(inputCol="carrier", outputCol="carrier_index")
carrier_encoder = OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")

In [30]:
dest_indexer = StringIndexer(inputCol="dest", outputCol="dest_index")
dest_encoder = OneHotEncoder(inputCol="dest_index", outputCol="dest_fact")

## Creating a `Pipeline`

Every Spark ML routine expects your explanatory variables to combined into **one column** so that each observation has one target variable column (`label`) and one explanatory variable column (`features`) which contains a vector of explanatory variables for each observation.

To make this step happen, we can use the `VectorAssembler` transformer method, which takes a list of input columns and the new for the output column.

In [31]:
vec_assembler = VectorAssembler(inputCols=['month', 'air_time', 'carrier_fact', 'dest_fact', 'plane_age'],
                                outputCol='features')

The next step is to wrap up all the transformation and feature engineering steps, plus the vector assembly, into one `Pipeline` object which lets Spark handle all the detail.

In [32]:
model_pipe = Pipeline(stages=[carrier_indexer, carrier_encoder,
                              dest_indexer, dest_encoder,
                              vec_assembler])

Once you have your `Pipeline` sorted, you can pass your data through it to get your piped data, using `Pipeline`'s `.fit(dataset)` and `.transform(dataset)` methods.

In [33]:
piped_data = model_pipe.fit(model_data).transform(model_data)

## Test and train splits

**Always make train/test splits after all transformations**, because `StringIndexer` may give different indices even when given same list of strings. We can use the `randomSplit(weights, seed=None)` method on our piped data, like so:

In [34]:
train, test = piped_data.randomSplit([0.8, 0.2])