## Retai Clickstream Analysis and Prediction

In [1]:
#Install Pyspark and circlify
!pip install pyspark
!pip install circlify

Collecting circlify
  Downloading circlify-0.15.0-py2.py3-none-any.whl.metadata (8.4 kB)
Downloading circlify-0.15.0-py2.py3-none-any.whl (11 kB)
Installing collected packages: circlify
Successfully installed circlify-0.15.0


In [2]:
import os

# List all files in the input directory
input_dir = "/kaggle/input"
for root, dirs, files in os.walk(input_dir):
    for file in files:
        print(os.path.join(root, file))

/kaggle/input/ecommerce-behavior-data-from-multi-category-store/2019-Nov.csv
/kaggle/input/ecommerce-behavior-data-from-multi-category-store/2019-Oct.csv


In [3]:
# from google.colab import drive
# drive.mount('/content/drive')

In [4]:
# path = "../input/2019-oct/2019-Oct.csv"
# # !unzip "/content/drive/MyDrive/Awesom_Big_Data_Project/nk_data.zip"
# !unzip "/content/drive/MyDrive/BigDataSet/archive.zip"

In [5]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import element_at, split, col
import pandas as pd
import os
import plotly.express as px

In [6]:
# Initialize Spark configuration
configuration = pyspark.SparkConf()
configuration.set('spark.sql.repl.eagerEval.enabled', True)

# Set up Spark Context
spark_context = pyspark.SparkContext(conf=configuration)

# Create or retrieve the existing Spark SQL Context
sql_context = pyspark.SQLContext.getOrCreate(spark_context)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/07 18:48:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
#Creating a subset of
# def read_with_pandas(input_filepath):
#   raw_data = pd.read_csv(input_filepath, nrows = 500000)
#   output_filepath = "small_oct_2019.csv"
#   raw_data.to_csv(output_filepath)
#   return output_filepath

# input_filepath = "/content/2019-Oct.csv"
# input_filepath = read_with_pandas(input_filepath)

In [8]:
import pandas as pd

# Set the path to the dataset in Kaggle
path_file_2019 = "/kaggle/input/ecommerce-behavior-data-from-multi-category-store/2019-Oct.csv"
output_show = True  # it makes .show() on/off
small_data_testing = False  # make it false when testing on whole dataset

if small_data_testing:
    # Read a subset of the data for testing
    raw_data = pd.read_csv(path_file_2019, nrows=5000000)
    raw_data.head(2)
    # Save the subset to Kaggle's working directory
    raw_data.to_csv("/kaggle/working/small_oct_2019.csv")
    path_file_2019 = "/kaggle/working/small_oct_2019.csv"

print("the file chosen is ", path_file_2019)
print("small data testing is ", small_data_testing)

the file chosen is  /kaggle/input/ecommerce-behavior-data-from-multi-category-store/2019-Oct.csv
small data testing is  False


In [9]:
# Load the dataset from the chosen path (either full or subset)
initial_df = sql_context.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(path_file_2019)

                                                                                                    

In [10]:
# Just keeping original_df in case we do some operation on df and loose track of original df
# add hyperloglog. Thats faster(Daniel)
# https://mungingdata.com/apache-spark/hyperloglog-count-distinct/
preprocessed_df = initial_df

# **Dataset Overview**

In [11]:
preprocessed_df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



1) ***event_time*** : denotes the date and time of the user session.

2) ***event_type***: There are 3 events in this dataset (viewing, adding to cart, and purchase).

3) ***product_id***: indicates the specific product id.

4) ***category_id***: indicates the specific category id.

5) ***user_id***: pertains to the specific user.

6) ***user session***: a single user can have multiple sessions indicating various events like view, purchase, add to cart.

7) ***brand***: indicates the brand associated with the product and category.

8) ***cateogry***: indicates a nested string having a structure liek electronics.smartphone.andorid which helps us in specifying the range of purchased item.



# Data Summary and Pre-processing

In [12]:
# Remove rows where 'event_type' field is missing
cleaned_df = preprocessed_df.na.drop(subset=["event_type"])

# Data Imputation

## Extracting Catgeory and Product

In [13]:
# Fill missing values in 'category_code' and 'brand' columns with the placeholder "empty"
filled_df = cleaned_df.fillna("empty", subset=["category_code", "brand"])

