# 0. Libraries

In [2]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, DecisionTreeRegressor, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml.feature import StringIndexer
import os
import logging

logging.getLogger("org.apache.spark.scheduler.DAGScheduler").setLevel(logging.ERROR)

In [4]:
# Set JAVA_HOME environment variable explicitly
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/jdk-22.jdk/Contents/Home'

from pyspark.sql import SparkSession


In [5]:
# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()


24/06/20 11:49:46 WARN Utils: Your hostname, Geraldines-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.209 instead (on interface en0)
24/06/20 11:49:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/20 11:49:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 1. Data Aggregation

In [6]:
# Load the CSV file into a DataFrame
sales_data = my_spark.read.csv("online_retail.csv", header=True, inferSchema=True)

# Convert InvoiceDate to a TimestampType
sales_data = sales_data.withColumn("InvoiceDate", to_timestamp(sales_data["InvoiceDate"], "M/d/yyyy H:m"))

# Add the additional date features
sales_data = sales_data.withColumn("Year", year("InvoiceDate")) \
       .withColumn("Month", month("InvoiceDate")) \
       .withColumn("Week", weekofyear("InvoiceDate")) \
       .withColumn("Day", dayofmonth("InvoiceDate")) \
       .withColumn("DayOfWeek", dayofweek("InvoiceDate"))

sales_data.show()


                                                                                

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+----+---+---------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|Year|Month|Week|Day|DayOfWeek|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----+-----+----+---+---------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|2010|   12|  48|  1|        4|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|  48|  1|        4|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|2010|   12|  48|  1|        4|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010|   12|  48|  1|        4|
|   536365|   84029E|RED WOOLLY HO

In [7]:
sales_data.orderBy("InvoiceDate", ascending=False).show(5)



+---------+---------+--------------------+--------+-------------------+---------+----------+-------+----+-----+----+---+---------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|Country|Year|Month|Week|Day|DayOfWeek|
+---------+---------+--------------------+--------+-------------------+---------+----------+-------+----+-----+----+---+---------+
|   581587|    22555|PLASTERS IN TIN S...|      12|2011-12-09 12:50:00|     1.65|     12680| France|2011|   12|  49|  9|        6|
|   581587|    22367|CHILDRENS APRON S...|       8|2011-12-09 12:50:00|     1.95|     12680| France|2011|   12|  49|  9|        6|
|   581587|    22728|ALARM CLOCK BAKEL...|       4|2011-12-09 12:50:00|     3.75|     12680| France|2011|   12|  49|  9|        6|
|   581587|    22631|CIRCUS PARADE LUN...|      12|2011-12-09 12:50:00|     1.95|     12680| France|2011|   12|  49|  9|        6|
|   581587|    22727|ALARM CLOCK BAKEL...|       4|2011-12-09 12:50:00|     3.75|  

                                                                                

In [8]:
# Convert InvoiceDate to datetime 
sales_data = sales_data.withColumn("InvoiceDate", to_date(
    to_timestamp(col("InvoiceDate"), "d/M/yyyy H:mm")))

# Aggregate data into daily intervals
daily_sales_data = sales_data.groupBy("Country", "StockCode", "InvoiceDate", "Year", "Month", "Day", "Week", "DayOfWeek").agg({"Quantity": "sum",                                                                                                           "UnitPrice": "avg"})
# Rename the target column
daily_sales_data = daily_sales_data.withColumnRenamed(
    "sum(Quantity)", "Quantity")


# 2. Train Test Split

In [9]:

# Split the data into two sets based on the spliting date, "2011-09-25". All data up to and including this date should be in the training set, while data after this date should be in the testing set. Return a pandas Dataframe, pd_daily_train_data, containing, at least, the columns ["Country", "StockCode", "InvoiceDate", "Quantity"].

split_date_train_test = "2011-09-25"

# Creating the train and test datasets
train_data = daily_sales_data.filter(
    col("InvoiceDate") <= split_date_train_test)
