# Training a Machine Learning Model in Spark

## Connect to Spark

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
# SparkSession
URL_SPARK = "spark://spark:7077"

spark = (
    SparkSession.builder
    .appName("spark-ml")
    .config("executor.memory", "4g")
    .master(URL_SPARK)
    .getOrCreate()
)

## Load Data

In [3]:
df_avocado = spark.read.csv("/data/avocado.csv", header=True, inferSchema=True)
# cache data
df_avocado.cache()
df_avocado.show(4)

+---+-------------------+------------+------------+-------+---------+-----+----------+----------+----------+-----------+------------+----+------+
|_c0|               Date|AveragePrice|Total Volume|   4046|     4225| 4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|
+---+-------------------+------------+------------+-------+---------+-----+----------+----------+----------+-----------+------------+----+------+
|  0|2015-12-27 00:00:00|        1.33|    64236.62|1036.74| 54454.85|48.16|   8696.87|   8603.62|     93.25|        0.0|conventional|2015|Albany|
|  1|2015-12-20 00:00:00|        1.35|    54876.98| 674.28| 44638.81|58.33|   9505.56|   9408.07|     97.49|        0.0|conventional|2015|Albany|
|  2|2015-12-13 00:00:00|        0.93|   118220.22|  794.7|109149.67|130.5|   8145.35|   8042.21|    103.14|        0.0|conventional|2015|Albany|
|  3|2015-12-06 00:00:00|        1.08|    78992.15| 1132.0| 71976.41|72.58|   5811.16|    5677.4|    133.76|        0.0|conv

## Preprocess data

In [88]:
# Import machine learning libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Machine learning pipeline
from pyspark.ml import Pipeline

# Import SQL transformer
from pyspark.ml.feature import SQLTransformer, StandardScaler, StringIndexer

In [89]:
df_avocado_train, df_avocado_test = df_avocado.randomSplit([0.75, 0.25], seed=214)

**Selecting columns with SQLTransformer**

This is a quite powerful transformer, which allows you to select and transform columns using SQL syntax.

In [90]:
COLUMNS = ['AveragePrice', 'type']
COLUMNS = [f"`{col}`" for col in COLUMNS]

LOG_COLUMNS =  ['4225', '4770', 'Small Bags', 'Large Bags', 'XLarge Bags']
LOG_COLUMNS = [f"LOG(`{col}`+1) AS `LOG {col}`" for col in LOG_COLUMNS]

sql_trans = SQLTransformer(
    statement=f"""
    
    SELECT
    {', '.join(COLUMNS)}
    , {', '.join(LOG_COLUMNS)}
    ,YEAR(__THIS__.Date)-2000 AS year
    ,MONTH(__THIS__.Date) AS month

    FROM __THIS__

    """
)

# Visualize the data
sql_trans.transform(df_avocado_train).show(4)

+------------+------------+------------------+------------------+------------------+------------------+-----------------+----+-----+
|AveragePrice|        type|          LOG 4225|          LOG 4770|    LOG Small Bags|    LOG Large Bags|  LOG XLarge Bags|year|month|
+------------+------------+------------------+------------------+------------------+------------------+-----------------+----+-----+
|        0.49|conventional|12.566747374652527| 9.362499927974252|11.166908098190957|10.313733879047971|              0.0|  15|   12|
|        0.71|conventional|11.860764002611406| 9.647818872531012| 11.72123326879331| 10.40627082310141|9.322865162818028|  15|   12|
|         0.8|conventional| 12.53017497505446|11.349393905288467|11.824526973139381| 9.415621332905047|9.658771095406955|  15|   12|
|         0.8|conventional|13.028501871764691|11.364461534887267|13.490872079413348| 11.21667384527801|9.342104328605496|  15|   12|
+------------+------------+------------------+------------------+----

**MinMaxScaler**

The MinMaxScaler is a trasformer of type estimator. This kind of object needs to be fitted to the data before being used to transform it. So, the conlumns need to be converted to a vector first.