In [14]:
if output_show:
  preprocessed_df.show(5)

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:01|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:04|      view|   1004237|2053013555631882655|electr

In [15]:
# Define UDFs to extract category and product names from 'category_code' or fallback to 'brand'
from pyspark.sql.functions import udf

@udf
def get_category_name(category_code, brand_name):
    parts = str(category_code).split('.')
    if parts[0] == "empty":
        return brand_name if brand_name != "empty" else "unknown"
    return parts[0]

@udf
def get_product_name(category_code, brand_name):
    parts = str(category_code).split('.')
    if parts[-1] == "empty":
        return brand_name if brand_name != "empty" else "unknown"
    return parts[-1]

In [16]:
# Apply the UDFs to extract category and product names
df_with_extracted_fields = filled_df.select(
    "*",
    get_category_name("category_code", "brand").alias("category"),
    get_product_name("category_code", "brand").alias("product")
)

# Drop the original 'category_code' column
df_with_extracted_fields = df_with_extracted_fields.drop("category_code")

# Display the first 5 rows if output is enabled
if output_show:
    df_with_extracted_fields.show(5)


[Stage 3:>                                                                              (0 + 1) / 1]

+-------------------+----------+----------+-------------------+--------+-------+---------+--------------------+-----------+------------+
|         event_time|event_type|product_id|        category_id|   brand|  price|  user_id|        user_session|   category|     product|
+-------------------+----------+----------+-------------------+--------+-------+---------+--------------------+-----------+------------+
|2019-10-01 00:00:00|      view|  44600062|2103807459595387724|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|   shiseido|    shiseido|
|2019-10-01 00:00:00|      view|   3900821|2053013552326770905|    aqua|   33.2|554748717|9333dfbd-b87a-470...| appliances|water_heater|
|2019-10-01 00:00:01|      view|  17200506|2053013559792632471|   empty|  543.1|519107250|566511c2-e2e3-422...|  furniture|        sofa|
|2019-10-01 00:00:01|      view|   1307067|2053013558920217191|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|  computers|    notebook|
|2019-10-01 00:00:04|      view|   100423

                                                                                                    

## Extracting Time features like Day, Time, Hour from Timestamp

In [17]:
# Extract 'Day' and 'Hour' from the 'event_time' column
df_time_transformed = df_with_extracted_fields \
    .withColumn('date_section', split(col('event_time'), ' ').getItem(0)) \
    .withColumn('time_section', split(col('event_time'), ' ').getItem(1)) \
    .withColumn('Day', split(col('date_section'), '-').getItem(2)) \
    .withColumn('Hour', split(col('time_section'), ':').getItem(0)) \
    .drop('date_section')

# Display top 5 records if output flag is enabled
if output_show:
    df_time_transformed.show(5)


+-------------------+----------+----------+-------------------+--------+-------+---------+--------------------+-----------+------------+------------+---+----+
|         event_time|event_type|product_id|        category_id|   brand|  price|  user_id|        user_session|   category|     product|time_section|Day|Hour|
+-------------------+----------+----------+-------------------+--------+-------+---------+--------------------+-----------+------------+------------+---+----+
|2019-10-01 00:00:00|      view|  44600062|2103807459595387724|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|   shiseido|    shiseido|    00:00:00| 01|  00|
|2019-10-01 00:00:00|      view|   3900821|2053013552326770905|    aqua|   33.2|554748717|9333dfbd-b87a-470...| appliances|water_heater|    00:00:00| 01|  00|
|2019-10-01 00:00:01|      view|  17200506|2053013559792632471|   empty|  543.1|519107250|566511c2-e2e3-422...|  furniture|        sofa|    00:00:01| 01|  00|
|2019-10-01 00:00:01|      view|   1307067|205

In [18]:
df = df_time_transformed

# Journey of a user in one session


In [19]:
# Retrieve all events for a specific user session and sort them by event time
session_events_df = df.filter(col('user_session') == 'b37abd25-7672-4dd7-a098-40e50e314388') \
                      .orderBy('event_time') \
                      .toPandas()

                                                                                                    

## Data Processing

In [20]:
df.groupBy("event_type").count()

                                                                                                    

event_type,count
purchase,742849
view,40779399
cart,926516


