#### Setting the environment variables

In [1]:
import os, sys, warnings, platform

osname = platform.system()
print('Running on', osname)

if (osname == 'Windows'):
    # Definitions for Windows 10 instance
    os.environ["PYSPARK_PYTHON"] = "D:/Anaconda3/python"
    os.environ["PYSPARK_DRIVER_PYTHON"]="D:/Anaconda3/python"
    os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
    os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk1.8.0_251/jre"
    os.environ["SPARK_HOME"] = "D:/spark-2.4.4-bin-hadoop2.7"
    os.environ["HADOOP_HOME"] = "D:/spark-2.4.4-bin-hadoop2.7"
    os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
    sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
    sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
else:
    # Definitions for EC2 Linux instance
    os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
    os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
    os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
    os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
    os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
    os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
    sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
    sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

Running on Linux


# Ecommerce Churn Assignment

The aim of the assignment is to build a model that predicts whether a person purchases an item after it has been added to the cart or not. Being a classification problem, you are expected to use your understanding of all the three models covered till now. You must select the most robust model and provide a solution that predicts the churn in the most suitable manner. 

For this assignment, you are provided the data associated with an e-commerce company for the month of October 2019. Your task is to first analyse the data, and then perform multiple steps towards the model building process.

The broad tasks are:
- Data Exploration
- Feature Engineering
- Model Selection
- Model Inference

### Data description

The dataset stores the information of a customer session on the e-commerce platform. It records the activity and the associated parameters with it.

- **event_time**: Date and time when user accesses the platform
- **event_type**: Action performed by the customer
            - View
            - Cart
            - Purchase
            - Remove from cart
- **product_id**: Unique number to identify the product in the event
- **category_id**: Unique number to identify the category of the product
- **category_code**: Stores primary and secondary categories of the product
- **brand**: Brand associated with the product
- **price**: Price of the product
- **user_id**: Unique ID for a customer
- **user_session**: Session ID for a user


### Initialising the SparkSession

The dataset provided is 5 GBs in size. Therefore, it is expected that you increase the driver memory to a greater number. You can refer to notebook 1 for the steps involved here.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

MAX_MEMORY = "40G"
spark = SparkSession.builder.appName("Notebook_4").config("spark.driver.memory", MAX_MEMORY).getOrCreate()
spark

In [3]:
# Spark session with 40 GB driver memory

spark.sparkContext.getConf().get('spark.driver.memory')

'40G'

In [59]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

warnings.filterwarnings('ignore')
pd.set_option('float_format', '{:.4f}'.format)
pd.options.display.max_colwidth = 100
%matplotlib inline
plt.rcParams['font.size'] = '14'

In [4]:
%%time
# Loading the clean data

df = spark.read.parquet('task3_transformed_df.parquet')
print('transformed_df row count =', df.count())

transformed_df row count = 1033889
CPU times: user 2.24 ms, sys: 1.71 ms, total: 3.95 ms
Wall time: 3.6 s


## Task 3: Model Selection
3 models for classification:	
- Logistic Regression
- Decision Tree
- Random Forest

### Model 3: Random Forest

In [5]:
# Additional steps for Random Forest, if any

#### Feature Transformation (Code will be same; check for the columns)

In [6]:
# Check if only the required columns are present to build the model
# If not, drop the redundant columns

# This step is already completed as part of Notebook_2

In [7]:
# Categorising the attributes into its type - Continuous and Categorical

# This step is already completed as part of Notebook_2

In [8]:
# Feature transformation for categorical features

# This step is already completed as part of Notebook_2

In [9]:
# Vector assembler to combine all the features

# This step is already completed as part of Notebook_2

In [10]:
# Pipeline for the tasks

# This step is already completed as part of Notebook_2

In [11]:
# Transforming the dataframe df

# This step is already completed as part of Notebook_2

In [12]:
%%time
# Schema of the transformed df

df.printSchema()

root
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- user_session_activity_count: long (nullable = true)
 |-- product_count_for_user: long (nullable = true)
 |-- sub_category_count_for_user: long (nullable = true)
 |-- avg_expense_for_sub_category: double (nullable = true)
 |-- user_sessions_count: long (nullable = true)
 |-- day_quadrant: string (nullable = true)
 |-- is_purchased: integer (nullable = true)
 |-- day_of_week_idx: double (nullable = true)
 |-- day_of_week_enc: vector (nullable = true)
 |-- day_quadrant_idx: double (nullable = true)
 |-- day_quadrant_enc: vector (nullable = true)
 |-- category_idx: double (nullable = true)
 |-- category_enc: vector (nullable = true)
 |-- sub_category_idx: double (nullable = true)
 |-- sub_category_enc: vector (nullable = true)
 |-- brand_idx: double (nullable = true)
 |-- brand_enc:

