In [0]:
from pyspark.sql import functions as F

gold_df = spark.table("walmart_cat.gold.sales_features")

gold_df.printSchema()


root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- IsHoliday: boolean (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- ingestion_ts: timestamp (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- week_of_year: integer (nullable = true)
 |-- is_holiday: integer (nullable = true)
 |-- lag_1_week: double (nullable = true)
 |-- lag_2_week: double (nullable = true)
 |-- lag_4_week: double (nullable = true)
 |-- rolling_avg_4w: d

In [0]:
label_col = "Weekly_Sales"


In [0]:
feature_cols = [
    "lag_1_week",
    "lag_2_week",
    "lag_4_week",
    "rolling_avg_4w",
    "month",
    "week_of_year",
    "is_holiday"
]


In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

model_df = assembler.transform(gold_df).select("features", label_col)


In [0]:
train_df = model_df.filter(F.col("week_of_year") <= 40)
test_df = model_df.filter(F.col("week_of_year") > 40)


In [0]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(
    featuresCol="features",
    labelCol=label_col,
    numTrees=50,
    maxDepth=10,
    seed=42
)

model = rf.fit(train_df)


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_df)

evaluator = RegressionEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")


RMSE: 12120.259620337856


In [0]:
%sql
CREATE VOLUME IF NOT EXISTS walmart_cat.gold.model_artifacts;


In [0]:
model.write().overwrite().save(
    "/Volumes/walmart_cat/gold/model_artifacts/rf_sales_model"
)
