In [1]:
!pip install "transformers" "mlflow-skinny" "ray[serve]" "torch"

Collecting mlflow-skinny
  Downloading mlflow_skinny-2.20.3-py3-none-any.whl.metadata (31 kB)
Collecting ray[serve]
  Downloading ray-2.43.0-cp311-cp311-manylinux2014_x86_64.whl.metadata (19 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny)
  Downloading databricks_sdk-0.44.1-py3-none-any.whl.metadata (38 kB)
Collecting py-spy>=0.2.0 (from ray[serve])
  Downloading py_spy-0.4.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (16 kB)
Collecting uvicorn[standard] (from ray[serve])
  Downloading uvicorn-0.34.0-py3-none-any.whl.metadata (6.5 kB)
Collecting opencensus (from ray[serve])
  Downloading opencensus-0.11.4-py2.py3-none-any.whl.metadata (12 kB)
Collecting virtualenv!=20.21.1,>=20.0.24 (from ray[serve])
  Downloading virtualenv-20.29.2-py3-none-any.whl.metadata (4.5 kB)
Collecting fastapi (from ray[serve])
  Downloading fastapi-0.115.11-py3-none-any.whl.metadata (27 kB)
Collecting starlette (from ray[serve])
  Downloading starlette-0.46.0-py3-none-any.w

In [2]:
import mlflow
from transformers import pipeline


class MyTranslationModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        self.lang_from = context.model_config.get("lang_from", "en")
        self.lang_to = context.model_config.get("lang_to", "de")

        self.input_label: str = context.model_config.get("input_label", "prompt")

        self.model_ref: str = context.model_config.get("hfhub_name", "google-t5/t5-base")

        self.pipeline = pipeline(
            f"translation_{self.lang_from}_to_{self.lang_to}",
            self.model_ref,
        )

    def predict(self, context, model_input, params=None):
        prompt = model_input[self.input_label].tolist()

        return self.pipeline(prompt)



In [3]:
import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "translation_model",
        registered_model_name="translation_model",
        python_model=MyTranslationModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "prompt": ["Hello my name is Jonathan."],
            }
        ),
        model_config={
            "hfhub_name": "google-t5/t5-base",
            "lang_from": "en",
            "lang_to": "de",
        },
    )

2025/03/06 03:20:39 INFO mlflow.pyfunc: Inferring model signature from input example
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/892M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

Device set to use cpu
Device set to use cpu
Successfully registered model 'translation_model'.
Created version '1' of model 'translation_model'.


In [4]:
en_to_de_version: str = str(model_info.registered_model_version)

In [5]:
print(model_info.signature)

inputs: 
  ['prompt': string (required)]
outputs: 
  ['translation_text': string (required)]
params: 
  None



In [6]:
import mlflow
import pandas as pd

from ray import serve
from fastapi import FastAPI

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class ModelDeployment:
    def __init__(self, model_name: str = "translation_model", default_version: str = "1"):
        self.model_name = model_name
        self.default_version = default_version

        self.model = mlflow.pyfunc.load_model(f"models:/{self.model_name}/{self.default_version}")

    @app.post("/serve")
    async def serve(self, input_string: str):
        return self.model.predict(pd.DataFrame({"prompt": [input_string]}))


deployment = ModelDeployment.bind(default_version=en_to_de_version)

In [7]:
serve.run(deployment, blocking=False)

