It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed.

Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.

You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.


## The Data

You are provided with a sales dataset to use. A summary and preview are provided below.

# Online Retail.csv

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

**Analyze the Online Retail.csv dataset and build a forecasting model to predict 'Quantity' of products sold.**

1. Split the data into two sets based on the splitting date, "2011-09-25". All data up to and including this date should be in the training set, while data after this date should be in the test set. Return a pandas DataFrame, pd_daily_train_data, containing, at least, the columns "Country", "StockCode", "InvoiceDate", "Quantity".

2. Using your test set, calculate the Mean Absolute Error (MAE) for your forecast model for the 'Quantity' sold? Return a double (float) named mae.

3. How many units are expected to be sold during the week 39 of 2011? Store as an integer variable called quantity_sold_w39.

# SET UP

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4aa653551ae6b114267881209dd91ca74759a486f550974419816e786bd4cb1b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [4]:
spark = SparkSession.builder.appName("OnlineRetail").getOrCreate()

# READ DATA

In [5]:
df = spark.read.csv('OnlineRetail.csv', header=True, inferSchema=True)
df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows



In [None]:
print("The data contain %d records." % df.count())

The data contain 541909 records.


In [None]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



In [None]:
df.describe().show()

+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|            541909|            541909|              540455|            541909|         541909|           541909|            406829|     541909|
|   mean|  559965.752026781|27623.240210938104|             20713.0|  9.55224954743324|           NULL|4.611113626082972|15287.690570239585|       NULL|
| stddev|13428.417280800133| 16799.73762842775|                NULL|218.08115785023486|           NULL| 96.7598530611797| 1713.600303321594|       NULL|
|    min|            536365|             10002| 4 PURPLE FLOCK D...|            -8

# CLEANING DATA

In [6]:
#delete the line with the blank customer code
df = df.dropna(subset=['CustomerID'])

In [7]:
#Delete the canceled invoice line
df = df.filter((col('UnitPrice') > 0) & (col('Quantity') > 0))

In [8]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
df = df.withColumn('InvoiceDate', to_timestamp(col('InvoiceDate'), 'MM/dd/yyyy H:mm'))

In [9]:
print("After cleaning data , The data contain %d records." % df.count())

After cleaning data , The data contain 397884 records.


In [10]:
df = df.withColumn('Year', year(col('InvoiceDate')))
df = df.withColumn('Month', month(col('InvoiceDate')))
df = df.withColumn('Day', dayofmonth(col('InvoiceDate')))
df = df.withColumn('Week', weekofyear(col('InvoiceDate')))
df = df.withColumn('DayOfWeek', dayofweek(col('InvoiceDate')))

In [11]:
df = df.withColumn('Revenue',round(df.Quantity * df.UnitPrice, 0))

In [None]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Week: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Revenue: double (nullable = true)



In [12]:
df.describe().show()

+-------+------------------+------------------+--------------------+------------------+------------------+------------------+-----------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|         InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|    Country|              Year|             Month|               Day|              Week|         DayOfWeek|           Revenue|
+-------+------------------+------------------+--------------------+------------------+------------------+------------------+-----------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|            397884|            397884|              397884|            397884|            397884|            397884|     397884|            397884|            397884|            397884|            397884|            397884|            

In [13]:
#cleaning outlier
column_Quantity = df.select("Quantity").toPandas()
Q25 = column_Quantity.quantile(0.25)
Q75 = column_Quantity.quantile(0.75)
IQR = Q75 - Q25
upper_limit = Q75 + 1.5*IQR
lower_limit = Q25 - 1.5*IQR

In [14]:
print(upper_limit[0])
print(lower_limit[0])

27.0
-13.0


In [15]:
df = df.filter((col('Quantity') > lower_limit[0]) & (col('Quantity') < upper_limit[0]))

In [16]:
df.describe().show()

