In [0]:
pip install darts

In [0]:
# Core
import pandas as pd
import numpy as np

# Spark
from pyspark.sql import functions as F

# MLflow
import mlflow

# Statsmodels
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.forecasting.theta import ThetaModel


In [0]:
CATALOG = "dev_ml"
SCHEMA = "raw_data"
RAW_TABLE = "sales_transactions"

TIME_COL = "ds"
TARGET_COL = "y"
HORIZON = 14

MODEL_NAME = "demand_forecast_model"
EXPERIMENT_NAME = "/Shared/python312-forecasting"


In [0]:
mlflow.set_tracking_uri("databricks")
mlflow.set_registry_uri("databricks-uc")
mlflow.set_experiment(EXPERIMENT_NAME)


In [0]:
from pyspark.sql import functions as F

# Load raw transactional data from Unity Catalog
spark_df = spark.table(f"{CATALOG}.{SCHEMA}.{RAW_TABLE}")

# Aggregate to DAILY time series
daily_df = (
    spark_df
    .withColumn("ds", F.to_date("dateTime"))   # extract date
    .groupBy("ds")
    .agg(
        F.sum("totalPrice").alias("y")         # target variable
    )
    .orderBy("ds")
)

display(daily_df.limit(10))


In [0]:
pdf = daily_df.toPandas()
pdf.set_index(TIME_COL, inplace=True)
pdf = pdf.asfreq("D")
pdf[TARGET_COL] = pdf[TARGET_COL].ffill()

train = pdf.iloc[:-HORIZON]
test = pdf.iloc[-HORIZON:]


In [0]:
def mape(y_true, y_pred):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100


In [0]:
results = []

with mlflow.start_run(run_name="notebook_train"):

    # ---- Exponential Smoothing ----
    with mlflow.start_run(run_name="ExponentialSmoothing", nested=True):
        es = ExponentialSmoothing(
            train[TARGET_COL],
            trend="add",
            seasonal=None
        ).fit()

        es_forecast = es.forecast(HORIZON)
        es_mape = mape(test[TARGET_COL], es_forecast)

        mlflow.log_param("model", "ExponentialSmoothing")
        mlflow.log_metric("MAPE", es_mape)

        results.append(("ExponentialSmoothing", es, es_mape))

    # ---- Theta Model ----
    with mlflow.start_run(run_name="Theta", nested=True):
        theta = ThetaModel(train[TARGET_COL]).fit()
        theta_forecast = theta.forecast(HORIZON)
        theta_mape = mape(test[TARGET_COL], theta_forecast)

        mlflow.log_param("model", "Theta")
        mlflow.log_metric("MAPE", theta_mape)

        results.append(("Theta", theta, theta_mape))


In [0]:
best_model_name, best_model, best_score = min(results, key=lambda x: x[2])

mlflow.log_param("best_model", best_model_name)
mlflow.log_metric("best_mape", best_score)

print("✅ Best model:", best_model_name)
print("✅ Best MAPE:", best_score)


In [0]:
import mlflow
import mlflow.pyfunc
from mlflow.models import infer_signature
import pandas as pd

class ForecastModel(mlflow.pyfunc.PythonModel):

    def __init__(self, model, horizon):
        self.model = model
        self.horizon = horizon

    def predict(self, context, model_input):
        """
        model_input is ignored here, but required for MLflow signature
        """
        forecast = self.model.forecast(self.horizon)
        return pd.DataFrame({"forecast": forecast.values})


# ---- Create example input & output for signature ----

example_input = pd.DataFrame({
    "dummy": [0]   # placeholder input
})

example_output = pd.DataFrame({
    "forecast": best_model.forecast(HORIZON).values
})

signature = infer_signature(example_input, example_output)

# ---- Log model WITH signature ----

wrapped_model = ForecastModel(best_model, HORIZON)

mlflow.pyfunc.log_model(
    artifact_path="model",
    python_model=wrapped_model,
    signature=signature,
    input_example=example_input
)


In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()

client.set_registered_model_alias(
    name=uc_model_name,
    alias="prod",
    version=1   # or the version returned after registration
)

print("✅ Alias 'prod' assigned to model")


In [0]:
run_id = mlflow.active_run().info.run_id
SCHEMA = "models"
uc_model_name = f"{CATALOG}.{SCHEMA}.{MODEL_NAME}"

mlflow.register_model(
    model_uri=f"runs:/{run_id}/model",
    name=uc_model_name
)

print("✅ Model registered:", uc_model_name)


In [0]:
future_forecast = best_model.forecast(HORIZON)

forecast_df = pd.DataFrame({
    "forecast": future_forecast
})

display(forecast_df)


In [0]:
model_uri = f"models:/{uc_model_name}@prod"

loaded_model = mlflow.pyfunc.load_model(model_uri)
print(loaded_model)
forecast = loaded_model.predict(pd.DataFrame({"dummy": [0]}))

forecast
