<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Objectives" data-toc-modified-id="Objectives-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Objectives</a></span></li><li><span><a href="#Set-Up-Spark-Context" data-toc-modified-id="Set-Up-Spark-Context-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Set Up Spark Context</a></span></li><li><span><a href="#Loading-and-Preprocessing-the-Example-Data" data-toc-modified-id="Loading-and-Preprocessing-the-Example-Data-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Loading and Preprocessing the Example Data</a></span><ul class="toc-item"><li><span><a href="#Process-the-Features" data-toc-modified-id="Process-the-Features-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>Process the Features</a></span></li></ul></li><li><span><a href="#Train-and-Predict-with-Random-Forest" data-toc-modified-id="Train-and-Predict-with-Random-Forest-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Train and Predict with Random Forest</a></span></li><li><span><a href="#Evaluate-the-Model" data-toc-modified-id="Evaluate-the-Model-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Evaluate the Model</a></span></li><li><span><a href="#Using-Pipeline-and-Performing-a-Grid-Search-for-Optimal-Parameters" data-toc-modified-id="Using-Pipeline-and-Performing-a-Grid-Search-for-Optimal-Parameters-6"><span class="toc-item-num">6&nbsp;&nbsp;</span>Using Pipeline and Performing a Grid Search for Optimal Parameters</a></span><ul class="toc-item"><li><span><a href="#Evaluate-with-Cross-Validation-to-Find-Optimal-Model" data-toc-modified-id="Evaluate-with-Cross-Validation-to-Find-Optimal-Model-6.1"><span class="toc-item-num">6.1&nbsp;&nbsp;</span>Evaluate with Cross Validation to Find Optimal Model</a></span></li></ul></li></ul></div>

<a href="https://colab.research.google.com/github/flatiron-school/DS-Live-022122/blob/main/Phase4/62-spark-ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Run for Google Colab environment
!pip install pyspark==3.2.1
!apt install openjdk-8-jdk-headless -qq
!pip install mlflow

Collecting pyspark==3.2.1
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=9d9f1e953d50286491a4902cf8df32133d1101cfaa77a2ba31f1654bdc2dcd93
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
The following packages were automatically installed and are no longer required:
  libnvidia-common-460 nsight-compute-2020.2.0
Use 'apt autoremove' to remove them.
The following additional packages will be insta

In [2]:
import pyspark
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import feature
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [3]:
# Get data directly from repo
!wget https://github.com/flatiron-school/ds-spark/releases/download/v1.0/US_births_2000-2014_SSA.csv

--2022-04-29 18:05:36--  https://github.com/flatiron-school/ds-spark/releases/download/v1.0/US_births_2000-2014_SSA.csv
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/379727666/12461180-d431-11eb-8163-e15e52afc9a9?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20220429%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20220429T180536Z&X-Amz-Expires=300&X-Amz-Signature=08bd7e5d78c0d289a3432208de18ac096cb8ee8670e2eda91e4e712e0465c204&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=379727666&response-content-disposition=attachment%3B%20filename%3DUS_births_2000-2014_SSA.csv&response-content-type=application%2Foctet-stream [following]
--2022-04-29 18:05:36--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/379727666/12461180-d431-1

# Objectives

- Use `pyspark` to build machine learning models

# Set Up Spark Context

In [5]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Loading and Preprocessing the Example Data

This example assumes that we have a holdout validation dataset somewhere else, so we don't need to perform a train-test split, we only need to perform cross validation

In [6]:
# Load the file since we downloaded it earlie
df = spark.read.format('csv').option('header', 'true').\
load('US_births_2000-2014_SSA.csv')

