<div class="alert alert-block alert-info">
<center> <h1> Customer Segmentation and Sales Forecast</h1> </center> <br>
<center> Big Data Analytics 2025</center><br>
<center> NOVA IMS MDSAA</center>

### [NOTE]
In this project, we are going to work on 3 notebooks:
- 1. **Data Preprocessing**: For EDA, Data Preprocessing, Creating DataFrames, and Feature Engineering
- 2. **Clustering**: For clustering 
- 3. **Sales Forecasting**: For Sales Forecast 
- 4. **Graph**: For Graph Visualization for Clusters<br>
##### This notebook is 1. Cleaning.

# Group 77

|   | Student Name          |  Student ID | 
|---|-----------------------|    ---      |
| 1 | Hassan Bhatti       |  20241023 |
| 2 | Moeko Mitani          |   20240670  | 
| 3 | Oumayma Ben Hfaiedh   |   20240699  | 
| 4 | Ricardo Pereira      |  20240745  | 

# 1. Business Understanding

## 1.1. Business Background

The retail industry is undergoing a significant transformation. Online retail shopping has become an absolute necessity to compete for business, and with that change comes new challenges, especially in niche areas such as gift items. As customer expectations rise and buying habits become more complex, retailers can no longer rely solely on intuition to gauge demand or manage inventory. They must become data-driven.

The company is a UK-based online retailer of giftware, primarily serving wholesale customers. This segment of the business has additional operational complexities, including high-volume purchases, unpredictable seasonality (especially during the holiday season), and a customer base divided between loyal repeat buyers and one-time, resourceful purchasers. What appears to be a simple transaction flow is in fact a rich and dynamic stream of behavioral data waiting to be deciphered.

In this environment, traditional data tools are not enough. Forecasting demand and understanding customers requires a scalable and intelligent approach. This project reflects how large companies are beginning to process huge, fast-moving data sets. Even though the current dataset is limited in size, it mirrors the volume, velocity, and variety challenges faced by growing online retailers.

## 1.2. Business Objectives

The overarching goal of this project is to empower a growing online retailer with the analytical tools needed to make smarter, data-driven decisions in two critical areas: customer understanding and demand forecasting.

### Customer Segmentation

The first objective is to uncover meaningful customer segments based on purchasing behavior. Not all customers bring the same value or behave in the same way - some make frequent low-volume purchases, others buy in bulk seasonally, and some show irregular patterns that suggest churn risk or opportunistic buying. 

By applying clustering techniques, we aim to:
- Identify distinct customer personas (e.g., "Loyal Wholesalers", "Occasional Retailers", "Holiday Shoppers")
- Reveal behavioral patterns that can inform targeted marketing and personalized recommendations
- Provide insights to improve customer retention and customer lifetime value (CLV)

This segmentation can serve as the foundation for a more customized engagement strategy, allowing retailers to move away from one-size-fits-all campaigns toward data-driven personalization.

### Sales Forecasting

The second objective is to develop a predictive model that estimates future sales based on historical transaction data.

Accurate forecasting is essential for:
- Optimizing inventory levels and reducing both stockouts and overstock situations
- Aligning operational planning with expected demand spikes (e.g., during the holiday season)
- Informing pricing, promotional, and procurement strategies

By implementing time-series forecasting models, we will simulate a pipeline that can eventually evolve into a real-time prediction engine in a production environment.

## 1.3. Business Success Criteria

Success for this project will be evaluated using both **quantitative** and **qualitative** criteria:

### Quantitative Criteria
- **Clustering Performance**: Metrics such as silhouette score, Davies-Bouldin index, or within-cluster sum of squares (WCSS) will be used to assess the quality of customer segmentation.
- **Forecast Accuracy**: Measured using MAE (Mean Absolute Error), RMSE (Root Mean Squared Error), and MAPE (Mean Absolute Percentage Error) on sales predictions.
- **Actionable Insights**: The identification of at least 3 meaningful and distinct customer segments, and 1-2 sales forecasting trends that could support operational decisions.

### Qualitative Criteria
- **Interpretability**: Clear and intuitive visualization of clusters and forecast trends for presentation to non-technical stakeholders.
- **Engagement**: The fun internal article should effectively raise awareness about the value of data analytics and be positively received by company staff.
- **Scalability Potential**: The approach should be adaptable to larger datasets and scalable for production-level deployment in a real business context.

By combining rigorous analytics with creative storytelling, this project aims not only to deliver strategic insights but also to shift the company mindset toward becoming truly data-driven.

# 2. Metadata

| Features | Descriptions |
|---------|---------------------|
| *ID* | Customer ID |
| *Invoice* | Invoice number. Nominal. A 6-digit integral number uniquely assigned to each transaction. If this code starts with the letter 'c', it indicates a cancellation. |
| *StockCode* | Product (item) code. Nominal. A 5-digit integral number uniquely assigned to each distinct product. |
| *Description* | Product (item) name. Nominal. |
| *Quantity* | The quantities of each product (item) per transaction. Numeric. |
| *InvoiceDate* | Invoice date and time. Numeric. The day and time when a transaction was generated. |
| *Price* | Unit price. Numeric. Product price per unit in sterling (Â£). |
| *Customer ID* | Customer number. Nominal. A 5-digit integral number uniquely assigned to each customer. |
| *Country* | Country name. Nominal. The name of the country where a customer resides. |

# 3. Data Integration 

## Import Libraries

In [0]:
# ─────────────────────────────────────────────
# Spark Core
# ─────────────────────────────────────────────
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F

from pyspark.sql.functions import (
    col, lit, to_timestamp, to_date, date_format, year, month, dayofmonth,
    count, sin, cos, sum, avg, min, max, stddev, lag, when, countDistinct,
    round, concat_ws, sum as spark_sum, max as spark_max, monotonically_increasing_id,
    datediff, current_date, array_except, array, collect_set, array_sort, size, last
)
from pyspark.sql.functions import max as spark_max
from pyspark.sql.functions import date_format, sum as spark_sum
import math
from pyspark.sql.window import Window