In [21]:
# Identify unique users who added products to cart or made purchases
cart_purchase_users_df = df.filter(
    (col('event_type') == 'cart') | (col('event_type') == 'purchase')
)

# Drop duplicate actions based on event type, product, price, user, and session
distinct_cart_purchase_df = cart_purchase_users_df.dropDuplicates(
    subset=['event_type', 'product_id', 'price', 'user_id', 'user_session']
)

# Group by event type and count unique actions
distinct_cart_purchase_df.groupBy('event_type').count().show()


[Stage 17:>                                                                             (0 + 4) / 5]

+----------+------+
|event_type| count|
+----------+------+
|  purchase|690618|
|      cart|628955|
+----------+------+



                                                                                                    

In [22]:
# Capture all user activity for users who added products to cart or made purchases
all_user_activity_df = df.alias('d').join(
    cart_purchase_users_df.alias('c'),
    on=df['user_id'] == cart_purchase_users_df['user_id'],
    how='inner'
).select(
    'd.user_id',
    'd.event_time',
    'd.event_type',
    'd.product_id',
    'd.category_id',
    'd.brand',
    'd.price',
    'd.user_session',
    'd.category',
    'd.product'
)

# Group by event type and count the activities
all_user_activity_df.groupBy('event_type').count().show()


25/05/07 18:55:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:55:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:55:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:55:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:55:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:55:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:55:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:55:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:55:47 WARN RowBasedKeyValueBatch: Calling spill() on

+----------+---------+
|event_type|    count|
+----------+---------+
|  purchase| 10733318|
|      view|100913032|
|      cart| 12799325|
+----------+---------+



                                                                                                    

## High Value Customers

In [23]:
# Count the number of activities per user session
activity_per_session_df = all_user_activity_df.groupBy('user_session').count()

# Display the first 5 sessions with their activity counts if output display is enabled
if output_show:
    activity_per_session_df.show(5)


25/05/07 18:57:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:57:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:57:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:57:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:57:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:57:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:57:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:57:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 18:57:25 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------------+-----+
|        user_session|count|
+--------------------+-----+
|ec82d195-6607-47a...|    7|
|f6dd0b68-7280-495...|    4|
|62b9a2b1-c85b-441...|  100|
|bf56200c-93bc-4f0...|    8|
|6a996a01-2154-482...|    1|
+--------------------+-----+
only showing top 5 rows



                                                                                                    

## Label Encoding Target Variable

In [24]:
# Define a UDF to label if an event is a purchase
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def is_purchase_label(event_type):
    if event_type == 'purchase':
        return 1
    else:
        return 0


In [25]:
# Apply the purchase label UDF to create a target column
df_targets = distinct_cart_purchase_df.select(
    "*",
    is_purchase_label('event_type').alias('is_purchased')
)

In [26]:
# Join the target labels with activity counts per session
df_targets = df_targets.join(
    activity_per_session_df,
    on='user_session',
    how='left'
)


In [27]:
# Define a UDF to extract the weekday (0 = Monday, 6 = Sunday) from the event timestamp
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def extract_weekday(event_time):
    try:
        return datetime.strptime(str(event_time)[:10], "%Y-%m-%d").weekday()
    except Exception:
        return None  # Handle parsing errors gracefully


In [28]:
# Apply the weekday extraction UDF to the event_time column
df_targets_week = df_targets.select(
    "*",
    extract_weekday('event_time').alias('week')
)


In [29]:
df_targets_week.printSchema()

root
 |-- user_session: string (nullable = true)
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- brand: string (nullable = false)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- product: string (nullable = true)
 |-- time_section: string (nullable = true)
 |-- Day: string (nullable = true)
 |-- Hour: string (nullable = true)
 |-- is_purchased: integer (nullable = true)
 |-- count: long (nullable = true)
 |-- week: integer (nullable = true)



In [30]:
df_targets_week = df_targets_week.dropDuplicates(["user_session"])

## Feature Selection

In [31]:
features = df_targets_week.select("event_type", "brand", "price", "count","week","category", "product","is_purchased")
features.printSchema()

root
 |-- event_type: string (nullable = true)
 |-- brand: string (nullable = false)
 |-- price: double (nullable = true)
 |-- count: long (nullable = true)
 |-- week: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- product: string (nullable = true)
 |-- is_purchased: integer (nullable = true)



In [32]:
features.count()

                                                                                                    

