# Analyse Customer Behaviour in a Multi Category e-Commerce Website
The dataset contains customer behaviour data of a large multi category e-commerce website. The customer behaviour is reflected in the `event_type` field which is either view, cart or purchase. Each row in the file represents an event. All events are related to products and users. Each event is like many-to-many relation between products and users. This exercise uses the 2019 October dataset published in Kaggle https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store. The dataset originally collected from Open CDP https://rees46.com/en/open-cdp. 

The selected dataset is approximately 5Gb in volume which makes data processing a difficult in our usual RStudio or Colab environment. Therefore, we need to move into a big data technology to process this dataset. In this exercise, we run the exerecise in a Spark Cluster run on a Cloud environment. We use Jupyter Notebook as our IDE and connect to the Spark Cluster using Python (`pyspark`). The exercise first connect to the Spark cluster, extract the data which is stored in HDFS and then run several descriptive analytics to understand the customer behaviour.

## 1. Spark configuration in `pyspark`

In [1]:
import socket
import pyspark
from pyspark.sql import SparkSession

hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
conf = pyspark.SparkConf()
spark = SparkSession.builder.master("localhost") \
.config("spark.driver.port","7077") \
.config("spark.executor.memory", '3g') \
.getOrCreate()
spark

23/07/21 13:38:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## 2. Create schema and read data
It is recommended to create a Spark schema before read data to a Spark dataframe. A schema validates the type of data fields we import from HDFS and whether the data point is nullable or not. You can find the available Spark data types on https://spark.apache.org/docs/latest/sql-ref-datatypes.html.

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import col,struct,when
schema = StructType().add('event_time', TimestampType(),True) \
                    .add('event_type',StringType(),True) \
                    .add('product_id', IntegerType(),True) \
                    .add('category_id', StringType(),True) \
                    .add('category_code', StringType(), True) \
                    .add('brand', StringType(),True) \
                    .add('price', FloatType(),True) \
                    .add('user_id', IntegerType(),True) \
                    .add('user_session', StringType(),True)

In [4]:
behaviour = spark.read.schema(schema).csv('hdfs:///data/behaviour_oct.csv', header=True, 
                            multiLine=True, escape="\"")
behaviour.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: string (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



## 3. Check the dataset
You can count the total number of records and display first few records to check whether Spark dataframe is successfully created.

In [5]:
behaviour.count()

                                                                                

42448764

In [None]:
behaviour.show()

## 4. Run `pyspark` query 
Spark dataframe has number of data manipulation methods available to run queries. These methods interprets these methods to Spark SQL queries. Query results can be converted to pandas dataframe which allows you to process the query result as a python pandas dataframe.

The following query counts the number of records by `event_type`.

In [None]:
import pandas as pd
from pyspark.sql import *
event_type = behaviour.groupby('event_type').count().toPandas()
event_type

## Exercise 01
Using the Spark dataframe, run the following queries.

    1. Total value of customer behaviour by event type.
    2. Top 10 brands purchased by value
    3. Top 10 brands purchased by volume

In [None]:
event_value = behaviour.groupby('event_type').agg({'price': 'sum'}).toPandas()
event_value

In [None]:
top_brands_by_value = behaviour.filter('event_type = "purchase"').groupby('brand').agg({'price': 'sum'}).toPandas()
top_brands_by_value.sort_values(['sum(price)'],ascending=[False]).head(10)

In [None]:
top_brands_by_volume = behaviour.filter('event_type = "purchase"').groupby('brand').agg({'product_id': 'count'}).toPandas()
top_brands_by_volume.sort_values(['count(product_id)'],ascending=[False]).head(10)

## Exercise 02

The `category_code` variable includes the product category and sub-categories deliemeted by a `.`. Using a string manipulation, extract the following query results.

1. Extract the main category of the purchased items with the price
2. Total value and volume of purchased items by the main category
3. Top 10 product categories by value
4. Top 10 product categories by volume


In [None]:
from pyspark.sql.functions import split
categories = behaviour.filter('event_type = "purchase"').select(split('category_code','\.').getItem(1).alias('categories'), 'product_id', 'price').toPandas()
categories

In [None]:
import numpy as np
category_master = categories.groupby('categories').agg(Value=('price',np.sum), Volume=('product_id',np.count_nonzero))
category_master.head()

In [None]:
category_master.sort_values(['Value'],ascending=[False]).head(10)

In [None]:
category_master.sort_values(['Volume'],ascending=[False]).head(10)

## Exercise 03
The dataset includes items viewed by the users. This can be identified using the `event_type`. The company wants to analyse the daily view pattern during the month. First we need to generate the date from the timestamp value. Then the company requires us to generate the following query results.

1. Create daily view pattern of apple products
2. Visualise the timeseries using a line chart
3. Compare the view patterns of apple vs samsung
4. Compare the two frequency distributions

In [None]:
from pyspark.sql.functions import *
views = behaviour.withColumn('date', to_date('event_time'))
views.show()

In [None]:
views_by_brand = views.groupby('brand','date').count()
apple_views = views_by_brand.filter('brand="apple"').drop('brand').toPandas().sort_values(['date'])
apple_views.head()

In [None]:
import seaborn as sns
sns.lineplot(apple_views,x="date",y="count")
sns.set(rc={'figure.figsize':(20,10)})

In [None]:
apple_vs_samsung = views_by_brand.filter('brand="apple" OR brand="samsung"').toPandas().sort_values(['date'])
apple_vs_samsung.head()

In [None]:
import seaborn as sns
sns.lineplot(apple_vs_samsung,x="date",y="count", hue="brand")
sns.set(rc={'figure.figsize':(30, 10)})

In [None]:
sns.displot(apple_vs_samsung, x="count", hue="brand", element="step")
sns.set(rc={'figure.figsize':(40,20)})

In [None]:
sns.displot(apple_vs_samsung, x="count", hue="brand", kind='kde', multiple="stack")
sns.set(rc={'figure.figsize':(40,20)})

## 5. Close Spark connection

In [6]:
spark.stop()

23/07/21 13:42:56 WARN GhfsStorageStatistics: Detected potential high latency for operation stream_write_close_operations. latencyMs=133; previousMaxLatencyMs=0; operationCount=1; context=gs://dataproc-temp-us-central1-351061576575-hc5whhgs/a135ab3a-2555-406c-8a00-67662c732070/spark-job-history/application_1689946506511_0002.inprogress
23/07/21 13:42:56 WARN GhfsStorageStatistics: Detected potential high latency for operation op_rename. latencyMs=219; previousMaxLatencyMs=0; operationCount=1; context=rename(gs://dataproc-temp-us-central1-351061576575-hc5whhgs/a135ab3a-2555-406c-8a00-67662c732070/spark-job-history/application_1689946506511_0002.inprogress -> gs://dataproc-temp-us-central1-351061576575-hc5whhgs/a135ab3a-2555-406c-8a00-67662c732070/spark-job-history/application_1689946506511_0002)