The MinMaxScaler is a transformer that scales each feature individually to a given range (often [0, 1]).

In [91]:
from pyspark.ml.feature import MinMaxScaler

month_vec_ass = VectorAssembler(inputCols=['month'], outputCol='month_vec')
month_vec_ass.transform(sql_trans.transform(df_avocado_train))

df_avocado_month_ass = month_vec_ass.transform(sql_trans.transform(df_avocado_train))

month_scaler = MinMaxScaler(inputCol='month_vec', outputCol='month_scaled')
month_scaler = month_scaler.fit(df_avocado_month_ass)

month_scaler.transform(df_avocado_month_ass).select( ['month', 'month_vec', 'month_scaled'] ).show(10)

+-----+---------+------------+
|month|month_vec|month_scaled|
+-----+---------+------------+
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
|   12|   [12.0]|       [1.0]|
+-----+---------+------------+
only showing top 10 rows



**StringIndexer**

Assigns a numerical value to each category in a column. As the column "type" only has two categories, it will be transformed into a column with only two values: 0 and 1, which is equivalent to applying a OneHotEncoder.

In [92]:
str_indexer = StringIndexer(inputCol="type", outputCol="type_index")

str_indexer = str_indexer.fit(df_avocado_train)

str_indexer.transform(df_avocado_train).select( ["type", "type_index"] ).show(4)

+------------+----------+
|        type|type_index|
+------------+----------+
|conventional|       0.0|
|conventional|       0.0|
|conventional|       0.0|
|conventional|       0.0|
+------------+----------+
only showing top 4 rows



**VectorAssembler**

This transformer combines a given list of columns into a single vector column. The vector column is named "features" by default, and it will be used later to train the model.


In [93]:
# Apply transformations
## SQL transformer
df_avocado_train_transformed = sql_trans.transform(df_avocado_train)

## String indexer
df_avocado_train_transformed = str_indexer.transform(df_avocado_train_transformed)

## Month scaler (vector assembler + minmax scaler)
df_avocado_train_transformed = month_vec_ass.transform(df_avocado_train_transformed)
df_avocado_train_transformed = month_scaler.transform(df_avocado_train_transformed)

# Join all features into a single vector
numerical_vec_ass = VectorAssembler(
    inputCols=['year', 'month_scaled', 'LOG 4225', 'LOG 4770', 'LOG Small Bags', 'LOG Large Bags', 'LOG XLarge Bags'],
    outputCol='features_num'
)
df_avocado_train_transformed = numerical_vec_ass.transform(df_avocado_train_transformed)

# Join all categorical features into a single vector
categorical_vec_ass = VectorAssembler(
    inputCols=['type_index'],
    outputCol='features_cat'
)
df_avocado_train_transformed = categorical_vec_ass.transform(df_avocado_train_transformed)


# See the result
df_avocado_train_transformed.select(['features_cat', 'features_num', 'AveragePrice']).show(4, False)

+------------+-------------------------------------------------------------------------------------------------------+------------+
|features_cat|features_num                                                                                           |AveragePrice|
+------------+-------------------------------------------------------------------------------------------------------+------------+
|[0.0]       |[15.0,1.0,12.566747374652527,9.362499927974252,11.166908098190957,10.313733879047971,0.0]              |0.49        |
|[0.0]       |[15.0,1.0,11.860764002611406,9.647818872531012,11.72123326879331,10.40627082310141,9.322865162818028]  |0.71        |
|[0.0]       |[15.0,1.0,12.53017497505446,11.349393905288467,11.824526973139381,9.415621332905047,9.658771095406955] |0.8         |
|[0.0]       |[15.0,1.0,13.028501871764691,11.364461534887267,13.490872079413348,11.21667384527801,9.342104328605496]|0.8         |
+------------+--------------------------------------------------------------

**StandardScaler**

It is a transformer that standardizes features by removing the mean and scaling to unit variance. It is very similar to the MinMaxScaler, but it does not bound the values to a specific range. It is also a type of estimator, so it needs to be fitted to the data before being used to transform it.

