In [1]:
# Setting the environment variables

In [2]:
import os
import sys
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

# Ecommerce Churn Assignment

The aim of the assignment is to build a model that predicts whether a person purchases an item after it has been added to the cart or not. Being a classification problem, you are expected to use your understanding of all the three models covered till now. You must select the most robust model and provide a solution that predicts the churn in the most suitable manner. 

For this assignment, you are provided the data associated with an e-commerce company for the month of October 2019. Your task is to first analyse the data, and then perform multiple steps towards the model building process.

The broad tasks are:
- Data Exploration
- Feature Engineering
- Model Selection
- Model Inference

### Data description

The dataset stores the information of a customer session on the e-commerce platform. It records the activity and the associated parameters with it.

- **event_time**: Date and time when user accesses the platform
- **event_type**: Action performed by the customer
            - View
            - Cart
            - Purchase
            - Remove from cart
- **product_id**: Unique number to identify the product in the event
- **category_id**: Unique number to identify the category of the product
- **category_code**: Stores primary and secondary categories of the product
- **brand**: Brand associated with the product
- **price**: Price of the product
- **user_id**: Unique ID for a customer
- **user_session**: Session ID for a user


### Initialising the SparkSession

The dataset provided is 5 GBs in size. Therefore, it is expected that you increase the driver memory to a greater number.

In [3]:
# Spark environment
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [4]:
# initialising the session with 14 GB driver memory
MAX_MEMORY = "14G"

spark = SparkSession \
    .builder \
    .appName("demo") \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

spark

In [5]:
# Spark session with 14 GB driver memory
spark.sparkContext.getConf().get('spark.driver.memory')

'14G'

In [6]:
import pandas as pd
from pyspark.sql.functions import col
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import to_timestamp, hour, dayofweek

In [7]:
# loading the dataset from the EC2 instance - 2019-Oct.csv
df = spark.read.csv("inputdata/2019-Oct.csv", header=True, inferSchema=True)

In [8]:
# exploring the dataframe - top 20 rows
df.show(20)

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    null|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:...|      view|   1004237|205301355563188265

In [9]:
# exploring the dataframe - schema
df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [10]:
# Number of rows and columns in the dataset
print((df.count(), len(df.columns)))

(42448764, 9)


So, we have a huge dataset that contains 42 million rows.

Now, tha dataset is loaded in Spark environment. Let's proceed with the desired tasks.

<hr>

## Task 1 - Data Exploration

**Find**
- 5 most popular:
    - Products sold by the e-commerce company in the month
    - Brands on the platform
    - Product categories
- Number of unique users and the most active user on the platform
- Average and maximum price for smartphones purchased by the customers
<br>

**Plot and comment**
- Event-type funnel distribution in e-commerce shopping journey
- Traffic on different days of the week

Provide the results in a separate report.

In [11]:
# 5 most popular products sold
df[df["event_type"]=="purchase"].groupby('product_id').count().sort(col("count").desc()).show(5)

+----------+-----+
|product_id|count|
+----------+-----+
|   1004856|28944|
|   1004767|21806|
|   1004833|12697|
|   1005115|12543|
|   4804056|12381|
+----------+-----+
only showing top 5 rows



In [12]:
# 5 most popular brands
df[df["event_type"]=="purchase"].groupby('brand').count().sort(col("count").desc()).show(6) #showing 6 as null is also there

+-------+------+
|  brand| count|
+-------+------+
|samsung|172896|
|  apple|142873|
|   null| 58214|
| xiaomi| 56616|
| huawei| 23501|
|lucente| 11578|
+-------+------+
only showing top 6 rows



In [13]:
# Number of unique users
df.select(countDistinct("user_id")).withColumnRenamed("count(DISTINCT user_id)", "UniqueUsers").show()

+-----------+
|UniqueUsers|
+-----------+
|    3022290|
+-----------+



In [14]:
# The most active user on the platform
df.groupby('user_id').agg(countDistinct("user_session")).withColumnRenamed("count(DISTINCT user_session)","UniqueSessions").sort(col("UniqueSessions").desc()).show(1)

+---------+--------------+
|  user_id|UniqueSessions|
+---------+--------------+
|512475445|          7400|
+---------+--------------+
only showing top 1 row



In [15]:
df.select('category_code').distinct().show()

+--------------------+
|       category_code|
+--------------------+
|apparel.shoes.sli...|
|    computers.ebooks|
|computers.periphe...|
|electronics.video...|
|appliances.kitche...|
|     sport.snowboard|
|electronics.camer...|
|       apparel.shirt|
|electronics.audio...|
|appliances.kitche...|
|appliances.kitche...|
|  electronics.tablet|
|appliances.kitche...|
|auto.accessories....|
|apparel.shoes.moc...|
|       apparel.jeans|
|computers.periphe...|
|furniture.living_...|
| stationery.cartrige|
|furniture.kitchen...|
+--------------------+
only showing top 20 rows



