![Shopping trolley in front of a laptop](./iStock-1249219777.jpg)

It's simple to buy any product with a click and have it delivered to your door. Online shopping has been rapidly evolving over the last few years, making our lives easier. But behind the scenes, e-commerce companies face a complex challenge that needs to be addressed. 

Uncertainty plays a big role in how the supply chains plan and organize their operations to ensure that the products are delivered on time. These uncertainties can lead to challenges such as stockouts, delayed deliveries, and increased operational costs.

You work for the Sales & Operations Planning (S&OP) team at a multinational e-commerce company. They need your help to assist in planning for the upcoming end-of-the-year sales. They want to use your insights to plan for promotional opportunities and manage their inventory. This effort is to ensure they have the right products in stock when needed and ensure their customers are satisfied with the prompt delivery to their doorstep.


## The Data

You are provided with a sales dataset to use. A summary and preview are provided below.

# Online Retail.csv

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

In [1]:
# Import required libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

print(my_spark)
print(pyspark.__version__)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/16 02:12:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
<pyspark.sql.session.SparkSession object at 0x7f04d8b97100>
3.3.1


In [2]:
# Importing sales data
sales_data = my_spark.read.csv(
    "Online Retail.csv", header=True, inferSchema=True, sep=",")

# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

sales_data.show(5)

                                                                                

+---------+---------+--------------------+--------+---------+----------+--------------+-----------+----+-----+----+---+---------+
|InvoiceNo|StockCode|         Description|Quantity|UnitPrice|CustomerID|       Country|InvoiceDate|Year|Month|Week|Day|DayOfWeek|
+---------+---------+--------------------+--------+---------+----------+--------------+-----------+----+-----+----+---+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6|     2.55|     17850|United Kingdom| 2010-01-12|2010|    1|   2| 12|        1|
|   536365|    71053| WHITE METAL LANTERN|       6|     3.39|     17850|United Kingdom| 2010-01-12|2010|    1|   2| 12|        1|
|   536365|   84406B|CREAM CUPID HEART...|       8|     2.75|     17850|United Kingdom| 2010-01-12|2010|    1|   2| 12|        1|
|   536365|   84029G|KNITTED UNION FLA...|       6|     3.39|     17850|United Kingdom| 2010-01-12|2010|    1|   2| 12|        1|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|     3.39|     17850|United Kingdom| 20

In [3]:
# Aggregate the data based on average unit price and quantity
group_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity":"sum", "UnitPrice":"avg"})
group_sales_data = group_sales_data.withColumnRenamed("sum(Quantity)", "Quantity")
group_sales_data = group_sales_data.withColumnRenamed("avg(UnitPrice)", "AvgUnitPrice")
group_sales_data.show(5)

[Stage 3:>                                                          (0 + 2) / 2]

+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|AvgUnitPrice|Quantity|
+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
|United Kingdom|    22912| 2010-01-12|2010|    1| 12|   2|        1|        4.95|       3|
|        France|    22659| 2010-01-12|2010|    1| 12|   2|        1|        1.95|      24|
|United Kingdom|    21544| 2010-01-12|2010|    1| 12|   2|        1|        0.85|      12|
|United Kingdom|    21098| 2010-01-12|2010|    1| 12|   2|        1|        1.25|      16|
|        Norway|    85150| 2010-01-12|2010|    1| 12|   2|        1|        2.55|      12|
+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
only showing top 5 rows



                                                                                

In [4]:
df = group_sales_data.selectExpr("max(InvoiceDate)", "min(InvoiceDate)")
df.show(5)

+----------------+----------------+
|max(InvoiceDate)|min(InvoiceDate)|
+----------------+----------------+
|      2011-12-10|      2010-01-12|
+----------------+----------------+



                                                                                

The dataset contains records from 12 January 2010 to 10 December 2011. Since we are forecasting the upcoming end of year sales (4th Quarter of the calendar year), records up to 30 September 2011 will be used for training data. Records after 30 September 2011 will be used for test data.

In [5]:
# Split the data into training and test sets
# sales_train contains data up to "2011-09-30"
# sales_test contains data after "2011-09-30"
train_data = group_sales_data.filter(group_sales_data.InvoiceDate <= "2011-09-30")
test_data = group_sales_data.filter(group_sales_data.InvoiceDate > "2011-09-30")

In [6]:
train_data.show(5)



