In [1]:
# Setting the environment variables
import os
import sys
import pandas as pd
import numpy as np
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

# Ecommerce Churn Assignment

The aim of the assignment is to build a model that predicts whether a person purchases an item after it has been added to the cart or not. Being a classification problem, you are expected to use your understanding of all the three models covered till now. You must select the most robust model and provide a solution that predicts the churn in the most suitable manner. 

For this assignment, you are provided the data associated with an e-commerce company for the month of October 2019. Your task is to first analyse the data, and then perform multiple steps towards the model building process.

The broad tasks are:
- Data Exploration
- Feature Engineering
- Model Selection
- Model Inference

### Data description

The dataset stores the information of a customer session on the e-commerce platform. It records the activity and the associated parameters with it.

- **event_time**: Date and time when user accesses the platform
- **event_type**: Action performed by the customer
            - View
            - Cart
            - Purchase
            - Remove from cart
- **product_id**: Unique number to identify the product in the event
- **category_id**: Unique number to identify the category of the product
- **category_code**: Stores primary and secondary categories of the product
- **brand**: Brand associated with the product
- **price**: Price of the product
- **user_id**: Unique ID for a customer
- **user_session**: Session ID for a user


### Initialising the SparkSession

The dataset provided is 5 GBs in size. Therefore, it is expected that you increase the driver memory to a greater number. You can refer to notebook 1 for the steps involved here.

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct ,col, avg, mean, isnan, when, count, col, to_timestamp
from pyspark.sql.types import IntegerType
import matplotlib.pyplot as plt
import datetime
import seaborn as sns
from datetime import timedelta
import pandas as pd
import numpy as np
import pyspark.sql.functions as f

In [3]:
# initialising the session with 14 GB driver memory
MAX_MEMORY = "14G"

spark = SparkSession \
    .builder \
    .appName("demo") \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

spark

In [4]:
# Loading the clean data
df_clean = spark.read.csv('/home/ec2-user/df_updated2', header= False, inferSchema= True)

In [5]:
df_clean.show(5)

+----+---+---+-----------+-----------+---+---+---+----+---+----+------+----+----+
| _c0|_c1|_c2|        _c3|        _c4|_c5|_c6|_c7| _c8|_c9|_c10|  _c11|_c12|_c13|
+----+---+---+-----------+-----------+---+---+---+----+---+----+------+----+----+
|view|540|  6|electronics| smartphone| 35|  1| 35|null| 35| 3.0|huawei|   0|   0|
|view|114|  5| appliances|environment|  1|  1|  1|null|  4| 0.0|others|   0|   0|
|view| 39|  3|electronics|     clocks|  2|  2|  2|null|  2| 3.0| casio|   0|   0|
|view|167|  6|electronics|      audio|  2|  2|  2|null| 14| 2.0|others|   0|   0|
|view|161|  6|electronics|      audio| 12|  1|  1|null|121| 2.0| apple|   0|   0|
+----+---+---+-----------+-----------+---+---+---+----+---+----+------+----+----+
only showing top 5 rows



In [6]:
#renaming the column names
df_clean = df_clean.select(col("_c0").alias("event_type"), col("_c1").alias("price"), col("_c2").alias("day_of_week"), col("_c3").alias("category1"), col("_c4").alias("category2"), col("_c5").alias("activity_countval"), col("_c6").alias("product_view_counts"), col("_c7").alias("category2_view_counts"), col("_c8").alias("average_shopping_expense"), col("_c9").alias("session_counts"), col("_c10").alias("binnedhour"), col("_c11").alias("brand_new"), col("_c12").alias("is_purchased"), col("_c13").alias("label"))

In [7]:
df_clean.show(5)

+----------+-----+-----------+-----------+-----------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+-----+
|event_type|price|day_of_week|  category1|  category2|activity_countval|product_view_counts|category2_view_counts|average_shopping_expense|session_counts|binnedhour|brand_new|is_purchased|label|
+----------+-----+-----------+-----------+-----------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+-----+
|      view|  540|          6|electronics| smartphone|               35|                  1|                   35|                    null|            35|       3.0|   huawei|           0|    0|
|      view|  114|          5| appliances|environment|                1|                  1|                    1|                    null|             4|       0.0|   others|           0|    0|
|      view|   39|       

In [9]:
df_clean.count()

15874983

In [10]:
!pip install spark-sklearn

Defaulting to user installation because normal site-packages is not writeable


In [11]:
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn import metrics

In [12]:
#import required libraries
from sklearn import preprocessing
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV
from pyspark.mllib.util import MLUtils

<hr>

## Task 3: Model Selection
3 models for classification:	
- Logistic Regression
- Decision Tree
- Random Forest

### Model 2: Decision Trees

