In [0]:
%pip install holidays tqdm skforecast==0.14.0 scikit-learn==1.5.0 xgboost==2.1.3 matplotlib==3.10.0

In [0]:
dbutils.library.restartPython()

In [0]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from functools import partial
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error
from tqdm import tqdm

from exog import get_day, get_dayofweek, get_month
from cust_holidays import get_all_holidays
from ts_ops import resample

pd.options.display.float_format = "{:,.2f}".format

In [0]:
DATE_COL = "SETTLEMENT_DATE"
AMOUNT_COL = "AMOUNT"
INPUT_DATA_PATH = "/Volumes/ts_catalog/ts_data/ts_input/serie_clean"
MODEL_PATH = "/notebooks/dbx/forecaster.joblib"

In [0]:
loaded_spark_df = spark.read.format("delta").load(INPUT_DATA_PATH)
data = loaded_spark_df.toPandas()

In [0]:
data.index = data[DATE_COL]
data = data.asfreq('B')

In [0]:
data

In [0]:
features_exog = {
    "day": partial(get_day, date_col=DATE_COL),
    "month": partial(get_month, date_col=DATE_COL),
    "day_of_week": partial(get_dayofweek, date_col=DATE_COL)
}

lags = []

In [0]:
def apply_functions(y, features_to_func):
    cols = []
    for fn in features_to_func:
        cols.append(y.apply(features_to_func[fn], axis=1).rename(fn))
    df_out = pd.concat(cols, axis=1)
    df_out.index = y.index
    return df_out

#### experiment parameters

In [0]:
date_thr_0 = "2024-08-01"  # train - test_1 threshold
length_window_0 = 120  # approx steps for test_1
length_window_1 = 60  # approx steps for test_2

In [0]:
X = apply_functions(data, features_exog)
y = data[AMOUNT_COL]

In [0]:
X

In [0]:
y

#### Variabili rolling

In [0]:
from skforecast.preprocessing import RollingFeatures

# for custom window features..
# https://skforecast.org/0.14.0/user_guides/window-features-and-custom-features.html

window_features = RollingFeatures(
    stats=["mean", "mean", "std", "std", "coef_variation"], window_sizes=[10, 5, 10, 5, 10]
)

In [0]:
y.index.min(), y.index.max()

#### Train/test split

In [0]:
years_of_interest = list(range(pd.Timestamp.today().year - 5, pd.Timestamp.today().year + 2))
print(years_of_interest)
list_holidays = [x.strftime("%Y-%m-%d") for x in get_all_holidays(years_of_interest)]

In [0]:
date_thr_1 = pd.to_datetime(date_thr_0) + pd.offsets.CustomBusinessDay(length_window_0, holidays=list_holidays)
date_thr_2 = pd.to_datetime(date_thr_1) + pd.offsets.CustomBusinessDay(length_window_1, holidays=list_holidays)

y_train = y.loc[y.index < date_thr_0]
y_test_1 = y.loc[(y.index >= date_thr_0) & (y.index < date_thr_1)]
y_test_2 = y.loc[(y.index >= date_thr_1) & (y.index < date_thr_2)]

X_train = X.loc[y_train.index]
X_test_1 = X.loc[y_test_1.index]
X_test_2 = X.loc[y_test_2.index]

In [0]:
y_train.shape, y_test_1.shape, y_test_2.shape

In [0]:
window_features.transform_batch(y).head(10)

#### Definizione Forecaster

In [0]:
from skforecast.model_selection import TimeSeriesFold

lags_grid = [11, 22]
param_grid = {
    "learning_rate": [0.05], #, 0.1],
    "max_depth": [5, 6], #, 7],
    "n_estimators": [50], #, 100, 200],
    "colsample_bytree": [0.5], #, 1],
}


cv = TimeSeriesFold(
    steps=60,
    initial_train_size=60,
    gap=0,
    refit=True,
    fixed_train_size=False,
)

