In [39]:
# Import required libraries
import mlflow
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.functions import col, dayofmonth, month, year,  to_date, to_timestamp, weekofyear, dayofweek
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import VectorAssembler, Tokenizer, HashingTF, IDF, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder # in case we have compute ressources

In [25]:
# Initialize Spark session
my_spark = SparkSession.builder.appName("SalesForecast").getOrCreate()

In [26]:
# Importing sales data
sales_data = my_spark.read.csv("Online Retail.csv", 
                               header=True, 
                               inferSchema=True, 
                               sep=",")
# Display the schema
sales_data.printSchema()

[Stage 40:>                                                         (0 + 2) / 2]

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Week: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)



                                                                                

In [27]:
# Define renaming dictionary
renaming_dict = {
    "InvoiceNo": "invoice_no",
    "StockCode": "stock_code",
    "Description": "description",
    "Quantity": "quantity",
    "UnitPrice": "unit_price",
    "CustomerID": "customer_id",
    "Country": "country",
    "InvoiceDate": "invoice_date",
    "Year": "year",
    "Month": "month",
    "Week": "week",
    "Day": "day",
    "DayOfWeek": "day_of_week"
}

In [28]:
class RenameColumns(Transformer):
    def __init__(self, renaming_dict):
        super(RenameColumns, self).__init__()
        self.renaming_dict = renaming_dict

    def _transform(self, dataset: DataFrame) -> DataFrame:
        return dataset.select([col(c).alias(self.renaming_dict.get(c, c)) for c in dataset.columns])

In [29]:
class MeanEncoder(Transformer):
    def __init__(self, inputCol=None, targetCol=None, outputCol=None):
        super(MeanEncoder, self).__init__()
        self.inputCol = inputCol
        self.targetCol = targetCol
        self.outputCol = outputCol

    def _transform(self, df: DataFrame) -> DataFrame:
        # Calculate the mean of the target column for each unique value in the input column
        encoding_df = df.groupBy(self.inputCol).agg(F.mean(self.targetCol).alias(self.outputCol))
        # Join this mean encoding back to the original DataFrame
        return df.join(encoding_df, on=self.inputCol, how="left")

In [30]:
# stage 1 : Create an instance of the RenameColumns transformer
rename_transformer = RenameColumns(renaming_dict)

# stage 2 : Create the country StringIndexer
country_indexer = StringIndexer(inputCol="country",
                                outputCol="country_index")

# stage 3 : Create the country OneHotEncoder
country_encoder = OneHotEncoder(inputCol="country_index",
                                outputCol="country_fact")

# stage 4 : Create the stock_code MeanEncoder
mean_encoder = MeanEncoder(inputCol="stock_code", 
                           targetCol="quantity", 
                           outputCol="stock_code_mean")

# stage 5 : Create the descripotion Tokenizer
tokenizer = Tokenizer(inputCol="description", 
                      outputCol="description_words")

# stage 6 : Create the HashingTF 
hashing_tf = HashingTF(inputCol="description_words", 
                       outputCol="description_tf", 
                       numFeatures=1000)
# stage 7 : Create the IDF
idf = IDF(inputCol="description_tf", outputCol="description_tfidf")

# stage 8 : Create the day_of_week_encoder
day_of_week_encoder = OneHotEncoder(inputCols=["day_of_week"], outputCols=["day_of_week_encoded"])

# stage 9 : Create the month_encoder
month_encoder = OneHotEncoder(inputCols=["month"], outputCols=["month_encoded"])

# stage 10 : Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["invoice_no",
                                           "country_fact", 
                                           "unit_price", 
                                           "customer_id", 
                                           "year", 
                                           "month_encoded",
                                           "week",
                                           "day",
                                           "day_of_week_encoded",
                                           "stock_code_mean",
                                           "description_tfidf"], 
                                outputCol="features")
# Make the pipeline
demand_pipe = Pipeline(stages=[rename_transformer,
                               country_indexer, 
                               country_encoder,
                               mean_encoder,
                               tokenizer,
                               hashing_tf,
                               idf,
                               day_of_week_encoder,
                               month_encoder,
                               vec_assembler])

In [31]:
# Create a RandomForest Estimator
rf_regressor = RandomForestRegressor(featuresCol="features", 
                                     labelCol="quantity",
                                     maxDepth =2,
                                     numTrees=20)
# Create the RegressionEvaluator instances
rmse_evaluator = RegressionEvaluator(labelCol="quantity", predictionCol="prediction", metricName="rmse")
mae_evaluator = RegressionEvaluator(labelCol="quantity", predictionCol="prediction", metricName="mae")
r2_evaluator = RegressionEvaluator(labelCol="quantity", predictionCol="prediction", metricName="r2")

In [32]:
# Fit and transform your data with the pipeline
pipeline_model = demand_pipe.fit(sales_data)
transformed_data = pipeline_model.transform(sales_data)

# Split the data into training and test sets
training, test = transformed_data.randomSplit([.8, .2])

