# Deploy an MLflow `pyfunc` model with Model Serving

In this notebook, learn how to deploy MLflow `pyfunc` to a serving endpoint. MLflow pyfunc offers greater flexibility and customization to your deployment. You can run any custom model, add preprocessing or post-processing logic, or execute any arbitrary Python code. While using the MLflow built-in flavor is recommended for optimal performance, you can use MLflow `pyfunc` where more customization is required. The following demonstrates how to deploy a GPT-2 model with `pyfunc` to a GPU endpoint, but the workflow outlined below also works with various other types of models to either CPU or GPU endpoints.

## Install and import libraries 

In [1]:
%pip install --upgrade mlflow -q

Note: you may need to restart the kernel to use updated packages.


In [2]:
import pandas as pd
import numpy as np
import requests
import json
import mlflow
from mlflow.models import infer_signature
from mlflow.tracking import MlflowClient
from sklearn.ensemble import RandomForestRegressor

import warnings

warnings.filterwarnings("ignore")

In [3]:
DOW_MODEL_NAME_PREFIX = "DOW_model_"
MME_MODEL_NAME = "MME_DOW_model"

## 1 - Create Some Sample Models

#### 1.1 - Create Dummy Data

In [4]:
def create_weekly_dataset(n_dates, n_observations_per_date):
    rng = pd.date_range(start="today", periods=n_dates, freq="D")
    df = pd.DataFrame(
        np.random.randn(n_dates * n_observations_per_date, 4),
        columns=["x1", "x2", "x3", "y"],
        index=np.tile(rng, n_observations_per_date),
    )
    df["dow"] = df.index.dayofweek
    return df


DF = create_weekly_dataset(n_dates=30, n_observations_per_date=500)
print(DF.shape)
DF.head()

(15000, 5)


Unnamed: 0,x1,x2,x3,y,dow
2024-01-25 11:52:32.719166,1.211154,0.051818,-0.046877,0.245944,3
2024-01-26 11:52:32.719166,-0.277421,0.260553,-0.436572,-1.23058,4
2024-01-27 11:52:32.719166,1.280508,-0.502804,-0.518665,-2.165463,5
2024-01-28 11:52:32.719166,1.371376,-0.755066,0.743234,1.78751,6
2024-01-29 11:52:32.719166,0.972065,-0.268332,2.404934,0.156428,0


#### 1.2 - Train Models for Each Day of the Week

In [5]:
for dow in DF["dow"].unique():
    # Create dataset corresponding to a single day of the week
    X = DF.loc[DF["dow"] == dow]
    X.pop("dow")  # Remove DOW as a predictor column
    y = X.pop("y")

    # Fit our DOW model
    model = RandomForestRegressor().fit(X, y)

    # Infer signature of the model
    signature = infer_signature(X, model.predict(X))

    with mlflow.start_run():
        model_path = f"model_{dow}"

        # Log our DOW model with signature
        mlflow.sklearn.log_model(model, model_path, signature=signature)
        mlflow.set_tag("dow", dow)

        # Register our DOW model
        model_uri = f"runs:/{mlflow.active_run().info.run_id}/{model_path}"
        mlflow.register_model(model_uri, f"{DOW_MODEL_NAME_PREFIX}{dow}")

Successfully registered model 'DOW_model_3'.
Created version '1' of model 'DOW_model_3'.
Successfully registered model 'DOW_model_4'.
Created version '1' of model 'DOW_model_4'.
Successfully registered model 'DOW_model_5'.
Created version '1' of model 'DOW_model_5'.
Successfully registered model 'DOW_model_6'.
Created version '1' of model 'DOW_model_6'.
Successfully registered model 'DOW_model_0'.
Created version '1' of model 'DOW_model_0'.
Successfully registered model 'DOW_model_1'.
Created version '1' of model 'DOW_model_1'.
Successfully registered model 'DOW_model_2'.
Created version '1' of model 'DOW_model_2'.


#### 1.3 - Test inference on our DOW models

In [6]:
client = MlflowClient()

# Load Tuesday's model
tuesday_dow = 1
model_name = f"{DOW_MODEL_NAME_PREFIX}{tuesday_dow}"
model_uri = f"models:/{model_name}/latest"
model = mlflow.sklearn.load_model(model_uri)

# Perform inference using our training data for Tuesday
predictor_columns = [column for column in DF.columns if column not in {"y", "dow"}]
head_of_training_data = DF.loc[DF["dow"] == tuesday_dow, predictor_columns].head()
tuesday_fitted_values = model.predict(head_of_training_data)
print(tuesday_fitted_values)

[ 0.64537186 -0.56991395  0.28764513 -0.16968807  0.28271733]


## 2 - Create an MME Custom PyFunc

#### 2.1 - Create our Custom PyFunc