# ─────────────────────────────────────────────
# Spark MLlib
# ─────────────────────────────────────────────
from pyspark.ml.feature import (
    VectorAssembler, PCA, StringIndexer, StandardScaler,
    MinMaxScaler
)
from pyspark.ml.clustering import KMeans
from pyspark.ml.stat import Correlation
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml import Pipeline

# ─────────────────────────────────────────────
# Python Built-ins and Data Science Libraries
# ─────────────────────────────────────────────
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.colors import ListedColormap
from sklearn.metrics import (
    silhouette_samples, silhouette_score,
    confusion_matrix
)

# ─────────────────────────────────────────────
# Utilities
# ─────────────────────────────────────────────
from itertools import combinations
from datetime import datetime
from dateutil.relativedelta import relativedelta



In [0]:
# Start Spark session
spark = SparkSession.builder.appName("Project_Group77").getOrCreate()

## Import CSV File

In [0]:
# File location and type
file_location = "/FileStore/tables/the_online_retail.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.limit(10).display()
df.schema


Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2022-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2022-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2022-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2022-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2022-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2022-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2022-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2022-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom


Out[3]: StructType([StructField('Invoice', StringType(), True), StructField('StockCode', StringType(), True), StructField('Description', StringType(), True), StructField('Quantity', IntegerType(), True), StructField('InvoiceDate', TimestampType(), True), StructField('Price', DoubleType(), True), StructField('Customer ID', DoubleType(), True), StructField('Country', StringType(), True)])

# 4. Data Exploration (EDA)

## 4.1. Summary Statistics

In [0]:
df.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+-----------+
|summary|          Invoice|         StockCode|         Description|          Quantity|             Price|       Customer ID|    Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+-----------+
|  count|          1067371|           1067371|             1062989|           1067371|           1067371|            824364|    1067371|
|   mean|537608.1499316233|29011.161534536903|            21848.25|   9.9388984711033| 4.649387727415796| 15324.63850435002|       null|
| stddev|26662.45044690759|  18822.9428661892|   922.9197780233488|172.70579407675396|123.55305872146369|1697.4644503793133|       null|
|    min|           489434|             10002|  DOORMAT UNION J...|            -80995|         -53594.36|           12346.0|  Australia|
|    max|          C581569|              

##### Number of rows in the DataFrame

In [0]:
row_count = df.count()
print(f"Number of rows: {row_count}")


Number of rows: 1067371


##### The time range of the DataFrame

In [0]:

df.agg(F.min("InvoiceDate"), F.max("InvoiceDate")).show()

+-------------------+-------------------+
|   min(InvoiceDate)|   max(InvoiceDate)|
+-------------------+-------------------+
|2022-12-01 07:45:00|2024-12-09 12:50:00|
+-------------------+-------------------+



## 4.2. Unique Value Counts

In [0]:
# Get distinct counts (unique value) for all columns
for col_name in df.columns:
    unique_count = df.select(countDistinct(col_name)).collect()[0][0]
    print(f"Unique values in '{col_name}': {unique_count}")

Unique values in 'Invoice': 53628
Unique values in 'StockCode': 5305
Unique values in 'Description': 5698
Unique values in 'Quantity': 1057
Unique values in 'InvoiceDate': 47635
Unique values in 'Price': 2807
Unique values in 'Customer ID': 5942
Unique values in 'Country': 43


In [0]:
## Check if there are StockCodes contain any letters
# Filter rows where StockCode contains any letters
df.filter(col("StockCode").rlike("[A-Za-z]")) \
  .select("StockCode") \
  .distinct() \
  .show()

+---------+
|StockCode|
+---------+
|   85014B|
|   79323W|
|   84970S|
|   35004B|
|   85132C|
|   84519A|
|   85183B|
|   84507B|
|   84596L|
|   84031A|
|   16161P|
|   84519B|
|   84596F|
|   85014A|
|   84031B|
|   85183A|
|   84032B|
|   48173C|
|   84032A|
|   79323P|
+---------+
only showing top 20 rows



The dataset contains 1,067,371 rows and 8 features.

## 4.3. Data Type
| Features | Data Types | Need to be changed? |
|---|--------| ---- |
| Invoice | String | No - "C" stands for cancellation |
| StockCode | String | No - Some code contains a string |
| Description| String | No |
| Quantity | String | Yes - Integer |
| InvoiceDate | String | Yes - Timestamp |
| Price | String | Yes - Decimal |
| Customer ID | String | Yes - Integer |
| Country | String | No |

## 4.4. Data Anomalies
| Features | Anomalies | Steps |
|---|--------| ---- |
| StockCode | Some code contains only a string | Check if the same description shares same stock code or not |
| Quantity | It contains negative values | Check it after changing to the correct data type |
| Price | It contains negative values | Check it after changing to the correct data type |

## 4.5. Missing Value

In [0]:
# Check if there are any missing values
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|Invoice|StockCode|Description|Quantity|InvoiceDate|Price|Customer ID|Country|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|      0|        0|       4382|       0|          0|    0|     243007|      0|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+



There are:
- 4,382 missing values in *Description* (approximately 0.4% of the data)
- 243,007 missing values in *Customer ID* (approximately 22.8% of the data)

Possible solution:
- *Description*: Use *StockCode* --> *StockCode* and *Description* should match.
- *Customer ID*: Use *Invoice* --> The same invoices belong to the same customer.

## 4.6. Change The Data Types
Before moving to create visualizations, we are going to change the data type. As we identified previously, we are going to change the data types of the following features:

| Features | Data Types | Need to be changed? |
|---|--------| ---- |
| Quantity | String | Yes - Integer |
| InvoiceDate | String | Yes - Timestamp |
| Price | String | Yes - Decimal |
| Customer ID | String | Yes - Integer |

