In [1]:
import os
import torch
from botorch.models import SingleTaskGP
from botorch.fit import fit_gpytorch_mll
from gpytorch.mlls import ExactMarginalLogLikelihood
from lume_model.variables import ScalarVariable, DistributionVariable
from lume_model.models.gp_model import GPModel
from gpytorch.kernels import RBFKernel
from gpytorch.kernels import ScaleKernel

import mlflow

# run below for local tracking: (see https://mlflow.org/docs/latest/getting-started/intro-quickstart/)
# use whatever port is not being used
# > mlflow server --host 127.0.0.1 --port 8082 --gunicorn-opts "--timeout=60"
os.environ["MLFLOW_TRACKING_URI"] = (
    "http://127.0.0.1:8082"  # or whatever port you use above
)

# Multi-output example

In [2]:
torch.manual_seed(0)
# Create training data, 1 input, 3 outputs
train_x = torch.rand(5, 1)
train_y = torch.stack(
    (
        torch.sin(train_x * (2 * torch.pi)) + 0.1 * torch.randn(1),
        torch.cos(train_x * (2 * torch.pi)) + 0.1 * torch.randn(1),
        torch.sin(2 * train_x * (2 * torch.pi)) + 0.1 * torch.randn(1),
    ),
    dim=-1,
).squeeze(1)


# Initialize the GP model
rbf_kernel = ScaleKernel(RBFKernel())

model = SingleTaskGP(
    train_x.to(dtype=torch.double),
    train_y.to(dtype=torch.double),
    covar_module=rbf_kernel,
)

# Fit the model
mll = ExactMarginalLogLikelihood(model.likelihood, model)
fit_gpytorch_mll(mll)

# Derive posterior mean and variance
model.eval()

test_x = torch.rand(4, 10, 1)
posterior = model.posterior(test_x)

# Derive the posterior mean and variance for each output
mean = posterior.mean
variance = posterior.variance

## LUME-Model import

In [3]:
# Define input variables
input_variables = [ScalarVariable(name="x")]

# Define output variables
# Currently the "distribution_type" field doesn't do anything
output_variables = [
    DistributionVariable(name="output1"),
    DistributionVariable(name="output2"),
    DistributionVariable(name="output3"),
]

# Create lume_model instance
gp_lume_model = GPModel(
    model=model, input_variables=input_variables, output_variables=output_variables
)

### Evaluate model and run methods

In [4]:
input_dict = {"x": test_x.to(dtype=torch.double).squeeze(0)}

In [5]:
# Evaluate function returns a dictionary mapping each output to a torch.distributions.Distribution
output_dict = gp_lume_model.evaluate(input_dict)

# Register model to MLflow

See function signature for reference:

In [6]:
gp_lume_model.register_to_mlflow?

[0;31mSignature:[0m
[0mgp_lume_model[0m[0;34m.[0m[0mregister_to_mlflow[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0minput_dict[0m[0;34m:[0m [0mdict[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mtyping[0m[0;34m.[0m[0mUnion[0m[0;34m[[0m[0mfloat[0m[0;34m,[0m [0mtorch[0m[0;34m.[0m[0mTensor[0m[0;34m][0m[0;34m][0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0martifact_path[0m[0;34m:[0m [0mstr[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mregistered_model_name[0m[0;34m:[0m [0mstr[0m [0;34m|[0m [0;32mNone[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mtags[0m[0;34m:[0m [0mdict[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mtyping[0m[0;34m.[0m[0mAny[0m[0;34m][0m [0;34m|[0m [0;32mNone[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mversion_tags[0m[0;34m:[0m [0mdict[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mtyping[0m[0;34m.[0m[0mAny[0m[0;34m][0m [0;34m|[0m [0;32mNone[0m [0;34m=

In [7]:
model_info = gp_lume_model.register_to_mlflow(
    input_dict=input_dict,
    artifact_path="lume-model-multi-output-gp",
    registered_model_name="lume-model-multi-output-gp",  # not always necessary but required for adding tags/aliases
    tags={"type": "test"},  # example, if desired
    version_tags={"status": "deploy"},  # example, if desired
    alias="latest-gp",  # example, if desired
    run_name="lume-test",  # will be generated randomly if not provided
)

🏃 View run lume-test at: http://127.0.0.1:8082/#/experiments/0/runs/5eaf5a2a90434e1bad7b897205f841ad
🧪 View experiment at: http://127.0.0.1:8082/#/experiments/0


Successfully registered model 'lume-model-multi-output-gp'.
Created version '1' of model 'lume-model-multi-output-gp'.


When calling `gp_lume_model.register_to_mlflow` again with the same `registered_model_name`, the model version will be incremented.

# Predict using loaded model

 Note that currently the loaded model requires a `dict[str, np.ndarray]`, and does not support tensors

In [8]:
input_dict_np = {k: v.numpy() for k, v in input_dict.items()}
version = model_info.registered_model_version
gp_model_saved = f"models:/lume-model-multi-output-gp/{version}"
gp_model_saved = mlflow.pyfunc.load_model(gp_model_saved)
prediction = gp_model_saved.predict(input_dict_np)
prediction

{'output1': MultivariateNormal(loc: torch.Size([4, 10]), covariance_matrix: torch.Size([4, 10, 10])),
 'output2': MultivariateNormal(loc: torch.Size([4, 10]), covariance_matrix: torch.Size([4, 10, 10])),
 'output3': MultivariateNormal(loc: torch.Size([4, 10]), covariance_matrix: torch.Size([4, 10, 10]))}

In [9]:
# check that outputs match
assert (output_dict["output1"].mean[0] == prediction["output1"].mean[0]).all()

# Logging other metrics/artifacts

Note that the `lume_model.register_to_mlflow` ends the run automatically, but if you'd like to go back and update it, e.g. log an artifact, you can do so as follows:

In [10]:
run_id = model_info.run_id
with mlflow.start_run(run_id=run_id) as run:
    # log some metric
    mlflow.log_metric("random_number", 5)
    # log some local file
    mlflow.log_artifact("./gp_model-mlflow.ipynb")

🏃 View run lume-test at: http://127.0.0.1:8082/#/experiments/0/runs/5eaf5a2a90434e1bad7b897205f841ad
🧪 View experiment at: http://127.0.0.1:8082/#/experiments/0
