In [2]:
# tools
import copy
import subprocess
import time
import requests

import mlflow
import pandas as pd

# data
from utilsforecast.data import generate_series

# feature engineer
from utilsforecast.feature_engineering import fourier
from mlforecast.lag_transforms import ExponentiallyWeightedMean
from mlforecast.utils import PredictionIntervals

# forecast
import mlforecast.flavor
from mlforecast import MLForecast

# models
import lightgbm as lgb
from sklearn.linear_model import LinearRegression

# evaluate
from utilsforecast.losses import rmse, smape
from utilsforecast.evaluation import evaluate

# data

In [10]:
n_series = 5
freq = "h"
horizon = 10

In [11]:
series = generate_series(n_series = n_series, freq = freq)
print(f"series data:")
print("-" * 40)
print(series.head())
print()
print(series.shape)
print()
print(series["unique_id"].value_counts())

valid = series.groupby("unique_id", observed = True).tail(horizon)
print(f"valid data:")
print("-" * 40)
print(valid.head())
print()
print(valid.shape)
print()
print(valid["unique_id"].value_counts())
train = series.drop(valid.index)
print(f"train data:")
print("-" * 40)
print(train.head())
print()
print(train.shape)
print()
print(train["unique_id"].value_counts())

train, X_df = fourier(train, freq = freq, season_length = 24, k = 2, h = horizon)
print(f"fourier train data:")
print("-" * 40)
print(train)
print(f"fourier X_df data:")
print("-" * 40)
print(X_df)

series data:
----------------------------------------
  unique_id                  ds         y
0         0 2000-01-01 00:00:00  0.428973
1         0 2000-01-01 01:00:00  0.423626
2         0 2000-01-01 02:00:00  0.311782
3         0 2000-01-01 03:00:00  0.192191
4         0 2000-01-01 04:00:00  0.148767

(1101, 3)

unique_id
4    373
3    242
0    222
2    167
1     97
Name: count, dtype: int64
valid data:
----------------------------------------
    unique_id                  ds         y
212         0 2000-01-09 20:00:00  0.114610
213         0 2000-01-09 21:00:00  0.440793
214         0 2000-01-09 22:00:00  0.452212
215         0 2000-01-09 23:00:00  0.322892
216         0 2000-01-10 00:00:00  0.162341

(50, 3)

unique_id
0    10
1    10
2    10
3    10
4    10
Name: count, dtype: int64
train data:
----------------------------------------
  unique_id                  ds         y
0         0 2000-01-01 00:00:00  0.428973
1         0 2000-01-01 01:00:00  0.423626
2         0 2000-01

# params

In [19]:
params = {
    "init": {
        "models": {
            "lgb": lgb.LGBMRegressor(n_estimators=50, num_leaves = 16, verbosity = -1),
            "lr": LinearRegression(),
        },
        "freq": freq,
        "lags": [24],
        "lag_transforms": {
            1: [ExponentiallyWeightedMean(0.9)],
        },
        "num_threads": 2,
    },
    "fit": {
        "static_features": ["unique_id"],
        "prediction_intervals": PredictionIntervals(n_windows = 2, h = horizon),
    },
}

# logging

In [21]:
mlflow.set_experiment("mlforecast")
with mlflow.start_run() as run:
    train_ds = mlflow.data.from_pandas(train)
    valid_ds = mlflow.data.from_pandas(valid)
    mlflow.log_input(train_ds, context="training")
    mlflow.log_input(valid_ds, context="validation")
    logged_params = copy.deepcopy(params) 
    logged_params['init']['models'] = {
        k: (v.__class__.__name__, v.get_params())
        for k, v in params['init']['models'].items()
    }
    mlflow.log_params(logged_params)
    mlf = MLForecast(**params['init'])
    mlf.fit(train, **params['fit'])
    preds = mlf.predict(horizon, X_df=X_df)
    eval_result = evaluate(
        valid.merge(preds, on=['unique_id', 'ds']),
        metrics=[rmse, smape],
        agg_fn='mean',
    )
    models = mlf.models_.keys()
    logged_metrics = {}
    for _, row in eval_result.iterrows():
        metric = row['metric']
        for model in models:
            logged_metrics[f'{metric}_{model}'] = row[model]
    mlflow.log_metrics(logged_metrics)
    mlforecast.flavor.log_model(model=mlf, artifact_path="model")
    model_uri = mlflow.get_artifact_uri("model")
    run_id = run.info.run_id



