In [0]:
%pip install missingno
%pip install mlforecast
%pip install window-ops
%pip install numba
%pip install optuna
%pip install polars
%pip install mapie

In [0]:
%restart_python

In [0]:
# Library
import os
import gc
import numpy as np
import pandas as pd
import polars as pls
from typing import Literal
pd.set_option('display.max_columns', None) 
import matplotlib.pyplot as plt
import missingno as msno

from numba import njit
from window_ops.expanding import expanding_mean, expanding_std
from window_ops.rolling import (rolling_mean, rolling_max, rolling_min, rolling_std, seasonal_rolling_mean, seasonal_rolling_std,
                                rolling_correlation, rolling_cv, rolling_mean_positive_only, rolling_kurtosis, rolling_average_days_with_sales)
from window_ops.ewm import ewm_mean

import math

import sklearn
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder
sklearn.set_config(transform_output="pandas")
from sklearn.metrics import mean_squared_error

from mlforecast import MLForecast
from mlforecast.lgb_cv import LightGBMCV
from mlforecast.target_transforms import Differences
from mlforecast.utils import PredictionIntervals
from mlforecast.feature_engineering import transform_exog

from lightgbm import LGBMRegressor
from mapie.regression import MapieRegressor

import optuna
optuna.logging.set_verbosity(optuna.logging.WARNING) # Use this to disable training prints from optuna

import mlflow
from mlflow.models import infer_signature
mlflow.autolog(disable=True)
mlflow.set_registry_uri("databricks-uc")

In [0]:
class CFG:
    horizon=1
    CV=True
    distributed_optuna=False
    n_folds=10
    feat_cols=['current_week_of_year', 'current_year', 'current_month', 'retail', 'week', 'release_year', 'release_month', 'release_day', 'release_day_of_week', 'release_week_of_year', 'price', 'discount']
    customer_cols=['retail_total_sales_qty', 'retail_total_customer_purchase']
    gtrends_cols= [
    "long_sleeve", "culottes", "miniskirt", "short_sleeves",
    "printed_shirt", "short_cardigan", "solid_color_top", 
    "trapeze_dress", "sleeveless", "long_cardigan", 
    "sheath_dress", "short_coat", "medium_coat", 
    "doll_dress", "long_dress", "shorts", 
    "long_coat", "jumpsuit", "drop_sleeve", 
    "patterned_top", "kimono_dress", "medium_cardigan", 
    "shirt_dress", "maxi", "capris", "gitana_skirt", 
    "long_duster", "yellow", "brown", "blue", 
    "grey", "green", "black", "red", 
    "white", "orange", "violet", "acrylic", 
    "scuba_crepe", "tulle", "angora", "faux_leather", 
    "georgette", "lurex", "nice", "crepe", 
    "satin_cotton", "silky_satin", "fur", 
    "matte_jersey", "plisse", "velvet", 
    "lace", "cotton", "piquet", "plush", 
    "bengaline", "jacquard", "frise", 
    "technical", "cady", "dark_jeans", 
    "light_jeans", "ity", "plumetis", 
    "polyviscous", "dainetto", "webbing", 
    "foam_rubber", "chanel", "marocain", 
    "macrame", "embossed", "heavy_jeans", 
    "nylon", "tencel", "paillettes", 
    "chambree", "chine_crepe", 
    "muslin_cotton_or_silk", "linen", 
    "tactel", "viscose_twill", "cloth", 
    "mohair", "mutton", "scottish", 
    "milano_stitch", "devore", "hron", 
    "ottoman", "fluid", "flamed", 
    "fluid_polyviscous", "shiny_jersey", "goose"
    ]
    cat_cols=['category', 'color', 'fabric', 'season_category']


In [0]:
raw_df = spark.read.table("portfolio.end_to_end_demand_forecast.gold_demandforecast2to1_table")


In [0]:
raw_df.createOrReplaceTempView("raw_df")

In [0]:
%sql
SELECT * FROM raw_df

Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT * FROM raw_df
WHERE raw_df.external_code = '324' AND raw_df.retail = 31

