# Spark ML Assignment

## Import Libraries and Data

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.types as tp
from pyspark.sql import functions as F
import matplotlib.pyplot as plt

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
train_data = spark.read.csv("train.csv",inferSchema=True, header=True)
test_data = spark.read.csv("test.csv",inferSchema=True, header=True)

In [4]:
train_data.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



## 1. Calculate Average Purchase Amount

In [5]:
train_data.createOrReplaceTempView('train_data_view')

spark.sql(
    """
    select round(avg(Purchase),2) as avg_purch from train_data_view
    """)

avg_purch
9263.97


To validate:

In [6]:
df = train_data.toPandas().describe()
df['Purchase']

count    550068.000000
mean       9263.968713
std        5023.065394
min          12.000000
25%        5823.000000
50%        8047.000000
75%       12054.000000
max       23961.000000
Name: Purchase, dtype: float64

The average purchase amount is $9263.97.

## 2. Count and Remove Null Values

In [7]:
for c in train_data.columns:
    missing_values = F.isnull(c)
    missing_values = train_data.filter(missing_values).count()
    print(c, missing_values)

User_ID 0
Product_ID 0
Gender 0
Age 0
Occupation 0
City_Category 0
Stay_In_Current_City_Years 0
Marital_Status 0
Product_Category_1 0
Product_Category_2 173638
Product_Category_3 383247
Purchase 0


In [8]:
train_data = train_data.fillna({
            "Product_Category_2" : 0,
            "Product_Category_3": 0
        })

In [9]:
for c in ['Product_Category_2', 'Product_Category_3']:
    missing_values = F.isnull(c)
    missing_values = train_data.filter(missing_values).count()
    print(c, missing_values)

Product_Category_2 0
Product_Category_3 0


## 3. Count Distinct Values per Column

In [10]:
from pyspark.sql.functions import col, countDistinct
train_data.agg(*(F.countDistinct(F.col(c)).alias(c) for c in train_data.columns))

User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
5891,3631,2,7,21,3,5,2,20,18,16,18105


## 4. Count category values within each of the following columns:
- Gender
- Age
- City_Category
- Stay_In_Current_City_Years

In [11]:
df = train_data.groupBy("Gender").agg(F.count("Gender").alias("gender_count"))
df.show()

df = train_data.groupBy("Age").agg(F.count("Age").alias("age_count"))
df.show()

df = train_data.groupBy("City_Category").agg(F.count("City_Category").alias("citycat_count"))
df.show()

df = train_data.groupBy("Stay_In_Current_City_Years").agg(F.count("Stay_In_Current_City_Years").alias("stayinyears_count"))
df.show()

df = train_data.groupBy("Marital_Status").agg(F.count("Marital_Status").alias("marital_status_count"))
df.show()

+------+------------+
|Gender|gender_count|
+------+------------+
|     F|      135809|
|     M|      414259|
+------+------------+

+-----+---------+
|  Age|age_count|
+-----+---------+
|18-25|    99660|
|26-35|   219587|
| 0-17|    15102|
|46-50|    45701|
|51-55|    38501|
|36-45|   110013|
|  55+|    21504|
+-----+---------+

+-------------+-------------+
|City_Category|citycat_count|
+-------------+-------------+
|            B|       231173|
|            C|       171175|
|            A|       147720|
+-------------+-------------+

+--------------------------+-----------------+
|Stay_In_Current_City_Years|stayinyears_count|
+--------------------------+-----------------+
|                         3|            95285|
|                         0|            74398|
|                        4+|            84726|
|                         1|           193821|
|                         2|           101838|
+--------------------------+-----------------+

+--------------+-----------------

## 5. Calculate average Purchase for each of the following columns:

- Gender
- Age
- City_Category
- Stay_In_Current_City_Years
- Marital_Status

In [12]:
df = train_data.groupBy("Gender").agg(F.round(F.avg("Purchase"),2).alias("avg_purch_by_gender"))
df.show()

