In [1]:
# Setting the environment variables

In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

# Ecommerce Churn Assignment

The aim of the assignment is to build a model that predicts whether a person purchases an item after it has been added to the cart or not. Being a classification problem, you are expected to use your understanding of all the three models covered till now. You must select the most robust model and provide a solution that predicts the churn in the most suitable manner. 

For this assignment, you are provided the data associated with an e-commerce company for the month of October 2019. Your task is to first analyse the data, and then perform multiple steps towards the model building process.

The broad tasks are:
- Data Exploration
- Feature Engineering
- Model Selection
- Model Inference

### Data description

The dataset stores the information of a customer session on the e-commerce platform. It records the activity and the associated parameters with it.

- **event_time**: Date and time when user accesses the platform
- **event_type**: Action performed by the customer
            - View
            - Cart
            - Purchase
            - Remove from cart
- **product_id**: Unique number to identify the product in the event
- **category_id**: Unique number to identify the category of the product
- **category_code**: Stores primary and secondary categories of the product
- **brand**: Brand associated with the product
- **price**: Price of the product
- **user_id**: Unique ID for a customer
- **user_session**: Session ID for a user


### Initialising the SparkSession

The dataset provided is 5 GBs in size. Therefore, it is expected that you increase the driver memory to a greater number. You can refer to notebook 1 for the steps involved here.

In [4]:
# Spark environment
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [5]:
# initialising the session with 14 GB driver memory
MAX_MEMORY = "14G"

spark = SparkSession \
    .builder \
    .appName("demo") \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

spark

In [6]:
spark.sparkContext.getConf().get('spark.driver.memory')

'14G'

In [11]:
# Loading the clean data

df = spark.read.parquet('cleaned_df.parquet')

<hr>

## Task 3: Model Selection
3 models for classification:	
- Logistic Regression
- Decision Tree
- Random Forest

### Model 1: Logistic Regression

In [12]:
# Additional steps for Logistic regression - Feature selection, Correlation, etc.

from pyspark.ml.feature import OneHotEncoderEstimator,StringIndexer,VectorAssembler

#### Feature Transformation

In [13]:
# Check if only the required columns are present to build the model
# If not, drop the redundant columns

df.printSchema()

root
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- event_day: integer (nullable = true)
 |-- sub_categ: string (nullable = true)
 |-- user_session_activity: long (nullable = true)
 |-- user_product_count: long (nullable = true)
 |-- sub_categ_user_count: long (nullable = true)
 |-- prod_avg_spend: double (nullable = true)
 |-- user_sess_count: long (nullable = true)
 |-- label: integer (nullable = true)



In [14]:
# Categorising the attributes into its type - Continuous and Categorical

categ_features=['brand','sub_categ', 'event_day']
cont_features=['price', 'user_product_count', 'sub_categ_user_count', 'prod_avg_spend',\
                     'user_sess_count', 'user_session_activity']
output_label = 'label'

In [16]:
stages=[]

In [17]:
# Feature transformation for categorical features

for categoricalCol in categ_features:
    stringIndexer=StringIndexer(inputCol=categoricalCol,outputCol=categoricalCol+'_ind').setHandleInvalid("keep")
    encoder=OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],outputCols=\
                                   [categoricalCol+"_enc"])
    stages+=[stringIndexer,encoder]

In [19]:
# Vector assembler to combine all the features

assemblerInputs=[col+"_enc" for col in categ_features]+cont_features
assembler=VectorAssembler(inputCols=assemblerInputs,outputCol="features")
stages+=[assembler]

In [20]:
# Pipeline for the tasks

from pyspark.ml import Pipeline
pipeline=Pipeline(stages=stages)

In [21]:
# Transforming the dataframe df

df_transform = pipeline.fit(df).transform(df)

In [22]:
# Schema of the transformed df

df_transform.printSchema()

root
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- event_day: integer (nullable = true)
 |-- sub_categ: string (nullable = true)
 |-- user_session_activity: long (nullable = true)
 |-- user_product_count: long (nullable = true)
 |-- sub_categ_user_count: long (nullable = true)
 |-- prod_avg_spend: double (nullable = true)
 |-- user_sess_count: long (nullable = true)
 |-- label: integer (nullable = true)
 |-- brand_ind: double (nullable = false)
 |-- brand_enc: vector (nullable = true)
 |-- sub_categ_ind: double (nullable = false)
 |-- sub_categ_enc: vector (nullable = true)
 |-- event_day_ind: double (nullable = false)
 |-- event_day_enc: vector (nullable = true)
 |-- features: vector (nullable = true)



In [23]:
# Checking the elements of the transformed df - Top 20 rows

df_transform.show(truncate = False)