Databricks visualization. Run in Databricks to view.

In [0]:
# custom preprocess pipeline

class Preprocessor(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass
    
    def set_output(self, transform: None | Literal['default'] | Literal['pandas'] = None) -> BaseEstimator:
        pass

    def reduce_memory_usage_pls(self, df):
      """ Reduce memory usage by polars dataframe {df} with name {name} by changing its data types.
          Original pandas version of this function: https://www.kaggle.com/code/arjanso/reducing-dataframe-memory-size-by-65 """
      print(f"Memory usage of dataframe is {round(df.estimated_size('mb'), 2)} MB")
      Numeric_Int_types = [pls.Int8,pls.Int16,pls.Int32,pls.Int64]
      Numeric_Float_types = [pls.Float32,pls.Float64]    
      for col in df.columns:
          col_type = df[col].dtype
          c_min = df[col].min()
          c_max = df[col].max()
          if col_type in Numeric_Int_types:
              if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                  df = df.with_columns(df[col].cast(pls.Int8))
              elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                  df = df.with_columns(df[col].cast(pls.Int16))
              elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                  df = df.with_columns(df[col].cast(pls.Int32))
              elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                  df = df.with_columns(df[col].cast(pls.Int64))
          elif col_type in Numeric_Float_types:
              if c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                  df = df.with_columns(df[col].cast(pls.Float32))
              else:
                  pass
          elif col_type == pls.Utf8:
              df = df.with_columns(df[col].cast(pls.Categorical))
          else:
              pass
      print(f"Memory usage of dataframe became {round(df.estimated_size('mb'), 2)} MB")
      return df

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X['ds'] = pd.to_datetime(X['ds'])

        base_X = X.loc[:, ['ds', 'unique_id', 'y'] + CFG.cat_cols + CFG.feat_cols].copy()
        base_X = pls.from_pandas(base_X)
        X = pls.from_pandas(X)
        gc.collect()

        transformed_feats = transform_exog(X.select(['ds', 'unique_id'] + CFG.customer_cols + CFG.gtrends_cols),
                                           lags=[1, 2], 
                                           #lag_transforms={
                                           #    1: [(rolling_mean, 2), (rolling_average_days_with_sales, 2), (rolling_mean_positive_only, 2)],
                                           #    },
                                           num_threads=4)
        transformed_feats = transformed_feats.fill_nan(-1)

        transformed_X = base_X.join(transformed_feats, on=['unique_id', 'ds'], how='left')
        transformed_X = transformed_X.to_pandas()
        del base_X, transformed_feats;gc.collect()

        # only use 1 lag gtrends to avoid data leakage
        CFG.feat_cols.extend([col for col in transformed_X.columns if col not in ['ds', 'unique_id', 'y'] + CFG.cat_cols + CFG.feat_cols + CFG.gtrends_cols + CFG.customer_cols])

        # fill out of stock quantity to 0 (reqired if tweedie loss)
        transformed_X.loc[transformed_X['y'] < 0, 'y'] = 0

        # convert date column to datetime64[ns] is nixtla requirement
        transformed_X['ds'] = pd.to_datetime(transformed_X['ds'], dayfirst=True)
      
        return transformed_X

    def fit_transform(self, X, y=None):
        self.fit(X, y)
        X = self.transform(X, y)
        return X
    
def make_pipeline(df):

    cat_pipeline = ColumnTransformer([
        ('cat_features', OrdinalEncoder(categories='auto', handle_unknown='error'), CFG.cat_cols)
    ], verbose_feature_names_out=False, remainder='passthrough')

    pipeline = Pipeline([
        ('train_data', Preprocessor()),
        ('cat_features', cat_pipeline),
    ]).set_output(transform='pandas')

    return pipeline

In [0]:
raw_df = raw_df.toPandas()
pipeline = make_pipeline(raw_df)
train_df = pipeline.fit_transform(raw_df)

In [0]:
train_df[train_df['unique_id']=='142510AW17short coatbrownfur2017-10-16']

In [0]:
# train and test split following paper experiment setting
train_path = 'dbfs:/FileStore/tables/stfore_train.csv' 
train_ids = spark.read.options(header=True).csv(train_path).toPandas()
test_path = 'dbfs:/FileStore/tables/stfore_test.csv' 
test_ids = spark.read.options(header=True).csv(test_path).toPandas()

def get_unique_id(df):
  df['release_date'] = pd.to_datetime(df['release_date'], format='%Y-%m-%d')
  df['unique_id'] = df['external_code'].astype(str) + df['retail'].astype(str) + df['season'] + df['category'] + df['color'] + df['fabric'] + df['release_date'].astype(str)

  return df['unique_id']

train_ids = get_unique_id(train_ids)
test_ids = get_unique_id(test_ids)

train_data = train_df[train_df['unique_id'].isin(train_ids)].reset_index(drop=True)
test_data = train_df[train_df['unique_id'].isin(test_ids)].reset_index(drop=True)

del train_ids, test_ids; gc.collect()

In [0]:
# black box hyperparameter optimization
import warnings
warnings.filterwarnings('ignore')

def objective(trial):
    # Suggest values of the hyperparameters using a trial object.
    params = {
            'objective': 'tweedie',
            'tweedie_variance_power': trial.suggest_float('tweedie_variance_power', 1.1, 1.499, log=True),
            #'objective': 'regression_l2',
            'metric': 'rmse',
            'learning_rate': 0.10,
            #'zero_as_missing': False,
            #'linear_tree': True,
            'seed': 7113,
            'boosting_type': 'gbdt',
            'reg_alpha': trial.suggest_float('reg_alpha', 1e-8, 10.0, log=True),
            'reg_lambda': trial.suggest_float('reg_lambda', 1e-8, 10.0, log=True),
            'num_leaves': trial.suggest_int('num_leaves', 2, 30),
            'max_depth': trial.suggest_int('max_depth', 2, 25),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.4, 1.0),
            'subsample': trial.suggest_float('subsample', 0.4, 1.0),
            'subsample_freq': trial.suggest_int('subsample_freq', 1, 7),
            'min_child_samples': trial.suggest_int('min_child_samples', 1, 3),
            #'verbose': -1,
            'force_row_wise': True,
            }

    if CFG.CV:
      cv = LightGBMCV(
          freq='W-MON', # weekly-dayofweek *note incorrect format leads nan loss
          #target_transforms=[Differences([1])],
          lags=[1],
          # since available training window is 2 week, lag_transforms not available
          #lag_transforms={
          #    1: [(rolling_mean, 2), (rolling_average_days_with_sales, 2), (rolling_mean_positive_only, 2)],
          #},
          date_features=None,  # computing date features none for already computed
          num_threads=14,
      )

      cv_hist = cv.fit(
          train_data[['unique_id', 'ds', 'y'] + CFG.cat_cols + CFG.feat_cols],
          static_features=CFG.cat_cols,
          n_windows=CFG.n_folds,
          h=CFG.horizon,
          params=params,
          eval_every=10,
          early_stopping_evals=10,
          num_iterations=1000,    
          #compute_cv_preds=True,
          metric='rmse',
      )

      cv_hist_df = pd.DataFrame(cv_hist, columns=['iterations', 'score'])
      best_rmse = cv_hist_df['score'].min()
      
      return best_rmse
    
    else:
      # custom validation here
      # default cv setting is backtest split (note data leakage may happend depends on how data is split)
      raise NotImplementedError
    

