### 

### S23B23/027, B24258

PART A: Big Data Platform Setup and Data Preprocessing

1. Justify Big Data

The dataset contains thousands of transaction records with multiple attributes such as invoices, customers, products, and logistics. Generating recommendations requires scalable storage, fast distributed processing, and parallel computation, which exceed the performance and limits of traditional relational databases when handling continuous, large-scale retail data.

2. Tool Selection

Google Colab Setup (Using PySpark)

Google Colab is chosen because it offers free cloud compute, supports Apache Spark, and easily handles distributed data processing without requiring local installation.

In [2]:
# Installing Java 17
!apt-get update -qq
!apt-get install -y openjdk-17-jdk-headless > /dev/null

# Downloading Spark 3.5.1
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -xzf spark-3.5.1-bin-hadoop3.tgz

# Installing Python dependencies
!pip install -q pyspark findspark


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [3]:
# Initializing Spark

import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

findspark.init("/content/spark-3.5.1-bin-hadoop3")

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("RetailRecommendationSystem") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark


3. Data Acquisition

A dataset has been obtained from kaggle that comprises anonymized data on online sales transactions, capturing various aspects of product purchases, customer details, and order characteristics.

In [4]:
# Uploading the dataset to colab
from google.colab import files
uploaded = files.upload()


Saving online_sales_dataset.csv to online_sales_dataset.csv


In [5]:
# Loading the csv into spark

df_raw = spark.read.csv("online_sales_dataset.csv", header=True, inferSchema=True)
df_raw.show(5)
df_raw.printSchema()


+---------+---------+-----------+--------+-------------------+---------+----------+--------------+-----------------+-------------+------------+-----------+------------+------------+----------------+-----------------+-------------+
|InvoiceNo|StockCode|Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|         Discount|PaymentMethod|ShippingCost|   Category|SalesChannel|ReturnStatus|ShipmentProvider|WarehouseLocation|OrderPriority|
+---------+---------+-----------+--------+-------------------+---------+----------+--------------+-----------------+-------------+------------+-----------+------------+------------+----------------+-----------------+-------------+
|   221958| SKU_1964|  White Mug|      38|2020-01-01 00:00:00|     1.71|   37039.0|     Australia|             0.47|Bank Transfer|       10.79|    Apparel|    In-store|Not Returned|             UPS|           London|       Medium|
|   771155| SKU_1241|  White Mug|      18|2020-01-01 01:00:00|    41.25|   1

4. Distributed Processing — Cleaning & Transformation

Handling Missing Values

In [6]:
from pyspark.sql.functions import col

# Replacing null values
df_clean = df_raw.fillna({"CustomerID": -1, "ShippingCost": 0})


Removing Negative or Invalid Values

In [7]:
df_clean = df_clean.filter(col("Quantity") > 0)
df_clean = df_clean.filter(col("UnitPrice") > 0)


Converting Date Column

In [8]:
from pyspark.sql.functions import to_timestamp

df_clean = df_clean.withColumn("InvoiceDate", to_timestamp("InvoiceDate"))


Creating Useful Features

In [9]:
# TotalTransactionValue
from pyspark.sql.functions import expr

df_clean = df_clean.withColumn(
    "TotalValue", expr("Quantity * UnitPrice - Discount + ShippingCost")
)


In [10]:
# Extract date features
from pyspark.sql.functions import year, month, dayofweek

df_clean = df_clean.withColumn("Year", year("InvoiceDate")) \
                   .withColumn("Month", month("InvoiceDate")) \
                   .withColumn("DayOfWeek", dayofweek("InvoiceDate"))


Final Distributed Dataset

In [11]:
df_clean.show(10)
df_clean.printSchema()
df_clean.count()


+---------+---------+--------------+--------+-------------------+---------+----------+--------------+--------+-------------+------------+-----------+------------+------------+----------------+-----------------+-------------+------------------+----+-----+---------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|Discount|PaymentMethod|ShippingCost|   Category|SalesChannel|ReturnStatus|ShipmentProvider|WarehouseLocation|OrderPriority|        TotalValue|Year|Month|DayOfWeek|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+--------+-------------+------------+-----------+------------+------------+----------------+-----------------+-------------+------------------+----+-----+---------+
|   221958| SKU_1964|     White Mug|      38|2020-01-01 00:00:00|     1.71|   37039.0|     Australia|    0.47|Bank Transfer|       10.79|    Apparel|    In-store|Not Returned|             UPS|           Lo

47293

PART B: Data Modelling and Analytics

1. Technique Selection

Chosen Technique: Collaborative Filtering (ALS – Alternating Least Squares)

Justification:
The dataset contains repeated customer–product interactions (purchases), which makes Collaborative Filtering ideal for learning customer preferences from transaction patterns.
ALS (Alternating Least Squares) in Spark is specifically optimized for large-scale recommendation systems, handles sparse matrices efficiently, and scales horizontally across distributed clusters—making it suitable for a big retail dataset.

2. Model Scalability

