In [1]:
# Setting the environment variables

In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

# Ecommerce Churn Assignment

The aim of the assignment is to build a model that predicts whether a person purchases an item after it has been added to the cart or not. Being a classification problem, you are expected to use your understanding of all the three models covered till now. You must select the most robust model and provide a solution that predicts the churn in the most suitable manner. 

For this assignment, you are provided the data associated with an e-commerce company for the month of October 2019. Your task is to first analyse the data, and then perform multiple steps towards the model building process.

The broad tasks are:
- Data Exploration
- Feature Engineering
- Model Selection
- Model Inference

### Data description

The dataset stores the information of a customer session on the e-commerce platform. It records the activity and the associated parameters with it.

- **event_time**: Date and time when user accesses the platform
- **event_type**: Action performed by the customer
            - View
            - Cart
            - Purchase
            - Remove from cart
- **product_id**: Unique number to identify the product in the event
- **category_id**: Unique number to identify the category of the product
- **category_code**: Stores primary and secondary categories of the product
- **brand**: Brand associated with the product
- **price**: Price of the product
- **user_id**: Unique ID for a customer
- **user_session**: Session ID for a user


### Initialising the SparkSession

The dataset provided is 5 GBs in size. Therefore, it is expected that you increase the driver memory to a greater number. You can refer to notebook 1 for the steps involved here.

In [3]:
# Spark environment
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [4]:
# initialising the session with 14 GB driver memory
MAX_MEMORY = "14G"

spark = SparkSession \
    .builder \
    .appName("demo") \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

spark

In [5]:
# Spark session with 14 GB driver memory
spark.sparkContext.getConf().get('spark.driver.memory')

'14G'

In [6]:
#importing required libraries
import pandas as pd
from matplotlib import pyplot as plt
from pyspark.sql.functions import *

In [7]:
# Loading the clean and transformed data
df= spark.read.parquet("final_df.parquet")

<hr>

## Task 3: Model Selection
3 models for classification:	
- Logistic Regression
- Decision Tree
- Random Forest

### Model 2: Decision Trees

#### Train-test split

In [10]:
# Splitting the data into train and test (Remember you are expected to compare the model later)
train_data, test_data = df.randomSplit([0.7,0.3], seed=100)

In [11]:
# Number of rows in train data
train_data.count()

548387

In [12]:
# Number of rows in test data
test_data.count()

235974

In [13]:
# Importing the DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassifier

In [14]:
# Defining the model parameters 
d_tree = DecisionTreeClassifier(featuresCol="features", labelCol="label")

#### Model Fitting

In [16]:
# Building the model with hyperparameter tuning

# Importing all the required libraries
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import numpy as np
# Create ParamGrid for Cross Validation

# np.linspace will help in taking multiple values within the specified range
d_tree_paramGrid = (ParamGridBuilder()
             .addGrid(d_tree.maxDepth, [int(x) for x in np.linspace(start = 10, stop = 20, num = 3)])
             .addGrid(d_tree.maxBins, [int(x) for x in np.linspace(start = 20, stop = 70, num = 4)])
             .build())

In [17]:
# Model evaluation parameters
# Default metric - area under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="label")

In [18]:
# Run cross-validation steps
# Create 3-fold CrossValidator
d_tree_cv = CrossValidator(estimator = d_tree,
                      estimatorParamMaps = d_tree_paramGrid,
                      evaluator = evaluator,
                      numFolds = 3)

In [19]:
# Running cross validations on the traindata
d_tree_cvModel = d_tree_cv.fit(train_data)

In [20]:
# Obtaining the maxDepth in the best model
d_tree_cvModel.bestModel.explainParam('maxDepth')

'maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 20)'

In [21]:
# Obtaining the maxBins in the best model
d_tree_cvModel.bestModel.explainParam('maxBins')

'maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32, current: 70)'

In [22]:
# Transforming the train data
predictions_train = d_tree_cvModel.transform(train_data)


#### Model Analysis

Required Steps:
- Fit on test data
- Performance analysis
    - Appropriate Metric with reasoning

In [23]:
# Transforming the test data
predictions_test = d_tree_cvModel.transform(test_data)

In [24]:
# cvModel uses the best model found from the Cross Validation
print('Area under ROC for training set:', evaluator.evaluate(predictions_train))
print('Area under ROC for test set:', evaluator.evaluate(predictions_test))

Area under ROC for training set: 0.7381586204349175
Area under ROC for test set: 0.7002588091384397


In [25]:
# Distribution of label values
predictions_test.groupby("label").count().show()

+-----+------+
|label| count|
+-----+------+
|    1|146232|
|    0| 89742|
+-----+------+



In [26]:
# Distribution of predicted values
predictions_test.groupby("prediction").count().show()

+----------+------+
|prediction| count|
+----------+------+
|       0.0| 73940|
|       1.0|162034|
+----------+------+



In [27]:
# Confusion Matrix
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
import pyspark.sql.functions as F

preds_and_labels = predictions_test.select(['prediction','label']).withColumn('label',F.col('label').cast(FloatType())).orderBy('prediction')

preds_and_labels = preds_and_labels.select(['prediction','label'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())

[[ 51524.  38218.]
 [ 22416. 123816.]]


In [32]:
# Accuracy value
accuracy=(51524+123816)/(51524+38218+22416+123816)
print("Accuracy score =", accuracy)

Accuracy score = 0.7430479629111681


In [33]:
# Precision Value
precision=(123816)/(123816+38218)
print("Precision =", precision)

Precision = 0.7641359220904255


In [34]:
# Recall value
recall=(123816)/(123816+22416)
print("Recall =",recall)

Recall = 0.8467093385852618


In [35]:
# Fscore
fscore=2 * ((precision * recall) / (precision+recall))
print("f score =",fscore)

f score = 0.8033062355238658


#### Summary of the best Decision Tree model

**Recall** is the appropriate metric. 
<br>Since we are concentrating on the number of churns, that is 1's, recall is the best metric and here we have around **0.85** as recall value.

Decision tree model is better than logistic regression which had recall of **0.76**