In [0]:
study = optuna.create_study(direction='minimize')
if CFG.distributed_optuna:
  study = optuna.create_study(direction='minimize')
  # install distributed_optuna before turning on distributed_optuna in CFG
  #distributed_study.optimize(objective, n_trials=20)
else:
  study.optimize(objective, n_trials=5)

In [0]:
best_params = study.best_params
best_cv_score = study.best_trial.value
print(best_params)
print(best_cv_score)

In [0]:
best_cv_score = 0.8952321706690088  # stab

In [0]:

# self defined metric
def MdAPE(
    y_true: pd.Series,
    y_pred: pd.Series):
    """Weighs the MAPE by the magnitude of the series values"""
    abs_pct_err = abs(y_true - y_pred) / y_true
    return abs_pct_err.median()

def WAPE(y_true, y_pred, weights=None):
    """
    Calculate Weighted Average Percentage Error (WAPE).

    :param y_true: array-like of true values
    :param y_pred: array-like of predicted values
    :param weights: array-like of weights (optional), defaults to uniform weights
    :return: Weighted Average Percentage Error
    """
    # Ensure inputs are numpy arrays
    if isinstance(y_true, pd.Series):
        y_true = y_true.values
    else:
        y_true = np.asarray(y_true)
        
    if isinstance(y_pred, pd.Series):
        y_pred = y_pred.values
    else:
        y_pred = np.asarray(y_pred)

    # Set weights to ones if not provided
    if weights is None:
        weights = np.ones_like(y_true)
    else:
        weights = np.asarray(weights)

    # Calculate absolute percentage errors
    ape = np.abs((y_true - y_pred) / y_true)

    # Calculate weighted absolute percentage error
    weighted_ape = weights * ape

    # Calculate weighted average percentage error
    wape_value = np.sum(weighted_ape) / np.sum(weights)

    return wape_value