2025-03-06 03:21:16,181	INFO worker.py:1832 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
[36m(ProxyActor pid=1116)[0m INFO 2025-03-06 03:21:19,614 proxy 172.28.0.12 -- Proxy starting on node 338a4af4e1a7c745dda246e1c76b83fd4ee3e6a0e599df22d0adfce4 (HTTP port: 8000).
[36m(ProxyActor pid=1116)[0m INFO 2025-03-06 03:21:19,706 proxy 172.28.0.12 -- Got updated endpoints: {}.
INFO 2025-03-06 03:21:19,891 serve 134 -- Started Serve in namespace "serve".
[36m(ServeController pid=1115)[0m INFO 2025-03-06 03:21:20,132 controller 1115 -- Deploying new version of Deployment(name='ModelDeployment', app='default') (initial target replicas: 1).
[36m(ProxyActor pid=1116)[0m INFO 2025-03-06 03:21:20,144 proxy 172.28.0.12 -- Got updated endpoints: {Deployment(name='ModelDeployment', app='default'): EndpointInfo(route='/', app_is_cross_language=False)}.
[36m(ServeController pid=1115)[0m INFO 2025-03-06 03:21:20,261 controller 1115 -- Adding 1 

DeploymentHandle(deployment='ModelDeployment')

In [8]:
import requests

response = requests.post(
    "http://127.0.0.1:8000/serve/",
    params={"input_string": "The weather is lovely today"},
)

print(response.json())

[36m(ServeReplica:default:ModelDeployment pid=1272)[0m INFO 2025-03-06 03:21:35,328 default_ModelDeployment jw89ukks 674122f9-3696-414f-8534-357e966ab94d -- POST /serve/ 307 2.2ms


[{'translation_text': 'Das Wetter ist heute nett.'}]


In [9]:
from ray import serve
from fastapi import FastAPI

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MultiplexedModelDeployment:

    @serve.multiplexed(max_num_models_per_replica=2)
    async def get_model(self, version: str):
        return mlflow.pyfunc.load_model(f"models:/{self.model_name}/{version}")

    def __init__(
        self,
        model_name: str = "translation_model",
        default_version: str = en_to_de_version,
    ):
        self.model_name = model_name
        self.default_version = default_version

    @app.post("/serve")
    async def serve(self, input_string: str):
        model = await self.get_model(serve.get_multiplexed_model_id())
        return model.predict(pd.DataFrame({"prompt": [input_string]}))

[36m(ServeReplica:default:ModelDeployment pid=1272)[0m INFO 2025-03-06 03:21:36,233 default_ModelDeployment jw89ukks b711117a-e2f7-4134-a4d0-e573f6a5be73 -- POST /serve 200 899.4ms


In [10]:
import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "translation_model",
        registered_model_name="translation_model",
        python_model=MyTranslationModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "prompt": [
                    "Hello my name is Jon.",
                ],
            }
        ),
        model_config={
            "hfhub_name": "google-t5/t5-base",
            "lang_from": "en",
            "lang_to": "fr",
        },
    )

en_to_fr_version: str = str(model_info.registered_model_version)

2025/03/06 03:21:36 INFO mlflow.pyfunc: Inferring model signature from input example
Device set to use cpu
Device set to use cpu
Registered model 'translation_model' already exists. Creating a new version of this model...
Created version '2' of model 'translation_model'.


In [11]:
import requests

response = requests.post(
    "http://127.0.0.1:8000/serve/",
    params={"input_string": "The weather is lovely today"},
    headers={"serve_multiplexed_model_id": en_to_fr_version},
)

print(response.json())

[36m(ServeReplica:default:ModelDeployment pid=1272)[0m INFO 2025-03-06 03:21:46,138 default_ModelDeployment jw89ukks 0deb8d51-0523-4f6e-9f49-a21e87b929d4 -- POST /serve/ 307 4.0ms


[{'translation_text': 'Das Wetter ist heute nett.'}]


In [12]:
print(
    requests.post(
        "http://127.0.0.1:8000/serve/",
        params={"input_string": "The weather is lovely today"},
        headers={"serve_multiplexed_model_id": en_to_de_version},
    ).json()
)

[36m(ServeReplica:default:ModelDeployment pid=1272)[0m INFO 2025-03-06 03:21:48,943 default_ModelDeployment jw89ukks 1d651c15-9d5e-4994-9f7e-32da445dd3ee -- POST /serve 200 789.8ms
[36m(ServeReplica:default:ModelDeployment pid=1272)[0m INFO 2025-03-06 03:21:48,957 default_ModelDeployment jw89ukks 82f7c621-5410-48e8-86b6-8ee0ccae40ba -- POST /serve/ 307 1.8ms


[{'translation_text': 'Das Wetter ist heute nett.'}]


In [13]:
import mlflow
import pydantic


def schema_to_pydantic(schema: mlflow.types.schema.Schema, *, name: str) -> pydantic.BaseModel:
    return pydantic.create_model(
        name, **{k: (v.type.to_python(), pydantic.Field(required=True)) for k, v in schema.input_dict().items()}
    )


def get_req_resp_signatures(
    model_signature: mlflow.models.ModelSignature,
) -> tuple[pydantic.BaseModel, pydantic.BaseModel]:
    inputs: mlflow.types.schema.Schema = model_signature.inputs
    outputs: mlflow.types.schema.Schema = model_signature.outputs

    return (schema_to_pydantic(inputs, name="InputModel"), schema_to_pydantic(outputs, name="OutputModel"))

In [14]:
import mlflow

from fastapi import FastAPI, Response, status
from ray import serve
from typing import List


def deployment_from_model_name(model_name: str, default_version: str = "1"):
    app = FastAPI()
    model_info = mlflow.models.get_model_info(f"models:/{model_name}/{default_version}")
    input_datamodel, output_datamodel = get_req_resp_signatures(model_info.signature)

    @serve.deployment
    @serve.ingress(app)
    class DynamicallyDefinedDeployment:

        MODEL_NAME: str = model_name
        DEFAULT_VERSION: str = default_version

        @serve.multiplexed(max_num_models_per_replica=2)
        async def get_model(self, model_version: str):
            model = mlflow.pyfunc.load_model(f"models:/{self.MODEL_NAME}/{model_version}")

            if model.metadata.get_model_info().signature != model_info.signature:
                raise ValueError(
                    f"Requested version {model_version} has signature incompatible with that of default version {self.DEFAULT_VERSION}"
                )
            return model

        # TODO: Extend this to support batching (lists of inputs and outputs)
        @app.post("/serve", response_model=List[output_datamodel])
        async def serve(self, model_input: input_datamodel, response: Response):
            model_id = serve.get_multiplexed_model_id()
            if model_id == "":
                model_id = self.DEFAULT_VERSION

            try:
                model = await self.get_model(model_id)
            except ValueError:
                response.status_code = status.HTTP_409_CONFLICT
                return [{"translation_text": "FAILED"}]

            return model.predict(model_input.dict())

    return DynamicallyDefinedDeployment


deployment = deployment_from_model_name("translation_model", default_version=en_to_fr_version)

serve.run(deployment.bind(), blocking=False)

INFO 2025-03-06 03:21:51,864 serve 134 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
[36m(ServeReplica:default:ModelDeployment pid=1272)[0m INFO 2025-03-06 03:21:51,793 default_ModelDeployment jw89ukks 15eb00d5-beb9-468b-9a9e-18d514dfc889 -- POST /serve 200 820.8ms
[36m(ServeController pid=1115)[0m INFO 2025-03-06 03:21:51,933 controller 1115 -- Deploying new version of Deployment(name='DynamicallyDefinedDeployment', app='default') (initial target replicas: 1).
[36m(ProxyActor pid=1116)[0m INFO 2025-03-06 03:21:51,936 proxy 172.28.0.12 -- Got updated endpoints: {Deployment(name='DynamicallyDefinedDeployment', app='default'): EndpointInfo(route='/', app_is_cross_language=False)}.
[36m(ServeController pid=1115)[0m INFO 2025-03-06 03:21:52,037 controller 1115 -- Removing 1 replica from Deployment(name='ModelDeployment', app='default').
[36m(ServeController pid=1115)[0m INFO 2025-03-06 03:21:52,037 controller 1115 -- Adding 1 repl

DeploymentHandle(deployment='DynamicallyDefinedDeployment')

In [15]:
import requests

resp = requests.post(
    "http://127.0.0.1:8000/serve/",
    json={"prompt": "The weather is lovely today"},
)

assert resp.ok
assert resp.status_code == 200

print(resp.json())

[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:21:56,025 default_DynamicallyDefinedDeployment 1idie5ic cb4fd687-931a-4cc2-8397-92075136766d -- POST /serve/ 307 2.4ms
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:21:56,040 default_DynamicallyDefinedDeployment 1idie5ic eeb72040-abaf-419f-b46c-8600e6792b54 -- Loading model '2'.
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m 2025-03-06 03:22:01.938353: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m E0000 00:00:1741231321.955701    1501 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=145

[{'translation_text': "Le temps est beau aujourd'hui"}]


In [16]:
import requests

resp = requests.post(
    "http://127.0.0.1:8000/serve/",
    json={"prompt": "The weather is lovely today"},
    headers={"serve_multiplexed_model_id": str(en_to_fr_version)},
)

assert resp.ok
assert resp.status_code == 200

print(resp.json())

[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:22:08,737 default_DynamicallyDefinedDeployment 1idie5ic eeb72040-abaf-419f-b46c-8600e6792b54 -- POST /serve 200 12703.6ms
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:22:08,743 default_DynamicallyDefinedDeployment 1idie5ic ab51ab1b-ffa6-4f13-93bc-d5673f485e23 -- POST /serve/ 307 2.2ms


[{'translation_text': "Le temps est beau aujourd'hui"}]


In [17]:
import pandas as pd

with mlflow.start_run():
    incompatible_version = str(
        mlflow.pyfunc.log_model(
            "translation_model",
            registered_model_name="translation_model",
            python_model=MyTranslationModel(),
            pip_requirements=["transformers"],
            input_example=pd.DataFrame(
                {
                    "text_to_translate": [
                        "Hello my name is Jon.",
                    ],
                }
            ),
            model_config={
                "input_label": "text_to_translate",
                "hfhub_name": "google-t5/t5-base",
                "lang_from": "en",
                "lang_to": "de",
            },
        ).registered_model_version
    )

[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:22:09,943 default_DynamicallyDefinedDeployment 1idie5ic 6ab0dd49-4056-42dd-a173-fa4ea17d0f6c -- POST /serve 200 1041.8ms
2025/03/06 03:22:09 INFO mlflow.pyfunc: Inferring model signature from input example
Device set to use cpu
Device set to use cpu
Registered model 'translation_model' already exists. Creating a new version of this model...
Created version '3' of model 'translation_model'.


In [18]:
import requests

resp = requests.post(
    "http://127.0.0.1:8000/serve/",
    json={"prompt": "The weather is lovely today"},
    headers={"serve_multiplexed_model_id": incompatible_version},
)
assert not resp.ok
resp.status_code == 409

assert resp.json()[0]["translation_text"] == "FAILED"

[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:22:23,195 default_DynamicallyDefinedDeployment 1idie5ic 539772a4-da07-47e7-91cd-2117e30d74d7 -- POST /serve/ 307 1.8ms
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:22:25,217 default_DynamicallyDefinedDeployment 1idie5ic 2fdf3709-fa0d-4265-9cdb-6ac84fb858b2 -- Loading model '3'.
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m Device set to use cpu
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m ERROR 2025-03-06 03:22:26,402 default_DynamicallyDefinedDeployment 1idie5ic 2fdf3709-fa0d-4265-9cdb-6ac84fb858b2 -- Failed to load model '3'. Error: Requested version 3 has signature incompatible with that of default version 2


In [19]:
import mlflow
from transformers import pipeline


class QuestionAnswererModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):

        self.model_context = context.model_config.get(
            "model_context",
            "My name is Hans and I live in Germany.",
        )
        self.model_name = context.model_config.get(
            "model_name",
            "deepset/roberta-base-squad2",
        )

        self.tokenizer_name = context.model_config.get(
            "tokenizer_name",
            "deepset/roberta-base-squad2",
        )

        self.pipeline = pipeline(
            "question-answering",
            model=self.model_name,
            tokenizer=self.tokenizer_name,
        )

    def predict(self, context, model_input, params=None):
        resp = self.pipeline(
            question=model_input["question"].tolist(),
            context=self.model_context,
        )

        return [resp] if type(resp) is not list else resp



In [20]:
import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "question_answerer",
        registered_model_name="question_answerer",
        python_model=QuestionAnswererModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "question": [
                    "Where do you live?",
                    "What is your name?",
                ],
            }
        ),
        model_config={
            "model_context": "My name is Hans and I live in Germany.",
        },
    )

2025/03/06 03:22:26 INFO mlflow.pyfunc: Inferring model signature from input example
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:22:26,434 default_DynamicallyDefinedDeployment 1idie5ic 2fdf3709-fa0d-4265-9cdb-6ac84fb858b2 -- POST /serve 409 1222.0ms


config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/496M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/79.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

Device set to use cpu
Device set to use cpu
Successfully registered model 'question_answerer'.
Created version '1' of model 'question_answerer'.


In [21]:
print(model_info.signature)

inputs: 
  ['question': string (required)]
outputs: 
  ['score': double (required), 'start': long (required), 'end': long (required), 'answer': string (required)]
params: 
  None



In [22]:
from ray import serve

serve.run(
    deployment_from_model_name(
        "question_answerer",
        default_version=str(model_info.registered_model_version),
    ).bind(),
    blocking=False,
)

INFO 2025-03-06 03:22:41,189 serve 134 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
[36m(ServeController pid=1115)[0m INFO 2025-03-06 03:22:41,214 controller 1115 -- Deploying new version of Deployment(name='DynamicallyDefinedDeployment', app='default') (initial target replicas: 1).
[36m(ServeController pid=1115)[0m INFO 2025-03-06 03:22:41,326 controller 1115 -- Stopping 1 replicas of Deployment(name='DynamicallyDefinedDeployment', app='default') with outdated versions.
[36m(ServeController pid=1115)[0m INFO 2025-03-06 03:22:41,327 controller 1115 -- Adding 1 replica to Deployment(name='DynamicallyDefinedDeployment', app='default').
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:22:43,334 default_DynamicallyDefinedDeployment 1idie5ic -- Unloading model '2'.
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1457)[0m INFO 2025-03-06 03:22:43,335 default_DynamicallyDefinedDeploym

DeploymentHandle(deployment='DynamicallyDefinedDeployment')

In [23]:
import requests

resp = requests.post(
    "http://127.0.0.1:8000/serve/",
    json={"question": "The weather is lovely today"},
)
print(resp.json())

[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1705)[0m INFO 2025-03-06 03:22:45,233 default_DynamicallyDefinedDeployment 00fob88b 76c7e645-f042-4e3e-bc52-343665605999 -- POST /serve/ 307 2.1ms
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1705)[0m INFO 2025-03-06 03:22:45,242 default_DynamicallyDefinedDeployment 00fob88b f2d6fd6f-e9ee-4d74-a6fd-20756f6fe817 -- Loading model '1'.
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1705)[0m 2025-03-06 03:22:50.237719: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1705)[0m E0000 00:00:1741231370.257376    1749 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=170

[{'score': 3.255747287767008e-05, 'start': 30, 'end': 38, 'answer': 'Germany.'}]


[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1705)[0m INFO 2025-03-06 03:22:54,953 default_DynamicallyDefinedDeployment 00fob88b f2d6fd6f-e9ee-4d74-a6fd-20756f6fe817 -- Successfully loaded model '1' in 9711.0ms.
[36m(ServeReplica:default:DynamicallyDefinedDeployment pid=1705)[0m <ipython-input-14-a75e6e8df5cf>:43: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
