In [None]:
import os
import logging
import inspect
import tempfile
from pathlib import Path

from devtools import pprint

logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler()])

### API Configuration

In [None]:
import mlopus

mlflow_api = mlopus.mlflow.get_api(
    conf={
        "tracking_uri": None,  # Defaults to env var MLFLOW_TRACKING_URI or `~/.cache/mlflow`
        "cache_dir": None,  # Defaults to ~/.cache/mlopus/mlflow-providers/mlflow/<hashed_tracking_uri>
        "cache_local_artifacts": True,  # Allow caching artifacts when artifacts repo is local (will cause duplication!) (default is False)
    },
)

### 5. Artifact Schemas

In [None]:
# The module `mlopus.artschema` offers a simple framework for defining schemas for model and run artifacts.
# Schemas can be used explicitly as dumpers/loaders and also registered via tags to be used implicitly.

# Install the package `my-schemas` from this example (a Kernel restart is required after the install)
# This package contains a minimal example of what user code can look like when working with the module `mlopus.artschema`
%pip install -e ./code/my-schemas

In [None]:
# Check the source code for the the module `my_schemas.foobar`.
# This is a minimal implementation of artifact schemas with no meaningful
# use case, but the same pattern applies to any complex model or dataset.
try:
    from my_schemas import foobar
except ModuleNotFoundError as exc:
    print("Hint: Restart the Kernel after installing `my-schemas` for the first time")
    raise exc

print(inspect.getsource(foobar))

In [None]:
# Let's register `foobar.Schema` as the default schema for any version of the model `mlopus_example`
(model := mlflow_api.get_model("mlopus_example")).set_tags(
    mlopus.artschema.Tags().using(foobar.Schema)
)

# Observe how the model tags now indicate `foobar.Schema` as being the `default` schema for this model.
# The tags also indicate the required Python package for using `foobar.Schema`
pprint(model.tags)

# Please note that the package requirement inferrence only works for packages intalled via package manager (pip, setuptools, poetry, etc)

In [None]:
# Now we can publish a version of the model `mlopus_example` using the default schema inferred from the model tags.
with mlflow_api.get_or_create_exp("mlopus_introduction").start_run("5_artifact_schemas") as run:
    
    model_version = mlopus.artschema.log_model_version(
        run=run,
        model=model,
        artifact={"some_data": {"foo": "bar"}},  # Because the `default` schema for this model is `foobar.Schema`, this data will
                                                 # be parsed into an instance of `foobar.Artifact` and dumped using `foobar.Dumper`
    )

    print(model_version.url)
    print(version_number := model_version.version)

In [None]:
# Inspect the logged model artifact.
# Observe that the dumper configuration is saved along with the model files.
!tree {model_version.get_artifact()}

In [None]:
# Likewise, we can load the model version using the default schema inferred from the tags.
# The files will be loaded using `foobar.Loader` and the returned object will be of type `foobar.Artifact`
artifact = mlopus.artschema.load_artifact(model_version)

# Check the artifact type and content.
print(type(artifact))
pprint(artifact)

# Although it's practical to register schemas at the model level, like we just did, they can also be
# registered for each specific model version, in which case the model version tags take precedence
# over the parent model tags. The same applies when loading run artifacts with inferred schema (the 
# schemas registered for the run take precedence over the ones registered for the parent experiment).

### 6. Other usage examples of `mlopus.artschema`

In [None]:
# 6.1 Passing a custom configuration to the inferred schema
with mlflow_api.get_or_create_exp("mlopus_introduction").start_run("6.1_artschema_custom_conf") as run:
    
    model_version = mlopus.artschema.log_model_version(
        run=run,
        model=model,
        artifact={"some_data": {"x": "1", "y": 2}},  # Same as: `data=foobar.Artifact(some_data={...})`
        dumper_conf={"encoding": "UTF-8"}            # Same as: `dumper_conf=foobar.Dumper(encoding=...)`
    )

mlopus.artschema.load_artifact(
    model_version,
    loader_conf={"max_files": 1}  # Same as: `foobar.Loader(max_files=1)`
)

In [None]:
# 6.2 Exploring the available schemas for a model (same applies to experiment, run and model version)

! echo -e "\n### Check all available schemas"
pprint(mlopus.artschema.get_schemas(model))

! echo -e "\n### Get a schema by alias (fails if the alias doesn't exist)"
try:
    pprint(mlopus.artschema.get_schema(model, alias="custom_schema"))
except Exception as exc:
    pprint(exc)

! echo -e "\n### Get the default schema and load its class"
pprint(Schema := mlopus.artschema.get_schema(model).load())  # This imports the schema class, after validating the python package requirement

! echo -e "\n### Check the type of Artifact, Dumper and Loader for this schema"
pprint(Schema().Artifact)
pprint(Schema().Dumper)
pprint(Schema().Loader)

In [None]:
# 6.3 Using the dumper and loader standalone (no MLflow involved)
artifact = foobar.Artifact(some_data={"x": "1", "y": "2"})