df = train_data.groupBy("Age").agg(F.round(F.avg("Purchase"),2).alias("avg_purch_by_age"))
df.show()

df = train_data.groupBy("City_Category").agg(F.round(F.avg("Purchase"),2).alias("avg_purch_by_citycat"))
df.show()

df = train_data.groupBy("Stay_In_Current_City_Years").agg(F.round(F.avg("Purchase"),2).alias("avg_purch_by_stayinyears"))
df.show()

df = train_data.groupBy("Marital_Status").agg(F.round(F.avg("Purchase"),2).alias("avg_purch_by_marital_status"))
df.show()

+------+-------------------+
|Gender|avg_purch_by_gender|
+------+-------------------+
|     F|            8734.57|
|     M|            9437.53|
+------+-------------------+

+-----+----------------+
|  Age|avg_purch_by_age|
+-----+----------------+
|18-25|         9169.66|
|26-35|         9252.69|
| 0-17|         8933.46|
|46-50|         9208.63|
|51-55|         9534.81|
|36-45|         9331.35|
|  55+|         9336.28|
+-----+----------------+

+-------------+--------------------+
|City_Category|avg_purch_by_citycat|
+-------------+--------------------+
|            B|              9151.3|
|            C|             9719.92|
|            A|             8911.94|
+-------------+--------------------+

+--------------------------+------------------------+
|Stay_In_Current_City_Years|avg_purch_by_stayinyears|
+--------------------------+------------------------+
|                         3|                  9286.9|
|                         0|                 9180.08|
|                  

Both item #6 and item #7 are addressed below:

## 6. Label encode the following columns:
- Age
- Gender
- Stay_In_Current_City_Years
- City_Category

## 7. One-Hot encode following columns:
- Gender
- City_Category
- Occupation

In [13]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml import Pipeline

In [14]:
stage_1 = StringIndexer(inputCol= 'Gender', outputCol= 'gender_le')
stage_2 = StringIndexer(inputCol= 'City_Category', outputCol= 'citycat_le')
stage_3 = StringIndexer(inputCol= 'Age', outputCol= 'age_le')
stage_4 = StringIndexer(inputCol= 'Stay_In_Current_City_Years', outputCol= 'staycity_le')

stage_5 = OneHotEncoderEstimator(inputCols= ['citycat_le', 'age_le', 'staycity_le', 'Product_Category_1', 'Occupation'], 
                                 outputCols= ['citycat_ohe', 'age_ohe', 'staycity_ohe', 'prodcat1_ohe', 'occupation_ohe'])

## 8. Build a Baseline Model

### Vector Assembler:

In [15]:
from pyspark.ml.feature import VectorAssembler

In [16]:
stage_6 = VectorAssembler(inputCols= ["citycat_ohe",
                                      "age_ohe",
                                      "staycity_ohe",
                                      "prodcat1_ohe",
                                      "occupation_ohe",
                                      "Marital_Status"],

                         outputCol=  "feature_vector")

### Linear Regression

In [17]:
from pyspark.ml import regression

In [18]:
pipeline = Pipeline(stages= [stage_1,
                             stage_2,
                             stage_3,
                             stage_4,
                             stage_5,
                             stage_6])

In [19]:
pipeline_model = pipeline.fit(train_data)
pipelined_data = pipeline_model.transform(train_data)

In [20]:
lr = regression.LinearRegression(featuresCol='feature_vector', labelCol="Purchase")

In [21]:
lr_model = lr.fit(pipelined_data)

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="Purchase", metricName="rmse") 
mse = evaluator.evaluate(lr_model.transform(pipelined_data))

evaluator = RegressionEvaluator(labelCol="Purchase", metricName="r2") 
r2 = evaluator.evaluate(lr_model.transform(pipelined_data))

print('mse:', round(mse,2))
print('r2:', round(r2,4))

mse: 3013.47
r2: 0.6401


## Cross-Validation