In [0]:
# Change the data types
df = df \
    .withColumn("Quantity", col("Quantity").cast("int")) \
    .withColumn("InvoiceDate", to_timestamp(col("InvoiceDate"), "dd/MM/yyyy HH:mm")) \
    .withColumn("Price", col("Price").cast("decimal(20,2)")) \
    .withColumn("Customer ID", col("Customer ID").cast("int"))

## 4.7. Visual EDA

### 4.7.1. Numerical Features' Visualization

In [0]:
display(df)

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2022-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2022-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2022-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2022-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2022-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2022-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2022-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2022-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom


### 4.7.2. Timestamp Feature's Visualization

In [0]:
# Creates a new column called 'InvoiceDateOnly' with just the date part from InvoiceDate (drop the time)
# Group all rows by the date --> Count how many invoices happened for each date and name the column 'NumInvoices'
df_daily_invoices = df.withColumn("InvoiceDateOnly", to_date("InvoiceDate"))\
                      .groupBy("InvoiceDateOnly")\
                      .agg(count("Invoice").alias("NumInvoices"))\
                      .orderBy("InvoiceDateOnly")  # Sort the results by the date in ascending order


### 4.7.3. Categorical Features' Visualization

In [0]:
display(df)

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2022-12-01T07:45:00.000+0000,6.95,13085.0,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085.0,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2022-12-01T07:45:00.000+0000,2.1,13085.0,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2022-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2022-12-01T07:45:00.000+0000,1.65,13085.0,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2022-12-01T07:45:00.000+0000,1.25,13085.0,United Kingdom
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2022-12-01T07:45:00.000+0000,5.95,13085.0,United Kingdom
489435,22350,CAT BOWL,12,2022-12-01T07:46:00.000+0000,2.55,13085.0,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2022-12-01T07:46:00.000+0000,3.75,13085.0,United Kingdom


# 5. Data Anomalies Treatment

## 5.1. Duplicates Treatment

### Check Duplicates

In [0]:
# Count total rows and distinct rows
total_rows = df.count()
distinct_rows = df.distinct().count()

# Check for duplicates
if total_rows > distinct_rows:
    print(f"Duplicates found: {total_rows - distinct_rows}")
else:
    print("No duplicates found.")


Duplicates found: 34335


We found there are 34,335 duplicates in our dataset.

### Remove Duplicates


We are going to remove the duplicates for more precise clustering.

In [0]:
# Drop Dulpicates
df_no_duplicates = df.dropDuplicates()
df_no_duplicates.limit(10).display()

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2022-12-01T07:45:00.000+0000,5.95,13085,United Kingdom
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2022-12-01T07:46:00.000+0000,3.75,13085,United Kingdom
489434,79323P,PINK CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085,United Kingdom
489435,22350,CAT BOWL,12,2022-12-01T07:46:00.000+0000,2.55,13085,United Kingdom
489434,21871,SAVE THE PLANET MUG,24,2022-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,22064,PINK DOUGHNUT TRINKET POT,24,2022-12-01T07:45:00.000+0000,1.65,13085,United Kingdom
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2022-12-01T07:45:00.000+0000,6.95,13085,United Kingdom
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2022-12-01T07:45:00.000+0000,2.1,13085,United Kingdom
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2022-12-01T07:45:00.000+0000,1.25,13085,United Kingdom
489434,79323W,WHITE CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085,United Kingdom


## 5.2. Missing Values Treatment

#### Recap
There are:
- 4,382 missing values in *Description* (approximately 0.4% of the data)
- 243,007 missing values in *Customer ID* (approximately 22.8% of the data)

Possible solution:
- *Description*: Use *StockCode* --> *StockCode* and *Description* should match.
- *Customer ID*: Use *Invoice* --> The same invoices belong to the same customer.

#### *Customer ID*

We are going to fill in missing Customer IDs by assigning new unique IDs based on their Invoice numbers.<br>
If the max Cusotmer ID is 9823 in the dataset, then new customer IDs will start from 9823 + 1 = 9824.

NOTE:
```monotonically_increasing_id()``` <br>
This Spark function generates a unique and increasing ID for each row. <br>
It’s:
- Not guaranteed to be consecutive (e.g., might go 0, 4, 9…),
- But each row will get a different number,
- Safe to use in distributed environments (like Spark/Databricks).

In [0]:
#### Step 1: Extract existing Customer IDs and find the max
#### We are going to add +1 to max Customer ID to fill the missing values
# Ensure all Customer IDs are integers., Cast to integer if needed
df_filled = df_no_duplicates.withColumn("Customer ID", col("Customer ID").cast("int"))

# spark_max --> Get max existing Customer ID 
# .first()[0] --> Get the actual value (not a Row object).
# or 10000 --> If that value is None (i.e., no customer IDs exist at all), then it uses 10000 instead.
max_existing_id = df_filled.select(spark_max("Customer ID")).first()[0] or 10000

#### Step 2: Get invoices with missing Customer ID
# Filter out all rows where Customer ID is missing
# Then selects the distinct invoices (each invoice will get a new fake ID later)
missing_cus_df = df_filled.filter(col("Customer ID").isNull()).select("Invoice").distinct()

#### Step 3: Generate new Customer IDs for these invoices
# Add a unique ID starting after max_existing_id
# Cap the generated ID at 100,000 just to keep it manageable
new_ids_df = missing_cus_df.withColumn(
    "Customer ID", 
    (monotonically_increasing_id() % 100000 + max_existing_id + 1).cast("int")
)
#### Step 4:　Join these new IDs back to the original DataFrame
# Rename the generated Customer ID column to avoid conflict
new_ids_df_renamed = new_ids_df.withColumnRenamed("Customer ID", "Generated_Customer_ID")