Data Preparation

In [14]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

# Filter out null customers
ratings_df = df_clean.filter(col("CustomerID") != -1)

#  Convert StockCode (string) to numeric item IDs
item_indexer = StringIndexer(inputCol="StockCode", outputCol="item_id")
ratings_df = item_indexer.fit(ratings_df).transform(ratings_df)

# Rename columns for ALS usage
ratings_df = ratings_df.withColumnRenamed("CustomerID", "user_id") \
                       .withColumnRenamed("TotalValue", "rating")

ratings_df.select("user_id", "item_id", "rating").show(5)


+-------+-------+------------------+
|user_id|item_id|            rating|
+-------+-------+------------------+
|37039.0|  182.0| 75.30000000000001|
|19144.0|  572.0| 751.8199999999999|
|50472.0|  762.0|           1449.07|
|96586.0|  388.0|1084.4599999999998|
|53887.0|  742.0|           3311.02|
+-------+-------+------------------+
only showing top 5 rows



ALS Model Training

In [15]:
# ALS Model Setup
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="user_id",
    itemCol="item_id",
    ratingCol="rating",
    nonnegative=True,
    coldStartStrategy="drop",
    rank=10,
    maxIter=10,
    regParam=0.1
)

# Train the ALS model
model = als.fit(ratings_df)


3. Model Execution & Optimization

Measure Training Time

In [16]:
import time
start = time.time()

model = als.fit(ratings_df)

end = time.time()
print("Training Time (seconds):", end - start)


Training Time (seconds): 11.464302062988281


Optimization 1 - Parallelism / Partitioning

Increase Spark partitions to fully utilize cluster cores. This reduces runtime by parallelizing computation.

In [17]:
# Repartition dataset to 8 partitions
ratings_df_opt1 = ratings_df.repartition(8)

start = time.time()
model_opt1 = als.fit(ratings_df_opt1)
end = time.time()
print("Execution time (opt1):", end - start, "seconds")


Execution time (opt1): 10.08347225189209 seconds


Optimization 2 - Caching & Lower Rank

In [20]:
# Independent optimization: caching dataset
ratings_df_opt2 = ratings_df.cache()

als_opt2 = ALS(
    userCol="user_id",
    itemCol="item_id",
    ratingCol="rating",
    rank=5,          # smaller rank than default
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop"
)

start = time.time()
model_opt2 = als_opt2.fit(ratings_df_opt2)
end = time.time()
print("Execution time (opt2):", end - start, "seconds")


Execution time (opt2): 7.111288070678711 seconds


Model performance

In [21]:
# Execution times
baseline_time = 11.464302062988281
opt1_time     = 10.08347225189209
opt2_time     =  7.111288070678711

import pandas as pd

summary_data = {
    "Model": ["Baseline ALS", "Optimization 1", "Optimization 2"],
    "Execution Time (s)": [baseline_time, opt1_time, opt2_time]
}

summary_df = pd.DataFrame(summary_data)
summary_df


Unnamed: 0,Model,Execution Time (s)
0,Baseline ALS,11.464302
1,Optimization 1,10.083472
2,Optimization 2,7.111288


4. Result Interpretation: Analyse the model's output.

Generating Sample Recommendations (Using Baseline ALS model)

In [24]:
# Get top 5 recommendations per user
user_recs = model.recommendForAllUsers(5)
user_recs.show(5, truncate=False)


+-------+------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                           |
+-------+------------------------------------------------------------------------------------------+
|10001  |[{679, 2114.2915}, {920, 1983.0413}, {193, 1889.0519}, {956, 1850.3314}, {296, 1784.9824}]|
|10003  |[{697, 545.2588}, {459, 526.82764}, {907, 518.2051}, {574, 512.73956}, {478, 510.15363}]  |
|10010  |[{692, 4759.439}, {416, 4609.7383}, {551, 4502.7446}, {915, 4467.0073}, {786, 4137.5396}] |
|10011  |[{716, 1297.9082}, {390, 1250.7799}, {786, 1233.0704}, {416, 1200.6604}, {685, 1162.1656}]|
|10012  |[{679, 1208.4714}, {405, 1166.0077}, {960, 1156.3242}, {851, 1086.4069}, {420, 1075.8961}]|
+-------+------------------------------------------------------------------------------------------+
only showing top 5 rows



user_id: Unique customer ID

recommendations: List of (item_id, predicted_rating)

Higher predicted rating → more likely the user will buy the product

Analysis;

**Top 5 Recommendations:**

Each user gets 5 recommended items, ordered by predicted rating.

Example: user_id 10001 → top recommendation is item 679 with 2114.29, then 920, 193, etc.


**Diversity Across Users:**

Different users get different top items, showing personalized recommendations.

For instance, 10001’s top item is 679, while 10003’s top is 697.

The model captures user-specific preferences.


**Item Popularity Insight:**

Some items appear for multiple users (679 shows up for 10001 and 10012) → these are likely popular or frequently purchased items.