## The Data

# Online Retail.csv

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Importing sales data
sales_data = my_spark.read.csv(
    "Online Retail.csv", header=True, inferSchema=True, sep=",")

# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

In [None]:
daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum", "UnitPrice": "avg"})
daily_sales_data = daily_sales_data.withColumnRenamed("sum(Quantity)", "Quantity")

In [None]:
# Split data on the splitting date, '2011-09-25'
split_date_train_test = "2011-09-25"

# Create the train and test ds
train_data = daily_sales_data.filter(col('InvoiceDate') <= split_date_train_test)
test_data = daily_sales_data.filter(col('InvoiceDate') > split_date_train_test)
pd_daily_train_data = train_data.toPandas()

In [None]:
country_indexer = StringIndexer(inputCol='Country', outputCol='CountryIndex').setHandleInvalid("keep")
stock_code_indexer = StringIndexer(inputCol="StockCode", outputCol="StockCodeIndex").setHandleInvalid("keep")

In [None]:
feature_cols = ["CountryIndex", "StockCodeIndex", "Month", "Year", "DayOfWeek", "Day","Week"]
# use VectorAssembler to combine features
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

# use RandomForest model
rf = RandomForestRegressor(featuresCol='features', labelCol='Quantity', maxBins=4000)

In [None]:
# create a pipeline
pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, rf])

In [None]:
# train the model
model = pipeline.fit(train_data)

In [None]:
# get predictions
test_predictions = model.transform(test_data)
test_predictions = test_predictions.withColumn('prediction', col('prediction').cast('double'))

In [None]:
# get mae
mae_evaluator = RegressionEvaluator(labelCol='Quantity', predictionCol='prediction', metricName='mae')
mae = mae_evaluator.evaluate(test_predictions)

In [None]:
# return an integer of how many units will be sold during the week 39 of 2011
weekly_test_predictions = test_predictions.groupby("Year", "Week").agg({'prediction': 'sum'})
promotion_week = weekly_test_predictions = weekly_test_predictions.filter(col('Week') == 39)
quantity_sold_w39 = int(promotion_week.select("sum(prediction)").collect()[0][0])
print(quantity_sold_w39)

In [None]:
my_spark.stop()