# Perform the join and update Customer ID
df_final = df_filled.join(
    new_ids_df_renamed,
    on="Invoice", # Join the generated IDs onto the original data by Invoice
    how="left"
).withColumn(
    "Customer ID",
    when(col("Customer ID").isNull(), col("Generated_Customer_ID")).otherwise(col("Customer ID"))
    # Replaces missing Customer IDs with the generated ones
    # Keeps the existing ones as they are
).drop("Generated_Customer_ID")
    # Drops the temporary Generated_Customer_ID column afterward


In [0]:
### Check if there are any missing values
# For each column, checks if the value is null
df_final.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|Invoice|StockCode|Description|Quantity|InvoiceDate|Price|Customer ID|Country|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|      0|        0|       4275|       0|          0|    0|          0|      0|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+



### *Description*

#### Remove rows where *Price* = 0

We noticed that ***Description* is empty when the *Price* = 0**. It does not make sense to keep the rows with price = 0, so we are going to removed them.

In [0]:
# Remove the rows where Price is equal to 0
df_final = df_final.filter(col("Price") != 0)

In [0]:
# Check again if there are any missing values in the dataset
df_final.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|Invoice|StockCode|Description|Quantity|InvoiceDate|Price|Customer ID|Country|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+
|      0|        0|          0|       0|          0|    0|          0|      0|
+-------+---------+-----------+--------+-----------+-----+-----------+-------+



It can be seen that there is no missing value after removing *Price* = 0.

## 5.3. Anomalies Treatment

### *Invoice*

We understood that if invoice code starts with the letter 'C', it indicates a cancellation in *Invoice*. To simplify this, we are going to make new feature called *IsReturn* which identify if the order was returned (0) or not (1).

In [0]:
# Create new column that checks for quantity < 0 which means a return
df_final = df_final.withColumn("IsReturn", when(col("Quantity") < 0, 1).otherwise(0))

In [0]:
df_final.limit(10).display()

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country,IsReturn
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2022-12-01T07:45:00.000+0000,5.95,13085,United Kingdom,0
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2022-12-01T07:46:00.000+0000,3.75,13085,United Kingdom,0
489434,79323P,PINK CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085,United Kingdom,0
489435,22350,CAT BOWL,12,2022-12-01T07:46:00.000+0000,2.55,13085,United Kingdom,0
489434,21871,SAVE THE PLANET MUG,24,2022-12-01T07:45:00.000+0000,1.25,13085,United Kingdom,0
489434,22064,PINK DOUGHNUT TRINKET POT,24,2022-12-01T07:45:00.000+0000,1.65,13085,United Kingdom,0
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2022-12-01T07:45:00.000+0000,6.95,13085,United Kingdom,0
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2022-12-01T07:45:00.000+0000,2.1,13085,United Kingdom,0
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2022-12-01T07:45:00.000+0000,1.25,13085,United Kingdom,0
489434,79323W,WHITE CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085,United Kingdom,0


### *StockCode*

#### Remove rows where we have TEST
We realized that there are some test data in our dataset. Since they are not actual data, we are going to remove them.

In [0]:
# Remove rows that start with "TEST" in StockCode
df_final = df_final.filter(~col('StockCode').startswith('TEST'))

In [0]:
df_final.limit(10).display()

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country,IsReturn
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2022-12-01T07:45:00.000+0000,5.95,13085,United Kingdom,0
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2022-12-01T07:46:00.000+0000,3.75,13085,United Kingdom,0
489434,79323P,PINK CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085,United Kingdom,0
489435,22350,CAT BOWL,12,2022-12-01T07:46:00.000+0000,2.55,13085,United Kingdom,0
489434,21871,SAVE THE PLANET MUG,24,2022-12-01T07:45:00.000+0000,1.25,13085,United Kingdom,0
489434,22064,PINK DOUGHNUT TRINKET POT,24,2022-12-01T07:45:00.000+0000,1.65,13085,United Kingdom,0
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2022-12-01T07:45:00.000+0000,6.95,13085,United Kingdom,0
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2022-12-01T07:45:00.000+0000,2.1,13085,United Kingdom,0
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2022-12-01T07:45:00.000+0000,1.25,13085,United Kingdom,0
489434,79323W,WHITE CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085,United Kingdom,0


In [0]:
row_count = df_final.count()
print(f"Number of rows: {row_count}")

Number of rows: 1026990


After the Data Anomalies Treatment, now the dataset consists of 1008415 rows and 9 features.

# 6. Creating New DataFrame

## 6.1. Creating Dataframe for Clustering

### 6.1.1. Feature Engineering 
For better cluster identification, we are going to engineer the following features:

| New Feature | Description | Equation |
|-------------|-------------|----------|
| **Total Price** (*TotalPrice*) | Total price of each transaction line | `Quantity * Price` |
| **Number of Products** (*num_products*) | Number of unique products purchased by the customer | `countDistinct(StockCode)` |
| **Total Quantity** (*total_quantity*) | Total number of items purchased by the customer | `sum(Quantity)` |
| **Total Price** (*total_price*) | Total amount spent by the customer | `sum(TotalPrice)` |
| **Average Unit Price** (*avg_unit_price*) | Average price per unit item purchased | `avg(Price)` |
| **First Purchase Date** (*first_purchase_date*) | Date of customer's first recorded purchase | `min(InvoiceDateOnly)` |
| **Last Purchase Date** (*last_purchase_date*) | Date of customer's most recent purchase | `max(InvoiceDateOnly)` |
| **Purchase Span** (*purchase_span_days*) | Time between first and last purchase | `datediff(last_purchase_date, first_purchase_date)` |
| **Average Quantity per Invoice** (*avg_quantity_per_invoice*) | Average number of items per invoice | `total_quantity / num_invoices` |
| **Recency** (*recency_days*) | Days since the customer’s last purchase (as of 09/12/2024) | `datediff(09/12/2024, last_purchase_date)` |
| **Monthly Purchase Counts** | Count of purchases per month (pivoted) | `pivoted year_month with count(Invoice)` |