In [16]:
# Average and Maximum price for smartphones purchased by the customers
df.filter((col("event_type")=="purchase")&(col("category_code").like("electronics.smartphone"))).groupBy().max('price').show()

+----------+
|max(price)|
+----------+
|   2110.45|
+----------+



In [17]:
df.filter((col("event_type")=="purchase")&(col("category_code").like("electronics.smartphone"))).groupBy().avg('price').show()

+-----------------+
|       avg(price)|
+-----------------+
|464.6191130945664|
+-----------------+



In [18]:
pip install plotly

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [19]:
# Event-type funnel distribution
# Remember Spark dataframe cannot be visualised directly
events_count = df.groupby('event_type').count().withColumnRenamed("count","event_count")
labels = [row.event_type for row in events_count.collect()]
size = [int(row.event_count)for row in events_count.collect()]
print(labels, size)

['purchase', 'view', 'cart'] [742849, 40779399, 926516]


In [20]:
labels

['purchase', 'view', 'cart']

In [21]:
import plotly
from plotly import version
print (version)
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
init_notebook_mode(connected=True)

<module 'plotly.version' from '/home/ec2-user/.local/lib/python3.7/site-packages/plotly/version.py'>


In [22]:
# from plotly import graph_objects as go
from plotly import graph_objects as go
fig = go.Figure(go.Funnel(y=size, x=labels))
fig.show()

In [23]:
from plotly import express as px
fig = px.funnel_area(names=labels,
                    values=size)
fig.show()

In [24]:
# Traffic on different days of the week
# Remember Spark dataframe cannot be visualised directly


In [25]:
df = df.withColumn("event_time", to_timestamp(df.event_time, 'yyyy-MM-dd HH:mm:ss'))

In [26]:
day_visits = df.groupby(dayofweek(df["event_time"])).agg(countDistinct("user_session"))

In [27]:
day_visits =  day_visits.withColumnRenamed("dayofweek(event_time)","DayOfWeek").withColumnRenamed("count(DISTINCT user_session)","ActivityCount")
day = [row.DayOfWeek for row in day_visits.collect()]
activity = [int(row.ActivityCount) for row in day_visits.collect()]

In [28]:
day_visits.show()

+---------+-------------+
|DayOfWeek|ActivityCount|
+---------+-------------+
|        1|      1236041|
|        6|      1286835|
|        3|      1504737|
|        5|      1413669|
|        4|      1476124|
|        7|      1191891|
|        2|      1173112|
+---------+-------------+



In [29]:
!pip3 install -U matplotlib

Defaulting to user installation because normal site-packages is not writeable
Requirement already up-to-date: matplotlib in ./.local/lib/python3.7/site-packages (3.4.2)


In [30]:
import matplotlib.pyplot as plt
plt.clf()
day_visits_pd = day_visits.toPandas().sort_values(by=['DayOfWeek'], inplace = True)
day_visits_pd.plot(kind='bar', x='DayOfWeek', y='ActivityCount')
plt.xticks(day_visits_pd.DayOfWeek)
plt.xlim(0, 7)
plt.xlabel('Day')
plt.ylabel('Number of visits')

%matplot plt

AttributeError: 'NoneType' object has no attribute 'plot'

<Figure size 432x288 with 0 Axes>

In [None]:
# drop redundant column(s), if any


<hr>

## Task 2 - Feature Engineering

- Handle missing values (provide justification for approach)
- Generate the category code at 2 levels (Split into 2 columns)
    - Example: electronics.video.tv - electronics, video
- Capture user activity in different columns
    - Total activities (view/cart/etc.) in the session
    - Affinity towards a particular product (Product count for user)
    - Affinity towards a category (Secondary category count for user)
    - Average shopping expense for a product category (secondary)
    - Number of user sessions
- Impact of time: Day and Hour (Binning hours into 4 buckets)
- Reduction in brands for analysis: Top 20 + ‘others’
- Target variable generation: is_purchased


In [31]:
# Handling missing values 
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns if c!='event_time']).show()

+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|          0|     13515609|6113008|    0|      0|           2|
+----------+----------+-----------+-------------+-------+-----+-------+------------+



In [32]:
##Check Missing values for each column when the event was Purchase
df.filter(col("event_type")=="purchase").select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns if c!='event_time']).show()

+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|          0|       173425|58214|    0|      0|           0|
+----------+----------+-----------+-------------+-----+-----+-------+------------+



Reason for the action performed: Since the values having null does not change the distribution and after dropping them we still have enough data to build the model. Hence dropping



In [33]:
df = df.na.drop()

In [34]:
# Checking for duplicate entries
if df.count()>df.dropDuplicates().count():
    print("Data has duplicates")

Data has duplicates


In [35]:
# Take required action here
df = df.dropDuplicates()

In [36]:
# drop redundant column(s), if any
columns_to_drop = ['category_id']
df = df.drop(*columns_to_drop)

In [37]:
# Generating 2 columns from category code
# Columns have only single entity (electronics.video.tv: electronics, video)
# Check if split command works with '.' or Spark treats it as special character