In [13]:
%%time
# Checking the elements of the transformed df - Top 20 rows

df.show(20)

+-------+-------+-----------+------------+------------+---------------------------+----------------------+---------------------------+----------------------------+-------------------+------------+------------+---------------+---------------+----------------+----------------+------------+--------------+----------------+----------------+---------+--------------+--------------------+
|  brand|  price|day_of_week|    category|sub_category|user_session_activity_count|product_count_for_user|sub_category_count_for_user|avg_expense_for_sub_category|user_sessions_count|day_quadrant|is_purchased|day_of_week_idx|day_of_week_enc|day_quadrant_idx|day_quadrant_enc|category_idx|  category_enc|sub_category_idx|sub_category_enc|brand_idx|     brand_enc|            features|
+-------+-------+-----------+------------+------------+---------------------------+----------------------+---------------------------+----------------------------+-------------------+------------+------------+---------------+-------

In [14]:
# Storing the transformed df in S3 bucket to prevent repetition of steps again

# This step is already completed as part of Notebook_2

#### Train-test split

In [15]:
%%time
# Splitting the data into train and test (Remember you are expected to compare the model later)

df_train, df_test = df.randomSplit([0.7, 0.3], seed=42)

CPU times: user 1.61 ms, sys: 1.22 ms, total: 2.82 ms
Wall time: 23.1 ms


In [16]:
%%time
# Number of rows in train and test data

print('Train Dataset Count:', df_train.count(), '| Test Dataset Count:', df_test.count())

Train Dataset Count: 723091 | Test Dataset Count: 310798
CPU times: user 5.17 ms, sys: 3.92 ms, total: 9.09 ms
Wall time: 28 s


#### Model Fitting

In [17]:
%%time
# Building the model with hyperparameter tuning
# Create ParamGrid for Cross Validation

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

CPU times: user 166 ms, sys: 5.27 ms, total: 171 ms
Wall time: 169 ms


In [39]:
%%time

# Considering the EC2 instance model performance and processing time constraints,
# Pre-setting most of the RandomForestClassifier hyperparameters based on results from earlier DecisionTree model,
# And keeping just ['impurity'='gini' or 'entropy'] as the ParamGrid variable value

rf = RandomForestClassifier(featuresCol='features', labelCol='is_purchased', maxDepth=30, maxBins=15, numTrees=10, seed=42)
rf

rfParamGrid = ParamGridBuilder() \
              .addGrid(rf.impurity, ['gini','entropy']) \
              .build()

rfEvaluator = BinaryClassificationEvaluator(labelCol='is_purchased')

multiEvaluator = MulticlassClassificationEvaluator(labelCol='is_purchased', predictionCol='prediction')

CPU times: user 2.04 ms, sys: 819 µs, total: 2.86 ms
Wall time: 11.5 ms


In [40]:
%%time
# Run cross-validation steps

crossval = CrossValidator(estimator=rf, estimatorParamMaps=rfParamGrid, evaluator=rfEvaluator, numFolds=3)

CPU times: user 368 µs, sys: 0 ns, total: 368 µs
Wall time: 355 µs


In [41]:
%%time
# Fitting the models on transformed df

cvModel = crossval.fit(df)

CPU times: user 1.89 s, sys: 736 ms, total: 2.63 s
Wall time: 34min 1s


In [42]:
# Best model from the results of cross-validation

rfBestModel = cvModel.bestModel
rfBestModel

RandomForestClassificationModel (uid=RandomForestClassifier_5f491d09dcb5) with 10 trees

#### Model Analysis

Required Steps:
- Fit on test data
- Performance analysis
    - Appropriate Metric with reasoning

In [43]:
%%time
predictions = rfBestModel.transform(df_test)

CPU times: user 3.36 ms, sys: 5.66 ms, total: 9.02 ms
Wall time: 60.3 ms


In [44]:
%%time
multiEvaluator.setMetricName('accuracy')
accuracy = multiEvaluator.evaluate(predictions)
print('Test data Accuracy =', accuracy)

Test data Accuracy = 0.7175046171468284
CPU times: user 32 ms, sys: 9.51 ms, total: 41.5 ms
Wall time: 34.8 s


In [45]:
%%time
multiEvaluator.setMetricName('weightedPrecision')
precision = multiEvaluator.evaluate(predictions)
print('Test data Precision =', precision)

