In [1]:
# Setting the environment variables

In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

# Ecommerce Churn Assignment

The aim of the assignment is to build a model that predicts whether a person purchases an item after it has been added to the cart or not. Being a classification problem, you are expected to use your understanding of all the three models covered till now. You must select the most robust model and provide a solution that predicts the churn in the most suitable manner. 

For this assignment, you are provided the data associated with an e-commerce company for the month of October 2019. Your task is to first analyse the data, and then perform multiple steps towards the model building process.

The broad tasks are:
- Data Exploration
- Feature Engineering
- Model Selection
- Model Inference

### Data description

The dataset stores the information of a customer session on the e-commerce platform. It records the activity and the associated parameters with it.

- **event_time**: Date and time when user accesses the platform
- **event_type**: Action performed by the customer
            - View
            - Cart
            - Purchase
            - Remove from cart
- **product_id**: Unique number to identify the product in the event
- **category_id**: Unique number to identify the category of the product
- **category_code**: Stores primary and secondary categories of the product
- **brand**: Brand associated with the product
- **price**: Price of the product
- **user_id**: Unique ID for a customer
- **user_session**: Session ID for a user


### Initialising the SparkSession

The dataset provided is 5 GBs in size. Therefore, it is expected that you increase the driver memory to a greater number. You can refer to notebook 1 for the steps involved here.

In [3]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

# initialising the session with 14 GB driver memory
MAX_MEMORY = "14G"

spark = SparkSession \
    .builder \
    .appName("log_reg") \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

spark

In [4]:
print('spark.driver.memory =', spark.sparkContext.getConf().get('spark.driver.memory'))


spark.driver.memory = 14G


In [5]:
# Flag to force only 10% data to be selected from the dataset
# This was used only during the initial development of the code
# DONOT set this to True and finalize any inference 
use_small_sample = False
load_from_existing_features_dataset = True
save_features_dataset=False

In [6]:
# Loading the clean data
if use_small_sample == True:
    df= spark.read.parquet("cleaned_df.parquet")
    df = df.randomSplit([.05, .95], seed=12)[0]
else:
    df= spark.read.parquet("cleaned_df.parquet")

In [7]:
from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator


In [8]:
from pyspark.mllib.evaluation import MulticlassMetrics, BinaryClassificationMetrics
from pyspark.sql.types import FloatType

In [9]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


<hr>

## Task 3: Model Selection
3 models for classification:	
- Logistic Regression
- Decision Tree
- Random Forest

### Model 2: Decision Trees

In [10]:
# Additional steps for Decision Trees, if any


#### Feature Transformation (Code will be same; check for the columns)

In [11]:
# Check if only the required columns are present to build the model
# If not, drop the redundant columns
df = df.drop('product_id', 'user_id', 'hour')
df.columns

['cat_0_cln',
 'price',
 'category_level',
 'day_of_week',
 'cat_1_cln',
 'brand_cln',
 'session_count',
 'activity_count',
 'product_view_count',
 'second_cat_view_count',
 'avg_price',
 'hour_bins',
 'is_purchased']

In [12]:
# Categorising the attributes into its type - Continuous and Categorical
numeric_cols = ['price', 'session_count', 'activity_count',
                'product_view_count', 'second_cat_view_count',
                'avg_price'
               ]

# List of categorical string columns
categorical_str_cols = ['cat_0_cln', 'cat_1_cln', 'brand_cln']

# list of categorical numeric columns
categorical_num_cols = ['category_level', 'day_of_week', 'hour_bins']

In [13]:
# Feature transformation for categorical features
#
stages = []
# Creating a pipeline for Feature Transformations
# Starting with the categorical string cols
for col in categorical_str_cols:
    # Encode the strings in the col with an index
    indexer = StringIndexer(inputCol=col,
                            outputCol = col + '_ix',
                           )
    # OHE the indices in the col
    encoder = OneHotEncoder(inputCol=indexer.getOutputCol(),
                            outputCol=col + '_enc'
                           )
    stages +=[indexer, encoder]
    

# Processing thhe Categorical numeric cols
for col in categorical_num_cols:
    encoder = OneHotEncoder(inputCol=col,
                            outputCol=col + '_enc'
                           )
    stages += [encoder]
    
# Vector assembler to combine all the features
# Add a Vector Assembler to the pipeline stages
vector_input_cols = [c + '_enc' for c in categorical_str_cols] + \
                    [c + '_enc' for c in categorical_num_cols] + \
                    numeric_cols
print('number of features to be given to the moddel = ', len(vector_input_cols))


number of features to be given to the moddel =  12


In [14]:
# Vector assembler to combine all the features
vector = VectorAssembler(inputCols = vector_input_cols,
                         outputCol = 'features'
                        )
