# Inflation Rate Time Series Prediction

Regression resp. time series prediction example showing the usage of Alibi Explainers using a [Kaggle Dataset](https://www.kaggle.com/datasets/federalreserve/interest-rates).
Get data via `wget https://ibm.box.com/shared/static/lb39fs3htuucg50ikkukg2esvy1xec4k.csv`

## Authors

Natalie Jann [natalie.jann@ibm.com](mailto:natalie.jann@ibm.com)

Sebastian Lehrig [sebastian.lehrig1@ibm.com](mailto:sebastian.lehrig1@ibm.com)

Marvin Giessing [MARVING@de.ibm.com](mailto:MARVING@de.ibm.com)

## License
Apache-2.0 License

## 0.) Imports and Constants

In [15]:
import json
import kfp
from kfp.components import InputPath, OutputPath
import kfp.dsl as dsl
from kfp.dsl import PipelineConf, data_passing_methods
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource
import os
from pydoc import importfile
import requests
from typing import NamedTuple

%load_ext lab_black

The lab_black extension is already loaded. To reload it, use:
  %reload_ext lab_black


In [3]:
BASE_IMAGE = "quay.io/ibm/kubeflow-notebook-image-ppc64le:latest"

COMPONENT_CATALOG_FOLDER = f"{os.getenv('HOME')}/components"
COMPONENT_CATALOG_GIT = "https://github.com/lehrig/kubeflow-ppc64le-components.git"
COMPONENT_CATALOG_RELEASE = "main"

NUMBER_OF_WORKER = os.getenv("NUMBER_OF_WORKERS", default="1")

ARGUMENTS = {
    "blackboard": "artefacts",
    "model_name": "inflation-time-series",
    "cluster_configuration_secret": os.getenv(
        "CLUSTER_CONFIGURATION_SECRET", default=""
    ),
    "training_gpus": os.getenv("TRAINING_GPUS", default="1"),
    "number_of_workers": NUMBER_OF_WORKER,
    "distribution_type": "Job" if int(NUMBER_OF_WORKER) <= 1 else "MPI",
    "training_node_selector": os.getenv("TRAINING_NODE_SELECTOR", default=""),
}
MODEL_NAME = ARGUMENTS["model_name"]

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
    NAMESPACE = f.read()

ARGUMENTS

{'blackboard': 'artefacts',
 'model_name': 'inflation-time-series',
 'cluster_configuration_secret': '',
 'training_gpus': '1',
 'number_of_workers': '1',
 'distribution_type': 'Job',
 'training_node_selector': ''}

## 1.) Load catalog with reusable Kubeflow components

In [4]:
!git clone --branch $COMPONENT_CATALOG_RELEASE $COMPONENT_CATALOG_GIT $COMPONENT_CATALOG_FOLDER

fatal: destination path '/home/jovyan/components' already exists and is not an empty directory.


In [5]:
CATALOG = importfile(f"{COMPONENT_CATALOG_FOLDER}/catalog.py")

## 2.) Create custom components
### 2.1) Component: Preprocess data (dataset loading & splitting)

In [6]:
def preprocess_dataset(data_dir: InputPath(str), dataset_dir: OutputPath(str)):

    import datetime
    import os
    import pandas as pd
    from sklearn.model_selection import train_test_split

    df = pd.read_feather(data_dir)  # pd.read_csv("/tmp/jovyan/interest-rates-1954.csv")
    print(f"Loaded {len(df)} rows:\n{df.head(2)}")

    df.dropna(subset=["inflation rate"], inplace=True)
    df["date"] = df[["year", "month", "day"]].apply(
        lambda s: datetime.datetime(*s), axis=1
    )
    df.index = pd.PeriodIndex(df["date"], freq="M")
    y = df["inflation rate"].copy(deep=True)
    df.drop(
        columns=[
            "date",
            "inflation rate",
            "federal funds target rate",
            "real gdp (percent change)",
            "federal funds lower target",
            "federal funds upper target",
            "year",
            "month",
            "day",
        ],
        inplace=True,
    )
    print(f"{len(df)} rows after cleansing")

    x_train, x_test, y_train, y_test = train_test_split(
        df, y, test_size=0.33, shuffle=False
    )

    if not os.path.exists(dataset_dir):
        os.makedirs(dataset_dir)

    x_train.to_csv(dataset_dir + "/x_train.csv")
    x_test.to_csv(dataset_dir + "/x_test.csv")
    y_train.to_csv(dataset_dir + "/y_train.csv")
    y_test.to_csv(dataset_dir + "/y_test.csv")

    print(
        f"Pre-processed dataset saved. Contents of '{dataset_dir}':\n {os.listdir(dataset_dir)}"
    )


preprocess_dataset_comp = kfp.components.create_component_from_func(
    func=preprocess_dataset, base_image=BASE_IMAGE
)

### 2.2) Components: model training, evaluation and conversion to ONNX
#### 2.2.1) A Decision Tree Regressor making predictions based on one past inflation rate

In [7]:
def train_lagone_regressor(
    dataset_dir: InputPath(str), model_dir: OutputPath(str)
) -> NamedTuple("Lag1Regressor", [("score", float), ("mae", float), ("mape", float)]):

    from collections import namedtuple
    import numpy as np
    import pandas as pd
    from sklearn.metrics import mean_absolute_error
    from sklearn.tree import DecisionTreeRegressor
    from skl2onnx import convert_sklearn
    from skl2onnx.common.data_types import FloatTensorType

    y_train = pd.read_csv(dataset_dir + "/y_train.csv", index_col=0).iloc[:, 0]
    y_test = pd.read_csv(dataset_dir + "/y_test.csv", index_col=0).iloc[:, 0]

    x_train = y_train.copy(deep=True)[:-1].values.reshape(-1, 1)
    y_train = y_train.shift(-1)[:-1].values
    x_test = y_test[:-1].values.reshape(-1, 1)
    y_test = y_test.shift(-1)[:-1].values
    print(
        f"{len(x_train)} train and {len(x_test)} test records. \n Training and Evaluation starts..."
    )

    dt_reg = DecisionTreeRegressor().fit(x_train, y_train)
    score = round(dt_reg.score(x_test, y_test), 2)
    dt_pred = dt_reg.predict(x_test)
    mae = round(mean_absolute_error(y_test, dt_pred), 2)
    mape = round(np.mean(np.abs((y_test - dt_pred) / y_test)) * 100, 2)

    output = namedtuple("Lag1Regressor", ["score", "mae", "mape"])

    print("Converting regressor to ONNX.")
    onx = convert_sklearn(
        dt_reg,
        initial_types=[("float_input", FloatTensorType([None, 1]))],
        target_opset={"": 15, "ai.onnx.ml": 2},
    )
    with open(model_dir, "wb") as f:
        f.write(onx.SerializeToString())

    return output(score, mae, mape)


train_lagone_regressor_comp = kfp.components.create_component_from_func(
    func=train_lagone_regressor, base_image=BASE_IMAGE, packages_to_install=["skl2onnx"]
)

#### 2.2.2) A Decision Tree Regressor making predictions based on the Effective Federal Funds Rate and Unemployment Rate

In [8]:
def train_conditioned_regressor(
    dataset_dir: InputPath(str), model_dir: OutputPath(str)
) -> NamedTuple(
    "ConditionedRegressor", [("score", float), ("mae", float), ("mape", float)]
):
    from collections import namedtuple
    import numpy as np
    import pandas as pd
    from sklearn.metrics import mean_absolute_error
    from sklearn.tree import DecisionTreeRegressor
    from skl2onnx import convert_sklearn
    from skl2onnx.common.data_types import FloatTensorType

    x_train = pd.read_csv(dataset_dir + "/x_train.csv", index_col=0)
    x_test = pd.read_csv(dataset_dir + "/x_test.csv", index_col=0)
    y_train = pd.read_csv(dataset_dir + "/y_train.csv", index_col=0)
    y_test = pd.read_csv(dataset_dir + "/y_test.csv", index_col=0).iloc[:, 0].tolist()
    print(
        f"{len(x_train)} train and {len(x_test)} test records. \n Training and Evaluation starts..."
    )

    dt_reg = DecisionTreeRegressor().fit(x_train, y_train)
    score = round(dt_reg.score(x_test, y_test), 2)
    dt_pred = dt_reg.predict(x_test)
    mae = round(mean_absolute_error(y_test, dt_pred), 2)
    mape = round(np.mean(np.abs((y_test - dt_pred) / y_test)) * 100, 2)

    output = namedtuple("ConditionedRegressor", ["score", "mae", "mape"])

    print("Converting regressor to ONNX.")
    onx = convert_sklearn(
        dt_reg,
        initial_types=[("float_input", FloatTensorType([None, len(x_train.columns)]))],
    )
    with open(model_dir, "wb") as f:
        f.write(onx.SerializeToString())

    return output(score, mae, mape)


train_conditioned_regressor_comp = kfp.components.create_component_from_func(
    func=train_conditioned_regressor,
    base_image=BASE_IMAGE,
    packages_to_install=["skl2onnx"],
)

#### 2.2.3) A Decision Tree Regressor making predictions based on five past inflation rates

In [9]:
def train_lagn_regressor(
    dataset_dir: InputPath(str), model_dir: OutputPath(str)
) -> NamedTuple("LagNRegressor", [("score", float), ("mae", float), ("mape", float)]):

    import numpy as np
    import pandas as pd
    from sklearn.metrics import mean_absolute_error
    from sklearn.tree import DecisionTreeRegressor
    from collections import namedtuple
    from skl2onnx import convert_sklearn
    from skl2onnx.common.data_types import FloatTensorType

    train = pd.read_csv(dataset_dir + "/y_train.csv", index_col=0)
    test = pd.read_csv(dataset_dir + "/y_test.csv", index_col=0)

    def window_input(window_length: int, data: pd.DataFrame) -> pd.DataFrame:
        df = data.copy(deep=True)
        i = 1
        while i < window_length:
            df[f"x_{i}"] = df.iloc[:, 0].shift(-i)
            i = i + 1

        if i == window_length:
            df["Y"] = df.iloc[:, 0].shift(-i)
        df = df.dropna(axis=0)
        return df

    df_train = window_input(5, train)
    df_test = window_input(5, test)
    print("Train data:\n", df_train.head(2), "\nTest data:\n", df_test.head(2))
    print(
        f"{len(train)} train and {len(test)} test records. \n Training and Evaluation starts..."
    )

    dt_reg = DecisionTreeRegressor().fit(df_train.iloc[:, :-1], df_train["Y"])
    score = round(dt_reg.score(df_test.iloc[:, :-1], df_test["Y"]), 2)
    dt_pred = dt_reg.predict(df_test.iloc[:, :-1])
    mae = round(mean_absolute_error(df_test["Y"], dt_pred), 2)
    mape = round(np.mean(np.abs((df_test["Y"] - dt_pred) / df_test["Y"])) * 100, 2)

    output = namedtuple("LagNRegressor", ["score", "mae", "mape"])

    print("Converting regressor to ONNX.")
    onx = convert_sklearn(
        dt_reg,
        initial_types=[
            ("float_input", FloatTensorType([None, len(df_train.iloc[:, :-1].columns)]))
        ],
    )
    with open(model_dir, "wb") as f:
        f.write(onx.SerializeToString())

    return output(score, mae, mape)


train_lagn_regressor_comp = kfp.components.create_component_from_func(
    func=train_lagn_regressor, base_image=BASE_IMAGE, packages_to_install=["skl2onnx"]
)

#### 2.2.4) A SARIMAX Regressor making predictions based on one past inflation rate

In [10]:
def train_sarimax_regressor(
    dataset_dir: InputPath(str), model_dir: OutputPath(str)
) -> NamedTuple(
    "SARIMAXRegressor", [("score", float), ("mae", float), ("mape", float)]
):

    import numpy as np
    import pandas as pd
    from collections import namedtuple
    from sklearn.metrics import mean_absolute_error
    from statsmodels.tsa.statespace.sarimax import SARIMAX

    train = pd.read_csv(dataset_dir + "/y_train.csv")
    train["date"] = train["date"].apply(lambda x: x + "-01")
    train.set_index("date", inplace=True)
    train.index = pd.to_datetime(train.index)
    train.index = train.index.to_period("M")
    test = pd.read_csv(dataset_dir + "/y_test.csv")
    test = test["inflation rate"].to_numpy()

    sarimax = SARIMAX(
        train,
        order=(1, 2, 4),
        seasonal_order=(3, 2, 3, 12),
        enforce_stationarity=False,
        enforce_invertibility=False,
        freq="M",
    ).fit(disp=-1, maxiter=500, low_memory=True)
    sm_pred = sarimax.forecast(len(test))
    mae = round(mean_absolute_error(test, sm_pred), 2)
    mape = round(
        np.mean(np.abs((test - sm_pred) / test)) * 100,
        2,
    )
    print("AIC:", sarimax.aic, "BIC:", sarimax.bic)

    output = namedtuple("SARIMAXRegressor", ["score", "mae", "mape"])
    return output(0, mae, mape)


train_sarimax_regressor_comp = kfp.components.create_component_from_func(
    func=train_sarimax_regressor, base_image=BASE_IMAGE
)

### 2.3) Component: Compare the performance of all regressors

In [19]:
def compare_regressors(
    metrics: list, mlpipeline_ui_metadata_path: OutputPath(str)
) -> str:

    from json import dump
    import pandas as pd

    df = pd.DataFrame({"score": [], "mae": [], "mape": []})
    s = "|Regressor|Score|MAE|MAPE|\n|----|----|----|----|\n"

    for m in metrics:
        df.loc[m[0]] = [m[1], m[2], m[3]]
        s += f"|{m[0]}|{m[1]}|{m[2]}|{m[3]}|\n"

    print(df)
    metadata = {
        "outputs": [
            {
                "storage": "inline",
                "source": s,
                "type": "markdown",
            }
        ]
    }
    best_regressor = list(
        filter(
            lambda x: float(x[3]) == min([float(line[3]) for line in metrics]), metrics
        )
    )[0][0]

    with open(mlpipeline_ui_metadata_path, "w") as metadata_file:
        dump(metadata, metadata_file)
    return best_regressor


compare_regressors_comp = kfp.components.create_component_from_func(
    func=compare_regressors, base_image=BASE_IMAGE
)

## 3.) Create the actual pipeline by combining the components

In [12]:
@dsl.pipeline(
    name="Time Series Prediction",
    description="An example pipeline that performs time series prediction for inflation rates",
)
def time_series_pipeline(
    blackboard: str,
    model_name: str,
    cluster_configuration_secret: str,
    training_gpus: int,
    number_of_workers: int,
    distribution_type: str,
    training_node_selector: str,
):
    create_blackboard = dsl.VolumeOp(
        name="Create Artefacts Blackboard",
        resource_name=blackboard,
        modes=dsl.VOLUME_MODE_RWO,
        size="4Gi",
        set_owner_reference=True,
    )

    load_dataframe_via_trino_task = CATALOG.load_dataframe_via_trino_comp(
        query="SELECT * FROM interestrates",
        columns_query="SHOW COLUMNS FROM interestrates",
        catalog="postgresql",
        schema="public",
    )
    load_dataframe_via_trino_task.after(create_blackboard)

    preprocess_dataset_task = preprocess_dataset_comp(
        data_dir=load_dataframe_via_trino_task.outputs["dataframe"]
    )
    # preprocess_dataset_task.add_pvolumes(
    #     {"/tmp/jovyan": dsl.PipelineVolume(pvc="time-series-volume")}
    # )
    # preprocess_dataset_task.after(create_blackboard)

    train_lagone_regressor_task = train_lagone_regressor_comp(
        dataset_dir=preprocess_dataset_task.outputs["dataset_dir"]
    )

    train_conditioned_regressor_task = train_conditioned_regressor_comp(
        dataset_dir=preprocess_dataset_task.outputs["dataset_dir"]
    )

    train_lagn_regressor_task = train_lagn_regressor_comp(
        dataset_dir=preprocess_dataset_task.outputs["dataset_dir"]
    )
    train_sarimax_regressor_task = train_sarimax_regressor_comp(
        dataset_dir=preprocess_dataset_task.outputs["dataset_dir"]
    )

    regressor_metrics = [
        [
            "conditioned",
            train_conditioned_regressor_task.outputs["score"],
            train_conditioned_regressor_task.outputs["mae"],
            train_conditioned_regressor_task.outputs["mape"],
        ],
        [
            "lagone",
            train_lagone_regressor_task.outputs["score"],
            train_lagone_regressor_task.outputs["mae"],
            train_lagone_regressor_task.outputs["mape"],
        ],
        [
            "lagn",
            train_lagn_regressor_task.outputs["score"],
            train_lagn_regressor_task.outputs["mae"],
            train_lagn_regressor_task.outputs["mape"],
        ],
        [
            "sarimax",
            train_sarimax_regressor_task.outputs["score"],
            train_sarimax_regressor_task.outputs["mae"],
            train_sarimax_regressor_task.outputs["mape"],
        ],
    ]

    compare_regressors_task = compare_regressors_comp(metrics=regressor_metrics)

    with dsl.Condition(compare_regressors_task.output == "lagone"):
        upload_model_task = CATALOG.upload_model_comp(
            train_lagone_regressor_task.outputs["model_dir"], project_name=model_name
        )
        deploy_model_with_kserve_task = CATALOG.deploy_model_with_kserve_comp(
            project_name=model_name
        )

        deploy_model_with_kserve_task.after(upload_model_task)

    with dsl.Condition(compare_regressors_task.output == "lagn"):
        upload_model_task = CATALOG.upload_model_comp(
            train_lagn_regressor_task.outputs["model_dir"], project_name=model_name
        )
        deploy_model_with_kserve_task = CATALOG.deploy_model_with_kserve_comp(
            project_name=model_name
        )

        deploy_model_with_kserve_task.after(upload_model_task)

    with dsl.Condition(compare_regressors_task.output == "conditioned"):
        upload_model_task = CATALOG.upload_model_comp(
            train_conditioned_regressor_task.outputs["model_dir"],
            project_name=model_name,
        )
        deploy_model_with_kserve_task = CATALOG.deploy_model_with_kserve_comp(
            project_name=model_name
        )

        deploy_model_with_kserve_task.after(upload_model_task)

## 4.) Run the pipeline within an experiment

Create a pipeline run, using a pipeline configuration that:

- enables data passing via persistent volumes (faster than the default MinIO-based passing)
- disables caching (which currently is not supported for data passing via volumes)

In [14]:
def disable_cache_transformer(op):
    if isinstance(op, dsl.ContainerOp):
        op.execution_options.caching_strategy.max_cache_staleness = "P0D"
    else:
        op.add_pod_annotation(
            name="pipelines.kubeflow.org/max_cache_staleness", value="P0D"
        )
    return op


pipeline_conf = PipelineConf()
pipeline_conf.add_op_transformer(disable_cache_transformer)
pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume(
    volume=V1Volume(
        name=ARGUMENTS["blackboard"],
        persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
            "{{workflow.name}}-%s" % ARGUMENTS["blackboard"]
        ),
    ),
    path_prefix=f'{ARGUMENTS["blackboard"]}/',
)

kfp.Client().create_run_from_pipeline_func(
    time_series_pipeline,
    arguments=ARGUMENTS,
    namespace=NAMESPACE,
    pipeline_conf=pipeline_conf,
)

RunPipelineResult(run_id=c3ed897d-4239-4f42-9c2d-610d8e747562)