# ZenML: Create production-ready ML pipelines

Our goal here is to help you to get the first practical experience with our tool and give you a brief overview on some basic functionalities of ZenML. We will start local in the jupyter notebook but will transition over to a more robust environment with Kubeflow pipelines.

This guide is designed to provide a practical introduction to transitioning from local setup to a more production MLOps stack. If you want more detail, our [full documentation](https://docs.zenml.io/) provides more on the concepts and how to implement them.

![zenml](assets/zenml.svg)

## Install libraries

In [None]:
# Install the ZenML CLI tool and Tensorflow
!pip install zenml
!zenml integration install kubeflow tensorflow tensorboard

Once the installation is completed, you can go ahead and create your first ZenML repository for your project. As ZenML repositories are built on top of Git repositories, you can create yours in a desired empty directory through:

In [None]:
# Initialize a ZenML repository
!zenml init

# Start with the local stack

The above commands have automatically created a local MLOps stack for you and set it to active. Let's make sure:

In [None]:
!zenml stack set local_stack

![localstack.png](assets/localstack.png)

## Create your first pipeline with the local_stack

Let's first do the imports

In [None]:
import logging
import os

import numpy as np
import tensorflow as tf

from zenml.integrations.constants import TENSORFLOW
from zenml.pipelines import pipeline
from zenml.steps import BaseParameters, Output, StepContext, step

## Define ZenML Steps

In the code that follows, you can see that we are defining the various steps of our pipeline. Each step is decorated with `@step`, the main abstraction that is currently available for creating pipeline steps.

The first step is an `importer` step that downloads a sample of the MNIST dataset.

In [None]:
@step
def importer() -> Output(
    X_train=np.ndarray,
    X_test=np.ndarray,
    y_train=np.ndarray,
    y_test=np.ndarray,
):
    """Download the MNIST data store it as an artifact"""
    (X_train, y_train), (
        X_test,
        y_test,
    ) = tf.keras.datasets.mnist.load_data()
    return X_train, X_test, y_train, y_test

Then we add a `normalizer` step that takes as input the test set and the trained model and evaluates some final metrics.

In [None]:
@step
def normalizer(
    X_train: np.ndarray, X_test: np.ndarray
) -> Output(X_train_normed=np.ndarray, X_test_normed=np.ndarray):
    """Normalize digits dataset with mean and standard deviation."""
    X_train_normed = (X_train - np.mean(X_train)) / np.std(X_train)
    X_test_normed = (X_test - np.mean(X_test)) / np.std(X_test)
    return X_train_normed, X_test_normed

We then add a `trainer` step, that takes the normalized data and trains a Keras model on the data. The step has an associated `TrainerConfig` step configuration class. Also note how we use the `StepContext` to extract the Artifact Store path alongside the output model Artifact where TensorBoard logs are to be stored.

In [None]:
class TrainerParameters(BaseParameters):
    """Trainer params"""

    epochs: int = 1
    lr: float = 0.001

@step
def trainer(
    X_train: np.ndarray,
    y_train: np.ndarray,
    context: StepContext,
    params: TrainerParameters,
) -> tf.keras.Model:
    """Train a neural net from scratch to recognize MNIST digits return our
    model or the learner"""
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(10, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )

    log_dir = os.path.join(context.get_output_artifact_uri(), "logs")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=log_dir, histogram_freq=1
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(params.lr),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )

    model.fit(
        X_train,
        y_train,
        epochs=params.epochs,
        callbacks=[tensorboard_callback],
    )

    return model

Finally, we had an `evaluator` to see how we did on the dataset!

In [None]:
@step
def evaluator(
    X_test: np.ndarray,
    y_test: np.ndarray,
    model: tf.keras.Model,
) -> float:
    """Calculate the accuracy on the test set"""

    _, test_acc = model.evaluate(X_test, y_test, verbose=2)
    logging.info(f"Test accuracy: {test_acc}")
    return test_acc

## Define ZenML Pipeline

A pipeline is defined with the `@pipeline` decorator. This defines the various steps of the pipeline and specifies the dependencies between the steps, thereby determining the order in which they will be run.

In [None]:
@pipeline
def mnist_pipeline(
    importer,
    normalizer,
    trainer,
    evaluator,
):
    # Link all the steps together
    X_train, X_test, y_train, y_test = importer()
    X_trained_normed, X_test_normed = normalizer(X_train=X_train, X_test=X_test)
    model = trainer(X_train=X_trained_normed, y_train=y_train)
    evaluator(X_test=X_test_normed, y_test=y_test, model=model)

## Run the pipeline

Running the pipeline is as simple as calling the `run()` method on an instance of the defined pipeline.

In [None]:
# Initialize the pipeline
first_pipeline = mnist_pipeline(
    importer=importer(),
    normalizer=normalizer(),
    trainer=trainer(),
    evaluator=evaluator(),
)

first_pipeline.run()

## Visualize the model with TensorBoard

To visualize the model with TensorBoard, make use of the built-in ZenML TensorBoard visualizer, that will automatically start a TensorBoard server in the background.

In [None]:
from zenml.integrations.tensorboard.visualizers import (
    visualize_tensorboard,
    stop_tensorboard_server,
)

visualize_tensorboard(
    pipeline_name="mnist_pipeline",
    step_name="trainer",
)

To stop the TensorBoard server, you can use the `stop_tensorboard` utility function.

In [None]:
stop_tensorboard_server(
    pipeline_name="mnist_pipeline",
    step_name="trainer",
)

# Transitioning to Kubeflow Pipelines