In [0]:
from xgboost import XGBRegressor
from skforecast.recursive import ForecasterRecursive

In [0]:
forecaster = ForecasterRecursive(
    regressor=XGBRegressor(random_state=321),
    lags=100,
    window_features=window_features,
    forecaster_id="forecasting_series_y",
)

"""
forecaster.fit(
    y=y_train,
    exog=X_train,
    store_last_window=False,
    store_in_sample_residuals=True,
    random_state=412,
)
"""

#### Grid search

In [0]:
from skforecast.model_selection import grid_search_forecaster

In [0]:


results_grid = grid_search_forecaster(
    forecaster=forecaster,
    y=y_train,
    exog=X_train,
    cv=cv,
    param_grid=param_grid,
    lags_grid=lags_grid,
    metric="mean_absolute_error",
    return_best=True,
    verbose=False,
    show_progress=True,
    n_jobs=6,
)



In [0]:
import tempfile
import mlflow
import mlflow.sklearn
from skforecast.utils import save_forecaster
from skforecast.utils import load_forecaster

def train_and_log_forecaster(
    forecaster,
    y_train,
    X_train,
    cv,
    param_grid,
    lags_grid,
    window_features,
    experiment_name,
    model_name
):
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run(run_name="xgb_forecaster_tuning") as run:
        # Grid search
        results_grid = grid_search_forecaster(
            forecaster=forecaster,
            y=y_train,
            exog=X_train,
            cv=cv,
            param_grid=param_grid,
            lags_grid=lags_grid,
            metric="mean_absolute_error",
            return_best=True,
            verbose=False,
            show_progress=True,
            n_jobs=6,
        )
        # Log best parameters and metric
        best_params = results_grid.loc[0, "params"]
        best_lags = results_grid.loc[0, "lags"]
        best_metric = results_grid.loc[0, "mean_absolute_error"]
        mlflow.log_params(best_params)
        mlflow.log_param("lags", best_lags)
        mlflow.log_metric("mean_absolute_error", best_metric)
        
        with tempfile.TemporaryDirectory() as tmp_dir:
            model_path = os.path.join(tmp_dir, "forecaster.pkl")
            save_forecaster(forecaster, file_name=model_path, verbose=False)
            mlflow.log_artifact(model_path, artifact_path="model")

            feature_importance = forecaster.get_feature_importances(sort_importance=True)
            fi_path = os.path.join(tmp_dir, "feature_importance.csv")
            feature_importance.to_csv(fi_path)
            mlflow.log_artifact(fi_path, artifact_path="feature_importance")

        mlflow.register_model(
                model_uri=f"runs:/{run.info.run_id}/model/forecaster.pkl",
                name=model_name
            )

    return results_grid


In [0]:
import os
from datetime import datetime
os.environ['MLFLOW_USE_DATABRICKS_SDK_MODEL_ARTIFACTS_REPO_FOR_UC'] = 'True'

mlflow.set_registry_uri("databricks-uc")

catalog = "ts_catalog"  # Replace with your Unity Catalog name
schema = "models"    # Replace with your Unity Catalog schema name

user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
experiment_name = f"/Users/{user}/my_forecasting_experiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
model_name = f"{catalog}.{schema}.xgb_forecaster"

results_grid = train_and_log_forecaster(
    forecaster=forecaster,
    y_train=y_train,
    X_train=X_train,
    cv=cv,
    param_grid=param_grid,
    lags_grid=lags_grid,
    window_features=window_features,
    experiment_name=experiment_name,
    model_name=model_name
)
display(results_grid)

### PRove da chatgpt

In [0]:
import mlflow.pyfunc
import pickle