These engineered features aim to capture customer behavior over time, such as spending habits, purchasing frequency, and recency. This enriched dataset forms the basis for meaningful clustering and segmentation analysis.


In [0]:
# Make a copy
df_cl = df_final

In [0]:
# Create a 'TotalPrice' feature
df_cl = df_cl.withColumn("TotalPrice", col("Quantity") * col("Price"))

In [0]:
# Extract year, month, and day from InvoiceDate and create features
df_cl = df_cl.withColumn("InvoiceYear", year("InvoiceDate")) \
       .withColumn("InvoiceMonth", month("InvoiceDate")) \
       .withColumn("InvoiceDay", dayofmonth("InvoiceDate"))

# Create a feature called InvoiceDateOnly that contain only the date of invoice
df_cl = df_cl.withColumn("InvoiceDateOnly", to_date("InvoiceDate"))

In [0]:
df_cl.limit(10).display()

Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country,IsReturn,TotalPrice,InvoiceYear,InvoiceMonth,InvoiceDay,InvoiceDateOnly
489434,21523,FANCY FONT HOME SWEET HOME DOORMAT,10,2022-12-01T07:45:00.000+0000,5.95,13085,United Kingdom,0,59.5,2022,12,1,2022-12-01
489435,22349,"DOG BOWL , CHASING BALL DESIGN",12,2022-12-01T07:46:00.000+0000,3.75,13085,United Kingdom,0,45.0,2022,12,1,2022-12-01
489434,79323P,PINK CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085,United Kingdom,0,81.0,2022,12,1,2022-12-01
489435,22350,CAT BOWL,12,2022-12-01T07:46:00.000+0000,2.55,13085,United Kingdom,0,30.6,2022,12,1,2022-12-01
489434,21871,SAVE THE PLANET MUG,24,2022-12-01T07:45:00.000+0000,1.25,13085,United Kingdom,0,30.0,2022,12,1,2022-12-01
489434,22064,PINK DOUGHNUT TRINKET POT,24,2022-12-01T07:45:00.000+0000,1.65,13085,United Kingdom,0,39.6,2022,12,1,2022-12-01
489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2022-12-01T07:45:00.000+0000,6.95,13085,United Kingdom,0,83.4,2022,12,1,2022-12-01
489434,22041,"""RECORD FRAME 7"""" SINGLE SIZE """,48,2022-12-01T07:45:00.000+0000,2.1,13085,United Kingdom,0,100.8,2022,12,1,2022-12-01
489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2022-12-01T07:45:00.000+0000,1.25,13085,United Kingdom,0,30.0,2022,12,1,2022-12-01
489434,79323W,WHITE CHERRY LIGHTS,12,2022-12-01T07:45:00.000+0000,6.75,13085,United Kingdom,0,81.0,2022,12,1,2022-12-01


In [0]:
# Aggregate features per customer (Customer ID) and change their names
customer_df_final = df_cl.groupBy("Customer ID").agg(
    countDistinct("Invoice").alias("num_invoices"),
    countDistinct("StockCode").alias("num_products"),
    sum("Quantity").alias("total_quantity"),
    sum("TotalPrice").alias("total_price"),
    avg("Price").alias("avg_unit_price"),
    min("InvoiceDateOnly").alias("first_purchase_date"),
    max("InvoiceDateOnly").alias("last_purchase_date")
)

Here we have created 7 features:
- *num_invoices*: How many unique invoices a customer made
- *num_products*: How many unique products they bought
- *total_quantity*: Sum of all quantities purchased
- *total_price*: Total revenue from this customer
- *avg_unit_price*: Average unit price per item
- *first_purchase_date*: When they made their first purchase
- *last_purchase_date*: Their most recent purchase

In [0]:
customer_df_final.limit(10).display()

Customer ID,num_invoices,num_products,total_quantity,total_price,avg_unit_price,first_purchase_date,last_purchase_date
13285,6,182,2457,3364.59,2.334632,2023-03-25,2024-11-16
14570,3,64,431,613.75,3.286765,2023-09-22,2024-03-04
15846,1,27,79,107.01,1.816897,2023-11-19,2023-11-19
25462,1,158,278,1281.03,6.10557,2024-05-24,2024-05-24
17420,8,43,444,943.68,3.410385,2023-11-03,2024-10-20
24171,1,13,22,43.25,2.492308,2024-03-18,2024-03-18
16386,3,127,700,1068.16,2.05446,2023-11-17,2024-11-11
18024,3,21,148,236.78,2.658636,2024-07-10,2024-07-10
15727,15,426,5908,9371.71,3.572131,2023-01-24,2024-11-23
16339,1,17,21,94.05,4.958333,2024-02-28,2024-02-28


In [0]:
### Create new time-based features
# Define and convert last date of dataset (December 9, 2024) to calculate the recency
last_date_of_dataset = to_date(lit("09/12/2024"), "dd/MM/yyyy")

# Create new features: purchase_span_days, avg_quantity_per_invoice, and recency_days
customer_df_final = customer_df_final.withColumn(
    "purchase_span_days", datediff("last_purchase_date", "first_purchase_date")
).withColumn(
    "avg_quantity_per_invoice", col("total_quantity") / col("num_invoices")
).withColumn(
    "recency_days", datediff(last_date_of_dataset, col("last_purchase_date"))
)

# Rounding the new price features to 2 decimal places for better readability and consistency
customer_df_final = customer_df_final.withColumn(
    "total_price", round(col("total_price"), 2)
).withColumn(
    "avg_unit_price", round(col("avg_unit_price"), 2)
).withColumn(
    "avg_quantity_per_invoice", round(col("avg_quantity_per_invoice"), 2)
)

Here we have created
- *purchase_span_days*: Number of days between first and last purchase (customer lifetime)
- *avg_quantity_per_invoice*: Quantity of items per invoice
- *recency_days*: How many days since the customer last purchased 

