## The Data

# Online Retail.csv

| Column     | Description              |
|------------|--------------------------|
| `'InvoiceNo'` | A 6-digit number uniquely assigned to each transaction |
| `'StockCode'` | A 5-digit number uniquely assigned to each distinct product |
| `'Description'` | The product name |
| `'Quantity'` | The quantity of each product (item) per transaction |
| `'UnitPrice'` | Product price per unit |
| `'CustomerID'` | A 5-digit number uniquely assigned to each customer |
| `'Country'` | The name of the country where each customer resides |
| `'InvoiceDate'` | The day and time when each transaction was generated `"MM/DD/YYYY"` |
| `'Year'` | The year when each transaction was generated |
| `'Month'` | The month when each transaction was generated |
| `'Week'` | The week when each transaction was generated (`1`-`52`) |
| `'Day'` | The day of the month when each transaction was generated (`1`-`31`) |
| `'DayOfWeek'` | The day of the weeke when each transaction was generated <br>(`0` = Monday, `6` = Sunday) |

In [1]:
!pip install pyspark



In [7]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

# Importing sales data
sales_data = my_spark.read.csv(
    "Online_Retail.csv", header=True, inferSchema=True, sep=",")

# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

In [8]:
from pyspark.sql.functions import sum as sum, avg

# Correcting the column names in the groupBy and agg functions
# Aggregate data into daily intervals
daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum", "UnitPrice": "avg"})
# Rename the target column
daily_sales_data = daily_sales_data.withColumnRenamed("sum(Quantity)", "Quantity").withColumnRenamed("avg(UnitPrice)", "UnitPrice")
    
splitting_date = "2011-09-25"

train_data = daily_sales_data.filter(daily_sales_data.InvoiceDate < splitting_date)
test_data = daily_sales_data.filter(daily_sales_data.InvoiceDate >= splitting_date)

# Convert to Pandas DataFrame for local processing
pd_daily_train_data = train_data.toPandas()

In [9]:
# Create a StringIndexer
country_indexer = StringIndexer(inputCol="Country", outputCol="country_index").setHandleInvalid("keep")
stock_code_indexer = StringIndexer(inputCol="StockCode", outputCol="stock_code_index").setHandleInvalid("keep")

# Make a VectorAssembler
assembler = VectorAssembler(inputCols=["Month", "Day", "DayOfWeek", "Week", "country_index", "stock_code_index"], outputCol="features")
rf = RandomForestRegressor(featuresCol="features",labelCol="Quantity", maxBins = 4000)

sales_pipe = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, rf])

# Fit the model using Spark DataFrame
model = sales_pipe.fit(train_data)

In [10]:
from pyspark.sql.functions import col

# Assuming sales_pipe is a trained PipelineModel
test_predictions = model.transform(test_data)
test_predictions = test_predictions.withColumn("prediction", col("prediction").cast("double"))

In [11]:
# Calculating MEAN ABSOLUTE ERROR
mae = RegressionEvaluator(labelCol = "Quantity", predictionCol = "prediction", metricName = "mae")
mae = mae.evaluate(test_predictions)

In [12]:
weekly_test_predictions = test_predictions.groupBy("Year", "Week").agg({"prediction": "sum"})

# Finding the quantity sold on the 39 week. 
promotion_week = weekly_test_predictions.filter(col('Week')==39)

# Storing prediction as quantity_sold_w30
quantity_sold_w39 = int(promotion_week.select("sum(prediction)").collect()[0][0])
print(quantity_sold_w39)
my_spark.stop()

87357