In [23]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [24]:
params = ParamGridBuilder().build()
model = regression.LinearRegression(featuresCol= "feature_vector",  labelCol="Purchase")

In [25]:
cv = CrossValidator(estimator=model,
                    estimatorParamMaps=params,
                    evaluator=evaluator,
                    numFolds=3)

In [26]:
cv_model = cv.fit(pipelined_data)
evaluator.evaluate(cv_model.transform(pipelined_data))

0.6400869468327336

## 9. Model Improvement with Grid Search CV

In [27]:
updated_params = ParamGridBuilder() \
                .addGrid(model.regParam, [0.1, 0.01, 0.001, 0.0001]) \
                .addGrid(model.elasticNetParam, [0.1, 0.01, 0.001, 0.0001]) \
                .build()

In [28]:
cv = CrossValidator(estimator=model,
                    estimatorParamMaps=updated_params,
                    evaluator=evaluator,
                    numFolds=5)

In [29]:
grid_model = cv.fit(pipelined_data)

In [30]:
print('r2: ', round(evaluator.evaluate(grid_model.transform(pipelined_data)),5))

r2:  0.64005


In [31]:
param_dict = grid_model.bestModel.extractParamMap()

final_dict = {}
for k, v in param_dict.items():
    final_dict[k.name] = v
    
print('EN Param:', final_dict["elasticNetParam"])
print('reg Param:', final_dict["regParam"])

EN Param: 0.0001
reg Param: 0.1


## RandomForestRegressor

In [32]:
model = regression.RandomForestRegressor(featuresCol='feature_vector', labelCol="Purchase")

In [33]:
updated_params = ParamGridBuilder() \
                .addGrid(model.maxDepth, [5,10]) \
                .build()

In [34]:
cv = CrossValidator(estimator=model,
                estimatorParamMaps=updated_params,
                evaluator=evaluator,
                numFolds=3)

grid_model = cv.fit(pipelined_data)

print('r2: ', round(evaluator.evaluate(grid_model.transform(pipelined_data)),5))

r2:  0.61067


In [42]:
param_dict = grid_model.bestModel.extractParamMap()

final_dict = {}
for k, v in param_dict.items():
    final_dict[k.name] = v
    
print('Max Depth:', final_dict["maxDepth"])

Max Depth: 10


## 10. Create a Spark ML Pipeline for the final model

In [35]:
stage_1 = StringIndexer(inputCol= 'Gender', outputCol= 'gender_le')
stage_2 = StringIndexer(inputCol= 'City_Category', outputCol= 'citycat_le')
stage_3 = StringIndexer(inputCol= 'Age', outputCol= 'age_le')
stage_4 = StringIndexer(inputCol= 'Stay_In_Current_City_Years', outputCol= 'staycity_le')

stage_5 = OneHotEncoderEstimator(inputCols= ['citycat_le', 'age_le', 'staycity_le', 'Product_Category_1', 'Occupation'], 
                                 outputCols= ['citycat_ohe', 'age_ohe', 'staycity_ohe', 'prodcat1_ohe', 'occupation_ohe'])

In [38]:
stage_6 = VectorAssembler(inputCols= ["citycat_ohe",
                                      "age_ohe",
                                      "staycity_ohe",
                                      "prodcat1_ohe",
                                      "occupation_ohe",
                                      "Marital_Status"],

                         outputCol=  "feature_vector")

In [43]:
stage_7 = regression.RandomForestRegressor(featuresCol='feature_vector', labelCol="Purchase",
                                          maxDepth=10)

In [44]:
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4, stage_5, stage_6, stage_7])

In [45]:
pipeline_model = pipeline.fit(train_data)
final_data = pipeline_model.transform(train_data)

In [49]:
final_data.select('prediction')

prediction
9174.276198602702
13173.69040266748
5413.292797433902
5413.292797433902
7299.575543295087
13187.180790398652
14397.232607365566
14397.232607365566
14397.232607365566
7126.30062785148