We are going to add dummy features that will count how many times the client bought during each month.

In [0]:
# Step 1: Extract year-month and join them into a string like "2024-5", then create new feature called year_month
df_monthly = df_cl.withColumn("year_month", concat_ws("-", year("InvoiceDateOnly"), month("InvoiceDateOnly")))

# Step 2: Group by Customer ID and year_month, count purchases
df_monthly_count = df_monthly.groupBy("Customer ID", "year_month").agg(count("Invoice").alias("monthly_purchases"))

# Step 3: Pivot year_month to wide format
# Pivot "turns" rows into columns
df_monthly_pivot = df_monthly_count.groupBy("Customer ID").pivot("year_month").sum("monthly_purchases")

# Step 4: Fill nulls with 0 (meaning no purchases that month)
df_monthly_pivot = df_monthly_pivot.fillna(0)

# Step 5: Join with customer_df_final
df_clustering = customer_df_final.join(df_monthly_pivot, on="Customer ID", how="left")


In [0]:
row_count = df_clustering.count()
print(f"Number of rows: {row_count}")

Number of rows: 9443


After processing, we have 9,443 rows in our DataFrme for clustering.

In [0]:
df_clustering.limit(10).display()

Customer ID,num_invoices,num_products,total_quantity,total_price,avg_unit_price,first_purchase_date,last_purchase_date,purchase_span_days,avg_quantity_per_invoice,recency_days,2022-12,2023-1,2023-10,2023-11,2023-12,2023-2,2023-3,2023-4,2023-5,2023-6,2023-7,2023-8,2023-9,2024-1,2024-10,2024-11,2024-12,2024-2,2024-3,2024-4,2024-5,2024-6,2024-7,2024-8,2024-9
13285,6,182,2457,3364.59,2.33,2023-03-25,2024-11-16,602,409.5,23,0,0,0,0,0,0,21,23,0,0,0,0,0,0,0,50,0,52,0,30,0,0,55,0,0
14570,3,64,431,613.75,3.29,2023-09-22,2024-03-04,164,143.67,280,0,0,0,0,0,0,0,0,0,0,0,0,39,0,0,0,0,0,29,0,0,0,0,0,0
15846,1,27,79,107.01,1.82,2023-11-19,2023-11-19,0,79.0,386,0,0,0,29,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
25462,1,158,278,1281.03,6.11,2024-05-24,2024-05-24,0,278.0,199,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,158,0,0,0,0
17420,8,43,444,943.68,3.41,2023-11-03,2024-10-20,352,55.5,50,0,0,0,22,7,0,0,0,0,0,0,0,0,0,9,0,0,0,0,0,0,14,0,0,0
24171,1,13,22,43.25,2.49,2024-03-18,2024-03-18,0,22.0,266,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,13,0,0,0,0,0,0
16386,3,127,700,1068.16,2.05,2023-11-17,2024-11-11,360,233.33,28,0,0,0,58,0,0,0,0,0,0,0,0,0,0,0,77,0,0,4,0,0,0,0,0,0
18024,3,21,148,236.78,2.66,2024-07-10,2024-07-10,0,49.33,152,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,22,0,0
15727,15,426,5908,9371.71,3.57,2023-01-24,2024-11-23,669,393.87,16,0,99,0,50,66,0,0,0,87,13,116,19,0,32,0,39,0,0,0,46,14,0,30,74,0
16339,1,17,21,94.05,4.96,2024-02-28,2024-02-28,0,21.0,285,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,18,0,0,0,0,0,0,0


### 6.1.2. Exporting CSV File of Clustering DataFrame

In [0]:
df_pandas = df_clustering.toPandas()



In [0]:
df_pandas.to_csv("/tmp/df_clustering.csv", index=False)

In [0]:
dbutils.fs.cp("file:/tmp/df_clustering.csv", "dbfs:/FileStore/df_clustering.csv")

Out[37]: True

## 6.2. Creating Dataframe for Sales Forecasting

### 6.2.1. Feature Engineering 
For more precise sale forcasting, we are going to engineer the following features:

| New Feature | Description |
|-------------|-------------|
| **Year** (*year*) | The year extracted from the date (e.g., 2023, 2024) |
| **Month** (*month*) | The year and month in "YYYY-MM" format (e.g., 2024-01) |
| **Date** (*date*) | The actual calendar date, typically set to the 1st of each month (e.g., 2024-01-01) |
| **Total Quantity** (*total_quantity*) | Total number of items sold (or transacted) for a product (StockCode) in a given month |
| **Number of Months** (*month_num*) | A numeric representation of the month (e.g., 202401 for Jan 2024) used to sort time chronologically |
| **Month Sin** (*month_sin*) | A cyclical transformation of the month using a sine function, helping models capture seasonality patterns |
| **Month Cos** (*month_cos*) | Another cyclical transformation using cosine to pair with month_sin for better seasonal modeling |
| **6 Month Lag Quantity** (*lag_6m_quantity*) | The total quantity value from exactly 6 months earlier for the same product |
| **6 Month Rolling Average** (*rolling_avg_6m*) | The average quantity sold over the last 6 months, including the current one, helps track recent trends |
| **6 Month Standard Deviation Average** (*rolling_std_6m*) | The standard deviation of quantities over the last 6 months, measures sales volatility or stability |

In [0]:
# Make a copy
df_sf = df_final

%md
We are going to extend the Sales Forecasting DataFrame (df_sf) with future months from January to June 2025 for each unique StockCode to be able to predict the quantities of gthese months, preserving the structure and readying it for predictions.

1) Convert month to proper date format for processing.

2) Generate future month dates (Jan–Jun 2025) and create new rows for each future month & stock code combination.

3) Derive date features into *month*, *year*, and *month_num* for modeling/analysis.