"""
# In case we have computational power 

# Create the parameter grid
grid_regressor = ParamGridBuilder() \
        .addGrid(rf_regressor.maxDepth, [5, 15]) \
        .addGrid(rf_regressor.numTrees, [20, 50]) \
        .build()
# Create CrossValidator
crossval_regressor = CrossValidator(estimator=rf_regressor,
                                     estimatorParamMaps=grid_regressor,
                                     evaluator=regressor_evaluator,
                                     numFolds=3)  # 3-fold cross-validation
# Fit the model
models = crossval_regressor.fit(training)

# Extract the best model
model = models.bestModel
"""

# Fit the model
model = rf_regressor.fit(training)

24/10/26 12:15:07 WARN MemoryStore: Not enough space to cache rdd_189_1 in memory! (computed 28.8 MiB so far)
24/10/26 12:15:07 WARN BlockManager: Persisting block rdd_189_1 to disk instead.
24/10/26 12:15:09 WARN MemoryStore: Not enough space to cache rdd_189_0 in memory! (computed 152.2 MiB so far)
24/10/26 12:15:09 WARN BlockManager: Persisting block rdd_189_0 to disk instead.
24/10/26 12:15:12 WARN MemoryStore: Not enough space to cache rdd_189_1 in memory! (computed 348.8 MiB so far)
24/10/26 12:15:14 WARN MemoryStore: Not enough space to cache rdd_189_0 in memory! (computed 64.9 MiB so far)
24/10/26 12:15:19 WARN MemoryStore: Not enough space to cache rdd_189_0 in memory! (computed 152.2 MiB so far)
24/10/26 12:15:19 WARN MemoryStore: Not enough space to cache rdd_189_1 in memory! (computed 232.4 MiB so far)
                                                                                

In [36]:
# Use the model to predict the test set
test_results = model.transform(test)

# Calculate each metric
mean_quantity = sales_data.agg(F.mean("Quantity").alias("mean_quantity")).collect()[0]['mean_quantity']
rmse = rmse_evaluator.evaluate(test_results)
rrmse = 100*(rmse/mean_quantity)
r2 = r2_evaluator.evaluate(test_results)

# Print the results
print(f"RMSE: {rmse}")
print(f"RRMSE: {rrmse}")
print(f"R²: {r2}")



RMSE: 8.4400017516277
MAE: 5.855641517677259
R²: 0.19130013266924561


                                                                                

Relative Scale of rmse: 101.6082768076977


In [52]:
mlflow.set_experiment("demand_forecasting_random_forest")

# the model parameters
params = {
    'maxDepth': 5,
    'numTrees': 50,
}
run_name = f"maxDepth_{params['maxDepth']}_numTrees_{params['numTrees']}"

# Start MLflow tracking
with mlflow.start_run(run_name=run_name): 
    rf_regressor = RandomForestRegressor(featuresCol="features",
                                        labelCol="quantity",
                                        maxDepth =params["maxDepth"],
                                        numTrees=params["numTrees"])  
    model = rf_regressor.fit(training)
    test_results = model.transform(test)
    rmse = rmse_evaluator.evaluate(test_results)
    rrmse = 100*(rmse/mean_quantity)
    r2 = r2_evaluator.evaluate(test_results)
    
    # Log metrics
    mlflow.log_metric("RMSE", rmse)
    mlflow.log_metric("RRMSE", rrmse)
    mlflow.log_metric("R2", r2)
    # Log model
    mlflow.spark.log_model(model, "RandomForestModel")
    # Log parameters
    mlflow.log_params(params)

# End the run
mlflow.end_run()

24/10/26 12:55:20 WARN MemoryStore: Not enough space to cache rdd_1143_1 in memory! (computed 44.4 MiB so far)
24/10/26 12:55:20 WARN BlockManager: Persisting block rdd_1143_1 to disk instead.
24/10/26 12:55:22 WARN MemoryStore: Not enough space to cache rdd_1143_0 in memory! (computed 156.4 MiB so far)
24/10/26 12:55:22 WARN BlockManager: Persisting block rdd_1143_0 to disk instead.
24/10/26 12:55:26 WARN MemoryStore: Not enough space to cache rdd_1143_1 in memory! (computed 358.2 MiB so far)
24/10/26 12:55:28 WARN MemoryStore: Not enough space to cache rdd_1143_0 in memory! (computed 66.6 MiB so far)
24/10/26 12:55:43 WARN MemoryStore: Not enough space to cache rdd_1143_1 in memory! (computed 156.4 MiB so far)
24/10/26 12:55:44 WARN MemoryStore: Not enough space to cache rdd_1143_0 in memory! (computed 238.7 MiB so far)
24/10/26 12:56:01 WARN MemoryStore: Not enough space to cache rdd_1143_0 in memory! (computed 156.4 MiB so far)
24/10/26 12:56:02 WARN MemoryStore: Not enough space t