# Effortless models deployment with MLflow

## Working with partitioned models (many models) in MLflow

This example demonstrates how to package multiple models with MLflow to work all together to produce inference over partitioned data. In this scenario, the data can be partitioned based on a given attribute and then individual models are trained for each slice of the data. This is a typical approach in time series data when training individual models for each partition outperforms training a larger model over the entire dataset. 

## Working with the M5 forecasting dataset

To demonstrate this scenario, we are going to solve the [M5 Forecasting competition](https://www.kaggle.com/competitions/m5-forecasting-accuracy/overview) problem which tries to predict the demand of diffent products on different retail stores across the US. Instead of training 1 model over the entire dataset, we are going to partition the data by store and then train one model per each store. In this dataset, there are 10 different stores so this will produce 10 different models.

At the end, we will package all those 10 models into a single entity that can be deploy using MLflow.

In [1]:
import mlflow
import pandas as pd
import numpy as np
import gc
from typing import List, Dict, Any, Union, Tuple

We start by reading the input data from the 3 CSV files:

In [2]:
sales = pd.read_csv('data/sales_train_validation.csv')
calendar = pd.read_csv('data/calendar.csv')
prices = pd.read_csv('data/sell_prices.csv')

### Combining multiple datasets into one

Let's define a function that can take the three datasets and build a unified one:

In [3]:
def build_dataset(sales, calendar, prices, remove_old: bool = True):
    data = sales
    data.drop(columns=['id'], inplace=True)
    calendar.drop(columns=['weekday', 'year'], inplace=True)

    data = pd.melt(data, id_vars = ['item_id', 'dept_id', 'cat_id', 'store_id', 'state_id'], var_name = 'day', value_name = 'demand')

    data = pd.merge(data, calendar, how='left', left_on=['day'], right_on=['d'])
    data.drop(columns=['day', 'd'], inplace=True)

    data = data.merge(prices, on = ['store_id', 'item_id', 'wm_yr_wk'], how = 'left')
    data.drop(columns=['wm_yr_wk'], inplace=True)

    if remove_old:
        del sales
        del calendar
        del prices
        gc.collect()

    return data

The function `build_dataset` will construct our final dataset. We then we delete the old datasets to save memory in the compute we are using as this data can grow large.

In [4]:
data = build_dataset(sales, calendar, prices)

Data would look as follows:

In [5]:
data.sample(10)

Unnamed: 0,item_id,dept_id,cat_id,store_id,state_id,demand,date,wday,month,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI,sell_price
15034829,HOBBIES_1_217,HOBBIES_1,HOBBIES,CA_2,CA,0,2012-06-05,4,6,,,,,1,1,1,3.47
28898415,FOODS_3_821,FOODS_3,FOODS,WI_1,WI,0,2013-09-02,3,9,LaborDay,National,,,1,0,1,4.98
516347,HOUSEHOLD_1_511,HOUSEHOLD_1,HOUSEHOLD,WI_3,WI,0,2011-02-14,3,2,ValentinesDay,Cultural,,,0,0,1,
58221494,HOUSEHOLD_1_281,HOUSEHOLD_1,HOUSEHOLD,TX_2,TX,0,2016-04-21,6,4,,,,,0,0,0,4.94
35581043,FOODS_3_037,FOODS_3,FOODS,WI_3,WI,2,2014-04-09,5,4,,,,,1,1,1,2.5
54824175,HOBBIES_1_111,HOBBIES_1,HOBBIES,CA_2,CA,0,2016-01-01,7,1,NewYear,National,,,1,1,0,2.92
8432284,FOODS_1_191,FOODS_1,FOODS,TX_2,TX,1,2011-11-01,4,11,,,,,1,1,0,5.72
48242199,HOUSEHOLD_1_364,HOUSEHOLD_1,HOUSEHOLD,CA_3,CA,0,2015-05-30,1,5,,,,,0,0,0,7.48
53302449,FOODS_3_656,FOODS_3,FOODS,CA_2,CA,0,2015-11-12,6,11,,,,,0,1,1,3.48
27719728,HOUSEHOLD_2_173,HOUSEHOLD_2,HOUSEHOLD,CA_2,CA,1,2013-07-26,7,7,,,,,0,0,0,6.97


### Train and validation split

Next, we will separate our data in train and validation. Notice that this function doesn't return the splits of the data but the row indices selected for each data set. Since the data is big, you will notice across all this notebook that we will index the data instead of making copies of it.

In [5]:
def split_train_test_idxs(data, cutoff_date):
    data.sort_values('date', inplace=True)

    train_idxs = data['date'] <= cutoff_date
    valid_idxs = data['date'] > cutoff_date

    data.drop(columns=['date'], inplace=True)

    gc.collect()

    return train_idxs, valid_idxs

Let's apply the function to generate the splits. We are considering anything before 2014-04-24 as part of the training data:

In [6]:
train_idxs, valid_idxs = split_train_test_idxs(data, cutoff_date='2014-04-24')

### Feature engineering

We are going to separate our target variable from the features:

In [7]:
features, target = data.loc[:, data.columns != "demand"], data[["demand"]]

del data
gc.collect()

0

Now, let's build now a function that can pre-process the input features. To keep it simple, we are only going to focus on doing categorical encoding of the variables and handling missing values. The function will also return the transformation that can be use to transform data later. We will put this transformation inside of our final model:

In [8]:
from sklearn.preprocessing import OrdinalEncoder
from sklearn.compose import ColumnTransformer
from tqdm import tqdm

def preprocess(data: pd.DataFrame, categorical_features: List[str]) -> Tuple[pd.DataFrame, ColumnTransformer]:
    """
    Preprocess the input data features.

    Parameters
    ----------
    data: pd.DataFrame
        The input data features
    categorical_features: List[str]
        The features that you want to treat as categorical

    Returns
    -------
    Tuple[pd.DataFrame, ColumnTransformer]:
        The transformed data and the associated transformations.
    """
    transformations = ColumnTransformer(
            [
                ('encoding', OrdinalEncoder(handle_unknown='use_encoded_value', unknown_value=np.nan, encoded_missing_value=-1), categorical_features),
            ],
            remainder='passthrough'
        )

    transformed = transformations.fit_transform(data)
    columns = [name.split('__')[1] for name in transformations.get_feature_names_out()]

    return pd.DataFrame(data=transformed, columns=columns, index=data.index).infer_objects(), transformations

Let's run it over the features:

In [9]:
categorical_features = ['item_id', 'dept_id', 'cat_id', 'store_id', 'state_id', 'event_name_1', 'event_type_1', 'event_name_2', 'event_type_2']
features, transforms = preprocess(features, categorical_features)

## Preparating the training framework

Now it's time to generate a training function. The following training function trans a LightGBM model for the given datasets. It receives the input data (both containing training and validation), and the train-tests indices within this dataset. That means that indices in `train_idxs` will be used for training while `valid_idxs` will be used for validation. The argument `params` will be used to pass the hyperparameters of the model.

In [10]:
from sklearn import metrics
from typing import List, Dict, Any
from mlflow.models.signature import infer_signature
import lightgbm as lgb

def train_and_evaluate(features: pd.DataFrame, target: pd.DataFrame, train_idxs: pd.Index, valid_idxs: pd.Index,
                       params: Dict[str, Any], model_name: str = None) -> Tuple[lgb.Booster, float]:
    """
    Trains a model using a LightGBM.

    Parameters
    ----------
    features: pd.DataFrame
        The input features, already preprocessed.
    target: pd.DataFrame
        The column to predict.
    train_idxs: pd.Index
        The indices of the data correspoding to training.
    valid_idxs: pd.Index
        The indices of the data corresponding to validation.
    params: Dict[str, Any]:
        The training parameters for the model.
    metrics_prefix: Union[str, None]
        A prefix to be used for logging metrics.
    """
    X_train, X_valid = features[train_idxs], features[valid_idxs]
    y_train, y_valid = target[train_idxs], target[valid_idxs]

    dtrain = lgb.Dataset(X_train, label=y_train)
    dvalid = lgb.Dataset(X_valid, label=y_valid)

    clf = lgb.train(params, dtrain, 2500, valid_sets = [dtrain, dvalid], callbacks=[lgb.early_stopping(stopping_rounds=10)])

    y_pred_valid = clf.predict(X_valid, num_iteration=clf.best_iteration)

    val_error = np.sqrt(metrics.mean_squared_error(y_valid, y_pred_valid))
    val_score = metrics.r2_score(y_valid, y_pred_valid)

    mlflow.log_metric("valid_rmse", val_error)
    mlflow.log_metric("valid_r2", val_score)

    if model_name:
        mlflow.lightgbm.log_model(clf, model_name, signature=infer_signature(features, target))
    
    return (clf, val_error)

For sake of simplicity, the hyper-parameters we will use will be the same:

In [11]:
params = {
    'num_leaves': 500,
    'min_child_weight': 0.034,
    'feature_fraction': 0.379,
    'bagging_fraction': 0.418,
    'min_data_in_leaf': 106,
    'objective': 'regression',
    'max_depth': -1,
    'learning_rate': 0.005,
    "boosting_type": "gbdt",
    "bagging_seed": 42,
    "metric": 'rmse',
    "verbosity": -1,
    'reg_alpha': 0.3899,
    'reg_lambda': 0.648,
    'random_state': 222,
}

We are going to need another helper function, which will return the `store_id` associated with a given "categorical encoding" of this column. Since our feature dataset is preprocess, the store "CA_1" is encoded as a numeric value. The function then will return for an input `0` the associated label, for instance `CA_1`.

In [12]:
def get_encoded_value(transformations: ColumnTransformer, column: str, value: float) -> str:
    """
    Returns the label associated with a given encoded value for a given column.

    Parameters
    ----------
    transformations: ColumnTransformer
        The transformations used to encode the columns.
    column: str
        The name of the column
    value: float
        The encoded value of the column indicated.
    """
    column_encoder_idxs = [i for i in range(len(transformations.transformers_)) if column in transformations.transformers_[i][2]]
    if column_encoder_idxs:
        column_encoder_idx = column_encoder_idxs[0]
        column_index = transformations.transformers_[column_encoder_idx][2].index(column)
        return transformations.transformers_[column_encoder_idx][1].categories_[column_index][int(value)]
    else:
        raise ValueError(f'There is no transformation applied to column ``{column}``')

Now, let's try to design our model. We want to create a partitioned multi-model, meaning, the data will be partitioned by a given key (in this case "store_id") and we will train one model per each of those partitions. When running inference, we want to apply the correct model based on the partition the data belongs to.

To solve this problem we will create a custom model in MLflow. The `PartitionedModelEnsemble` will be a meta model that will be able to apply multiple models to a given input data set based on a partition key. Those models can be trained independently or all together, like in this example.

> Tip: Notice how models are loaded using `mlflow.pyfunc.load_model`.

In [15]:
from mlflow.pyfunc import PythonModel, PythonModelContext
from sklearn.compose import ColumnTransformer
import pandas as pd
import mlflow

class PartitionedModelEnsemble(PythonModel):
    """
    A custom model that implements a paritioned inferrencing strategy over an ensamble of models.
    """
    def __init__(self, key: str, prediction_col: str, transformations: ColumnTransformer):
        """
        Creates a new instance of the mode.

        Parameters
        ----------
        key: str
            The name of the column the data will be partitioned on.
        prediction_col: str
            The name of the column the model generates predictions on.
        transformations: ColumnTransformer
            Any given transformation that needs to be applied to the data before sending to the model.
        """
        self.pred_col = prediction_col
        self.key = key
        self.transformations = transformations

    def load_context(self, context: PythonModelContext):
        """
        Loads all the models for the given partitions. This method assumes the models were logged with the
        different of the column `key` as artifact key.
        """
        self.models = { key: mlflow.pyfunc.load_model(model_path) for key, model_path in context.artifacts.items() }
        
    def predict(self, context: PythonModelContext, data: pd.DataFrame) -> pd.DataFrame:
        if isinstance(data, pd.DataFrame):
            predictions = pd.DataFrame(0, index=data.index, columns=[self.pred_col])
            
            # Get all the unique partition's value in the input data
            key_ids = data[self.key].unique()

            # We will run 1 predict call per each partition
            for key_id in key_ids:
                input_data_idx = data[self.key] == key_id

                if self.transformations:
                    columns = [name.split('__')[1] for name in self.transformations.get_feature_names_out()]
                    transformed_data = pd.DataFrame(self.transformations.transform(data[input_data_idx]), columns=columns)
                else:
                    transformed_data = data[input_data_idx]
                predictions[input_data_idx] = self.models[key_id].predict(transformed_data.loc[:, transformed_data.columns != self.key]).reshape(-1,1)

            return predictions
        
        raise TypeError("This implementation can only take pandas DataFrame as inputs")

## Model ensemble training

It's time now to train our model. Let's start by configuring the run with MLflow:

In [14]:
mlflow.set_experiment(experiment_name="m5-forecasting-mlflow")

<Experiment: artifact_location='', creation_time=1687962235375, experiment_id='a0ca7d8e-43e1-4d8f-aa9e-7eb636b58fc7', last_update_time=None, lifecycle_stage='active', name='m5-forecasting-mlflow', tags={}>

We will use the following training routine which will train all the models, one per each partition. It does the following:

1. It gets all the different values for the stores in the dataset. There are 10 different stores.
2. For each of them:

    a. We start a new child run in MLflow. We will have one child run per each store. This makes comparation and evaluation much easier. 
    b. Selects the indices of the data that belong to this particular store. This is done for training and validation datasets.
    c. Calls the `train_and_evaluate()` function passing as arguments the features (all of them but the partition key column), the training indices and the test indices for the given store.
    d. The function returns the trained model and the associated performance of it. Autolog is taking care of all the logging so we don't do much here.
    e. In a dictionary called `artifacts`, we will record which is the artifact associated with the given store. The artifact is indicated in the form `runs:/<RUN_ID>/<ARTIFACT_PATH`. The reason for that is that this makes very clear from where the associated model came from, which we can use for lineage purposes later.

In [15]:
from tqdm import tqdm

with mlflow.start_run() as run:
    models = {}
    model_error = {}
    artifacts = {}

    partition_key = 'store_id'
    model_features = features.columns.difference([partition_key]).values
    stores = features[partition_key].unique()

    mlflow.lightgbm.autolog(log_datasets=False, log_models=False)

    for store in tqdm(stores):
        store_id = get_encoded_value(transforms, partition_key, store)
        with mlflow.start_run(run_name=f"train_store_{store_id}", nested=True) as store_run:
            store_train_idx = (train_idxs) & (features[partition_key] == store)
            store_valid_idx = (valid_idxs) & (features[partition_key] == store)
            models[store_id], model_error[store_id] = train_and_evaluate(features=features.loc[:, features.columns != partition_key],
                                                                         target=target, 
                                                                         train_idxs=store_train_idx, 
                                                                         valid_idxs=store_valid_idx,
                                                                         params=params,
                                                                         model_name=store_id)

            artifacts[store_id] = f"runs:/{store_run.info.run_id}/{store_id}"

    mlflow.pyfunc.log_model("model", 
                            python_model=PartitionedModelEnsemble(partition_key, target, transforms),
                            artifacts=artifacts,
                            registered_model_name="m5-forcasting-partitioned")

100%|██████████| 10/10 [56:54<00:00, 341.50s/it]
2023/07/06 00:45:42 INFO mlflow.types.utils: Unsupported type hint: <class 'pandas.core.frame.DataFrame'>, skipping schema inference
2023/07/06 00:45:42 INFO mlflow.types.utils: Unsupported type hint: <class 'pandas.core.frame.DataFrame'>, skipping schema inference
Registered model 'm5-forcasting-partitioned' already exists. Creating a new version of this model...
2023/07/06 00:46:43 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: m5-forcasting-partitioned, version 2
Created version '2' of model 'm5-forcasting-partitioned'.


Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[762]	training's rmse: 3.34108	valid_1's rmse: 3.28104
Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[1063]	training's rmse: 2.72586	valid_1's rmse: 3.22566
Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[1119]	training's rmse: 1.76246	valid_1's rmse: 2.21233
Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[762]	training's rmse: 3.49055	valid_1's rmse: 3.46766
Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[415]	training's rmse: 3.63284	valid_1's rmse: 2.89144
Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[1245]	training's rmse: 2.80333	valid_1's rmse: 3.46991
Training until validation scores don't improve for 10 rounds
Early stopping, best iteration

Notice that at the end of the routine, we are logging a new model in a very particular way. We are logging an instance of our custom model, `PartitionedModelEnsemble`. Pay closer attention about how artifacts is indicated. Artifacts are used to indicate all the models this aggregator model ensembles together. When artifacts are indicated in the form of `runs:/`, MLflow will automatically pull those artifacts from the MLflow server and log them inside of the model as a single unit. So you still have a single unit and reproducible asset that you can move along. However, the advantage of doing it this way is that you can easily see from where these models came from.

## Running the model

Let's see now how this model works in practice when running inference. Let's start by pulling the model from the registry:

In [6]:
model = mlflow.pyfunc.load_model(f"models:/m5-forcasting-partitioned/latest")

Let's build a sample dataset we can use to test this out:

In [4]:
sales = pd.read_csv('data/sales_train_validation.csv')
calendar = pd.read_csv('data/calendar.csv')
prices = pd.read_csv('data/sell_prices.csv')
data = build_dataset(sales, calendar, prices)

Let's remove the target column:

In [12]:
data.drop(columns=["demand"], inplace=True)

Let's create the input data. We will create a copy of it:

In [13]:
input_data = data.sample(n=1000).copy().reset_index()

In [18]:
model.predict(input_data)

Unnamed: 0,demand
0,-0.146778
1,0.381161
2,0.071890
3,-0.024440
4,0.070622
...,...
995,0.186330
996,0.059465
997,0.116552
998,1.126394


Notice how the signature of the "downstream models" is different from the one in the aggregator. Particularly, the downstream models 1) take transformed data as inputs and 2) the column `store_id` is not required.

In [25]:
model._model_impl.python_model.models["CA_1"].metadata.signature

inputs: 
  ['item_id': double, 'dept_id': double, 'cat_id': double, 'store_id': double, 'state_id': double, 'event_name_1': double, 'event_type_1': double, 'event_name_2': double, 'event_type_2': double, 'wday': double, 'month': double, 'snap_CA': double, 'snap_TX': double, 'snap_WI': double, 'sell_price': double]
outputs: 
  [Tensor('float64', (-1,))]