with tempfile.TemporaryDirectory() as tmp:
    path = Path(tmp) / "my-artifact"

    # dump
    foobar.Dumper(encoding="UTF-8").dump(path, artifact)
    
    # load
    loaded = foobar.Loader(max_files=1).load(path)

pprint(loaded)

In [None]:
# 6.4 Using the dumper and loader explicitly with MLflow (no inference via tags, no need to register the schema).
#     As seen in the `Part-1` of this tutorial, the `source` for logging an artifact can be a `Path` to the pre-serialized
#     artifact or a callback that performs serialization when called with a `Path`. In this case, `foobar.Schema` is used
#     explicitly to obtain such a callback. When loading the artifact, the schema is also used to obtain a loader callback.
with mlflow_api.get_or_create_exp("mlopus_introduction").start_run("6.4_artschema_explicit") as run:
    
    model_version = model.log_version(
        run=run,
        source=foobar.Schema().get_dumper(
            artifact={"some_data": {"x": "1", "y": "2"}},  # This data will be parsed into `foobar.Artifact` (an instance is also accepted)
            encoding="UTF-8",  # This settings will be passed to `foobar.Dumper` (an instance or dict is also accepted)
        ),
    )

model_version.load_artifact(
    loader=foobar.Schema().get_loader(
        max_files=1,  # This settings will be passed to `foobar.Loader` (an instance or dict is also accepted)
    ),
)

In [None]:
# 6.5 Some models can be cumbersome to hold in memory and dump "all at once" because of size and complexity.
#     In such cases, it's fine to leave the `_dump` method of the `Dumper` unimplemented and write model files
#     using a specialized pipeline. The respective `Dumper` for the registered schema is still used at publish
#     time to verify that the files comply with the expected format, but the `_dump` method is never called.

# Let's import this slightly different version of the `foobar` module
from my_schemas import foobar_no_dump

# Observe that the `Dumper` is tweaked so that the `_dump` method won't work
print(inspect.getsource(foobar_no_dump.Dumper))

# Register `foobar_no_dump.Schema` as an alternative schema for the model `mlopus_example` under the alias `no-dump`
model.set_tags(
    mlopus.artschema.Tags() \
        .using(foobar_no_dump.Schema, aliased_as="no-dump")
)

# Start a new experiment run
with mlflow_api.get_or_create_exp("mlopus_introduction").start_run("6.5_artschema_no_dump") as run:
    with tempfile.TemporaryDirectory() as tmp:

        # Produce the data files using an custom pipeline
        (some_data_path := Path(tmp) / "some_data").mkdir()
        (some_data_path / "x").write_text("1")
        (some_data_path / "y").write_text("2")

        # Publish the data files as a model version using the `no-dump` schema.
        # The class `foobar_no_dumper.Dumper` is only used to verify the files in `Path(tmp)`
        model_version = mlopus.artschema.log_model_version(
            run=run,
            model=model,
            schema="no-dump",
            artifact=Path(tmp),
            keep_the_source=False,
        )

# Load the files using the inferred schema class
mlopus.artschema.load_artifact(model_version, schema="no-dump")

### 7. Artifacts Catalog

In [None]:
# The `ArtifactsCatalog` is type safe interface for downloading or loading
# all artifacts required by an application based on the provided settings.

# Have a look at the catalog implementation for `my_schemas`
# It's a single data container for all aritfacts that will be used in this example
from my_schemas.catalog import MyCatalog

print(inspect.getsource(MyCatalog))

In [None]:
# Suppose we have an app that uses the following settings to describe its artifact requirements.
# Observe that the keys in the following specification match the fields in `MyCatalog`
artifact_specs = {
    "foobar": {
        "subject": {
            "model_name": "mlopus_example",
            "model_version": version_number,
        },
        "schema": "default",
    },
    "foobar_no_dump": {
        "subject": {
            "run_id": run.id,
            "path_in_run": "mlopus_example",
        },
        "schema": "my_schemas.foobar_no_dump:Schema",
    },
}

In [None]:
# Let's clean all cache now, so the download step can be demonstrated
mlflow_api.clean_all_cache()

In [None]:
# Download the artifacts
MyCatalog.download(mlflow_api, artifact_specs)

In [None]:
# Load the cached artifacts using the MLflow API in offline mode
catalog = MyCatalog.load(mlflow_api.in_offline_mode, artifact_specs)

# The resulting catalog instance offers type-safe accessors for each of the required artifacts
pprint(catalog.foobar)
pprint(catalog.foobar_no_dump)

In [None]:
# For more flexibility, the artifact specs defined previously can also be used independently, without defining a catalog
specs = mlopus.artschema.parse_load_specs(artifact_specs)
foobar_spec = specs["foobar"].using(mlflow_api)

# Download
foobar_spec.download()

# Load with configured schema (return type is dynamic)
foobar_spec.load()

# Load with explicit schema (return type is static)
foobar_spec.load(schema=foobar.Schema)