In [94]:
std_scaler = StandardScaler(
    inputCol="features_num",
    outputCol="features_scaled",
    withStd=True,
    withMean=True
)

std_scaler = std_scaler.fit(df_avocado_train_transformed)
std_scaler.transform(df_avocado_train_transformed).select(['features_scaled']).show(5, False)

+----------------------------------------------------------------------------------------------------------------------------------------+
|features_scaled                                                                                                                         |
+----------------------------------------------------------------------------------------------------------------------------------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295]|
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976]  |
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.1083545825302594] |
|[-1.2177154955881637,1.6482225355667333,1.1142436751287843,1.5073956355774096,1.4653967110976907,1.0678725104034048,2.0181300922626053] |
|[-1.2177154955881637,1.648

**Join everything together**

In [95]:
# Create a pipeline
prepro_pipe = Pipeline(stages=[
    sql_trans,
    str_indexer,
    month_vec_ass,
    month_scaler,
    numerical_vec_ass,
    categorical_vec_ass,
    std_scaler,

    # Join all features into a single vector
    VectorAssembler(
        inputCols=['features_scaled', 'features_cat'],
        outputCol='features'
    ),
])


# Fit the pipeline
pipeline_model = prepro_pipe.fit(df_avocado_train)

# Transform the data
df_avocado_train_transformed = pipeline_model.transform(df_avocado_train)

# See the result
df_avocado_train_transformed.select(['features', 'AveragePrice']).show(4, False)

+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
|features                                                                                                                                    |AveragePrice|
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295,0.0]|0.49        |
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976,0.0]  |0.71        |
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.1083545825302594,0.0] |0.8         |
|[-1.2177154955881637,1.6482225355667333,1.1142436751287843,1.50

## Define the model

In [99]:
from pyspark.ml.regression import LinearRegression, RandomForestRegressor

from pyspark.ml.evaluation import RegressionEvaluator

### Training a Linear Regression Model

**Training a linear regression model**

In [97]:
# Create a linear regression model
lin_reg = LinearRegression(
    featuresCol='features',
    labelCol='AveragePrice',
    predictionCol='prediction',
    maxIter=1000,
    regParam=0.3,       # Regularization
    elasticNetParam=0.8 # Regularization mixing parameter. 1 for L1, 0 for L2.
)

# Explain parameter 
print(lin_reg.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0, current: 0.8)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: AveragePrice)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxBlockSizeInMB: maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition then it is adjusted to the data size. Default 0.0 represents choosing optimal value, depends on specific algorithm. Must be >= 0. (default: 0.0)
maxIter

In [98]:
# Fit the model
lin_reg_model = lin_reg.fit(df_avocado_train_transformed)

# See the output
df_avocado_train_pred = lin_reg_model.transform(df_avocado_train_transformed)
df_avocado_train_pred.select(['features', 'AveragePrice', 'prediction']).show(4, False)