+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|AvgUnitPrice|Quantity|
+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
|United Kingdom|    22912| 2010-01-12|2010|    1| 12|   2|        1|        4.95|       3|
|        France|    22659| 2010-01-12|2010|    1| 12|   2|        1|        1.95|      24|
|United Kingdom|    21544| 2010-01-12|2010|    1| 12|   2|        1|        0.85|      12|
|United Kingdom|    21098| 2010-01-12|2010|    1| 12|   2|        1|        1.25|      16|
|        Norway|    85150| 2010-01-12|2010|    1| 12|   2|        1|        2.55|      12|
+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
only showing top 5 rows



                                                                                

In [7]:
test_data.show(5)



+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|AvgUnitPrice|Quantity|
+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
|United Kingdom|    22414| 2011-10-01|2011|   10|  1|  39|        5|        7.95|       1|
|United Kingdom|    22773| 2011-10-01|2011|   10|  1|  39|        5|        1.25|      12|
|United Kingdom|    22180| 2011-10-01|2011|   10|  1|  39|        5|        9.95|       1|
|United Kingdom|    20686| 2011-10-01|2011|   10|  1|  39|        5|        3.25|       2|
|United Kingdom|    82580| 2011-11-01|2011|   11|  1|  44|        1|        0.73|      12|
+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+
only showing top 5 rows



                                                                                

In [8]:
# Use pandas function to check whether there are any missing values
pd_sales_data = group_sales_data.toPandas()
pd_sales_data.info()

                                                                                

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 239976 entries, 0 to 239975
Data columns (total 10 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   Country       239976 non-null  object 
 1   StockCode     239976 non-null  object 
 2   InvoiceDate   239976 non-null  object 
 3   Year          239976 non-null  int32  
 4   Month         239976 non-null  int32  
 5   Day           239976 non-null  int32  
 6   Week          239976 non-null  int32  
 7   DayOfWeek     239976 non-null  int32  
 8   AvgUnitPrice  239976 non-null  float64
 9   Quantity      239976 non-null  int64  
dtypes: float64(1), int32(5), int64(1), object(3)
memory usage: 13.7+ MB


Next step is to select the features for training a forecasting model. Categorical columns are converted to numeric indexes because machine learning model can only process numeric data. InvoiceDate and AvgUnitPrice columns are dropped because they are not relevant features for forecasting the sales quantity for any given week.

In [9]:
# Create indexers for categorical columns
country_indexer = StringIndexer(inputCol="Country", outputCol="CountryIndex").setHandleInvalid("keep")
stock_code_indexer = StringIndexer(inputCol="StockCode", outputCol="StockCodeIndex").setHandleInvalid("keep")

# Feature selection
feature_col = ['CountryIndex', 'StockCodeIndex', 'Year', 'Month', 'Day', 'Week', 'DayOfWeek']
label_col = ['Quantity']

# Combine all features into a single feature vector
assembler = VectorAssembler(inputCols=feature_col, outputCol="features")

In [10]:
# Initialize a model
rf = RandomForestRegressor(featuresCol="features", labelCol="Quantity", maxBins=4000, seed=123)

# Create pipeline 
pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, rf])

# Create the model by fitting the pipeline to training data
model = pipeline.fit(train_data)

                                                                                

24/03/16 02:13:10 WARN DAGScheduler: Broadcasting large task binary with size 1039.9 KiB


                                                                                

24/03/16 02:13:12 WARN DAGScheduler: Broadcasting large task binary with size 1769.4 KiB


                                                                                

24/03/16 02:13:14 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB


                                                                                

In [11]:
# Predict the test data
test_predictions = model.transform(test_data)
test_predictions = test_predictions.withColumn('prediction', col("prediction").cast("double"))
test_predictions.select("Quantity", "prediction").show()

[Stage 53:>                                                         (0 + 2) / 2]

+--------+------------------+
|Quantity|        prediction|
+--------+------------------+
|       1|7.5129975048714215|
|      12| 13.88000067169404|
|       1| 5.231038280811128|
|       2|  8.37660485614391|
|      12|14.207739397202118|
|      40|15.744511444032792|
|       4|15.940656530932458|
|       9| 6.919204758300566|
|       1| 5.609166345664996|
|       3| 7.064579465577227|
|       4| 6.509400203253241|
|      12|12.927051785975198|
|      24| 37.17114746713155|
|      12| 9.858831971734599|
|       2| 5.449614425206179|
|      12|17.127505961287408|
|       9| 17.70660916005197|
|       4|14.714168995237236|
|      46|22.427825151441656|
|      12|11.677030074670807|
+--------+------------------+
only showing top 20 rows



                                                                                