+-------+------------------+------------------+--------------------+-----------------+------------------+------------------+-----------+------------------+------------------+----------------+------------------+------------------+------------------+
|summary|         InvoiceNo|         StockCode|         Description|         Quantity|         UnitPrice|        CustomerID|    Country|              Year|             Month|             Day|              Week|         DayOfWeek|           Revenue|
+-------+------------------+------------------+--------------------+-----------------+------------------+------------------+-----------+------------------+------------------+----------------+------------------+------------------+------------------+
|  count|            372190|            372190|              372190|           372190|            372190|            372190|     372190|            372190|            372190|          372190|            372190|            372190|            372190|
|   

# PINELINE

In [17]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Week: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Revenue: double (nullable = true)



In [20]:
# StringIndexer for column 'Country' và 'StockCode'
indexer = StringIndexer(inputCols=["Country", "StockCode"], outputCols=["CountryIndex", "StockCodeIndex"], handleInvalid="keep")

# OneHotEncoder : 'CountryIndex' và 'StockCodeIndex'
encoder = OneHotEncoder(inputCols=["CountryIndex", "StockCodeIndex"], outputCols=["CountryVec", "StockCodeVec"])

# Assembling all feature columns
assembler = VectorAssembler(inputCols=["CountryVec", "StockCodeVec"], outputCol="features")

# Defining the linear regression model to predict 'Quantity'
lr = LinearRegression(labelCol="Quantity", featuresCol="features")

# Creating a pipeline with all the above stages
pipeline = Pipeline(stages=[indexer, encoder, assembler,lr])

# Split training and testing data by date "2011-09-25"
split_date = "2011-09-25"

# Convert the split date string to a timestamp
split_timestamp = to_timestamp(lit(split_date))

# Use the timestamp for splitting the data
train_data = df.filter(col("InvoiceDate") <= split_timestamp)
test_data = df.filter(col("InvoiceDate") > split_timestamp)

# Return a pandas DataFrame, pd_daily_train_data, containing, at least, the columns "Country", "StockCode", "InvoiceDate", "Quantity".
pd_daily_train_data = train_data.select("Country", "StockCode", "InvoiceDate", "Quantity").toPandas()

# Training the pipeline on the training data
model = pipeline.fit(train_data)

# Using the trained model to make predictions on the test data
predictions = model.transform(test_data)

# Evaluating the model's performance
evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(predictions)

print(f" Mean Absolute Error on test data = {mae}")


# Calculate expected quantity sold in week 39 of 2011
week_39_data = test_data.filter(col("Week") == 39)
quantity_sold_w39 = week_39_data.select(sum("Quantity").alias("TotalQuantity")).collect()[0]["TotalQuantity"]
print("The number of units sold in week 39 of 2011 was : ",quantity_sold_w39)

#Predicted results from the model compared to reality
predictions.select("prediction", "Quantity").show(30)

 Mean Absolute Error on test data = 4.370810383657595
The number of units sold in week 39 of 2011 was :  72059
+------------------+--------+
|        prediction|Quantity|
+------------------+--------+
| 4.531185844842659|       4|
| 4.378096475856258|       6|
| 7.192972788785883|       6|
| 7.013138005341645|       6|
| 7.692358675429518|       3|
| 6.714733779486458|       6|
| 7.568505154711499|       6|
|  7.21757724212274|       6|
| 7.476933655347669|       6|
|14.068818023695588|      12|
|19.164751545227496|      24|
| 7.171549818623401|       6|
|  5.95119391269419|       2|
|  6.25000070584645|       2|
|13.742559509428137|      24|
| 6.552367185355938|       6|
| 6.974411572492822|       6|
| 5.182153158253581|       5|
| 7.735154100720095|       8|
|10.830245112696058|      10|
|12.433138508030892|      12|
| 6.145297465299245|       5|
| 7.281824926890979|       8|
|3.1240094638766713|       2|
| 7.350108505553226|      12|
| 8.010887982094678|      12|
| 6.853906098906493