class SkforecastWrapper(mlflow.pyfunc.PythonModel):

    def load_context(self, context):
        # Carica il forecaster serializzato
        with open(context.artifacts["forecaster"], "rb") as f:
            self.forecaster = pickle.load(f)

    def predict(self, context, model_input):
        """
        model_input: dict con chiavi:
            - 'last_window': pd.Series
            - 'exog': pd.DataFrame
            - 'steps': int
        """
        # Validazione minima
        if not isinstance(model_input, dict):
            raise ValueError("model_input must be a dict with keys: 'last_window', 'exog', 'steps'.")

        last_window = model_input.get("last_window")
        exog = model_input.get("exog")
        steps = model_input.get("steps")

        if last_window is None or exog is None or steps is None:
            raise ValueError("Missing one or more required inputs: 'last_window', 'exog', 'steps'.")

        # Previsione
        return self.forecaster.predict(
            steps=steps,
            last_window=last_window,
            exog=exog
        )


In [0]:
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, ColSpec

signature = ModelSignature(
    inputs=None,  # oppure una signature adatta al tuo dict serializzato
    outputs=Schema([
        ColSpec(type="double", name="forecast")
    ])
)


In [0]:
import mlflow
import os
import pickle
import tempfile

mlflow.set_registry_uri("databricks") # you need the Model Registry/UC to be enabled 
#model_name_uc = "ts_catalog__models__my_forecaster_pyfunc" # you need the Model Registry/UC to be enabled

user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
experiment_name = f"/Users/{user}/my_forecasting_experiment"
mlflow.set_experiment(experiment_name)
with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp_file:
    with open(tmp_file.name, "wb") as f:
        pickle.dump(forecaster, f)
    pickle_path = tmp_file.name

with mlflow.start_run(run_name='forecaster_train_run'):
    mlflow.pyfunc.log_model(
        artifact_path="skforecast_model",
        python_model=SkforecastWrapper(),
        artifacts={"forecaster": pickle_path},
        # registered_model_name=model_name_uc, # you need the Model Registry/UC to be enabled
        signature=signature
    )

# (opzionale) rimuovi file temporaneo
os.remove(pickle_path)


In [0]:
import mlflow


experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id

# Search for runs with the given run_name, sorted by start_time descending
runs = mlflow.search_runs(
    experiment_ids=[experiment_id],
    filter_string="tags.mlflow.runName = 'forecaster_train_run'",
    order_by=["start_time DESC"]
)

if not runs.empty:
    last_run_id = runs.iloc[0].run_id
    print(f"Last run_id for run_name: {last_run_id}")
    # Load the model
    model = mlflow.pyfunc.load_model(f"runs:/{last_run_id}/skforecast_model")
else:
    print("No runs found with the given run_name.")

In [0]:
input_dict = {
    "last_window": pd.concat([y_train, y_test_1], axis=0),  # pd.Series
    "exog": X_test_2,                # pd.DataFrame
    "steps": len(X_test_2)               # int
}

model.predict(input_dict)

In [0]:
forecaster.regressor.get_params()

In [0]:
X_example, y_example = forecaster.create_train_X_y(y=y_train)
print(X_example.columns.tolist())

#### Feature importance

In [0]:
fi = forecaster.get_feature_importances(sort_importance=True)
fi

#### Model save/load

In [0]:
from skforecast.utils import save_forecaster
from skforecast.utils import load_forecaster

save_forecaster(forecaster, file_name=MODEL_PATH, verbose=True)

In [0]:
forecaster = load_forecaster(MODEL_PATH, verbose=True)

#### Predict su periodo adiacente il training set

In [0]:
from skforecast.model_selection import backtesting_forecaster

bt_cv = TimeSeriesFold(steps=60, initial_train_size=len(y_train), refit=False, allow_incomplete_fold=False)

bt_metric, bt_prediction = backtesting_forecaster(
    forecaster=forecaster,
    y=pd.concat([y_train, y_test_1], axis=0),
    exog=pd.concat([X_train, X_test_1], axis=0),
    cv=bt_cv,
    metric="mean_absolute_error",
    n_jobs=6,
    verbose=True,
    show_progress=True,
)

In [0]:
bt_metric

In [0]:
bt_prediction