test_data = daily_sales_data.filter(col("InvoiceDate") > split_date_train_test)

pd_daily_train_data = train_data.toPandas()


  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (
                                                                                

# 3. Regression Model Building 

In [10]:

# Creating indexer for categorical columns
country_indexer = StringIndexer(
    inputCol="Country", outputCol="CountryIndex").setHandleInvalid("keep")
stock_code_indexer = StringIndexer(
    inputCol="StockCode", outputCol="StockCodeIndex").setHandleInvalid("keep")

# Selectiong features columns
feature_cols = ["CountryIndex", "StockCodeIndex", "Month", "Year",
                "DayOfWeek", "Day", "Week"]

# Using vector assembler to combine features
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Initializing a Random Forest model
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="Quantity",
    maxBins=4000
)

# Create a pipeline for staging the processes
pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, rf])

# Training the model
model = pipeline.fit(train_data)


24/06/20 11:50:00 WARN DAGScheduler: Broadcasting large task binary with size 1101.6 KiB
24/06/20 11:50:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/06/20 11:50:01 WARN DAGScheduler: Broadcasting large task binary with size 1753.5 KiB
                                                                                

# 4. Model Evaluation

In [11]:

# Getting test predictions
test_predictions = model.transform(test_data)
test_predictions = test_predictions.withColumn(
    "prediction", col("prediction").cast("double"))

# Provide the Mean Absolute Error (MAE) for your forecast? Return a double/floar "mae"

# Initializing the evaluator
mae_evaluator = RegressionEvaluator(
    labelCol="Quantity", predictionCol="prediction", metricName="mae")

# Obtaining MAE
mae = mae_evaluator.evaluate(test_predictions)


                                                                                

In [12]:
print(f'Mean Absolute Error (MAE) is {mae}')

Mean Absolute Error (MAE) is 30.468210127710204


# 5. Example of Model Usage

In [13]:
# Lets say we want to know how many will be sold week 42 of 2011? 

weekly_test_predictions = test_predictions.groupBy("Year", "Week").agg({"prediction": "sum"})

promotion_week = weekly_test_predictions.filter(col('Week')==42)

# Get the predicted quantity for week 42 of 2011
predicted_quantity = int(promotion_week.select("sum(prediction)").collect()[0][0])
print(f'Predicted Quantity Sold week 42 of 2011: {predicted_quantity} Units')

# Filter actual data for week 42 of 2011
actual_weekly_sales = test_data.groupBy("Year", "Week").agg({"Quantity": "sum"})
actual_quantity = actual_weekly_sales.filter(col('Week') == 42).select("sum(Quantity)").collect()[0][0]

print(f'Actual Quantity Sold week 42 of 2011: {actual_quantity} Units')

# Compare predicted vs actual
if actual_quantity is not None:
    print(f'Comparison - Predicted vs Actual:')
    print(f'Predicted: {predicted_quantity} Units')
    print(f'Actual: {actual_quantity} Units')
    difference = predicted_quantity - actual_quantity
    print(f'Difference: {difference} Units')
else:
    print('No actual data found for week 42 of 2011 in the test dataset.')


Predicted Quantity Sold week 42 of 2011: 238659 Units
Actual Quantity Sold week 42 of 2011: 150811 Units
Comparison - Predicted vs Actual:
Predicted: 238659 Units
Actual: 150811 Units
Difference: 87848 Units


# 6. We can see using one model leads to a terrible forecaster so lets run other models and pick a better 

In [None]:
logging.getLogger("org.apache.spark.scheduler.DAGScheduler").setLevel(logging.ERROR)
# Initialize regression models
rf = RandomForestRegressor(featuresCol="features", labelCol="Quantity", maxBins=4000)
lr = LinearRegression(featuresCol="features", labelCol="Quantity")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="Quantity", maxBins=4000)  # Increase maxBins parameter
gbt = GBTRegressor(featuresCol="features", labelCol="Quantity", maxBins=4000)