In [12]:
# Evaluate the model predictions using Mean Absolute Error
mae_evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(test_predictions)
print(mae)

[Stage 58:>                                                         (0 + 1) / 1]

9.480718896342323


                                                                                

In [13]:
test_predictions.show(2)

[Stage 59:>                                                         (0 + 2) / 2]

+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+------------+--------------+--------------------+------------------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|AvgUnitPrice|Quantity|CountryIndex|StockCodeIndex|            features|        prediction|
+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+------------+--------------+--------------------+------------------+
|United Kingdom|    22414| 2011-10-01|2011|   10|  1|  39|        5|        7.95|       1|         0.0|        1345.0|[0.0,1345.0,2011....|7.5129975048714215|
|United Kingdom|    22773| 2011-10-01|2011|   10|  1|  39|        5|        1.25|      12|         0.0|         616.0|[0.0,616.0,2011.0...| 13.88000067169404|
+--------------+---------+-----------+----+-----+---+----+---------+------------+--------+------------+--------------+--------------------+------------------+
only showing top 2 rows



                                                                                

In [16]:
# Predict the quantity to be sold in Quarter 4 of 2011
weekly_sales = test_predictions.groupBy("Year", "Week").agg({"prediction":"sum", "Quantity":"sum"})
weekly_sales = weekly_sales.withColumnRenamed("sum(prediction)", "Prediction")
weekly_sales = weekly_sales.withColumnRenamed("sum(Quantity)", "Quantity")
weekly_sales = weekly_sales.withColumn("Prediction", weekly_sales.Prediction.cast("Integer"))
#week39_2011 = weekly_sales.filter("Year = 2011 AND Week = 39")
#week39_2011.show()
weekly_sales.sort(['Year', 'Week']).show()

[Stage 68:>                                                         (0 + 2) / 2]

+----+----+----------+--------+
|Year|Week|Prediction|Quantity|
+----+----+----------+--------+
|2011|  39|     14681|   12591|
|2011|  40|     50928|   41597|
|2011|  41|     86179|   78371|
|2011|  42|     99805|   88704|
|2011|  43|     97370|   91546|
|2011|  44|     59540|   55742|
|2011|  45|     96821|   90898|
|2011|  46|    117027|  114444|
|2011|  47|    107656|  102550|
|2011|  48|     73849|   73760|
|2011|  49|     66907|   61973|
+----+----+----------+--------+



                                                                                

The end-of-the-year sales prediction is expected to increase every week and we are expecting to hit 6 figures sales demand on weeks 46 and 47 (between end-November to begining of December).

In [26]:
to_csv = test_predictions.drop('CountryIndex', 'StockCodeIndex', 'features')
to_csv = to_csv.withColumnRenamed('prediction', 'Predicted_Quantity')
to_csv.show()

+--------------+---------+-----------+----+-----+---+----+---------+------------------+--------+------------------+
|       Country|StockCode|InvoiceDate|Year|Month|Day|Week|DayOfWeek|      AvgUnitPrice|Quantity|Predicted_Quantity|
+--------------+---------+-----------+----+-----+---+----+---------+------------------+--------+------------------+
|United Kingdom|    22414| 2011-10-01|2011|   10|  1|  39|        5|              7.95|       1|7.5129975048714215|
|United Kingdom|    22773| 2011-10-01|2011|   10|  1|  39|        5|              1.25|      12| 13.88000067169404|
|United Kingdom|    22180| 2011-10-01|2011|   10|  1|  39|        5|              9.95|       1| 5.231038280811128|
|United Kingdom|    20686| 2011-10-01|2011|   10|  1|  39|        5|              3.25|       2|  8.37660485614391|
|United Kingdom|    82580| 2011-11-01|2011|   11|  1|  44|        1|              0.73|      12|14.207739397202118|
|United Kingdom|    22113| 2011-11-01|2011|   11|  1|  44|        1|3.57

In [28]:
# Export the test_predictions into CSV file
to_csv.write.csv("Online_Retail_Prediction.csv", header=True)

                                                                                