from pyspark.sql.functions import split
df = df.withColumn("Category", split(col("category_code"), "\.").getItem(0)) \
 .withColumn("SubCategory", split(col("category_code"), "\.").getItem(1)).drop("category_code")

In [38]:
# Check if the column is added
df.columns

['event_time',
 'event_type',
 'product_id',
 'brand',
 'price',
 'user_id',
 'user_session',
 'Category',
 'SubCategory']

In [39]:
# Activities in a session by the user
# Window functions can be helpful here
from pyspark.sql.window import Window
w = Window.partitionBy(df.user_session)
df = df.withColumn("ActivityCount", count('event_type').over(w))

In [40]:
# View count for a product by the user
# Window functions can be helpful here
w = Window.partitionBy(df.user_id, df.product_id)
df = df.withColumn("ProductView", count(when(col('event_type')=='view', col('event_type'))).over(w))

In [41]:
# View count for the secondary category by the user
# Window functions can be helpful here
w = Window.partitionBy(df.user_id, df.SubCategory)
df = df.withColumn('SubCategoryViewCount', count(when(col('event_type')=='view', col('event_type'))).over(w))

In [42]:
# Average shopping expense for a product category
# Window functions can be helpful here
from pyspark.sql.functions import avg
w = Window.partitionBy(df.SubCategory)
df = df.withColumn("AvgSpendingOnCategoty", avg(when(col("event_type")=="purchase", df.price)).over(w))

In [43]:
# Session count for a user
# Window functions can be helpful here
df.groupby('user_id').agg(countDistinct("user_session")).withColumnRenamed("count(DISTINCT user_session)", "NumberOfSessions").show()

+---------+----------------+
|  user_id|NumberOfSessions|
+---------+----------------+
|518261950|               3|
|518805657|               5|
|552654400|              30|
|543051276|               2|
|557989376|               2|
|543440365|               2|
|520478523|               2|
|515915152|               4|
|518838046|               1|
|546595834|               1|
|519064626|               6|
|556344837|              16|
|513843094|              30|
|551576406|               6|
|518800215|              15|
|546439895|              13|
|550083694|               1|
|519292858|               5|
|538560377|               2|
|557101376|              29|
+---------+----------------+
only showing top 20 rows



In [44]:
# Generating the hour variable
df = df.withColumn("EventHour", hour(col('event_time')))

In [45]:
# Reduction in brands for analysis: Top 20 + ‘others'
top_twenty = df[df["event_type"]=="purchase"].groupby('brand').count().sort(col("count").desc()).limit(20).select('brand').collect()

In [46]:
top_twenty = [row.brand for row in top_twenty]

In [47]:
df = df.withColumn("brand", when(col('brand').isin(top_twenty), col('brand')).otherwise('others'))

In [48]:
# Generating 'is_purchased' variable
df = df.filter(df['event_type'] !='view')
df = df.withColumn("is_purchased", when(df['event_type']=='purchase', 1).otherwise(0))

from pyspark.sql.functions import max as sparkMax
w = Window.partitionBy(df.user_session, df.product_id)
df = df.withColumn("is_purchased", sparkMax(col('is_purchased')).over(w))


In [49]:
# Dropping redundant rows
df = df.filter(df['event_type'] =='cart')

In [50]:
# Printing the top 20 rows as output
df.show()

+-------------------+----------+----------+-------+-------+---------+--------------------+-----------+-----------+-------------+-----------+--------------------+---------------------+---------+------------+
|         event_time|event_type|product_id|  brand|  price|  user_id|        user_session|   Category|SubCategory|ActivityCount|ProductView|SubCategoryViewCount|AvgSpendingOnCategoty|EventHour|is_purchased|
+-------------------+----------+----------+-------+-------+---------+--------------------+-----------+-----------+-------------+-----------+--------------------+---------------------+---------+------------+
|2019-10-24 09:06:14|      cart|   1004856|samsung| 131.51|513622224|000081ea-9376-4eb...|electronics| smartphone|            3|          1|                   1|   464.33546803888896|        9|           1|
|2019-10-16 13:28:26|      cart|   1002544|  apple| 460.11|543196263|000887f2-b420-4c2...|electronics| smartphone|            8|          2|                   5|   464.3354

In [51]:
# Dropping the redundant columns 
redundant_cols = ["event_time","event_type", "user_id", "user_session","product_id"]
df = df.drop(*redundant_cols)

In [52]:
# Columns in df after feature engineering
df.columns

['brand',
 'price',
 'Category',
 'SubCategory',
 'ActivityCount',
 'ProductView',
 'SubCategoryViewCount',
 'AvgSpendingOnCategoty',
 'EventHour',
 'is_purchased']

In [53]:
# Storing the cleaned df in the instance to prevent repetition of steps again
# Rename the file based on your preferences
df.coalesce(1).write.option("header", "true").parquet("processed.parquet")

Proceed to another notebooks after saving the dataframe.