# Create a list of models to iterate over
models = [rf, lr, dt, gbt]

# Initialize an empty dictionary to store MAEs
mae_dict = {}

# Iterate over models
for model in models:
    # Create a pipeline for each model
    pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, model])
    
    # Train the model
    trained_model = pipeline.fit(train_data)
    
    # Make predictions on test data
    test_predictions = trained_model.transform(test_data)
    
    # Evaluate MAE
    evaluator = RegressionEvaluator(labelCol="Quantity", predictionCol="prediction", metricName="mae")
    mae = evaluator.evaluate(test_predictions)
    
    # Store MAE in dictionary
    model_name = model.__class__.__name__
    mae_dict[model_name] = mae
    
    # Print MAE for each model
    print(f"{model_name} MAE: {mae}")



In [18]:
# Print the MAE dictionary for comparison
print("\nMAE Comparison:")
for model_name, mae in mae_dict.items():
    print(f"{model_name}: {mae}")


MAE Comparison:
RandomForestRegressor: 30.468210127710204
LinearRegression: 20.78916333892425
DecisionTreeRegressor: 22.999555896800754
GBTRegressor: 49.040119594620734


Linear regression is the best regressor

# 8. Using best regressor

In [15]:

# Initialize a new LinearRegression model
best_regressor = LinearRegression(
    featuresCol="features",
    labelCol="Quantity",
    maxIter=10,
    regParam=0.3,
    elasticNetParam=0.8
)

# Create a new pipeline with the best regressor
pipeline = Pipeline(stages=[country_indexer, stock_code_indexer, assembler, best_regressor])

# Train the model
model = pipeline.fit(train_data)

# Get predictions on test data
test_predictions = model.transform(test_data)

# Evaluate MAE
mae_evaluator = RegressionEvaluator(
    labelCol="Quantity", 
    predictionCol="prediction", 
    metricName="mae"
)

mae = mae_evaluator.evaluate(test_predictions)
print(f"MAE for LinearRegression: {mae}")


MAE for LinearRegression: 20.09193735248746


In [16]:
# Filter predictions for week 42 of 2011
predicted_weekly_sales = test_predictions.groupBy("Year", "Week").agg({"prediction": "sum"})
predicted_quantity = int(predicted_weekly_sales.filter(col('Week')==42).select("sum(prediction)").collect()[0][0])
print(f'Predicted Quantity Sold week 42 of 2011: {predicted_quantity} Units')

# Filter actual data for week 42 of 2011
actual_weekly_sales = test_data.groupBy("Year", "Week").agg({"Quantity": "sum"})
actual_quantity = actual_weekly_sales.filter(col('Week') == 42).select("sum(Quantity)").collect()[0][0]

if actual_quantity is not None:
    print(f'Actual Quantity Sold week 42 of 2011: {actual_quantity} Units')
    
    # Compare predicted vs actual
    print(f'Comparison - Predicted vs Actual:')
    print(f'Predicted: {predicted_quantity} Units')
    print(f'Actual: {actual_quantity} Units')
    difference = predicted_quantity - actual_quantity
    print(f'Difference: {difference} Units')
else:
    print('No actual data found for week 42 of 2011 in the test dataset.')


Predicted Quantity Sold week 42 of 2011: 118329 Units
Actual Quantity Sold week 42 of 2011: 150811 Units
Comparison - Predicted vs Actual:
Predicted: 118329 Units
Actual: 150811 Units
Difference: -32482 Units


# 9. Recommendations

The model could benefit from more features. 

While the LinearRegression model is performing better than other regressors based on the MAE comparison, it's still not perfectly accurate. The negative difference suggests that there may be underlying patterns or factors not captured effectively by the current features and model setup.

Explore additional relevant features that might influence sales, such as promotional activities, economic indicators, seasonality adjustments, or customer demographics.

Model Tuning: Fine-tune the parameters of the LinearRegression model or explore other algorithms that might capture non-linear relationships better.



In [17]:
#