+--------+-------+---------+----------+---------------------+------------------+--------------------+------------------+---------------+-----+---------+--------------+-------------+--------------+-------------+-------------+--------------------------------------------------------------------------------------------+
|brand   |price  |event_day|sub_categ |user_session_activity|user_product_count|sub_categ_user_count|prod_avg_spend    |user_sess_count|label|brand_ind|brand_enc     |sub_categ_ind|sub_categ_enc |event_day_ind|event_day_enc|features                                                                                    |
+--------+-------+---------+----------+---------------------+------------------+--------------------+------------------+---------------+-----+---------+--------------+-------------+--------------+-------------+-------------+--------------------------------------------------------------------------------------------+
|samsung |283.62 |3        |smartphone|2      

In [24]:
# Storing the transformed df in S3 bucket to prevent repetition of steps again

df_transform.coalesce(1).write.option("header", "true").parquet("LR_Transform_Output.parquet")

In [25]:
from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler().setInputCol("features").setOutputCol("scaled_features")
df_logistic = scaler.fit(df_transform).transform(df_transform)

In [26]:
# Scaled features
df_logistic.select("features","scaled_features").head()

Row(features=SparseVector(71, {0: 1.0, 20: 1.0, 59: 1.0, 65: 283.62, 66: 2.0, 67: 3.0, 68: 270.4633, 69: 2.0, 70: 2.0}), scaled_features=DenseVector([1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.1098, 0.0013, 0.0006, 0.1047, 0.0005, 0.0031]))

#### Train-test split

In [27]:
# Splitting the data into train and test (Remember you are expected to compare the model later)

traindata, testdata = df_logistic.randomSplit([0.7,0.3], seed=100)

In [28]:
# Number of rows in train and test data

traindata.count()

549095

In [29]:
testdata.count()

235266

#### Model Fitting

In [30]:
# Building the model

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol='scaled_features', labelCol='label')

In [31]:
# Fitting the model on transformed df

model = lr.fit(traindata)

#### Model Analysis

Required Steps:
- Fit on test data
- Performance analysis
    - Appropriate Metric with reasoning
    - Threshold selection

In [32]:
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coefficients: [-0.016364044126548516,0.024748699218964852,-0.28785411121093746,-0.5358607494044827,-0.12917960496283934,0.011084880414660566,-0.15600180236201014,-0.09730783673971405,-0.22149768095428427,-0.24177574521820927,-0.25333938435742426,-0.19464539597003264,-0.1661891541166122,-0.24003551972653012,-0.3894712611289704,-0.13530529335577177,-0.31371923167152166,-0.3802748846488473,-0.4285691598795496,0.2820335114163284,0.007787971362826653,-0.3031394080317867,-0.40911510160185927,-0.2760195166770672,-0.22582200766893348,-0.3757056101442078,-0.2518806030280961,-0.39938026186328257,-0.1910907039604234,-0.479464854549623,-0.28593963054690597,-0.33670391225922214,-0.051894608494017135,-0.5367769536868628,-0.2882989288266407,-0.17619677321155902,-0.5437305869579929,-0.4027020140216677,-0.6813934772732959,-0.8823411058744571,-0.5454344061012408,-0.39700131305355363,-0.5762834944594801,-0.9708166145313164,0.11174865396798954,-0.5629084306812733,-0.6307816949116098,-1.1923134522613483,-0

In [33]:
predictions_train = model.transform(traindata)

In [34]:
predictions_test = model.transform(testdata)

In [35]:
predictions_test.select('label', 'prediction').show(10)

+-----+----------+
|label|prediction|
+-----+----------+
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       1.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    1|       1.0|
|    0|       1.0|
|    0|       1.0|
+-----+----------+
only showing top 10 rows



In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions_train))
print('Test Area Under ROC', evaluator.evaluate(predictions_test))

Test Area Under ROC 0.7082473906942639
Test Area Under ROC 0.708299445894665


In [37]:
result_train = model.evaluate(traindata)
result_test = model.evaluate(testdata)

print('fMeasure on Training data', result_train.fMeasureByLabel())
print('fMeasure on Test data', result_test.fMeasureByLabel())

fMeasure on Training data [0.44304921672628583, 0.7666291946091923]
fMeasure on Test data [0.4420633551799471, 0.7657130786408353]


In [40]:
conf_matrix = predictions_train.select('label','probability','prediction')

In [43]:
from pyspark.sql.functions import udf, when
from pyspark.sql.types import FloatType

element_extrac=udf(lambda v:float(v[1]),FloatType())

final_result = conf_matrix.withColumn('label_p',when(element_extrac(conf_matrix['probability']) >= 0.5, 1).otherwise(0))
TP = final_result.filter("label==1 AND label_p==1").count()
FP = final_result.filter("label==0 AND label_p==1").count()
FN = final_result.filter("label==1 AND label_p==0").count()
TN = final_result.filter("label==0 AND label_p==0").count() 

In [44]:
accuracy = (TP + TN)/(TP + TN + FP + FN)
accuracy

0.6710805962538359

In [45]:
precision = (TP)/(TP+FP)
precision

0.6843002468224493

In [46]:
recall = (TP)/(TP+FN)
recall

0.8714776733254994

In [47]:
fscore = (2*precision*recall)/(precision + recall)
fscore

0.7666291946091923

#### Updated model, if any
Repeat the steps 