In [0]:
# Convert to date and extract month
df_forecasting = df_sf.withColumn("date", to_date(col("InvoiceDate"))) \
       .withColumn("month", date_format(col("date"), "yyyy-MM"))

# Group by SKU (StockCode) and month
df_forecasting = df_forecasting.groupBy("StockCode", "month").agg(
    spark_sum("quantity").alias("total_quantity")
)

In [0]:
# Count the number of rows
row_count = df_forecasting.count()
print(f"Number of rows: {row_count}")

Number of rows: 67445


In [0]:
# Ensure 'date' column exists in df_forecasting
df_forecasting = df_forecasting.withColumn("date", to_date(col("month"), "yyyy-MM"))

# Create list of future months
future_months = []
future_start = datetime(2025, 1, 1)
future_end = datetime(2025, 6, 1)
while future_start <= future_end:
    future_months.append(future_start.strftime("%Y-%m-01"))
    future_start += relativedelta(months=1)

# Get distinct StockCodes from df_forecasting
stockcodes = [row['StockCode'] for row in df_forecasting.select("StockCode").distinct().collect()]

# Create future rows (StockCode + date)
future_rows = []
for code in stockcodes:
    for month_str in future_months:
        future_rows.append(Row(StockCode=code, date=datetime.strptime(month_str, "%Y-%m-%d")))

# Convert future_rows to DataFrame
df_future = spark.createDataFrame(future_rows)
df_future = df_future.withColumn("date", to_date(col("date")))

# Add missing columns from df_forecasting schema
for column in df_forecasting.columns:
    if column not in df_future.columns:
        df_future = df_future.withColumn(column, lit(None).cast(df_forecasting.schema[column].dataType))

# Reorder to match df_forecasting
df_future = df_future.select(df_forecasting.columns)

# Union + sort
df_sf_final = df_forecasting.unionByName(df_future).orderBy("StockCode", "date")

# Add 'month', 'year', and 'month_num' derived from 'date'
df_sf_final = df_sf_final.withColumn("month", date_format(col("date"), "yyyy-MM")) \
                   .withColumn("year", year("date")) \
                   .withColumn("month_num", month("date"))

df_sf_final.limit(10).display()


StockCode,month,total_quantity,date,year,month_num
10002,2022-12,215,2022-12-01,2022,12
10002,2023-01,291,2023-01-01,2023,1
10002,2023-02,257,2023-02-01,2023,2
10002,2023-03,641,2023-03-01,2023,3
10002,2023-04,932,2023-04-01,2023,4
10002,2023-05,464,2023-05-01,2023,5
10002,2023-06,453,2023-06-01,2023,6
10002,2023-07,512,2023-07-01,2023,7
10002,2023-08,574,2023-08-01,2023,8
10002,2023-09,240,2023-09-01,2023,9


%md
We are going to engineer the features to capture:  
> - Trends (via rolling average)
> - Seasonality (via sine/cosine)
> - Anomalies or stability (via standard deviation)
> - Delayed effects (via lag)

In [0]:
# Create month_sin, month_cos 
df_sf_final = df_sf_final.withColumn("month_sin", sin(2 * math.pi * col("month_num") / 12)) \
                   .withColumn("month_cos", cos(2 * math.pi * col("month_num") / 12))

# Define window
# Partition by StockCode, order by date
window_spec = Window.partitionBy("StockCode").orderBy("date").rowsBetween(-5, 0)  # last 6 months including current

# Create lag and rolling features 
df_sf_final = df_sf_final.withColumn("lag_6m_quantity", lag("total_quantity", 6).over(Window.partitionBy("StockCode").orderBy("date"))) \
                   .withColumn("rolling_avg_6m", avg("total_quantity").over(window_spec)) \
                   .withColumn("rolling_std_6m", stddev("total_quantity").over(window_spec))
                   
df_sf_final.limit(10).display()

StockCode,month,total_quantity,date,year,month_num,month_sin,month_cos,lag_6m_quantity,rolling_avg_6m,rolling_std_6m
10002,2022-12,215,2022-12-01,2022,12,-2.4492935982947064e-16,1.0,,215.0,
10002,2023-01,291,2023-01-01,2023,1,0.4999999999999999,0.8660254037844387,,253.0,53.74011537017761
10002,2023-02,257,2023-02-01,2023,2,0.8660254037844386,0.5000000000000001,,254.33333333333331,38.07011076071796
10002,2023-03,641,2023-03-01,2023,3,1.0,6.123233995736766e-17,,351.0,195.816240388789
10002,2023-04,932,2023-04-01,2023,4,0.8660254037844387,-0.4999999999999998,,467.2,310.2743946896037
10002,2023-05,464,2023-05-01,2023,5,0.4999999999999999,-0.8660254037844387,,466.6666666666667,277.52093014161414
10002,2023-06,453,2023-06-01,2023,6,1.2246467991473532e-16,-1.0,215.0,506.3333333333333,249.99973333319116
10002,2023-07,512,2023-07-01,2023,7,-0.4999999999999997,-0.8660254037844388,291.0,543.1666666666666,227.16638542413503
10002,2023-08,574,2023-08-01,2023,8,-0.8660254037844385,-0.5000000000000004,257.0,596.0,179.07205253751908
10002,2023-09,240,2023-09-01,2023,9,-1.0,-1.8369701987210294e-16,641.0,529.1666666666666,227.2640901389101


Since we chose to do the lag of 6 months this created missing values in the column lag_6m_quantity(the first 6 month quantity of each product 12-2022 until 06-2023). For this reason we decided to remove these rows and start our training from 06-2023.

In [0]:
# Ensure 'month' is in correct format (overwrite only if needed)
if "month" not in df_sf_final.columns:
    df_sf_final = df_sf_final.withColumn("month", date_format(col("date"), "yyyy-MM"))
else:
    df_sf_final = df_sf_final.withColumn("month", date_format(col("date"), "yyyy-MM"))