# lightgbm custom metric
def custom_metric(y_true, y_hat):  
    higher_is_better = False
    score = WAPE(y_true, y_hat)
    return 'WAPE', score, higher_is_better

# mlflow metric
def MdAPE_mlflow(eval_df, _builtin_metrics):
  score = MdAPE(eval_df["prediction"], eval_df["target"])
  return score

def WAPE_mlflow(eval_df, _builtin_metrics):
  score = WAPE(eval_df["prediction"], eval_df["target"])
  return score


In [0]:
gc.collect()  # check memory waste

with mlflow.start_run() as run:
    train_ds = mlflow.data.from_pandas(train_data)
    valid_ds = mlflow.data.from_pandas(test_data)
    mlflow.log_input(train_ds, context="training")
    mlflow.log_input(valid_ds, context="testing")

    lgbm_params = {
                'objective': 'tweedie',
                'tweedie_variance_power': 1.3596343351145384,
                'metric': 'rmse',
                #'zero_as_missing': True,
                #'linear_tree': True,
                'learning_rate': 0.07,
                'seed': 7113,
                'boosting_type': 'gbdt',
                'reg_alpha': 7.682417976653917e-07,
                'reg_lambda': 0.0023065465445564304,
                'num_leaves': 28,
                'max_depth': 11,
                'colsample_bytree': 0.47478083397792714,
                'subsample': 0.7413694185041535,
                'subsample_freq': 7,
                'min_child_samples': 2,
                #'verbose': -1,
                'force_row_wise': True,
                'n_estimators': 250,  # choose number of estimators from result of optuna CV result
                }

    # log hyperparameters
    for param_name, param_value in lgbm_params.items():
        mlflow.log_param(param_name, param_value)
    # log score for this hyperparameters
    mlflow.log_metric("CV_RMSE", best_cv_score)


    fcst = MLForecast(
        models={'dummy_model': LGBMRegressor(**lgbm_params)},
        freq='W-MON',
        lags=[1],
        date_features=None,
        num_threads=14,
    )

    # library managed training for recursive forecasting strategy (not flexible for prediction of fixd window and new sample, but confirmal prediction is not available so required to do it manually with MAPIE)
    #mlf.fit(
    #    train_data[['unique_id', 'ds', 'y'] + CFG.cat_cols + CFG.feat_cols],
    #    static_features=CFG.cat_cols,
    #    prediction_intervals=PredictionIntervals(n_windows=CFG.n_folds, h=CFG.horizon),
        #max_horizon=CFG.horizon, # note!: CV be fitted using recursive strategy, setting max_horizon result in direct strategy and cv score can not be reproduced
    #)

    # custom training
    train_data_prep = fcst.preprocess(
        train_data[['unique_id', 'ds', 'y'] + CFG.cat_cols + CFG.feat_cols],
        static_features=CFG.cat_cols,
        max_horizon=1
        )
    X_tr, y_tr = train_data_prep[CFG.cat_cols + CFG.feat_cols], train_data_prep['y0']

    test_data_prep = fcst.preprocess(
        test_data[['unique_id', 'ds', 'y'] + CFG.cat_cols + CFG.feat_cols],
        static_features=CFG.cat_cols,
        max_horizon=1
        )
    X_te, y_te = test_data_prep[CFG.cat_cols + CFG.feat_cols], test_data_prep['y0']

    regress = LGBMRegressor(**lgbm_params)
    regress.fit(
        X_tr,
        y_tr,
        categorical_feature=CFG.cat_cols,
    )

    y_tr_preds = regress.predict(X_tr)
    signature = infer_signature(X_tr, y_tr_preds)

    # construct an evaluation dataset from the test set
    eval_data = X_te
    eval_data["target"] = y_te

    model_info = mlflow.sklearn.log_model(regress, "LGBM-regress-tweedie", signature=signature) 

    result = mlflow.evaluate(
        model_info.model_uri,
        eval_data,
        targets="target",
        model_type="regressor",
        evaluators=["default"],
        custom_metrics=[
            mlflow.models.make_metric(
                eval_fn=MdAPE_mlflow,
                name="MdAPE",
                greater_is_better=False,
            ),
            mlflow.models.make_metric(
                eval_fn=WAPE_mlflow,
                name="WAPE",
                greater_is_better=False,
            ),
        ]
    )

