## Spark cluster (standalone) - Prediction with a Pipeline notebook

> Dockerized env : [JupyterLab server => Spark (master <-> 1 worker) ]  

`docker-compose.yml` was (slightly) adapted from this [article](https://towardsdatascience.com/first-steps-in-machine-learning-with-apache-spark-672fe31799a3), whereras original notebook was heavily modified :   
-random forest regressor instead of the article's linreg  
-use of a Pipeline (pyspark.ml.pipeline) to streamline the whole prediction process  
-no more sql-type queries (personal preferences)

#### Connect to Spark

Reminder (as defined in docker-compose.yml) :
- This notebook : http://localhost:8888
- Access Master http://localhost:8080
- Access Worker http://localhost:8081

In [7]:
from pyspark.sql import SparkSession


# SparkSession
URL_SPARK = "spark://spark:7077"

spark = (
    SparkSession.builder
    .appName("spark-ml")
    .config("executor.memory", "4g")
    .master(URL_SPARK)
    .getOrCreate()
)

### Run example - pyspark.sql / pyspark.ml, build a ML Pipeline

On Avocado dataset (how original). If you cloned git repo, is in /data, else go Kaggle

#### Load data

*Quick desc / scope of dataset :*  
No EDA, this exercise have been made a million times
Years 2015 to 2018  
Two avocado types : organic or conventional  
Region = region of consumption  
Avocado sizes (PLU): 4046 (small-medium), 4225 (large), 4770 (x-large), expressed in total # of sold avocados

In [8]:
# Cache table/dataframe for re-usable table with .cache()
# caching operation takes place only when a Spark action (count, show, take or write) is also performed on the same dataframe
df = spark.read.csv(
  "data/avocado.csv", 
  header=True, 
  inferSchema=True
).cache() # cache transformation

df.printSchema()
df.show(4) # call show() from the cached df_avocado. df_avocado cached in memory right after we call the action (show)

root
 |-- _c0: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- AveragePrice: double (nullable = true)
 |-- Total Volume: double (nullable = true)
 |-- 4046: double (nullable = true)
 |-- 4225: double (nullable = true)
 |-- 4770: double (nullable = true)
 |-- Total Bags: double (nullable = true)
 |-- Small Bags: double (nullable = true)
 |-- Large Bags: double (nullable = true)
 |-- XLarge Bags: double (nullable = true)
 |-- type: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- region: string (nullable = true)

+---+-------------------+------------+------------+-------+---------+-----+----------+----------+----------+-----------+------------+----+------+
|_c0|               Date|AveragePrice|Total Volume|   4046|     4225| 4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|
+---+-------------------+------------+------------+-------+---------+-----+----------+----------+----------+-----------+------------+----+------+
|  0|

#### Steps overview

In [9]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col

from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.pipeline import Pipeline

- Steps differs a bit from sklearn. Search for 'transformers' and 'estimators'
- No EDA, has been done a million times on this dataset. 
- Format data  
-Feature creation from 'Date' : yy and mm  
-Drop columns : Total Bags, Total Volume (strong corr with respective subcategories) ; could also be done in pipeline tho ?
- Pipeline (encode etc...)  
-One hot encoding categorical 'region' (before that, use StringIndexer)   
-Drop transformed columns:  Date, region. Note : unlike scikit-learn col transf, pyspark adds new col when transforming    
- Consolidate all remaining features in a single vector using VectorAssembler
- Scale numerical features using StandardScaler <- would be earlier in a sklearn pipeline
- Predict

#### Format Data

In [10]:
# convert 'year' yyyy to yy (yyyy - 2000, since we have 2015-2018 values)
df = df.withColumn('Year Index', col('Year') - 2000)

# extract month from 'Date' timestamps col
df = df.withColumn('Month', F.month('Date'))

# drop "useless" columns ; from multiple notebooks on this particular dataset : "Total Bags", "Total Volume" & index (c_0)
# /!\ Optional : not really useful tho, as we will assemble a features (the ones we're interested in) vector later in the pipeline
drop_cols =  ("Total Bags", "Total Volume", "_c0", "Date") # we keep "year" just to show we do not need to drop it.
df = df.drop(*drop_cols)

# rename columns avocado sizes columns for clarity 
df = df.withColumnRenamed("4046", "Medium Size").withColumnRenamed("4225", "Large Size").withColumnRenamed("4770", "XLarge Size")
df.show(4)

+------------+-----------+----------+-----------+----------+----------+-----------+------------+----+------+----------+-----+
|AveragePrice|Medium Size|Large Size|XLarge Size|Small Bags|Large Bags|XLarge Bags|        type|year|region|Year Index|Month|
+------------+-----------+----------+-----------+----------+----------+-----------+------------+----+------+----------+-----+
|        1.33|    1036.74|  54454.85|      48.16|   8603.62|     93.25|        0.0|conventional|2015|Albany|        15|   12|
|        1.35|     674.28|  44638.81|      58.33|   9408.07|     97.49|        0.0|conventional|2015|Albany|        15|   12|
|        0.93|      794.7| 109149.67|      130.5|   8042.21|    103.14|        0.0|conventional|2015|Albany|        15|   12|
|        1.08|     1132.0|  71976.41|      72.58|    5677.4|    133.76|        0.0|conventional|2015|Albany|        15|   12|
+------------+-----------+----------+-----------+----------+----------+-----------+------------+----+------+----------

#### Build Pipeline (encode, vectorize, scaling)

In [14]:
# 1. Must use StringIndexer before encoding categorical features. (OHE input is category indices)
str_indexer = StringIndexer(inputCols=['region','type'], outputCols=['region_str', 'type_str'])

# 2. Encoder categorical features. 
# Unlike sklearn, transformations add new columns, keeping inputs. So we keep track of changes with outp cols names.
#  Spark OHE is different from sklearn’s OneHotEncoder which keeps all categories. The output vectors are sparse
ohe = OneHotEncoder(
    inputCols=['region_str','type_str'], 
    outputCols=['region_str_ohe','type_str_ohe']
)

# 3. Assemble (used) features in one vector
assembler = VectorAssembler(
    inputCols=[
        'Medium Size',
        'Large Size',
        'XLarge Size',
        'Small Bags',
        'Large Bags',
        'XLarge Bags',
        'region_str_ohe',
        'type_str_ohe'
    ], outputCol='features')

# 4. Standardize numerical features
scaler = StandardScaler(inputCol='features',outputCol='scaled_features')

# 5. define regressor
rf_regressor = RandomForestRegressor(featuresCol='scaled_features',labelCol='AveragePrice', numTrees=50, maxDepth=15)

# 6. build Pipeline
pipeline = Pipeline(stages=[str_indexer, ohe, assembler, scaler, rf_regressor])


#### Simple randomforest modeling : train-test split, apply Pipeline to train & test, eval

Crude attempt, no cv, some default rf parameters.  
For parameters tuning, look up for pyspark.ml.tuning  / CrossValidator, ParamGridBuilder. Not used here

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator

# split
train, test = df.randomSplit([.8,.2])

# fit the model
model = pipeline.fit(train)


# apply the model to the test set
prediction = model.transform(test)
eval = RegressionEvaluator(predictionCol='prediction',
                                       labelCol='AveragePrice', metricName='rmse')

eval.evaluate(prediction)

0.1975694758480664

For reference, original article, using Linear regression + cv : rmse of .28