In [8]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5479 entries, 0 to 5478
Data columns (total 5 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   year           5479 non-null   object
 1   month          5479 non-null   object
 2   date_of_month  5479 non-null   object
 3   day_of_week    5479 non-null   object
 4   births         5479 non-null   object
dtypes: object(5)
memory usage: 214.1+ KB


In [9]:
df.toPandas().head(3)

Unnamed: 0,year,month,date_of_month,day_of_week,births
0,2000,1,1,6,9083
1,2000,1,2,7,8006
2,2000,1,3,1,11363


## Process the Features

In [11]:
# Fix Types
df = df.withColumn('births',df['births'].cast('int'))

In [14]:
df.dtypes

[('year', 'string'),
 ('month', 'string'),
 ('date_of_month', 'string'),
 ('day_of_week', 'string'),
 ('births', 'int')]

In [15]:
df = df.withColumn('births',df['births'].cast('int'))
df = df.withColumn('day_of_week',df['day_of_week'].cast('int'))
df = df.withColumn('date_of_month',df['date_of_month'].cast('int'))
df = df.withColumn('month',df['month'].cast('int'))
df = df.withColumn('year',df['year'].cast('int'))

In [16]:
df.dtypes

[('year', 'int'),
 ('month', 'int'),
 ('date_of_month', 'int'),
 ('day_of_week', 'int'),
 ('births', 'int')]

In [18]:
df.show()

+----+-----+-------------+-----------+------+
|year|month|date_of_month|day_of_week|births|
+----+-----+-------------+-----------+------+
|2000|    1|            1|          6|  9083|
|2000|    1|            2|          7|  8006|
|2000|    1|            3|          1| 11363|
|2000|    1|            4|          2| 13032|
|2000|    1|            5|          3| 12558|
|2000|    1|            6|          4| 12466|
|2000|    1|            7|          5| 12516|
|2000|    1|            8|          6|  8934|
|2000|    1|            9|          7|  7949|
|2000|    1|           10|          1| 11668|
|2000|    1|           11|          2| 12611|
|2000|    1|           12|          3| 12398|
|2000|    1|           13|          4| 11815|
|2000|    1|           14|          5| 12180|
|2000|    1|           15|          6|  8525|
|2000|    1|           16|          7|  7657|
|2000|    1|           17|          1| 10824|
|2000|    1|           18|          2| 12350|
|2000|    1|           19|        

In [20]:
# OHE!
ohe = feature.OneHotEncoder(
    inputCols = ['date_of_month','day_of_week'],
    outputCols=['date_vec','day_vec'],
    dropLast=True
    )

In [21]:
one_hot_encoded = ohe.fit(df).transform(df)

In [22]:
one_hot_encoded.show()

+----+-----+-------------+-----------+------+---------------+-------------+
|year|month|date_of_month|day_of_week|births|       date_vec|      day_vec|
+----+-----+-------------+-----------+------+---------------+-------------+
|2000|    1|            1|          6|  9083| (31,[1],[1.0])|(7,[6],[1.0])|
|2000|    1|            2|          7|  8006| (31,[2],[1.0])|    (7,[],[])|
|2000|    1|            3|          1| 11363| (31,[3],[1.0])|(7,[1],[1.0])|
|2000|    1|            4|          2| 13032| (31,[4],[1.0])|(7,[2],[1.0])|
|2000|    1|            5|          3| 12558| (31,[5],[1.0])|(7,[3],[1.0])|
|2000|    1|            6|          4| 12466| (31,[6],[1.0])|(7,[4],[1.0])|
|2000|    1|            7|          5| 12516| (31,[7],[1.0])|(7,[5],[1.0])|
|2000|    1|            8|          6|  8934| (31,[8],[1.0])|(7,[6],[1.0])|
|2000|    1|            9|          7|  7949| (31,[9],[1.0])|    (7,[],[])|
|2000|    1|           10|          1| 11668|(31,[10],[1.0])|(7,[1],[1.0])|
|2000|    1|

In [23]:
one_hot_encoded.toPandas().head()

Unnamed: 0,year,month,date_of_month,day_of_week,births,date_vec,day_vec
0,2000,1,1,6,9083,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)"
1,2000,1,2,7,8006,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
2,2000,1,3,1,11363,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)"
3,2000,1,4,2,13032,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)"
4,2000,1,5,3,12558,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)"


Note the 'SparseVector' we've created!

The Vector Assembler is often what we want when we're building a model in Spark. [How does the VectorAssembler work?](https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler)

In [27]:
# Create the vector

features = ['year', 'month', 'date_vec', 'day_vec']

target = 'births'

vector = VectorAssembler(inputCols=features, outputCol='features')

vectorized_df = vector.transform(one_hot_encoded)

In [28]:
vectorized_df.toPandas().head()

Exception ignored in: <function JavaWrapper.__del__ at 0x7f309d6fb830>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/pyspark/ml/wrapper.py", line 39, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


Unnamed: 0,year,month,date_of_month,day_of_week,births,date_vec,day_vec,features
0,2000,1,1,6,9083,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0)","(2000.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...."
1,2000,1,2,7,8006,"(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(2000.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0...."
2,2000,1,3,1,11363,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0)","(2000.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0...."
3,2000,1,4,2,13032,"(0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0)","(2000.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0...."
4,2000,1,5,3,12558,"(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0)","(2000.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0...."


# Train and Predict with Random Forest

In [29]:
# Instantiante
rfc = RandomForestRegressor(featuresCol = 'features',
                            labelCol='births',
                            predictionCol='prediction').fit(vectorized_df)

In [30]:
# Predictions
predictions = rfc.transform(vectorized_df).select('births','prediction')
predictions.head(3)

[Row(births=9083, prediction=8616.484662015031),
 Row(births=8006, prediction=10418.488388608532),
 Row(births=11363, prediction=11010.538360520879)]

# Evaluate the Model

Let's evaluate our model! [Here](https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html) is a reference for the many metrics available in Spark.

In [35]:
# Create it
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='births')