In [0]:
test_data_prep[test_data_prep['unique_id'] == '5198100SS19culottesgreylinen2019-05-06']

In [0]:
# custom conformal prediction

class MapieWrapper(mlflow.pyfunc.PythonModel):
    # Initialize model in the constructor
    def __init__(self, model):
        self.model = model

    def predict(self, context, model_input, alpha=0.15):
        _, y_pis = self.model.predict(model_input, alpha=alpha)
        return y_pis[:, :, 0]


with mlflow.start_run() as run:
  mapie_reg = MapieRegressor(estimator=regress, cv="prefit", random_state=7113)
  mapie_reg = mapie_reg.fit(X_tr, y_tr)

  mapie_reg_wrapped = MapieWrapper(mapie_reg)

  y_tr_pis = mapie_reg_wrapped.predict(None, X_tr)
  signature = infer_signature(X_tr, y_tr_pis)
  model_info = mlflow.pyfunc.log_model("MAPIE-LGBM-regress-tweedie",
                                       python_model=mapie_reg_wrapped,
                                       signature=signature)

  y_te_pis = mapie_reg_wrapped.predict(None, X_te.drop(columns=['target']))
  result_df = pd.DataFrame(y_te.values, columns=['y'])
  result_df['lo-85'] = y_te_pis[:, 0]
  result_df['hi-85'] = y_te_pis[:, 1]

  def flag_coverage(row):
    if (row['y'] < row['hi-85']) & (row['y'] >= row['lo-85']):
        return 1
    else:
        return 0
      
  result_df['covered_flag'] = result_df.apply(flag_coverage, axis=1)
  result_df = pd.concat([result_df, test_data_prep[['unique_id', 'ds']].reset_index(drop=True)], axis=1)

  mlflow.log_metric("coverage_of_interval", result_df['covered_flag'].sum() / len(result_df) * 100)

In [0]:
spark.createDataFrame(result_df).createOrReplaceTempView("conformal_prediction_results")

In [0]:
%sql
SELECT * FROM conformal_prediction_results AS CPR
WHERE CPR.unique_id == '5198100SS19culottesgreylinen2019-05-06';

Databricks visualization. Run in Databricks to view.