In [13]:
# Additional steps for Decision Trees, if any

In [14]:
df_clean=df_clean.withColumn("is_purchased",df_clean["is_purchased"].cast(IntegerType()))

In [15]:
df_clean.printSchema()

root
 |-- event_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- category1: string (nullable = true)
 |-- category2: string (nullable = true)
 |-- activity_countval: integer (nullable = true)
 |-- product_view_counts: integer (nullable = true)
 |-- category2_view_counts: integer (nullable = true)
 |-- average_shopping_expense: double (nullable = true)
 |-- session_counts: integer (nullable = true)
 |-- binnedhour: double (nullable = true)
 |-- brand_new: string (nullable = true)
 |-- is_purchased: integer (nullable = true)
 |-- label: integer (nullable = true)



#### Feature Transformation (Code will be same; check for the columns)

In [16]:
# Check if only the required columns are present to build the model
# If not, drop the redundant columns
from pyspark.sql.functions import when, count, col, isnull, isnan
df_clean.select([count(when(isnan(c), c)).alias(c) for c in df_clean.columns]).show()

+----------+-----+-----------+---------+---------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+-----+
|event_type|price|day_of_week|category1|category2|activity_countval|product_view_counts|category2_view_counts|average_shopping_expense|session_counts|binnedhour|brand_new|is_purchased|label|
+----------+-----+-----------+---------+---------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+-----+
|         0|    0|          0|        0|        0|                0|                  0|                    0|                       0|             0|         0|        0|           0|    0|
+----------+-----+-----------+---------+---------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+-----+



In [17]:
!pip3 install xverse

Defaulting to user installation because normal site-packages is not writeable


In [18]:
df_clean.columns

['event_type',
 'price',
 'day_of_week',
 'category1',
 'category2',
 'activity_countval',
 'product_view_counts',
 'category2_view_counts',
 'average_shopping_expense',
 'session_counts',
 'binnedhour',
 'brand_new',
 'is_purchased',
 'label']

In [19]:
df_clean=df_clean.drop('event_type', 'label')

In [20]:
# Categorising the attributes into its type - Continuous and Categorical
# Storing the categorical and continuous columns in different lists
all_categorical_features = ['day_of_week', 'category1', 'category2', 'binnedhour', 'brand_new']
all_continuous_features = ['price', 'activity_countval', 'product_view_counts', 'category2_view_counts', 'session_counts', 'average_shopping_expense']

In [21]:
# Initialising the variable 'stages' to store every step for building a pipeline
stages = []

In [22]:
# Importing the libraries for data transormation
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

In [23]:
# Building a function for encoding all the categorical variables
for categoricalCol in all_categorical_features:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [24]:
# Vector assembler to combine all the features
assemblerInputs = [c + "classVec" for c in all_categorical_features] + all_continuous_features
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid("keep")
stages += [assembler]

In [25]:
# Pipeline for the tasks
# Loading all the steps in a pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)

In [26]:
# Transforming the dataframe df
# Fitting the steps on the dataFrame
pipelineModel = pipeline.fit(df_clean)
df_clean = pipelineModel.transform(df_clean)

In [27]:
# Schema of the transformed df
df_clean.printSchema()

root
 |-- price: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- category1: string (nullable = true)
 |-- category2: string (nullable = true)
 |-- activity_countval: integer (nullable = true)
 |-- product_view_counts: integer (nullable = true)
 |-- category2_view_counts: integer (nullable = true)
 |-- average_shopping_expense: double (nullable = true)
 |-- session_counts: integer (nullable = true)
 |-- binnedhour: double (nullable = true)
 |-- brand_new: string (nullable = true)
 |-- is_purchased: integer (nullable = true)
 |-- day_of_weekIndex: double (nullable = false)
 |-- day_of_weekclassVec: vector (nullable = true)
 |-- category1Index: double (nullable = false)
 |-- category1classVec: vector (nullable = true)
 |-- category2Index: double (nullable = false)
 |-- category2classVec: vector (nullable = true)
 |-- binnedhourIndex: double (nullable = false)
 |-- binnedhourclassVec: vector (nullable = true)
 |-- brand_newIndex: double (nullable = false)
 |-- br

In [28]:
# Checking the elements of the transformed df - Top 20 rows
df_clean.show(20)

+-----+-----------+-----------+-----------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+----------------+-------------------+--------------+-----------------+--------------+-----------------+---------------+------------------+--------------+-----------------+--------------------+
|price|day_of_week|  category1|  category2|activity_countval|product_view_counts|category2_view_counts|average_shopping_expense|session_counts|binnedhour|brand_new|is_purchased|day_of_weekIndex|day_of_weekclassVec|category1Index|category1classVec|category2Index|category2classVec|binnedhourIndex|binnedhourclassVec|brand_newIndex|brand_newclassVec|            features|
+-----+-----------+-----------+-----------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+----------------+-------------------+--------------+-----------------+-------------