In [34]:
# Evaluate it!
evaluator.evaluate(predictions, {evaluator.metricName:"r2"})

0.5923653168252127

# Using Pipeline and Performing a Grid Search for Optimal Parameters

In [36]:
# Instantiante and create steps
one_hot_encoder = OneHotEncoder(inputCols=['date_of_month',
                                                'day_of_week'],
                                     outputCols=['date_vec',
                                                  'day_vec'],
                                     dropLast=True)

features = ['year', 'month', 'date_vec', 'day_vec']

target = 'births'


vector_assember = VectorAssembler(inputCols=features,
                                  outputCol='features')

random_forest = RandomForestRegressor(featuresCol='features',
                                      labelCol='births')

In [37]:
# Create Pipeline stages
stages = [one_hot_encoder, vector_assember,random_forest]

In [38]:
#Instantiate pipeline
pipeline = Pipeline(stages = stages)

Note: The stages in a pipeline can be either *Transformers* or *Estimators*. An estimator fits a DataFrame to produce a Transformer.

In [39]:
# Get possible params
random_forest.params

[Param(parent='RandomForestRegressor_4e96bb9176b8', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'),
 Param(parent='RandomForestRegressor_4e96bb9176b8', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='RandomForestRegressor_4e96bb9176b8', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='RandomForestRegressor_4e96bb9176b8', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto'

In [41]:
# Build parameter grid
params = ParamGridBuilder() \
.addGrid(random_forest.maxDepth, [2,4,6]) \
.addGrid(random_forest.numTrees, [20,50,100]) \
.addGrid(random_forest.seed, [1,42]) \
.build()

In [43]:
params

[{Param(parent='RandomForestRegressor_4e96bb9176b8', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
  Param(parent='RandomForestRegressor_4e96bb9176b8', name='numTrees', doc='Number of trees to train (>= 1).'): 20,
  Param(parent='RandomForestRegressor_4e96bb9176b8', name='seed', doc='random seed.'): 1},
 {Param(parent='RandomForestRegressor_4e96bb9176b8', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
  Param(parent='RandomForestRegressor_4e96bb9176b8', name='numTrees', doc='Number of trees to train (>= 1).'): 20,
  Param(parent='RandomForestRegressor_4e96bb9176b8', name='seed', doc='random seed.'): 42},
 {Param(parent='RandomForestRegressor_4e96bb9176b8', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth

In [44]:
# Build Evaluator
reg_evaluator = RegressionEvaluator(predictionCol='prediction',labelCol='births',
                                    metricName='mae')

## Evaluate with Cross Validation to Find Optimal Model

In [47]:
# Cross Validatate!
cv = CrossValidator(estimator =pipeline,
               estimatorParamMaps=params,
               evaluator=reg_evaluator,
               parallelism=4)

In [48]:
cross_validated_model = cv.fit(df.cache())

In [50]:
cross_validated_model.avgMetrics

[1576.7857779734686,
 1563.9639095398459,
 1556.3662408176629,
 1535.8115107951228,
 1516.6919071374473,
 1524.1199211996018,
 1244.4413692713872,
 1234.6798564835847,
 1248.1422879084068,
 1251.237382952957,
 1232.3114406907478,
 1249.1002299535885,
 909.1545635884136,
 985.1731077997081,
 939.0724445886767,
 965.0972863210757,
 942.1707822998733,
 942.0539475574835]

In [52]:
cross_validated_model.bestModel.stages

[OneHotEncoderModel: uid=OneHotEncoder_448a1707916c, dropLast=true, handleInvalid=error, numInputCols=2, numOutputCols=2,
 VectorAssembler_f7fb6b0b4b9c,
 RandomForestRegressionModel: uid=RandomForestRegressor_4e96bb9176b8, numTrees=20, numFeatures=40]

In [66]:
cross_validated_model.bestModel.stages[2].getMaxDepth()

6

In [78]:
cross_validated_model.bestModel.stages[2].numFeatures

40

In [79]:
cross_validated_model.bestModel.stages[2].numTrees

Param(parent='RandomForestRegressor_4e96bb9176b8', name='numTrees', doc='Number of trees to train (>= 1).')

In [None]:
cross_validated_model.bestModel.stages[2].numFeatures