# Project: Walmart Sales Forecasting
**Phase III: Model Training**

@author: Syed Shahzad Raza

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Import-required-dependencies" data-toc-modified-id="Import-required-dependencies-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Import required dependencies</a></span></li><li><span><a href="#Load-Data" data-toc-modified-id="Load-Data-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Load Data</a></span></li><li><span><a href="#Indexing,-Encoding-and-Scaling" data-toc-modified-id="Indexing,-Encoding-and-Scaling-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Indexing, Encoding and Scaling</a></span></li><li><span><a href="#Create-Checkpoint-for-Phase-IV" data-toc-modified-id="Create-Checkpoint-for-Phase-IV-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Create Checkpoint for Phase IV</a></span></li><li><span><a href="#Model-Training" data-toc-modified-id="Model-Training-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Model Training</a></span><ul class="toc-item"><li><span><a href="#Linear-Regression" data-toc-modified-id="Linear-Regression-5.1"><span class="toc-item-num">5.1&nbsp;&nbsp;</span>Linear Regression</a></span></li><li><span><a href="#Hyper-parameter-tuning-for-Linear-Regression" data-toc-modified-id="Hyper-parameter-tuning-for-Linear-Regression-5.2"><span class="toc-item-num">5.2&nbsp;&nbsp;</span>Hyper-parameter tuning for Linear Regression</a></span></li><li><span><a href="#Generalized-Linear-Regression" data-toc-modified-id="Generalized-Linear-Regression-5.3"><span class="toc-item-num">5.3&nbsp;&nbsp;</span>Generalized Linear Regression</a></span></li></ul></li></ul></div>

## Import required dependencies

In [39]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression, DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession \
    .builder \
    .getOrCreate()

import pandas as pd

## Load Data

In [40]:
# Load data from saved file
df = spark.read.format("csv").option("header", "true").option('inferSchema', 'true').load("phase_2_train_df.csv")
test_df = spark.read.format("csv").option("header", "true").option('inferSchema', 'true').load("phase_2_test_df.csv")

## Indexing, Encoding and Scaling

In [41]:
# Split data into train and validate sets
(train_df, validate_df) = df.randomSplit([0.8, 0.2])

In [42]:
all_vars = train_df.columns
cat_vars = ['Store', 'Dept', 'IsHoliday', 'Type']
target_value = 'label'
cont_vars = [x for x in all_vars if x not in cat_vars]
cont_vars = [x for x in cont_vars if x not in target_value]

In [43]:
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in cat_vars ]

encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

In [44]:
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + cont_vars, outputCol="features")

In [45]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledfeatures")

## Create Checkpoint for Phase IV

In [8]:
# Save data for Phase III
train_df.toPandas().to_csv('phase_3_train_df.csv', index=False)
validate_df.toPandas().to_csv('phase_3_validate_df.csv', index=False)
test_df.toPandas().to_csv('phase_3_test_df.csv', index=False)

## Model Training

### Linear Regression

In [9]:
# Train a linear regression model
lr = LinearRegression(labelCol="label", featuresCol="scaledfeatures", maxIter=10, regParam=0.1, elasticNetParam=0.5)

In [10]:
# chain indexer, encoder, assembler, scaler and linear regression in a pipeline
pipeline = Pipeline(stages = indexers + encoders + [assembler, scaler, lr])

In [11]:
# fit linear regression model on training data
LR_model = pipeline.fit(train_df)

In [12]:
# Save trained model
path = "LR_model"
LR_model.write().overwrite().save(path)

### Hyper-parameter tuning for Linear Regression

In [13]:
# Train a linear regression model
lr2 = LinearRegression(labelCol="label", featuresCol="scaledfeatures")

In [14]:
# chain indexer, encoder, assembler, scaler and linear regression in a pipeline
pipeline = Pipeline(stages = indexers + encoders + [assembler, scaler, lr2])

In [15]:
# We use a ParamGridBuilder to construct a grid of parameters to search over
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(lr.maxIter, [10,15,20])\
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 0.8, 1.0])\
    .build()

In [16]:
# TrainValidationSplit will try all combinations of values from ParamGridBuilder and determine 
# best model using the evaluator
valid = TrainValidationSplit(estimator=pipeline,
                             estimatorParamMaps=paramGrid,
                             evaluator=RegressionEvaluator(),
                             trainRatio=0.80)

In [17]:
# Run TrainValidationSplit, and choose the best set of parameters
tvs_model = valid.fit(train_df)

In [18]:
# Save trained model
path = "tvs_model"
tvs_bestModel = tvs_model.bestModel
tvs_bestModel.write().overwrite().save(path)

### Generalized Linear Regression

In [19]:
# Train a generalized linear regression model
glr = GeneralizedLinearRegression(featuresCol="scaledfeatures", family="gaussian", link="identity", maxIter=10, regParam=0.3)

In [20]:
# chain indexer, encoder, assembler, scaler and generalized linear regression in a pipeline
pipeline = Pipeline(stages = indexers + encoders + [assembler, scaler, glr])

In [21]:
# fit generalized linear regression model on training data
GLR_model = pipeline.fit(train_df)

In [22]:
# Save trained model
path = "GLR_model"
GLR_model.write().overwrite().save(path)