In [7]:
class DOWModel(mlflow.pyfunc.PythonModel):
    def __init__(self, model_uris):
        self.model_uris = model_uris
        self.models = {}

    @staticmethod
    def _model_uri_to_dow(model_uri: str) -> int:
        return int(model_uri.split("/")[-2].split("_")[-1])

    def load_context(self, context):
        self.models = {
            self._model_uri_to_dow(model_uri): mlflow.sklearn.load_model(model_uri)
            for model_uri in self.model_uris
        }

    def predict(self, context, model_input, params):
        # Parse the dow parameter
        dow = params.get("dow")
        if dow is None:
            raise ValueError(f"DOW param is not passed.")

        # Get the model associated with the dow parameter
        model = self.models.get(dow)
        if model is None:
            raise ValueError(f"Model {dow} version was not found: {self.models.keys()}.")

        # Perform inference
        return model.predict(model_input)

#### 2.2 - Test our Custom PyFunc

In [8]:
head_of_training_data

Unnamed: 0,x1,x2,x3
2024-01-30 11:52:32.719166,0.707506,0.566814,0.867072
2024-02-06 11:52:32.719166,-0.394847,-0.579888,0.497493
2024-02-13 11:52:32.719166,0.875665,-0.571626,-0.434631
2024-02-20 11:52:32.719166,-1.451809,-1.435772,1.819538
2024-01-30 11:52:32.719166,-0.026807,0.58368,1.155405


In [9]:
# Instantiate our DOW MME
model_uris = [f"models:/{DOW_MODEL_NAME_PREFIX}{i}/latest" for i in DF["dow"].unique()]
dow_model = DOWModel(model_uris)
dow_model.load_context(None)
print("Model URIs:")
print(model_uris)

# Perform inference using our training data for Tuesday
params = {"dow": 1}
mme_tuesday_fitted_values = dow_model.predict(None, head_of_training_data, params=params)
assert all(tuesday_fitted_values == mme_tuesday_fitted_values)

print("\nTuesday fitted values:")
print(mme_tuesday_fitted_values)

Model URIs:
['models:/DOW_model_3/latest', 'models:/DOW_model_4/latest', 'models:/DOW_model_5/latest', 'models:/DOW_model_6/latest', 'models:/DOW_model_0/latest', 'models:/DOW_model_1/latest', 'models:/DOW_model_2/latest']

Tuesday fitted values:
[ 0.64537186 -0.56991395  0.28764513 -0.16968807  0.28271733]


#### 2.3 - Register our PyFunc

In [10]:
with mlflow.start_run():
    # Instantiate the custom pyfunc model
    model = DOWModel(model_uris)
    model.load_context(None)
    model_path = "MME_model_path"

    signature = infer_signature(
        model_input=head_of_training_data,
        model_output=tuesday_fitted_values,
        params=params,
    )
    print(signature)

    # Log the model to the experiment
    mlflow.pyfunc.log_model(
        model_path,
        python_model=model,
        signature=signature,
        registered_model_name=MME_MODEL_NAME,  # also register the model for easy access
    )

    # Set some relevant information about our model
    # (Assuming model has a property 'models' that can be counted)
    mlflow.log_param("num_models", len(model.models))

inputs: 
  ['x1': double, 'x2': double, 'x3': double]
outputs: 
  [Tensor('float64', (-1,))]
params: 
  ['dow': long (default: 1)]



Successfully registered model 'MME_DOW_model'.
Created version '1' of model 'MME_DOW_model'.


## 3 - Serve our Model
To test our endpoint, let's serve our model on our local machine. 
1. Open a new shell window in the root containing `mlruns` directory e.g. the same directory you ran this notebook.
2. Ensure mlflow is installed: `pip install --upgrade mlflow`
3. Run the bash command printed below.

In [11]:
PORT = 1234
print(
    f'Run the below command in a new window. You must be in the same repo as your mlruns directory and have mlflow installed...\n\tmlflow models serve -m "models:/{MME_MODEL_NAME}/latest" -p {PORT}'
)

Run the below command in a new window. You must be in the same repo as your mlruns directory and have mlflow installed...
	mlflow models serve -m "models:/MME_DOW_model/latest" -p 1234


## 4 - Query our Served Model

In [12]:
def score_model(pdf, params):
    headers = {"Content-Type": "application/json"}
    url = f"http://127.0.0.1:{PORT}/invocations"
    ds_dict = {"dataframe_split": pdf, "params": params}
    data_json = json.dumps(ds_dict, allow_nan=True)

    response = requests.request(method="POST", headers=headers, url=url, data=data_json)
    response.raise_for_status()

    return response.json()


print("Inference on dow model 1 (Tuesday):")
inference_df = head_of_training_data.reset_index(drop=True).to_dict(orient="split")
print(score_model(inference_df, params={"dow": 1}))

Inference on dow model 1 (Tuesday):
{'predictions': [-0.44137002976164835, -0.2676824459583504, 0.28058275226687557, 0.11478868205899269, -0.008908641826345398]}