910797

## Prediction Model - SparkML

In [33]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Index categorical columns
category_indexer = StringIndexer(inputCol='category', outputCol='category_index')
event_type_indexer = StringIndexer(inputCol='event_type', outputCol='event_type_index')
brand_indexer = StringIndexer(inputCol='brand', outputCol='brand_index')
product_indexer = StringIndexer(inputCol='product', outputCol='product_index')
label_indexer = StringIndexer(inputCol='is_purchased', outputCol='label')

# One-hot encode the indexed columns
category_encoder = OneHotEncoder(inputCol='category_index', outputCol='category_vector')
event_type_encoder = OneHotEncoder(inputCol='event_type_index', outputCol='event_type_vector')
brand_encoder = OneHotEncoder(inputCol='brand_index', outputCol='brand_vector')
product_encoder = OneHotEncoder(inputCol='product_index', outputCol='product_vector')

# Assemble the categorical feature vectors
categorical_assembler = VectorAssembler(
    inputCols=[
        'category_vector',
        'event_type_vector',
        'brand_vector',
        'product_vector'
    ],
    outputCol='categorical_features'
)

# Assemble the numerical feature vectors
numerical_features = ['count', 'week', 'price']
numerical_assembler = VectorAssembler(
    inputCols=numerical_features,
    outputCol='numerical_features'
)

# Final feature assembler combining categorical and numerical features
full_feature_assembler = VectorAssembler(
    inputCols=['categorical_features', 'numerical_features'],
    outputCol='features'
)

# Define the complete pipeline
feature_pipeline = Pipeline(stages=[
    category_indexer,
    event_type_indexer,
    brand_indexer,
    product_indexer,
    label_indexer,
    category_encoder,
    event_type_encoder,
    brand_encoder,
    product_encoder,
    categorical_assembler,
    numerical_assembler,
    full_feature_assembler
])


In [34]:
features = features.na.drop()
df_transformed = feature_pipeline.fit(features).transform(features)
if output_show:
  df_transformed.show(2)

25/05/07 19:01:10 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:01:11 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:01:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:01:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:01:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:01:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:01:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:01:13 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:01:13 WARN RowBasedKeyValueBatch: Calling spill() on

+----------+-------+------+-----+----+-----------+----------+------------+--------------+----------------+-----------+-------------+-----+----------------+-----------------+----------------+----------------+--------------------+------------------+--------------------+
|event_type|  brand| price|count|week|   category|   product|is_purchased|category_index|event_type_index|brand_index|product_index|label| category_vector|event_type_vector|    brand_vector|  product_vector|categorical_features|numerical_features|            features|
+----------+-------+------+-----+----+-----------+----------+------------+--------------+----------------+-----------+-------------+-----+----------------+-----------------+----------------+----------------+--------------------+------------------+--------------------+
|      cart|samsung| 171.9|  165|   4|electronics|smartphone|           0|           0.0|             0.0|        0.0|          0.0|  0.0|(1335,[0],[1.0])|    (1,[0],[1.0])|(1969,[0],[1.0])|(14

                                                                                                    

In [35]:
final_data = df_transformed.select("features", "label")
final_data = final_data.na.drop()
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [36]:
(trainingData, testData) = final_data.randomSplit([0.7, 0.3])

Random Forest

In [37]:
from pyspark.ml.classification import RandomForestClassifier

# Initialize Random Forest Classifier
rf_classifier = RandomForestClassifier(
    labelCol='label',
    featuresCol='features',
    maxDepth=5,
    seed=42  # Optional: for reproducibility
)

# Train the Random Forest model
rf_model = rf_classifier.fit(trainingData)

# Make predictions on the test set
rf_predictions = rf_model.transform(testData)


25/05/07 19:22:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:22:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:22:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:22:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:22:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:22:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:22:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:22:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 19:22:03 WARN RowBasedKeyValueBatch: Calling spill() on

Saving and loading model from file

In [38]:
path_to_model = "/kaggle/working/RF_model"
rf_model.save(path_to_model)

                                                                                                    

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel
model_1 = RandomForestClassificationModel.load(path_to_model)
accuracy = rf_predictions.filter(rf_predictions.label == rf_predictions.prediction).count() / float(rf_predictions.count())
print("Accuracy : ",accuracy)

Accuracy :  0.78495521554448