# load model

In [23]:
loaded_model = mlforecast.flavor.load_model(model_uri=model_uri)
results = loaded_model.predict(h=horizon, X_df=X_df, ids=[3])
results.head(2)

Unnamed: 0,unique_id,ds,lgb,lr
0,3,2000-01-10 16:00:00,0.333308,0.243017
1,3,2000-01-10 17:00:00,0.127424,0.249742


# PyFunc

In [25]:
loaded_pyfunc = mlforecast.flavor.pyfunc.load_model(model_uri=model_uri)
# single row dataframe
predict_conf = pd.DataFrame(
    [
        {
            "h": horizon,
            "ids": [0, 2],
            "X_df": X_df,
            "level": [80]
        }
    ]
)
pyfunc_result = loaded_pyfunc.predict(predict_conf)
pyfunc_result.head(2)

Unnamed: 0,unique_id,ds,lgb,lr,lgb-lo-80,lgb-hi-80,lr-lo-80,lr-hi-80
0,0,2000-01-09 20:00:00,0.260544,0.244128,0.140168,0.380921,0.114001,0.374254
1,0,2000-01-09 21:00:00,0.250096,0.247742,0.07282,0.427372,0.047584,0.4479


# model serving

In [27]:
host = 'localhost'
port = '5000'
cmd = f'mlflow models serve -m runs:/{run_id}/model -h {host} -p {port} --env-manager local'
# initialize server
process = subprocess.Popen(cmd.split())
time.sleep(5)
# single row dataframe. must be JSON serializable
predict_conf = pd.DataFrame(
    [
        {
            "h": horizon,
            "ids": [3, 4],
            "X_df": X_df.astype({'ds': 'str'}).to_dict(orient='list'),
            "level": [95]
        }
    ]
)
payload = {'dataframe_split': predict_conf.to_dict(orient='split', index=False)}
resp = requests.post(f'http://{host}:{port}/invocations', json=payload)
print(pd.DataFrame(resp.json()['predictions']).head(2))
process.terminate()
process.wait(timeout=10)

Downloading artifacts: 100%|██████████| 7/7 [00:00<00:00, 19443.79it/s]
2024/12/05 21:28:51 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'
2024/12/05 21:28:51 INFO mlflow.pyfunc.backend: === Running command 'exec gunicorn --timeout=60 -b localhost:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2024-12-05 21:28:51 +0800] [3853] [INFO] Starting gunicorn 23.0.0
[2024-12-05 21:28:51 +0800] [3853] [ERROR] Connection in use: ('localhost', 5000)
[2024-12-05 21:28:51 +0800] [3853] [ERROR] connection to ('localhost', 5000) failed: [Errno 48] Address already in use
[2024-12-05 21:28:52 +0800] [3853] [ERROR] Connection in use: ('localhost', 5000)
[2024-12-05 21:28:52 +0800] [3853] [ERROR] connection to ('localhost', 5000) failed: [Errno 48] Address already in use
[2024-12-05 21:28:53 +0800] [3853] [ERROR] Connection in use: ('localhost', 5000)
[2024-12-05 21:28:53 +0800] [3853] [ERROR] connection to ('localhost', 5000) failed: 

   unique_id                   ds       lgb  ...  lgb-hi-95  lr-lo-95  lr-hi-95
0          3  2000-01-10T16:00:00  0.333308  ...   0.492544  0.032451  0.453583
1          3  2000-01-10T17:00:00  0.127424  ...   0.264842  0.045525  0.453959

[2 rows x 8 columns]




-15

[2024-12-05 21:28:56 +0800] [3853] [ERROR] Can't connect to ('localhost', 5000)
