In [1]:
# Install PySpark
!pip install pyspark

# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.evaluation import RegressionMetrics

# Set up Spark Session
spark = SparkSession.builder \
    .appName("Retail Demand Forecasting") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()




In [3]:
# Load dataset from CSV file
file_path = "/content/Online_Retail.csv"  # Update with your file path
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the schema to verify correct loading
df.printSchema()

# Display a sample of the dataset
df.show(5)


root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|  

In [5]:
# Set legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Convert InvoiceDate to timestamp
df = df.withColumn("InvoiceDate", to_timestamp("InvoiceDate", "MM/dd/yyyy HH:mm"))

# Create a new column for date (excluding time) for aggregation
df = df.withColumn("Date", to_date("InvoiceDate"))

# Filter out rows with missing or invalid values
df = df.na.drop(subset=["StockCode", "Quantity", "InvoiceDate", "UnitPrice"])

# Remove rows with negative or zero Quantity
df = df.filter(df.Quantity > 0)

# Display preprocessed data
df.show(5)


+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|      Date|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|     17850|United Kingdom|2010-12-01|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|     17850|United Kingdom|2010-12-01|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|     17850|United Kingdom|2010-12-01|
+---------+---------+--------------------+------

In [6]:
# Aggregate data at StockCode and Date level
agg_data = df.groupBy("StockCode", "Date") \
    .agg(sum("Quantity").alias("TotalQuantity")) \
    .orderBy("Date")

# Display the aggregated data
agg_data.show(5)

+---------+----------+-------------+
|StockCode|      Date|TotalQuantity|
+---------+----------+-------------+
|    21559|2010-12-01|            8|
|   84029E|2010-12-01|          551|
|   35004C|2010-12-01|          174|
|    22086|2010-12-01|          274|
|    21210|2010-12-01|           13|
+---------+----------+-------------+
only showing top 5 rows



In [9]:
# Add lag features using a window function
window_spec = Window.partitionBy("StockCode").orderBy("Date")
agg_data = agg_data.withColumn("Lag_1", lag("TotalQuantity", 1).over(window_spec)) \
                   .withColumn("Lag_2", lag("TotalQuantity", 2).over(window_spec)) \
                   .withColumn("Lag_3", lag("TotalQuantity", 3).over(window_spec))

# Drop rows with null values created by lag features
agg_data = agg_data.na.drop()

# Display the data
agg_data.show(5)

+---------+----------+-------------+-----+-----+-----+
|StockCode|      Date|TotalQuantity|Lag_1|Lag_2|Lag_3|
+---------+----------+-------------+-----+-----+-----+
|    10002|2010-12-13|           27|   48|   44|   13|
|    10002|2010-12-14|            7|   27|   48|   44|
|    10002|2010-12-16|            5|    7|   27|   48|
|    10002|2010-12-17|            2|    5|    7|   27|
|    10002|2010-12-20|            2|    2|    5|    7|
+---------+----------+-------------+-----+-----+-----+
only showing top 5 rows



In [10]:
# Assemble features into a single vector
feature_cols = ["Lag_1", "Lag_2", "Lag_3"]
agg_data = agg_data.withColumn("features", array(*feature_cols))

# Split the data into training and testing sets
train_data, test_data = agg_data.randomSplit([0.8, 0.2], seed=42)

# Display the sizes of the training and testing datasets
print(f"Training Data Count: {train_data.count()}")
print(f"Testing Data Count: {test_data.count()}")


Training Data Count: 196142
Testing Data Count: 49053


In [11]:
# Convert data to RDD for scaling
train_rdd = train_data.rdd.map(lambda row: LabeledPoint(row["TotalQuantity"], Vectors.dense(row["features"])))
test_rdd = test_data.rdd.map(lambda row: LabeledPoint(row["TotalQuantity"], Vectors.dense(row["features"])))

# Scale the features
scaler = StandardScaler(withMean=True, withStd=True).fit(train_rdd.map(lambda x: x.features))
train_rdd_scaled = train_rdd.map(lambda lp: LabeledPoint(lp.label, scaler.transform(lp.features)))
test_rdd_scaled = test_rdd.map(lambda lp: LabeledPoint(lp.label, scaler.transform(lp.features)))


In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

# Assemble features into a single vector (convert array to VectorUDT)
assembler = VectorAssembler(inputCols=["Lag_1", "Lag_2", "Lag_3"], outputCol="features_vector")
train_data_ml = assembler.transform(train_data).select(col("TotalQuantity").alias("label"), col("features_vector").alias("features"))
test_data_ml = assembler.transform(test_data).select(col("TotalQuantity").alias("label"), col("features_vector").alias("features"))

# Define the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="label", maxIter=100, regParam=0.3, elasticNetParam=0.8)

# Train the model
lr_model = lr.fit(train_data_ml)

# Display model coefficients and intercept
print(f"Coefficients: {lr_model.coefficients}")
print(f"Intercept: {lr_model.intercept}")


Coefficients: [0.09469991208579596,0.10811459367712797,0.11838071215482396]
Intercept: 13.80930616836672


In [15]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on the test set
predictions = lr_model.transform(test_data_ml)

# Evaluate the model using RMSE and MAE
evaluator_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)

evaluator_mae = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mae")
mae = evaluator_mae.evaluate(predictions)

# Print evaluation metrics
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")

# Display sample predictions
predictions.select("features", "label", "prediction").show(5)


Root Mean Squared Error (RMSE): 62.97587733077631
Mean Absolute Error (MAE): 20.19762219441922
+---------------+-----+------------------+
|       features|label|        prediction|
+---------------+-----+------------------+
|[7.0,27.0,48.0]|    5|23.073573765681296|
| [12.0,2.0,2.0]|   60|15.398695725060175|
|[1.0,60.0,12.0]|   24|21.811450246938083|
| [24.0,1.0,7.0]|   13| 17.01888363718672|
| [14.0,2.0,1.0]|  132|15.469714837076944|
+---------------+-----+------------------+
only showing top 5 rows



In [17]:
# Convert features to a string format for saving
predictions_to_save = predictions.withColumn("features", col("features").cast("string"))

# Save predictions to a CSV file
output_path = "/content/predictions.csv"
predictions_to_save.select("features", "label", "prediction").write.csv(output_path, header=True)

print(f"Predictions saved to {output_path}")


Predictions saved to /content/predictions.csv