Test data Precision = 0.7223446670705886
CPU times: user 41.4 ms, sys: 16.4 ms, total: 57.8 ms
Wall time: 50.1 s


In [46]:
%%time
multiEvaluator.setMetricName('weightedRecall')
recall = multiEvaluator.evaluate(predictions)
print('Test data Recall =', recall)

Test data Recall = 0.7175046171468284
CPU times: user 35.2 ms, sys: 10.6 ms, total: 45.8 ms
Wall time: 33.9 s


In [47]:
F1_score = (2 * precision * recall) / (precision + recall)
print('Test data F1_score =', F1_score)

Test data F1_score = 0.7199165072005896


In [48]:
%%time
areaUnderROC = rfEvaluator.evaluate(predictions, {rfEvaluator.metricName: 'areaUnderROC'})
print('Test data ROC_AUC =', areaUnderROC)

Test data ROC_AUC = 0.7299482874299179
CPU times: user 18.6 ms, sys: 7.38 ms, total: 26 ms
Wall time: 18.6 s


#### Summary of the best Random Forest model

In [49]:
# Defining a function to extract features along with the feature importance score

import pandas as pd
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [61]:
%%time
# Printing the feature importance scores

rfFeatureImp = ExtractFeatureImp(rfBestModel.featureImportances, predictions, 'features').head(5)
rfFeatureImp.to_csv('randomForestFeatureImp.csv')
rfFeatureImp.head(5)

CPU times: user 0 ns, sys: 8.08 ms, total: 8.08 ms
Wall time: 10.7 ms


Unnamed: 0,idx,name,score
1,99,user_session_activity_count,0.1173
2,100,product_count_for_user,0.1097
0,98,price,0.0624
3,101,sub_category_count_for_user,0.0609
4,102,avg_expense_for_sub_category,0.0608


__*Based on the RandomForest featureImportance scores, we see that the top 5 important features for predicting the "purchase" target variable are:*__
1. user_session_activity_count
2. product_count_for_user
3. price
4. sub_category_count_for_user
5. avg_expense_for_sub_category

In [53]:
print('Number of features used by the best model =', rfBestModel.numFeatures)

Number of features used by the best model = 104


In [54]:
print('Number of nodes in the best model =', rfBestModel.totalNumNodes)

Number of nodes in the best model = 226646


In [52]:
rfBestModel.trees

[DecisionTreeClassificationModel (uid=dtc_185cfc2c8b7a) of depth 30 with 19459 nodes,
 DecisionTreeClassificationModel (uid=dtc_78844dbf8df5) of depth 30 with 31475 nodes,
 DecisionTreeClassificationModel (uid=dtc_7c5c3ae97171) of depth 30 with 17313 nodes,
 DecisionTreeClassificationModel (uid=dtc_3b24764493a3) of depth 30 with 22229 nodes,
 DecisionTreeClassificationModel (uid=dtc_861fb84ce010) of depth 30 with 19245 nodes,
 DecisionTreeClassificationModel (uid=dtc_f2a8b9048909) of depth 30 with 18747 nodes,
 DecisionTreeClassificationModel (uid=dtc_e3ba3c78001c) of depth 30 with 15511 nodes,
 DecisionTreeClassificationModel (uid=dtc_bae43bf50e46) of depth 30 with 27941 nodes,
 DecisionTreeClassificationModel (uid=dtc_0b838e577a74) of depth 30 with 32125 nodes,
 DecisionTreeClassificationModel (uid=dtc_e8f8d0f87be7) of depth 30 with 22601 nodes]

In [60]:
# Collate all the model evaluation metrics
randomForestMetrics = [{'ModelType'    : 'RandomForest',
                        'Accuracy'     : accuracy,
                        'Precision'    : precision,
                        'Recall'       : recall,
                        'F1_score'     : F1_score,
                        'AreaUnderROC' : areaUnderROC}]
  
# Convert the metrics to a Pandas dataframe 
randomForestMetrics_df = pd.DataFrame(randomForestMetrics)

# Save the dataframe as csv for future model comparison
randomForestMetrics_df.to_csv('randomForestMetrics.csv')

randomForestMetrics_df

Unnamed: 0,ModelType,Accuracy,Precision,Recall,F1_score,AreaUnderROC
0,RandomForest,0.7175,0.7223,0.7175,0.7199,0.7299


### Evaluation metrics for the best RandomForestClassifier Model :
1. Accuracy = 0.7175046171468284<br>
2. Precision = 0.7223446670705886<br>
3. Recall = 0.7175046171468284<br>
4. F1_score = 0.7199165072005896<br>
5. AreaUnderROC = 0.7299482874299179<br>