stages.append(vector)

print('stages = ')
stages

stages = 


[StringIndexer_e7fe2ec1fbb6,
 OneHotEncoder_11024f76a7da,
 StringIndexer_f039aaba2f9c,
 OneHotEncoder_0cb2df844a8b,
 StringIndexer_fe3a763db57d,
 OneHotEncoder_97e3c3f7b8f3,
 OneHotEncoder_b731cc26a33e,
 OneHotEncoder_1ec78ea3939c,
 OneHotEncoder_79f37b523c3e,
 VectorAssembler_5fcfda09e33f]

In [15]:
# Pipeline for the tasks
pipeline = Pipeline(stages = stages)


In [16]:
# Transforming the dataframe df
if load_from_existing_features_dataset == True:
    transformed_df = spark.read.parquet('features_df.parquet')
else:
    pipeline_model = pipeline.fit(df)
    transformed_df = pipeline_model.transform(df)

In [17]:
# Schema of the transformed df
transformed_df.printSchema()

root
 |-- cat_0_cln: string (nullable = true)
 |-- price: float (nullable = true)
 |-- category_level: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- cat_1_cln: string (nullable = true)
 |-- brand_cln: string (nullable = true)
 |-- session_count: long (nullable = true)
 |-- activity_count: long (nullable = true)
 |-- product_view_count: long (nullable = true)
 |-- second_cat_view_count: long (nullable = true)
 |-- avg_price: double (nullable = true)
 |-- hour_bins: double (nullable = true)
 |-- is_purchased: integer (nullable = true)
 |-- cat_0_cln_ix: double (nullable = true)
 |-- cat_0_cln_enc: vector (nullable = true)
 |-- cat_1_cln_ix: double (nullable = true)
 |-- cat_1_cln_enc: vector (nullable = true)
 |-- brand_cln_ix: double (nullable = true)
 |-- brand_cln_enc: vector (nullable = true)
 |-- category_level_enc: vector (nullable = true)
 |-- day_of_week_enc: vector (nullable = true)
 |-- hour_bins_enc: vector (nullable = true)
 |-- features: vector (

In [18]:
# Checking the elements of the transformed df - Top 20 rows
transformed_df.show(20)

+-----------+-------+--------------+-----------+----------+---------+-------------+--------------+------------------+---------------------+-----------------+---------+------------+------------+--------------+------------+--------------+------------+---------------+------------------+---------------+-------------+--------------------+
|  cat_0_cln|  price|category_level|day_of_week| cat_1_cln|brand_cln|session_count|activity_count|product_view_count|second_cat_view_count|        avg_price|hour_bins|is_purchased|cat_0_cln_ix| cat_0_cln_enc|cat_1_cln_ix| cat_1_cln_enc|brand_cln_ix|  brand_cln_enc|category_level_enc|day_of_week_enc|hour_bins_enc|            features|
+-----------+-------+--------------+-----------+----------+---------+-------------+--------------+------------------+---------------------+-----------------+---------+------------+------------+--------------+------------+--------------+------------+---------------+------------------+---------------+-------------+--------------

In [19]:
# Storing the transformed df in S3 bucket to prevent repetition of steps again


#### Train-test split

In [20]:
## Renaming the target column as label as a workaround for crossValidator to run without error
transformed_df = transformed_df.withColumnRenamed('is_purchased', 'label')

In [21]:
# Splitting the data into train and test (Remember you are expected to compare the model later)
train, test = transformed_df.randomSplit([0.8, 0.2], seed=12)

In [22]:
# Number of rows in train and test data
print(f'Number of rows in train data = {train.count()}')
print(f'Number of rows in test data = {test.count()}')

Number of rows in train data = 718925
Number of rows in test data = 179516


#### Model Fitting

In [23]:
# Building the model with hyperparameter tuning
# Create ParamGrid for Cross Validation
tree = DecisionTreeClassifier()


grid = ParamGridBuilder()\
        .baseOn({tree.featuresCol: 'features'})\
        .baseOn({tree.labelCol: 'label'})\
        .baseOn({tree.seed: 12})\
        .addGrid(tree.maxDepth, [3, 4, 5, 6, 7])\
        .addGrid(tree.minInstancesPerNode, [5, 10, 20, 50, 100])\
        .addGrid(tree.maxBins, [5, 20, 30, 50])\
        .addGrid(tree.impurity, ['gini', 'entropy'])\
        .build()
evaluator = BinaryClassificationEvaluator(labelCol='label',
                                          metricName='areaUnderPR')
# evaluator = MulticlassClassificationEvaluator(labelCol = 'label',
#                                               metricName='recallByLabel')

In [24]:
# Run cross-validation steps
cv = CrossValidator(estimator=tree, 
                    estimatorParamMaps=grid, 
                    evaluator=evaluator, 
                    numFolds = 3,
                    seed = 12
                   )

In [25]:
# Fitting the models on transformed df
cvModel = cv.fit(train)

In [26]:
best_tree = cvModel.bestModel
best_tree

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_b84abb6e14f1) of depth 7 with 113 nodes

