# ZenML: Open-source MLOps Framework for reproducible ML pipelines

![Test](_assets/Logo/zenml.svg)

In [1]:
from absl import logging as absl_logging
import warnings
warnings.filterwarnings('ignore')
%load_ext autoreload
%autoreload 2
absl_logging.set_verbosity(-10000)

# Initialize ZenML

Let's begin by initializing ZenML in our directory. We are going to use a local stack to begin with, for simplicity and then transition to other stacks. This can be achieved in code by executing the following block.

In [9]:
!rm -rf .zen
!zenml init
!zenml stack set default

[?25l[2;36mZenML repository initialized at [0m[2;35m/Users/strickvl/coding/zenml/repos/[0m[2;95mzenbytes.[0m
[2;32m⠋[0m[2;36m [0m[2;36mInitializing ZenML repository at /Users/strickvl/coding/zenml/repos/zenbytes.[0m
[2K[1A[2K[32m⠋[0m Initializing ZenML repository at /Users/strickvl/coding/zenml/repos/zenbytes.

[1A[2K[1A[2K[2;36mThe local active profile was initialized to [0m[2;32m'default'[0m[2;36m and the local active stack[0m
[2;36mto [0m[2;32m'default'[0m[2;36m. This local configuration will only take effect when you're running[0m
[2;36mZenML from the initialized repository root, or from a subdirectory. For more [0m
[2;36minformation on profile and stack configuration, please visit [0m
[2;4;94mhttps://docs.zenml.io.[0m
[2;36mRunning with active profile: [0m[2;32m'default'[0m[2;36m [0m[1;2;36m([0m[2;36mlocal[0m[1;2;36m)[0m
[?25l[2;36mActive stack set to: [0m[2;32m'default'[0m
[2K[32m⠋[0m Setting the active stack to 'default

# Install integrations

ZenML handles integrations natively, to avoid dependency conflicts, so make sure to use the following command to install the integrations required for this lesson.

In [None]:
!zenml integration install sklearn dash evidently mlflow feast -f

# Create a simple training pipeline

We will start by looking at the definition of a pipeline that we want to build. This will give an overview of what we want to achieve and how we plan on getting there. We'll dive into the details on some of the interesting steps after that.

![Pipeline1](_assets/chapter_1/first_pipeline.png "Pipeline")

# Feast Alterations

In [10]:
!redis-server --daemonize yes

In [11]:
!ps aux | grep redis-server

strickvl         38901   3.9  0.0 34415488   1720 s002  Ss+   2:45pm   0:00.01 /bin/zsh -c ps aux | grep redis-server
strickvl         38897   0.2  0.0 35417268   2240   ??  Ss    2:45pm   0:00.02 redis-server *:6379  
strickvl         38903   0.0  0.0 34263116    992 s002  S+    2:45pm   0:00.00 grep redis-server


In [12]:
!zenml feature-store register feast_store -t feast --feast_repo="./feature_importer_repo"

[2;36mRunning with active profile: [0m[2;32m'default'[0m[2;36m [0m[1;2;36m([0m[2;36mlocal[0m[1;2;36m)[0m
[1;35mRegistered stack component with type 'feature_store' and name 'feast_store'.[0m
[2;36mSuccessfully registered feature store `feast_store`.[0m


In [13]:
# register the sagemaker stack
!zenml stack register fs_stack -m default -o default -a default -f feast_store

[2;36mRunning with active profile: [0m[2;32m'default'[0m[2;36m [0m[1;2;36m([0m[2;36mlocal[0m[1;2;36m)[0m
[?25l[32m⠋[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠙[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠹[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠸[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠼[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠦[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠇[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠏[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠋[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠙[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠹[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠸[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠼[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠴[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠧[0m Registering stack 'fs_stack'...
[2K[1A[2K[32m⠇[0m Registering stack 'fs_stack'...
[2K[1A

In [14]:
# activate the stack
!zenml stack set fs_stack

[2;36mRunning with active profile: [0m[2;32m'default'[0m[2;36m [0m[1;2;36m([0m[2;36mlocal[0m[1;2;36m)[0m
[?25l[2;36mActive stack set to: [0m[2;32m'fs_stack'[0m
[2K[32m⠋[0m Setting the active stack to 'fs_stack'...to 'fs_stack'...[0m
[1A[2K

In [15]:
# view the current active stack
!zenml stack describe

[2;36mRunning with active profile: [0m[2;32m'default'[0m[2;36m [0m[1;2;36m([0m[2;36mlocal[0m[1;2;36m)[0m
[3m        Stack Configuration        [0m
┏━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━┓
┃[1m [0m[1mCOMPONENT_TYPE[0m[1m [0m│[1m [0m[1mCOMPONENT_NAME[0m[1m [0m┃
┠────────────────┼────────────────┨
┃ ARTIFACT_STORE │ default        ┃
┠────────────────┼────────────────┨
┃ FEATURE_STORE  │ feast_store    ┃
┠────────────────┼────────────────┨
┃ METADATA_STORE │ default        ┃
┠────────────────┼────────────────┨
┃ ORCHESTRATOR   │ default        ┃
┗━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━┛
[2;3m     'fs_stack' stack (ACTIVE)     [0m


In [48]:
from typing import Tuple
import numpy as np
from sklearn.base import ClassifierMixin
from sklearn.datasets import load_digits, load_iris, load_diabetes
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from numpy.typing import NDArray

In [29]:
def get_digits() -> Tuple[
    "NDArray[np.float64]",
    "NDArray[np.float64]",
    "NDArray[np.int64]",
    "NDArray[np.int64]",
]:
    """Returns the digits dataset in the form of a tuple of numpy
    arrays."""
    digits = load_digits()
    # flatten the images
    n_samples = len(digits.images)
    data = digits.images.reshape((n_samples, -1))

    # Split data into 50% train and 50% test subsets
    X_train, X_test, y_train, y_test = train_test_split(
        data, digits.target, test_size=0.5, shuffle=False
    )
    return X_train, X_test, y_train, y_test


In [40]:
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq

In [57]:
# convert all the iris data into a parquet file
data = load_iris()['data']

table = pa.Table.from_arrays(
    data,
    names=['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class'] # give names to each columns
)

table
# Save it:
pq.write_table(table, 'table.pq')

In [60]:
iris = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')
iris.to_parquet("iris.pq")

In [61]:
pd.read_parquet('iris.pq')

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa
...,...,...,...,...,...
145,6.7,3.0,5.2,2.3,virginica
146,6.3,2.5,5.0,1.9,virginica
147,6.5,3.0,5.2,2.0,virginica
148,6.2,3.4,5.4,2.3,virginica


# Using Feast

- get the data from sklearn.datasets
- convert it all to a parquet file / format


- write the relevant code to ingest that where you define entities and featureviews
- `feast apply` from within the feature_importer_repo to load the data
- write / specify the entity_df and features that you'll pass in to the historical data store

## Define Steps
In the code that follows, you can see that we are defining the various steps of our pipeline. Each step is decorated with @step, the main abstraction that is currently available for creating pipeline steps.

In [47]:
import numpy as np
import pandas as pd
from sklearn.base import ClassifierMixin

from zenml.integrations.sklearn.helpers.digits import (
    get_digits,
)
from sklearn.base import ClassifierMixin
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from zenml.pipelines import pipeline
from zenml.steps import Output, step

The first step is an import step that downloads the DIGITS dataset and returns four numpy arrays as its output.

In [None]:
@step
def importer() -> Output(
    X_train=np.ndarray, X_test=np.ndarray, y_train=np.ndarray, y_test=np.ndarray
):
    """Loads the digits array as normal numpy arrays."""
    X_train, X_test, y_train, y_test = get_digits()
    return X_train, X_test, y_train, y_test

We then add a Trainer step, that takes the imported data and trains a sklearn classifier on the data. Note that the model is not explicitly saved within the step. Under the hood ZenML uses Materializers to automatically persist the Artifacts that result from each step into the Artifact Store.

In [None]:
@step
def svc_trainer(
    X_train: np.ndarray,
    y_train: np.ndarray,
) -> ClassifierMixin:
    """Train another simple sklearn classifier for the digits dataset."""
    print("test")
    model = SVC(gamma=0.001)
    model.fit(X_train, y_train)
    return model

Finally, we add an Evaluator step that takes as input the test set and the trained model and evaluates some final metrics.

In [None]:
@step
def evaluator(
    X_test: np.ndarray,
    y_test: np.ndarray,
    model: ClassifierMixin,
) -> float:
    """Calculate the accuracy on the test set"""
    test_acc = model.score(X_test, y_test)
    print(f"Test accuracy: {test_acc}")
    return test_acc

## Run and visualize Pipeline
A pipeline is defined with the @pipeline decorator. This defines the various steps of the pipeline and specifies the dependencies between the steps, thereby determining the order in which they will be run.


In [None]:
@pipeline
def digits_pipeline(
    importer,
    trainer,
    evaluator,
):
    """Links all the steps together in a pipeline"""
    X_train, X_test, y_train, y_test = importer()
    model = trainer(X_train=X_train, y_train=y_train)
    evaluator(X_test=X_test, y_test=y_test, model=model)

In [None]:
# Initialize the pipeline
first_pipeline = digits_pipeline(
    importer=importer(),
    trainer=svc_trainer(),
    evaluator=evaluator(),
)
first_pipeline.run()

We can again use the lineage visualizer to see what just happened. The integration will visualize the pipeline run for us right in the browser

In [None]:
from zenml.integrations.dash.visualizers.pipeline_run_lineage_visualizer import (
    PipelineRunLineageVisualizer,
)
from zenml.repository import Repository

repo = Repository()
pipelines = repo.get_pipelines()
p = pipelines[-1]
run = p.runs[-1]
steps = run.steps
s = steps[-1]
PipelineRunLineageVisualizer().visualize(run)

# Add Drift Detection with Evidently

Evidently is an open source tool that allows you to easily compute drift on your data. [Here](https://blog.zenml.io/zenml-loves-evidently/) is a little blog post of ours that explains the evidently integration in a bit more detail. 

At its core, Evidently’s drift detection calculation functions take in a reference data set and compare it with a separate comparison dataset. These are both passed in as Pandas dataframes, though CSV inputs are also possible. ZenML implements this functionality in the form of several standardized steps along with an easy way to use the visualization tools also provided along with Evidently as ‘Dashboards’.


If you’re working on any kind of machine learning problem that has an ongoing training loop that takes in new data, you’ll want to guard against drift. Machine learning pipelines are built on top of data inputs, so it is worth checking for drift if you have a model that was trained on a certain distribution of data. The incoming data is something you have less control over and since things often change out in the real world, you should have a plan for knowing when things have shifted. Evidently offers a [growing set of features](https://github.com/evidentlyai/evidently) that help you monitor not only data drift but other key aspects like target drift and so on.

![Evidently](_assets/zenml+evidently.png "Evidently")

## Define a new pipeline with drift detector

![Pipeline2](_assets/chapter_1/second_pipeline.png "Pipeline")

In [None]:
@pipeline(enable_cache=False)
def digits_pipeline_with_drift(
    importer,
    trainer,
    evaluator,
    
    get_reference_data,
    drift_detector,
):
    """Links all the steps together in a pipeline"""
    X_train, X_test, y_train, y_test = importer()
    model = trainer(X_train=X_train, y_train=y_train)
    evaluator(X_test=X_test, y_test=y_test, model=model)
    
    reference, comparison = get_reference_data(X_train, X_test)
    drift_detector(reference, comparison)

## Import the standard evidently step

In [None]:
from zenml.integrations.evidently.steps import (
    EvidentlyProfileConfig,
    EvidentlyProfileStep,
)

In [None]:
@step
def get_reference_data(
    X_train: np.ndarray,
    X_test: np.ndarray,
) -> Output(reference=pd.DataFrame, comparison=pd.DataFrame):
    """Splits data for drift detection."""
    # X_train = _add_awgn(X_train)
    columns = [str(x) for x in list(range(X_train.shape[1]))]
    return pd.DataFrame(X_test, columns=columns), pd.DataFrame(X_train, columns=columns)

## Run and visualize the pipeline

In [None]:
evidently_profile_config = EvidentlyProfileConfig(
    column_mapping=None,
    profile_sections=["datadrift"])

second_pipeline = digits_pipeline_with_drift(
    importer=importer(),
    trainer=svc_trainer(),
    evaluator=evaluator(),
    
    # EvidentlyProfileStep takes reference_dataset and comparison dataset
    get_reference_data=get_reference_data(),
    drift_detector=EvidentlyProfileStep(config=evidently_profile_config)
)
second_pipeline.run()

In [None]:
from zenml.integrations.evidently.visualizers import EvidentlyVisualizer
from zenml.repository import Repository
import json

repo = Repository()
p = repo.get_pipeline('digits_pipeline_with_drift')
last_run = p.runs[-1]

drift_detection_step = last_run.get_step(
    name="drift_detector"
)
evidently_outputs = drift_detection_step

EvidentlyVisualizer().visualize(evidently_outputs)

# Add alerts with Discord

MLOps promotes giving more visibility to your team about runs of pipelines. A good way to do that is to add a ChatOps step to your pipeline to ping some relevant results every time the pipeline is run. You can use a Discord webhook in your step for this.

![Discord](_assets/evidently+discord.png "Discord")

## Create an alerter step in your pipeline

![Pipeline3](_assets/chapter_1/third_pipeline.png "Pipeline")

In [None]:
@pipeline
def digits_pipeline_with_drift_alert(
    importer,
    trainer,
    evaluator,
    
    get_reference_data,
    drift_detector,
    
    alerter,
):
    """Links all the steps together in a pipeline"""
    X_train, X_test, y_train, y_test = importer()
    model = trainer(X_train=X_train, y_train=y_train)
    evaluator(X_test=X_test, y_test=y_test, model=model)
    
    reference, comparison = get_reference_data(X_train, X_test)
    drift_report, _ = drift_detector(reference, comparison)
    
    alerter(drift_report)

## Define a discord alerter step

In [None]:
import requests
from zenml.steps import step
from zenml.environment import Environment

# This is a private ZenML Discord channel. We will get notified if you use 
# this, but you won't be able to see it. Feel free to create a new Discord 
# [webhook](https://support.discord.com/hc/en-us/articles/228383668-Intro-to-Webhooks) 
# and replace this one!
DISCORD_URL = (
    "https://discord.com/api/webhooks/935835443826659339/Q32jTwmqc"
    "GJAUr-r_J3ouO-zkNQPchJHqTuwJ7dK4wiFzawT2Gu97f6ACt58UKFCxEO9"
)


@step(enable_cache=False)
def discord_alert(
    drift_report: dict
) -> None:
    """Send a message to the discord channel to report drift.
    Args:
        deployment_decision: True if drift detected; false otherwise.
    """
    drift = drift_report["data_drift"]["data"]["metrics"]["dataset_drift"]
    url = DISCORD_URL
    

    env = Environment().step_environment
    env.pipeline_name, env.pipeline_run_id, env.step_name
    
    content = f"Message from pipeline: **{env.pipeline_name}**, run: **{env.pipeline_run_id}**, step: **{env.step_name}**"
    content += "\n\n"
    content += "Drift Detected!" if drift else "No Drift Detected!"
    
    data = {
        "content": content,
        "username": "Drift Bot",
    }
    result = requests.post(url, json=data)

    try:
        result.raise_for_status()
    except requests.exceptions.HTTPError as err:
        print(err)
    else:
        print(
            "Posted to discord successfully, code {}.".format(
                result.status_code
            )
        )
    print("Drift detected" if drift else "No Drift detected")

In [None]:
evidently_profile_config = EvidentlyProfileConfig(
    column_mapping=None,
    profile_sections=["datadrift"])

third_pipeline = digits_pipeline_with_drift_alert(
    importer=importer(),
    trainer=svc_trainer(),
    evaluator=evaluator(),
    
    # EvidentlyProfileStep takes reference_dataset and comparison dataset
    get_reference_data=get_reference_data(),
    drift_detector=EvidentlyProfileStep(config=evidently_profile_config),
    
    # Add discord
    alerter=discord_alert()
)
third_pipeline.run()

# Track experiments and parameters with MLFlow

For this pipeline we want to take you a step further by showing you some more integrations. We will be using MLFlow Tracking for visualizing and comparing multiple pipeline runs. 

![MLflow](_assets/evidently+discord+mlflow.png "MLflow")

## Create a trainer with mlflow logging

Now that we have mlflow enabled we need to choose what we want to log into mlflow. For now, we have chosen to use the [mlflow autolog](https://www.mlflow.org/docs/latest/tracking.html#scikit-learn) functionality to automatically log the model and training parameters within the training step.


<div class="alert alert-block alert-info">
    <b>Note:</b> The @enable_mlflow decorator above the step is all we need to get started with mlflow. This decorator sets up an mlflow experiment and an mlflow backend for all runs within this pipeline. 
</div>

![Pipeline4](_assets/chapter_1/fourth_pipeline.png "Pipeline")

In [None]:
from zenml.integrations.mlflow.mlflow_step_decorator import enable_mlflow
import mlflow


@enable_mlflow
@step(enable_cache=False)
def svc_trainer_mlflow(
    X_train: np.ndarray,
    y_train: np.ndarray,
) -> ClassifierMixin:
    """Train another simple sklearn classifier for the digits dataset."""
    mlflow.sklearn.autolog()
    model = SVC(gamma=0.001)
    model.fit(X_train, y_train)
    return model

In [None]:
fourth_pipeline = digits_pipeline_with_drift_alert(
    importer=importer(),
    trainer=svc_trainer_mlflow(),
    evaluator=evaluator(),
    
    # EvidentlyProfileStep takes reference_dataset and comparison dataset
    get_reference_data=get_reference_data(),
    drift_detector=EvidentlyProfileStep(config=evidently_profile_config),
    
    # Add discord
    alerter=discord_alert()
)
fourth_pipeline.run()

## Use native mlflow features

Training is done, let's have a look at our mlflow ui and see if our training including the model have made it in there.

In [None]:
# This will start a serving process for mlflow 
#  - if you want to continue in the notebook you need to manually
#  interrupt the kernel 
from zenml.environment import Environment
from zenml.integrations.mlflow.mlflow_environment import MLFLOW_ENVIRONMENT_NAME

!mlflow ui --backend-store-uri {Environment()[MLFLOW_ENVIRONMENT_NAME].tracking_uri} --port 4998

Environment()[MLFLOW_ENVIRONMENT_NAME].tracking_uri

## Create another trainer with a different model

![Pipeline1](_assets/chapter_1/fixth_pipeline.png "Pipeline")

In [None]:
@enable_mlflow
@step(enable_cache=False)
def tree_trainer_with_mlflow(
    X_train: np.ndarray,
    y_train: np.ndarray,
) -> ClassifierMixin:
    """Train another simple sklearn classifier for the digits dataset."""
    mlflow.sklearn.autolog()
    model = DecisionTreeClassifier()
    model.fit(X_train, y_train)
    return model

In [None]:
fifth_pipeline = digits_pipeline_with_drift_alert(
    importer=importer(),
    trainer=tree_trainer_with_mlflow(),
    evaluator=evaluator(),
    
    # EvidentlyProfileStep takes reference_dataset and comparison dataset
    get_reference_data=get_reference_data(),
    drift_detector=EvidentlyProfileStep(config=evidently_profile_config),
    
    # Add discord
    alerter=discord_alert()
)
fifth_pipeline.run()

## Compare Models

In [None]:
# This will start a serving process for mlflow 
#  - if you want to continue in the notebook you need to manually
#  interrupt the kernel 
from zenml.environment import Environment
from zenml.integrations.mlflow.mlflow_environment import MLFLOW_ENVIRONMENT_NAME

!mlflow ui --backend-store-uri {Environment()[MLFLOW_ENVIRONMENT_NAME].tracking_uri} --port 4998

# Add Continous Deployment to your ML pipeline

![Pipeline5](_assets/chapter_1/sixth_pipeline.png "Pipeline")

In [None]:
@pipeline(enable_cache=False)
def continuous_deployment_pipeline_notebook(
    importer,
    trainer,
    evaluator,
    get_reference_data,
    drift_detector,
    alerter,
    
    deployment_trigger,
    model_deployer,
):
    """Links all the steps together in a pipeline"""
    X_train, X_test, y_train, y_test = importer()
    model = trainer(X_train=X_train, y_train=y_train)
    evaluator(X_test=X_test, y_test=y_test, model=model)
    
    reference, comparison = get_reference_data(X_train, X_test)
    drift_report, _ = drift_detector(reference, comparison)
    
    alerter(drift_report)
    
    # new 
    deployment_decision = deployment_trigger(drift_report)
    model_deployer(deployment_decision)

## Define deployment trigger and deployer step

In [None]:
@step
def deployment_trigger(
    drift_report: dict,
) -> bool:
    """Implements a simple model deployment trigger that looks at the
    drift report and deploys if there's none"""

    drift = drift_report["data_drift"]["data"]["metrics"]["dataset_drift"]

    if drift:
        return False
    else:
        return True

In [None]:
from zenml.integrations.mlflow.steps import mlflow_deployer_step, MLFlowDeployerConfig
from zenml.services import load_last_service_from_step

model_deployer_ml = mlflow_deployer_step(name="model_deployer")


sixth_pipeline = continuous_deployment_pipeline_notebook(
    importer=importer(),
    trainer=tree_trainer_with_mlflow(),
    evaluator=evaluator(),
    
    # EvidentlyProfileStep takes reference_dataset and comparison dataset
    get_reference_data=get_reference_data(),
    drift_detector=EvidentlyProfileStep(config=evidently_profile_config),
    
    # Add discord
    alerter=discord_alert(),
    
    deployment_trigger=deployment_trigger(),
    model_deployer=model_deployer_ml(config=MLFlowDeployerConfig(workers=1)),
)
sixth_pipeline.run()

## Interact with deployed model service directly

In [None]:
repo = Repository()
p = repo.get_pipeline('continuous_deployment_pipeline_notebook')
last_run = p.runs[-1]
X_test = last_run.steps[0].outputs['X_test'].read()
y_test = last_run.steps[0].outputs['y_test'].read()

In [None]:
service = load_last_service_from_step(
    pipeline_name="continuous_deployment_pipeline_notebook",
    step_name="model_deployer",
    running=True,
)

In [None]:
service.check_status()

In [None]:
X_test[0], y_test[0]

In [None]:
service.predict(X_test[0:1])

In [None]:
y_test[0]

In [None]:
!pip install matplotlib

In [None]:
# Standard scientific Python imports
import matplotlib.pyplot as plt


# ax.set_axis_off()
plt.imshow(X_test[0].reshape(8, 8), cmap=plt.cm.gray_r, interpolation="nearest")

# Create a Continous Inference pipeline

## Fetch data in real time

![MLflow](_assets/ZenML0-6-2.gif "ZenML")

In [None]:
def get_data_from_api():
    data = np.array([[ 0.,  0.,  1., 11., 14., 15.,  3.,  0.,  0.,  1., 13., 16., 12.,
        16.,  8.,  0.,  0.,  8., 16.,  4.,  6., 16.,  5.,  0.,  0.,  5.,
        15., 11., 13., 14.,  0.,  0.,  0.,  0.,  2., 12., 16., 13.,  0.,
         0.,  0.,  0.,  0., 13., 16., 16.,  6.,  0.,  0.,  0.,  0., 16.,
        16., 16.,  7.,  0.,  0.,  0.,  0., 11., 13., 12.,  1.,  0.]])
    # data = np.array([x.reshape(1, 8, 8) for x in data])
    return data


@step(enable_cache=False)
def dynamic_importer() -> Output(data=np.ndarray):
    """Downloads the latest data from a mock API."""
    data = get_data_from_api()
    return data


## Load the latest deployed model and run inference on it

In [None]:
from zenml.steps import BaseStepConfig, StepContext
from zenml.integrations.mlflow.services import MLFlowDeploymentService

class MLFlowDeploymentLoaderStepConfig(BaseStepConfig):
    """MLflow deployment getter configuration
    Attributes:
        pipeline_name: name of the pipeline that deployed the MLflow prediction
            server
        step_name: the name of the step that deployed the MLflow prediction
            server
        running: when this flag is set, the step only returns a running service
    """

    pipeline_name: str
    step_name: str
    running: bool = True


@step(enable_cache=False)
def prediction_service_loader(
    config: MLFlowDeploymentLoaderStepConfig, context: StepContext
) -> MLFlowDeploymentService:
    """Get the prediction service started by the deployment pipeline"""

    service = load_last_service_from_step(
        pipeline_name=config.pipeline_name,
        step_name=config.step_name,
        step_context=context,
        running=config.running,
    )
    if not service:
        raise RuntimeError(
            f"No MLflow prediction service deployed by the "
            f"{config.step_name} step in the {config.pipeline_name} pipeline "
            f"is currently running."
        )

    return service



@step
def predictor(
    service: MLFlowDeploymentService,
    data: np.ndarray,
) -> Output(predictions=list):
    """Run a inference request against a prediction service"""
    service.start(timeout=10)  # should be a NOP if already started
    prediction = service.predict(data)
    prediction = prediction.argmax(axis=-1)
    print(f"Prediction is: {[prediction.tolist()]}")
    return [prediction.tolist()]

## Define the CI pipeline

In [None]:
@pipeline
def inference_pipeline(
    dynamic_importer,
    prediction_service_loader,
    predictor,
):
    # Link all the steps artifacts together
    batch_data = dynamic_importer()
    model_deployment_service = prediction_service_loader()
    predictor(model_deployment_service, batch_data)

In [None]:
# Initialize an inference pipeline run
inference = inference_pipeline(
    dynamic_importer=dynamic_importer(),
    prediction_service_loader=prediction_service_loader(
        MLFlowDeploymentLoaderStepConfig(
            pipeline_name="continuous_deployment_pipeline_notebook",
            step_name="model_deployer",
        )
    ),
    predictor=predictor(),
)

inference.run()