In [29]:
df_clean.count()

15874983

In [30]:
# Checking the elements of the transformed df - Top 20 rows
df_clean.show(20)

+-----+-----------+-----------+-----------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+----------------+-------------------+--------------+-----------------+--------------+-----------------+---------------+------------------+--------------+-----------------+--------------------+
|price|day_of_week|  category1|  category2|activity_countval|product_view_counts|category2_view_counts|average_shopping_expense|session_counts|binnedhour|brand_new|is_purchased|day_of_weekIndex|day_of_weekclassVec|category1Index|category1classVec|category2Index|category2classVec|binnedhourIndex|binnedhourclassVec|brand_newIndex|brand_newclassVec|            features|
+-----+-----------+-----------+-----------+-----------------+-------------------+---------------------+------------------------+--------------+----------+---------+------------+----------------+-------------------+--------------+-----------------+-------------

#### Train-test split

In [31]:
# Splitting the data into train and test (Remember you are expected to compare the model later)
traindata, testdata = df_clean.randomSplit([0.7,0.3], seed=100)

In [32]:
# Number of rows in train and test data
traindata.count()

11113821

In [33]:
testdata.count()

4761162

#### Model Fitting

In [34]:
# Building the model with hyperparameter tuning
# Create ParamGrid for Cross Validation

from pyspark.ml.classification import DecisionTreeClassifier

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="is_purchased", featuresCol="features")

dt_model = dt.fit(traindata)

In [35]:
# Run cross-validation steps
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [2,3,5]) \
    .addGrid(dt.minInstancesPerNode, [5,10,20]) \
    .addGrid(dt.impurity, ['entropy', 'gini']) \
    .build()

crossval = CrossValidator(estimator=dt_model,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
crossval = crossval(maxIter=10)
crossval = crossval.fit(traindata)
cvModel = crossval.fit(traindata)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(testdata)
selected = prediction.select("is_purchased", "probability", "prediction")
for row in selected.collect():
    print(row)

TypeError: 'CrossValidator' object is not callable

In [None]:
# Fitting the models on transformed df


In [None]:
# Best model from the results of cross-validation


#### Model Analysis

Required Steps:
- Fit on test data
- Performance analysis
    - Appropriate Metric with reasoning

In [36]:
# Applying the model on test set
predictions = dt_model.transform(testdata)

In [37]:
# Printing the required columns
predictions.select('is_purchased', 'rawPrediction', 'prediction', 'probability').show(10)

+------------+--------------------+----------+--------------------+
|is_purchased|       rawPrediction|prediction|         probability|
+------------+--------------------+----------+--------------------+
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
|           0|[1.0769307E7,3445...|       0.0|[0.96900130027287...|
+------------+--------------------+----------+--------------------+
only showing top 10 rows



In [38]:
# Feature Importance
dt_model.featureImportances

SparseVector(108, {})

In [39]:
# Defining a function to extract features along with the feature importance score
import pandas as pd
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [40]:
# Printing the feature importance scores
ExtractFeatureImp(dt_model.featureImportances, predictions, "features").head(10)

Unnamed: 0,idx,name,score
0,102,price,0.0
68,62,category2classVec_lawn_mower,0.0
79,73,category2classVec_skirt,0.0
78,72,category2classVec_tennis,0.0
77,71,category2classVec_furniture,0.0
76,70,category2classVec_sock,0.0
75,69,category2classVec_jumper,0.0
74,68,category2classVec_jeans,0.0
73,67,category2classVec_scarf,0.0
72,66,category2classVec_ski,0.0


In [41]:
# Model evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="is_purchased", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

In [42]:
accuracy

0.9688395395913855

In [43]:
#import the libraries required to print the confusion matrix
from pyspark.sql.types import FloatType
from pyspark.mllib.evaluation import MulticlassMetrics
import pyspark.sql.functions as F

In [44]:
#Place the predictions and labels together
preds_and_labels = predictions.select(['prediction','is_purchased']).withColumn('label', F.col('is_purchased').cast(FloatType())).orderBy('prediction')

In [45]:
preds_and_labels = preds_and_labels.select(['prediction','label'])

In [46]:
#convert into RDD
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
#print in form of an array
print(metrics.confusionMatrix().toArray())

[[4612802.       0.]
 [ 148360.       0.]]


In [47]:
#Calculate recall
recall = metrics.recall()
recall

0.9688395395913855

In [48]:
#Calculate precision
precision = metrics.precision()
precision

0.9688395395913855

#### Summary of the best Decision Tree model

In [None]:
#Accuracy, REcall and Precision are @ 96.88%.  