Skip to content

Commit

Permalink
Artifact Control Plane (#24)
Browse files Browse the repository at this point in the history
* Artifact Control Plane

* remove blocking part for now

---------

Co-authored-by: Andrei Vishniakov <31008759+avishniakov@users.noreply.github.com>
  • Loading branch information
fa9r and avishniakov committed Nov 23, 2023
1 parent 75c6c69 commit e97f7e5
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ jobs:
with:
stack-name: ${{ matrix.stack-name }}
python-version: ${{ matrix.python-version }}
ref-zenml: feature/OSS-2609-OSS-2575-model-config-is-model-version
ref-zenml: feature/OSS-2190-data-as-first-class-citizen
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,15 @@ To make the most of the Model Control Plane we additionally annotate the output
<summary>Code snippet 💻</summary>

```python
from zenml.model import ModelArtifactConfig
from zenml import ArtifactConfig

experiment_tracker = Client().active_stack.experiment_tracker
@step(experiment_tracker=experiment_tracker.name)
def model_trainer(
...
) -> Annotated[ClassifierMixin, "model", ModelArtifactConfig()]:
) -> Annotated[
ClassifierMixin, ArtifactConfig(name="model", is_model_artifact=True)
]:
...
```
</details>
Expand Down Expand Up @@ -308,7 +310,7 @@ You can follow [Data Validators docs](https://docs.zenml.io/stacks-and-component

As a last step concluding all work done so far, we will calculate predictions on the inference dataset and persist them in [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) attached to the current inference model version of the Model Control Plane for reuse and observability.

We will leverage a prepared predictions service called `mlflow_deployment` linked to the inference model version of the Model Control Plane to run `.predict()` and to put predictions as an output of the predictions step, so it is automatically stored in the [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) and linked to the Model Control Plane model version as a versioned artifact link with zero effort. This is achieved because we additionally annotated the `predictions` output with `DataArtifactConfig(overwrite=False)`. This is required to deliver a comprehensive history to stakeholders since Batch Inference can be executed using the same Model Control Plane version multiple times.
We will leverage a prepared predictions service called `mlflow_deployment` linked to the inference model version of the Model Control Plane to run `.predict()` and to put predictions as an output of the predictions step, so it is automatically stored in the [Artifact Store](https://docs.zenml.io/stacks-and-components/component-guide/artifact-stores) and linked to the Model Control Plane model version as a versioned artifact link with zero effort.

```
NOTE: On non-local orchestrators a `model` artifact will be loaded into memory to run predictions directly. You can adapt this part to your needs.
Expand All @@ -318,12 +320,10 @@ NOTE: On non-local orchestrators a `model` artifact will be loaded into memory t
<summary>Code snippet 💻</summary>

```python
from zenml.model import DataArtifactConfig

@step
def inference_predict(
dataset_inf: pd.DataFrame,
) -> Annotated[pd.Series, "predictions", DataArtifactConfig(overwrite=False)]:
) -> Annotated[pd.Series, "predictions"]:
model_version = get_step_context().model_version

# get predictor
Expand Down
16 changes: 4 additions & 12 deletions template/pipelines/batch_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
notify_on_failure,
notify_on_success,
)
from zenml import pipeline
from zenml.artifacts.external_artifact import ExternalArtifact
from zenml import ExternalArtifact, pipeline
from zenml.integrations.evidently.metrics import EvidentlyMetricConfig
from zenml.integrations.evidently.steps import evidently_report_step
from zenml.logger import get_logger
Expand All @@ -32,26 +31,19 @@ def {{product_name}}_batch_inference():
# of one step as the input of the next step.
########## ETL stage ##########
df_inference, target, _ = data_loader(
random_state=ExternalArtifact(
model_artifact_pipeline_name="{{product_name}}_training",
model_artifact_name="random_state",
),
random_state=ExternalArtifact(name="random_state"),
is_inference=True
)
df_inference = inference_data_preprocessor(
dataset_inf=df_inference,
preprocess_pipeline=ExternalArtifact(
model_artifact_name="preprocess_pipeline",
),
preprocess_pipeline=ExternalArtifact(name="preprocess_pipeline"),
target=target,
)

{%- if data_quality_checks %}
########## DataQuality stage ##########
report, _ = evidently_report_step(
reference_dataset=ExternalArtifact(
model_artifact_name="dataset_trn",
),
reference_dataset=ExternalArtifact(name="dataset_trn"),
comparison_dataset=df_inference,
ignored_cols=["target"],
metrics=[
Expand Down
13 changes: 1 addition & 12 deletions template/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from typing import Optional

from zenml.artifacts.external_artifact import ExternalArtifact
from zenml.client import Client
from zenml.logger import get_logger

from pipelines import {{product_name}}_batch_inference, {{product_name}}_training, {{product_name}}_deployment
Expand Down Expand Up @@ -192,17 +192,6 @@ def main(
] = f"{{product_name}}_batch_inference_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
{{product_name}}_batch_inference.with_options(**pipeline_args)(**run_args_inference)

artifact = ExternalArtifact(
model_artifact_name="predictions",
model_name="{{ product_name }}",
model_version="{{ target_environment }}",
model_artifact_version=None, # can be skipped - using latest artifact link
)
logger.info(
"Batch inference pipeline finished successfully! "
"You can find predictions in Artifact Store using ID: "
f"`{str(artifact.get_artifact_id())}`."
)


if __name__ == "__main__":
Expand Down
6 changes: 2 additions & 4 deletions template/steps/deployment/deployment_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
from typing import Optional

from typing_extensions import Annotated
from zenml import get_step_context, step
from zenml import ArtifactConfig, get_step_context, step
from zenml.client import Client
from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService
from zenml.integrations.mlflow.steps.mlflow_deployer import (
mlflow_model_registry_deployer_step,
)
from zenml.logger import get_logger
from zenml.model import EndpointArtifactConfig

logger = get_logger(__name__)

Expand All @@ -20,8 +19,7 @@
def deployment_deploy() -> (
Annotated[
Optional[MLFlowDeploymentService],
"mlflow_deployment",
EndpointArtifactConfig(),
ArtifactConfig(name="mlflow_deployment", is_endpoint_artifact=True),
]
):
"""Predictions step.
Expand Down
7 changes: 1 addition & 6 deletions template/steps/etl/inference_data_preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@
import pandas as pd
from sklearn.pipeline import Pipeline
from zenml import step
from zenml.model import DataArtifactConfig


@step
def inference_data_preprocessor(
dataset_inf: pd.DataFrame,
preprocess_pipeline: Pipeline,
target: str,
) -> Annotated[
pd.DataFrame,
"dataset_inf",
DataArtifactConfig(overwrite=False, artifact_name="inference_dataset"),
]:
) -> Annotated[pd.DataFrame, "inference_dataset"]:
"""Data preprocessor step.
This is an example of a data processor step that prepares the data so that
Expand Down
11 changes: 5 additions & 6 deletions template/steps/inference/inference_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@
from zenml import get_step_context, step
from zenml.integrations.mlflow.services.mlflow_deployment import MLFlowDeploymentService
from zenml.logger import get_logger
from zenml.model import DataArtifactConfig

logger = get_logger(__name__)


@step
def inference_predict(
dataset_inf: pd.DataFrame,
) -> Annotated[pd.Series, "predictions", DataArtifactConfig(overwrite=False)]:
) -> Annotated[pd.Series, "predictions"]:
"""Predictions step.
This is an example of a predictions step that takes the data in and returns
Expand All @@ -39,9 +38,9 @@ def inference_predict(
model_version = get_step_context().model_version

# get predictor
predictor_service: Optional[MLFlowDeploymentService] = model_version.get_endpoint_artifact(
"mlflow_deployment"
).load()
predictor_service: Optional[
MLFlowDeploymentService
] = model_version.load_artifact("mlflow_deployment")
if predictor_service is not None:
# run prediction from service
predictions = predictor_service.predict(request=dataset_inf)
Expand All @@ -51,7 +50,7 @@ def inference_predict(
"as the orchestrator is not local."
)
# run prediction from memory
predictor = model_version.get_model_artifact("model").load()
predictor = model_version.load_artifact("model")
predictions = predictor.predict(dataset_inf)

predictions = pd.Series(predictions, name="predicted")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ from typing_extensions import Annotated
import pandas as pd
from sklearn.metrics import accuracy_score
from zenml import step, get_step_context
from zenml.model import ModelVersion
from zenml.model.model_version import ModelVersion
from zenml.logger import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -54,8 +54,8 @@ def compute_performance_metrics_on_current_data(
else:
# Get predictors
predictors = {
latest_version_number: latest_version.get_model_artifact("model").load(),
current_version_number: current_version.get_model_artifact("model").load(),
latest_version_number: latest_version.load_artifact("model"),
current_version_number: current_version.load_artifact("model"),
}

metrics = {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# {% include 'template/license_header' %}

from zenml import get_step_context, step
from zenml.model import ModelVersion
from zenml.model.model_version import ModelVersion
from zenml.logger import get_logger

from utils import promote_in_model_registry
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# {% include 'template/license_header' %}

from zenml import get_step_context, step
from zenml.model import ModelVersion
from zenml.model.model_version import ModelVersion
from zenml.logger import get_logger

from utils import promote_in_model_registry
Expand Down
21 changes: 12 additions & 9 deletions template/steps/training/model_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import mlflow
import pandas as pd
from sklearn.base import ClassifierMixin
from zenml import log_artifact_metadata, step
from zenml import ArtifactConfig, log_artifact_metadata, step
from zenml.client import Client
from zenml.integrations.mlflow.experiment_trackers import MLFlowExperimentTracker
from zenml.integrations.mlflow.steps.mlflow_registry import mlflow_register_model_step
from zenml.logger import get_logger
from zenml.model import ModelArtifactConfig

logger = get_logger(__name__)

Expand All @@ -31,7 +30,9 @@ def model_trainer(
model: ClassifierMixin,
target: str,
name: str,
) -> Annotated[ClassifierMixin, "model", ModelArtifactConfig()]:
) -> Annotated[
ClassifierMixin, ArtifactConfig(name="model", is_model_artifact=True)
]:
"""Configure and train a model on the training dataset.
This is an example of a model training step that takes in a dataset artifact
Expand Down Expand Up @@ -79,12 +80,14 @@ def model_trainer(
name=name,
)
# keep track of mlflow version for future use
log_artifact_metadata(
output_name="model",
model_registry_version=Client()
.active_stack.model_registry.list_model_versions(name=name)[-1]
.version,
)
model_registry = Client().active_stack.model_registry
if model_registry:
versions = model_registry.list_model_versions(name=name)
if versions:
log_artifact_metadata(
metadata={"model_registry_version": versions[-1].version},
artifact_name="model",
)
### YOUR CODE ENDS HERE ###

return model
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def hp_tuning_select_best_model(
best_metric = -1
# consume artifacts attached to current model version in Model Control Plane
for step_name in step_names:
hp_output = model_version.get_data_artifact(
step_name=step_name, name="hp_result"
)
hp_output = model_version.get_data_artifact("hp_result")
model: ClassifierMixin = hp_output.load()
# fetch metadata we attached earlier
metric = float(hp_output.run_metadata["metric"].value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def hp_tuning_single_search(
score = accuracy_score(y_tst, y_pred)
# log score along with output artifact as metadata
log_artifact_metadata(
output_name="hp_result",
metric=float(score),
metadata={"metric": float(score)},
artifact_name="hp_result",
)
### YOUR CODE ENDS HERE ###
return cv.best_estimator_

0 comments on commit e97f7e5

Please sign in to comment.