#### Model Analysis

Required Steps:
- Fit on test data
- Performance analysis
    - Appropriate Metric with reasoning

In [27]:
# Generating predictions for the test data

predictions = best_tree.transform(test)

In [28]:
predictions.show(2)

+---------+-----+--------------+-----------+---------+---------+-------------+--------------+------------------+---------------------+-----------------+---------+-----+------------+--------------+------------+--------------+------------+--------------+------------------+---------------+-------------+--------------------+---------------+--------------------+----------+
|cat_0_cln|price|category_level|day_of_week|cat_1_cln|brand_cln|session_count|activity_count|product_view_count|second_cat_view_count|        avg_price|hour_bins|label|cat_0_cln_ix| cat_0_cln_enc|cat_1_cln_ix| cat_1_cln_enc|brand_cln_ix| brand_cln_enc|category_level_enc|day_of_week_enc|hour_bins_enc|            features|  rawPrediction|         probability|prediction|
+---------+-----+--------------+-----------+---------+---------+-------------+--------------+------------------+---------------------+-----------------+---------+-----+------------+--------------+------------+--------------+------------+--------------+------

In [29]:
preds_and_labels=predictions.select(['prediction','label'])
preds_and_labels = preds_and_labels.withColumn('label', preds_and_labels['label'].cast(FloatType()))
preds_and_labels.show(3)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
+----------+-----+
only showing top 3 rows



In [30]:
preds_and_labels.dtypes

[('prediction', 'double'), ('label', 'float')]

In [31]:
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

print(metrics.confusionMatrix().toArray())

[[25262. 45254.]
 [16482. 92518.]]


In [32]:
# accuracy for label 1
print('Accuracy for label 1 = ', metrics.accuracy)
#precision for label 1
print('Precision for label 1 =', metrics.precision(1))

#recall for label 1
print('Recall for label 1 =', metrics.recall(1))

#F1 score for label 1
print('F1 score = ', metrics.fMeasure(1.0, beta = 1.0))



Accuracy for label 1 =  0.6560975066289356
Precision for label 1 = 0.6715297738292251
Recall for label 1 = 0.848788990825688
F1 score =  0.7498257500850987


In [33]:
binary_metrics = BinaryClassificationMetrics(preds_and_labels.rdd.map(tuple))

print('Area Under Curve = ', binary_metrics.areaUnderROC)



Area Under Curve =  0.6035169640724389


#### Summary of the best Decision Tree model

In [34]:
best_tree.featureImportances

SparseVector(91, {13: 0.1303, 18: 0.0002, 52: 0.0116, 53: 0.0014, 55: 0.0106, 78: 0.0002, 80: 0.0004, 83: 0.0007, 85: 0.002, 86: 0.2082, 87: 0.083, 88: 0.482, 89: 0.0694})

In [35]:
best_tree.toDebugString


'DecisionTreeClassificationModel (uid=DecisionTreeClassifier_b84abb6e14f1) of depth 7 with 113 nodes\n  If (feature 88 <= 6.5)\n   If (feature 86 <= 3.5)\n    If (feature 88 <= 2.5)\n     If (feature 87 <= 1.5)\n      If (feature 52 in {1.0})\n       If (feature 13 in {0.0})\n        Predict: 0.0\n       Else (feature 13 not in {0.0})\n        If (feature 89 <= 11.5)\n         Predict: 0.0\n        Else (feature 89 > 11.5)\n         Predict: 1.0\n      Else (feature 52 not in {1.0})\n       Predict: 0.0\n     Else (feature 87 > 1.5)\n      Predict: 0.0\n    Else (feature 88 > 2.5)\n     If (feature 52 in {1.0})\n      If (feature 88 <= 3.5)\n       If (feature 85 <= 251.1050033569336)\n        If (feature 87 <= 1.5)\n         Predict: 1.0\n        Else (feature 87 > 1.5)\n         Predict: 0.0\n       Else (feature 85 > 251.1050033569336)\n        If (feature 83 in {0.0})\n         Predict: 0.0\n        Else (feature 83 not in {0.0})\n         Predict: 1.0\n      Else (feature 88 > 3.5

In [36]:
    best_tree.save('decision_tree_model')