# Spark ML: Predicting Avocado Prices

This notebook introduces how to train a ML model using Spark ML.  This bases on an excellent article in Towards Data Science [First Steps in Machine Learning with Apache Spark](https://towardsdatascience.com/first-steps-in-machine-learning-with-apache-spark-672fe31799a3) using [Avocado Prices dataset](https://www.kaggle.com/datasets/neuromusic/avocado-prices) in Kaggle.

The objective of this model is to predict the average price of avocado given datetime, supply amounts, and region.

## Spark Cluster Preparation

In [None]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [None]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
    !tar xf spark-3.3.2-bin-hadoop3.tgz
    !mv spark-3.3.2-bin-hadoop3 spark
    !pip install -q findspark
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark"

In [None]:
import findspark
findspark.init()

In [None]:
spark_url = 'local'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [None]:
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark SQL')\
        .getOrCreate()

## Data Preparation

First, we read a csv file.  We can provide option such as delimiter and header.  We then rename the colume names to remove dot ('.') in the names.

In [None]:
path = 'avocado.csv'

In [None]:
df_avocado = spark.read.csv(path, header=True, inferSchema=True)

In [None]:
cols = [c.replace(' ', '_') for c in df_avocado.columns]
df_avocado = df_avocado.toDF(*cols)

In [None]:
df_avocado.printSchema()

We then split data into training and testing datasets.

In [None]:
(df_avocado_train, df_avocado_test) = df_avocado.randomSplit([0.75, 0.25], seed=214)

## Create ML Pipeline
For this pipeline, we will create several transformers using built-in estimators/transformers.  These include:


| SparkML Feature | Feature Type | Data Type |
|:-----------------|:--------------:|:--------------:|
| SQLTransformer  | Tranformer   | Numerical |
| MinMaxScaler    | Estimator    | Numerical |
| StandardScaler  | Estimator    | Numerical |
| StringIndexer   | Estimator    | Categorical |
| VectorAssembler | Transformer  | Both |

Using these components, we create the following pipeline:

| Pipeline Stage | SparkML Feature |
|:----------|:----------|
| sql_transformer | SQLTransformer |
| month_vec_asm_transfromer | VectorAssembler |
| month_scaler_transfromer | MinMaxScaler |
| numerical_vec_asm_transformer | VectorAssembler |
| std_scaler_transformer | StandardScaler |
| str_indexer_transformer | StringIndexer |
| categorical_vec_asm_transformer | VectorAssembler |
| all_vec_asm_transformer | VectorAssembler |

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer, MinMaxScaler, StandardScaler
from pyspark.ml.feature import VectorAssembler, StringIndexer

### Numerical Feature Transformers

#### sql_transformer: numeric column selection and log-transform
Create a transformer to select columns and log-transform some numerical columns

In [None]:
cols = ['AveragePrice', 'type']
cols = [f"`{col}`" for col in cols]
cols

In [None]:
log_cols =  ['4225', '4770', 'Small_Bags', 'Large_Bags', 'XLarge_Bags']
log_cols = [f"LOG(`{col}`+1) AS `LOG_{col}`" for col in log_cols]
log_cols

In [None]:
statement = f"""SELECT{', '.join(cols)}, {', '.join(log_cols)}, 
    YEAR(__THIS__.Date)-2000 AS year, MONTH(__THIS__.Date) AS month
    FROM __THIS__
    """
statement

In [None]:
sql_transformer = SQLTransformer(statement=statement)

In [None]:
df_avocado_train.show(4)

In [None]:
sql_transformer.transform(df_avocado_train).show(4)

#### month_vec_asm_transformer / month_scaler_transformer: create month vectors and normalize their values

After using SQLTransformer, we then tranform *'month'* column into month vector and then normalize their values

In [None]:
month_vec_asm_transformer = VectorAssembler(inputCols=['month'], outputCol='month_vec')

df_avocado_month_ass = month_vec_asm_transformer.transform(sql_transformer.transform(df_avocado_train))
df_avocado_month_ass.show(4)

Create a transformer that normalizes month vector using an estimator, *"MinMaxScaler"*

In [None]:
month_scaler_estimator = MinMaxScaler(inputCol='month_vec', outputCol='month_scaled')
month_scaler_transformer = month_scaler_estimator.fit(df_avocado_month_ass)

month_scaler_transformer.transform(df_avocado_month_ass)\
    .select( ['month', 'month_vec', 'month_scaled'] )\
    .show(10)

#### numerical_vec_asm_transformer/std_scaler_transformer : assemble numerical features vector and scale all numerical features

In [None]:
numerical_vec_asm_transformer = VectorAssembler(
    inputCols=[
      'year', 'month_scaled', 'LOG_4225', 
      'LOG_4770', 'LOG_Small_Bags', 
      'LOG_Large_Bags', 'LOG_XLarge_Bags'
    ],
    outputCol='features_num'
)
df_avocado_numerical = numerical_vec_asm_transformer.transform(month_scaler_transformer.transform(df_avocado_month_ass))
df_avocado_numerical.select('year', 'month_scaled', 'LOG_4225','features_num').show(4)

In [None]:
# Scaling the numerical features using a StandardScaler
std_scaler_estimator = StandardScaler(
    inputCol="features_num",
    outputCol="features_scaled",
    withStd=True,
    withMean=True
)

std_scaler_transformer = std_scaler_estimator.fit(df_avocado_numerical)
std_scaler_transformer.transform(df_avocado_numerical).select(['features_scaled']).show(5, False)

### Categorical Feature Transformers
Transforming categorical features usually involve text transformation e.g. one-hot encoding

### str_indexer_transformer: encoding categorical data
We create a transformer using "StringIndexer", which is an estimator that produces StringIndexerModel.  This is similar to perform one-hot encoder on the categorical data

In [None]:
type_indexer_estimator = StringIndexer(inputCol="type", outputCol="type_index")
type_indexer_transformer = type_indexer_estimator.fit(df_avocado_train)

type_indexer_transformer.transform(df_avocado_train)\
  .select( ["type", "type_index"] ).show(4)

In [None]:
categorical_vec_asm_transformer = VectorAssembler(
    inputCols=['type_index'],
    outputCol='features_cat'
)
categorical_vec_asm_transformer.transform(
    type_indexer_transformer.transform(df_avocado_train)
).select('type', 'type_index', 'features_cat').show(4)

### Create a pipeline: merge both numerical and categorical features

In [None]:
all_vec_asm_transformer = VectorAssembler(
        inputCols=['features_scaled', 'features_cat'],
        outputCol='features')

In [None]:
feature_prep_pipeline = Pipeline(stages=[sql_transformer, month_vec_asm_transformer,
                                         month_scaler_transformer, 
                                         numerical_vec_asm_transformer,
                                         std_scaler_transformer,
                                         type_indexer_transformer,
                                         categorical_vec_asm_transformer,
                                         all_vec_asm_transformer])

In [None]:
pipeline_model = feature_prep_pipeline.fit(df_avocado_train)

### Transform training dataset using the pipeline

In [None]:
df_avocado_train_transformed = pipeline_model.transform(df_avocado_train)

In [None]:
df_avocado_train_transformed.select('features', 'AveragePrice').show(5, False)

## Model Training
We will train a linear regression model using transformed training dataset.  In order to do this, we will have to fit an estimator, *'LinearRegression'* to transformed training dataset to create a model, which is a *transformer*, that can be used to test the testing dataset.

Note that this example focuses on how to create a pipeline.  Spark also provides hyperparameter tuning function.  However, this is out of the scope of this example.  Please refer to [First Steps in Machine Learning with Apache Spark](https://towardsdatascience.com/first-steps-in-machine-learning-with-apache-spark-672fe31799a3) for more details.

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
linear_reg_estimator = LinearRegression(
    featuresCol='features',
    labelCol='AveragePrice',
    predictionCol='prediction',

    # Hyperaparameters
    maxIter=1000,
    regParam=0.3,       # Regularization
    elasticNetParam=0.8 # Regularization mixing parameter. 1 for L1, 0 for L2.
)

In [None]:
linear_reg_model = linear_reg_estimator.fit(df_avocado_train_transformed)

### Inference the testing dataset

In [None]:
df_avocado_train_pred = linear_reg_model.transform(df_avocado_train_transformed)
df_avocado_train_pred.select(
  ['AveragePrice', 'prediction']
).sample(False, 0.1, 0).show(5, False)

## Model Evaluation
Spark provides several evaluation functions.  We will have to select the right one.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
reg_eval = RegressionEvaluator(
    labelCol='AveragePrice',
    predictionCol='prediction',
    metricName='rmse' # Root mean squared error
)

In [None]:
reg_eval.evaluate(df_avocado_train_pred)

## THE END

In [None]:
spark.stop()