# Define required months as Spark array
required_months = [f"{y}-{str(m).zfill(2)}" for y in range(2023, 2026) for m in range(1, 13)]
required_months = [m for m in required_months if "2023-06" <= m <= "2025-06"]
required_months_array = array(*[lit(m) for m in required_months])

# Get months available for each StockCode only in the target range
stock_months = df_sf_final.filter(col("month").between("2023-06", "2025-06")) \
    .select("StockCode", "month") \
    .distinct() \
    .groupBy("StockCode") \
    .agg(array_sort(collect_set("month")).alias("months_present"))

# Keep only StockCodes with ALL required months
valid_products = stock_months.filter(
    size(array_except(required_months_array, col("months_present"))) == 0
).select("StockCode")

# Join and KEEP ONLY records in 2023-06 to 2025-06
df_sf_final = df_sf_final.join(valid_products, on="StockCode", how="inner") \
                   .filter(col("month").between("2023-06", "2025-06"))

df_sf_final.limit(10).display()

StockCode,month,total_quantity,date,year,month_num,month_sin,month_cos,lag_6m_quantity,rolling_avg_6m,rolling_std_6m
10135,2023-06,73,2023-06-01,2023,6,1.2246467991473532e-16,-1.0,123,213.83333333333331,321.93130737265466
10135,2023-07,53,2023-07-01,2023,7,-0.4999999999999997,-0.8660254037844388,48,214.66666666666663,321.42225602261374
10135,2023-08,74,2023-08-01,2023,8,-0.8660254037844385,-0.5000000000000004,868,82.33333333333333,29.770231216211048
10135,2023-09,45,2023-09-01,2023,9,-1.0,-1.8369701987210294e-16,101,73.0,31.476975712415573
10135,2023-10,84,2023-10-01,2023,10,-0.8660254037844386,0.5000000000000001,60,77.0,31.016124838541646
10135,2023-11,325,2023-11-01,2023,11,-0.5000000000000004,0.8660254037844384,133,109.0,106.80262169066825
10135,2023-12,411,2023-12-01,2023,12,-2.4492935982947064e-16,1.0,73,165.33333333333334,159.9383214450704
10135,2024-01,613,2024-01-01,2024,1,0.4999999999999999,0.8660254037844387,53,258.6666666666667,229.53053536875365
10135,2024-02,272,2024-02-01,2024,2,0.8660254037844386,0.5000000000000001,74,291.6666666666667,211.16975793580545
10135,2024-03,118,2024-03-01,2024,3,1.0,6.123233995736766e-17,45,303.8333333333333,195.64806839492863


In [0]:
# Count the number of rows
row_count = df_sf_final.count()
print(f"Number of rows: {row_count}")

Number of rows: 24250


Another problem we had is that for features *rolling_avg_6m* and *rolling_std_6m* we had missig values in the month 2025-05 to fix this problem we opted for the forward-fill solution to fill these missing values

In [0]:
# Ensure month_num exists for sorting
df_sf_final = df_sf_final.withColumn(
    "month_num",
    col("month").substr(1, 4).cast("int") * 100 + col("month").substr(6, 2).cast("int")
)

# Define forward-fill window (up to current row)
forward_window = Window.partitionBy("StockCode").orderBy("month_num").rowsBetween(Window.unboundedPreceding, 0)

# Forward-fill rolling_avg_6m and rolling_std_6m
df_sf_final = df_sf_final.withColumn(
    "rolling_avg_6m",
    last("rolling_avg_6m", ignorenulls=True).over(forward_window)
).withColumn(
    "rolling_std_6m",
    last("rolling_std_6m", ignorenulls=True).over(forward_window)
)

df_sf_final.limit(10).display()

StockCode,month,total_quantity,date,year,month_num,month_sin,month_cos,lag_6m_quantity,rolling_avg_6m,rolling_std_6m
10135,2023-06,73,2023-06-01,2023,202306,1.2246467991473532e-16,-1.0,123,213.83333333333331,321.93130737265466
10135,2023-07,53,2023-07-01,2023,202307,-0.4999999999999997,-0.8660254037844388,48,214.66666666666663,321.42225602261374
10135,2023-08,74,2023-08-01,2023,202308,-0.8660254037844385,-0.5000000000000004,868,82.33333333333333,29.770231216211048
10135,2023-09,45,2023-09-01,2023,202309,-1.0,-1.8369701987210294e-16,101,73.0,31.476975712415573
10135,2023-10,84,2023-10-01,2023,202310,-0.8660254037844386,0.5000000000000001,60,77.0,31.016124838541646
10135,2023-11,325,2023-11-01,2023,202311,-0.5000000000000004,0.8660254037844384,133,109.0,106.80262169066825
10135,2023-12,411,2023-12-01,2023,202312,-2.4492935982947064e-16,1.0,73,165.33333333333334,159.9383214450704
10135,2024-01,613,2024-01-01,2024,202401,0.4999999999999999,0.8660254037844387,53,258.6666666666667,229.53053536875365
10135,2024-02,272,2024-02-01,2024,202402,0.8660254037844386,0.5000000000000001,74,291.6666666666667,211.16975793580545
10135,2024-03,118,2024-03-01,2024,202403,1.0,6.123233995736766e-17,45,303.8333333333333,195.64806839492863


In [0]:
# Count the number of rows
row_count = df_sf_final.count()
print(f"Number of rows: {row_count}")


Number of rows: 24250


After all processes, we have 24,250 rows for Sales Forecasting DataFrame.

### 6.2.2. Exporting CSV File of Sales Forecast DataFrame

In [0]:
df_sf_pandas = df_sf_final.toPandas()

In [0]:
df_sf_pandas.to_csv("/tmp/df_sf.csv", index=False)

In [0]:
dbutils.fs.cp("file:/tmp/df_sf.csv", "dbfs:/FileStore/df_sf.csv")

Out[49]: True