We got pretty good results on the digits model that we trained, but at some point we want to get out of this notebook local stack and go to a stack which looks more like production. Here is where the ZenML [Kubeflow Pipelines](https://github.com/kubeflow/pipelines) integration helps!

## Pre-requisites

In order to run this example, you need to have installed:

* [Docker](https://docs.docker.com/get-docker/)
* [K3D](https://k3d.io/v5.2.1/) 
* [Kubectl](https://kubernetes.io/docs/tasks/tools/)

## Create a Kubeflow Stack

![localstack-with-kubeflow.png](assets/localstack-with-kubeflow-orchestrator.png)

In [None]:
!zenml container-registry register local_registry --uri=localhost:5000

In [None]:
!zenml orchestrator register kubeflow_orchestrator --flavor=kubeflow

In [None]:
!zenml stack register local_kubeflow_stack -a default -o kubeflow_orchestrator -c local_registry

In [None]:
!zenml stack set local_kubeflow_stack

## Lets spin the stack up

In [None]:
!zenml stack up

## Write the pipeline to disk

In [None]:
%%writefile run-kubeflow.py
#  Copyright (c) ZenML GmbH 2021. All Rights Reserved.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at:
#
#       https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
#  or implied. See the License for the specific language governing
#  permissions and limitations under the License.

import logging
import os

import numpy as np
import tensorflow as tf

from zenml.integrations.constants import TENSORFLOW
from zenml.pipelines import pipeline
from zenml.steps import BaseParameters, Output, StepContext, step
from zenml.config import DockerSettings


@step
def importer() -> Output(
    X_train=np.ndarray,
    X_test=np.ndarray,
    y_train=np.ndarray,
    y_test=np.ndarray,
):
    """Download the MNIST data store it as an artifact"""
    (X_train, y_train), (
        X_test,
        y_test,
    ) = tf.keras.datasets.mnist.load_data()
    return X_train, X_test, y_train, y_test


@step
def normalizer(
    X_train: np.ndarray, X_test: np.ndarray
) -> Output(X_train_normed=np.ndarray, X_test_normed=np.ndarray):
    """Normalize digits dataset with mean and standard deviation."""
    X_train_normed = (X_train - np.mean(X_train)) / np.std(X_train)
    X_test_normed = (X_test - np.mean(X_test)) / np.std(X_test)
    return X_train_normed, X_test_normed


class TrainerParameters(BaseParameters):
    """Trainer params"""

    epochs: int = 1
    lr: float = 0.001


@step
def trainer(
    X_train: np.ndarray,
    y_train: np.ndarray,
    context: StepContext,
    params: TrainerParameters,
) -> tf.keras.Model:
    """Train a neural net from scratch to recognize MNIST digits return our
    model or the learner"""
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(10, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )

    log_dir = os.path.join(context.get_output_artifact_uri(), "logs")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=log_dir, histogram_freq=1
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(params.lr),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )

    model.fit(
        X_train,
        y_train,
        epochs=params.epochs,
        callbacks=[tensorboard_callback],
    )

    return model


@step
def evaluator(
    X_test: np.ndarray,
    y_test: np.ndarray,
    model: tf.keras.Model,
) -> float:
    """Calculate the accuracy on the test set"""

    _, test_acc = model.evaluate(X_test, y_test, verbose=2)
    logging.info(f"Test accuracy: {test_acc}")
    return test_acc


docker_settings = DockerSettings(required_integrations=[TENSORFLOW])

@pipeline(enable_cache=True, settings={"docker": docker_settings})
def mnist_pipeline(
    importer,
    normalizer,
    trainer,
    evaluator,
):
    # Link all the steps together
    X_train, X_test, y_train, y_test = importer()
    X_trained_normed, X_test_normed = normalizer(X_train=X_train, X_test=X_test)
    model = trainer(X_train=X_trained_normed, y_train=y_train)
    evaluator(X_test=X_test_normed, y_test=y_test, model=model)


if __name__ == "__main__":
    # Run the pipeline
    pipeline_instance = mnist_pipeline(
        importer=importer(),
        normalizer=normalizer(),
        trainer=trainer(),
        evaluator=evaluator(),
    )
    pipeline_instance.run()


In [None]:
# Initialize a new pipeline
!python run-kubeflow.py

# Post execution workflow

In [None]:
from zenml.post_execution import get_pipelines

## Pipelines 

In [None]:
pipelines = get_pipelines()

## Retrieve the pipeline

In [None]:
mnist_pipeline = pipelines[0]

## Get the first run

In [None]:
runs = mnist_pipeline.runs  # chronologically ordered
mnist_run = runs[0]

## Get the second run

In [None]:
kubeflow_mnist_run = runs[1]

## Get the steps (note the first step name is different)

In [None]:
mnist_run.steps

In [None]:
kubeflow_mnist_run.steps

## Check the results of the evaluator and compare

In [None]:
mnist_eval_step = mnist_run.get_step(step='evaluator')
kubeflow_mnist_eval_step = kubeflow_mnist_run.get_step(step='evaluator')

In [None]:
# One output is simply called `output`, multiple is a dict called `outputs`.
mnist_eval_step.output.read()

In [None]:
kubeflow_mnist_eval_step.output.read()

# Congratulations!

… and that's it!. If you came here without a hiccup, you must have successly installed ZenML, set up a ZenML repo, configured a training pipeline, executed it and evaluated the results. You have also deployed said pipeline to a production MLOps stack from right within your notebook! Hurray!

However, if you had a hiccup or you have some suggestions/questions regarding our framework, you can always check our [docs](https://docs.zenml.io/) or our [Github](https://github.com/zenml-io/zenml) or even better join us on our [Slack channel](https://zenml.io/slack-invite).

Cheers!

For more detailed information on all the components and steps that went into this short example, please continue reading [our more detailed documentation pages](https://docs.zenml.io/).