In [0]:
fig, ax = plt.subplots(1, 1, figsize=(8, 4))

y_test_1.plot(ax=ax, label="test_1", color="y")
bt_prediction["pred"].plot(ax=ax, label="bt pred", color="b")
fig.legend()

In [0]:
# Predizione in un'unica soluzione
'''
predict_steps = len(y_test_1)
y_pred_1 = forecaster.predict(steps=predict_steps, last_window=y_train, exog=X_test_1)
y_pred_1
'''

#### Predict su periodo non adiacente il training set

In [0]:
predict_steps_2 = len(y_test_2)
last_window = pd.concat([y_train, y_test_1], axis=0)

y_pred_2 = forecaster.predict(steps=predict_steps_2, last_window=last_window, exog=X_test_2)

In [0]:
fig, ax = plt.subplots(1, 1, figsize=(8, 4))

y_test_1.plot(ax=ax, label="test_1", color="y")
bt_prediction["pred"].plot(ax=ax, label="bt pred", color="b")
y_test_2.plot(ax=ax, label="test_2", color="y")
y_pred_2.plot(ax=ax, label="pred_2", color="g")
fig.legend()

#### Predict + prediction interval con bootstrap

https://skforecast.org/0.14.0/user_guides/probabilistic-forecasting.html

In [0]:
def get_coverage(y_true, y_lower, y_upper):
    inside_interval = np.where((y_true >= y_lower) & (y_true <= y_upper), True, False)
    return inside_interval.mean()

In [0]:
def plot_pi(y_true, y_pred, y_lb, y_ub, lbl_pi):

    for ind in (y_pred.index, y_lb.index, y_ub.index):
        assert y_true.index.equals(ind)

    fig, ax = plt.subplots(1, 1, figsize=(8, 4))

    y_true.plot(ax=ax, label="true", color="k", linestyle="--")

    ax.fill_between(y_lb.index, y_lb, y_ub, color="deepskyblue", alpha=0.9, label=lbl_pi)

    out_of_bounds = (y_true < y_lb) | (y_true > y_ub)
    ax.vlines(
        y_true.index[out_of_bounds],
        ymin=y_lb.min(),
        ymax=y_ub.max(),
        linewidth=0.5,
        color="b",
        linestyle="--",
        zorder=5,
        label="True value outside interval",
    )

    y_pred.plot(ax=ax, label="prediction", color="g")
    # ax.set_xlim('2024-11-01', '2024-12-10')
    ax.text(
        0.95,
        0.05,
        f"coverage = {get_coverage(y_true, y_lb, y_ub):.2f}",
        transform=ax.transAxes,
        fontsize=7,
        verticalalignment="bottom",
        horizontalalignment="right",
        color="black",
    )
    ax.text(
        0.95,
        0.00,
        f"mae = {mean_absolute_error(y_true, y_pred):.0f}",
        transform=ax.transAxes,
        fontsize=7,
        verticalalignment="bottom",
        horizontalalignment="right",
        color="black",
    )

    fig.legend(loc="lower left")


In [0]:
forecaster.set_out_sample_residuals(y_true=y_test_1.loc[bt_prediction.index], y_pred=bt_prediction["pred"])

predict_steps_2 = len(y_test_2)
last_window = pd.concat([y_train, y_test_1], axis=0)

In [0]:
pred_2_ord = forecaster.predict_interval(
    steps=predict_steps_2,
    last_window=last_window,
    exog=X_test_2,
    interval=[10, 90],
    n_boot=20,  # default 250, spesso usano 1000
    use_in_sample_residuals=False,  # True -> usa quelli ricavati in training set
    use_binned_residuals=False,  # feature sperimentale, per condizionare i residui alla predizione
)

In [0]:
plot_pi(
    y_true=y_test_2,
    y_pred=pred_2_ord["pred"],
    y_lb=pred_2_ord["lower_bound"],
    y_ub=pred_2_ord["upper_bound"],
    lbl_pi="ordinary bs PI",
)