+--------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|features                                                                                                                                    |AveragePrice|prediction        |
+--------------------------------------------------------------------------------------------------------------------------------------------+------------+------------------+
|[-1.2177154955881637,1.6482225355667333,0.9527463109714546,1.0269649008115518,0.5657377199959452,0.8334134211814762,-0.6436162273445295,0.0]|0.49        |1.4003505112793717|
|[-1.2177154955881637,1.6482225355667333,0.7058305701685025,1.0954357394643428,0.7803295242390127,0.8574417380503548,2.012648481596976,0.0]  |0.71        |1.4003505112793717|
|[-1.2177154955881637,1.6482225355667333,0.9399552148956506,1.5037797059140563,0.8203168521795554,0.6002078289352569,2.108354

**Evaluating the model**

The evaluator is a transformer that takes a dataset and returns a single value. In this case, it will return the RMSE value.

In [105]:
reg_eval = RegressionEvaluator(
    labelCol='AveragePrice',
    predictionCol='prediction',
    metricName='rmse' # Root mean squared error
)

# Evaluate the model
reg_eval.evaluate(df_avocado_train_pred)

0.3978489578943717

### Creating the Full Pipeline - Hyperparameter Tuning with Cross Validation

In [106]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml.evaluation import RegressionEvaluator

Define the pipeline

In [109]:
ml_pipeline = Pipeline(stages=[
    prepro_pipe, # Preprocessing pipeline
    lin_reg     # Linear regression model
])

Define the parameter grid. To build a parameter grid, we use the ParamGridBuilder class. We use the original regression object to reference the parameters we want to tune.

In [110]:
param_grid = ParamGridBuilder() \
    .addGrid(lin_reg.regParam, [0.0, 0.1, 0.3, 0.5]) \
    .addGrid(lin_reg.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

Define the optimize metric.

In [111]:
reg_eval = RegressionEvaluator(
    labelCol='AveragePrice',
    predictionCol='prediction',
    metricName='rmse' # Root mean squared error
)

Join everything together using a CrossValidator object.

In [112]:
crossval_ml = CrossValidator(
    estimator=ml_pipeline, 
    estimatorParamMaps=param_grid, 
    evaluator=reg_eval, 
    numFolds=4
)

Train the model

In [113]:
crossval_ml_model = crossval_ml.fit(df_avocado_train)

In [125]:
best_model = crossval_ml_model.bestModel
best_score = crossval_ml_model.avgMetrics[0]

print("Best model: ", best_model)
print("Best score: ", best_score)

Best model:  PipelineModel_dc90de555ac1
Best score:  0.2833541578138277


In [146]:
best_lin_reg_params = best_model.stages[-1].extractParamMap()

print("Best score (RMSE):", best_score, end="\n\n")
for parameter, value in best_lin_reg_params.items():
    print(f"{str(parameter):50s}, {value}")

Best score (RMSE): 0.2833541578138277

LinearRegression_eeaa1d8bf6ea__aggregationDepth   , 2
LinearRegression_eeaa1d8bf6ea__elasticNetParam    , 0.0
LinearRegression_eeaa1d8bf6ea__epsilon            , 1.35
LinearRegression_eeaa1d8bf6ea__featuresCol        , features
LinearRegression_eeaa1d8bf6ea__fitIntercept       , True
LinearRegression_eeaa1d8bf6ea__labelCol           , AveragePrice
LinearRegression_eeaa1d8bf6ea__loss               , squaredError
LinearRegression_eeaa1d8bf6ea__maxBlockSizeInMB   , 0.0
LinearRegression_eeaa1d8bf6ea__maxIter            , 1000
LinearRegression_eeaa1d8bf6ea__predictionCol      , prediction
LinearRegression_eeaa1d8bf6ea__regParam           , 0.0
LinearRegression_eeaa1d8bf6ea__solver             , auto
LinearRegression_eeaa1d8bf6ea__standardization    , True
LinearRegression_eeaa1d8bf6ea__tol                , 1e-06


**Defining a baseline model**

In [149]:
# Creating a dummy regression model

# Mean price
mean_price = df_avocado_train.agg(F.mean('AveragePrice')).collect()[0][0]
median_price = df_avocado_train.approxQuantile('AveragePrice', [0.5], 0.25)[0]

mean_dummy_df = df_avocado_train.select('AveragePrice').withColumn('prediction', F.lit(mean_price))
median_dummy_df = df_avocado_train.select('AveragePrice').withColumn('prediction', F.lit(median_price))

# Evaluate the dummy models
print(reg_eval.evaluate(mean_dummy_df))
print(reg_eval.evaluate(median_dummy_df))

0.40128919777533023
0.5046357636762134


### Evaluate the best model on the test set

In [154]:
df_avocado_test_pred = best_model.transform(df_avocado_test)

# show scores
print(reg_eval.evaluate(df_avocado_test_pred))

0.28368085199676235
