# Get Up and Running Quickly

## 🌍 Overview

This quickstart demonstrates some of ZenML's features. We will:

- Import some data from a public dataset (Adult Census Income), then train two models (SGD and Random Forest)
- Compare and evaluate which model performs better, and deploy the best one.
- Run a prediction on the deployed model.

Along the way we will also show you how to:

- Automatically version, track, and cache data, models, and other artifacts,
- Track model hyperparameters and metrics in an experiment tracking tool

This will give you enough to get started building your own ZenML Pipelines.
Let's dive in!


## Run on Colab

You can use Google Colab to see ZenML in action, no signup / installation
required!

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](
https://colab.research.google.com/github/zenml-io/zenml/blob/main/examples/quickstart/notebooks/quickstart.ipynb)

# 1. Install Requirements

Let's install ZenML to get started. First we'll install the latest version of
ZenML as well as the two integrations we'll need for this quickstart: `sklearn`
and `mlflow`.

In [None]:
!pip install "zenml[server]"

In [None]:
from zenml.environment import Environment

if Environment.in_google_colab():
    # Install Cloudflare Tunnel binary
    !wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64.deb && dpkg -i cloudflared-linux-amd64.deb


In [None]:
!zenml integration install sklearn mlflow -y

import IPython
IPython.Application.instance().kernel.do_shutdown(restart=True)

In [None]:
!zenml init

Please wait for the installation to complete before running subsequent cells. At the end of the installation, the notebook kernel will automatically restart.

# 2. Import Data

We'll start off by importing our data. In this quickstart we'll be working with
[the Adult Census Income](https://archive.ics.uci.edu/dataset/2/adult) dataset
which is publicly available on the UCI Machine Learning Repository. The task is
to predict whether a person makes over $50k a year based on a number of
features. These features are things like age, work class, education level,
marital status, occupation, relationship, race, sex, capital gain, capital loss,
hours per week, and native country.

When you're getting started with a machine learning problem you'll want to do
something similar to this: import your data and get it in the right shape for
your training. ZenML mostly gets out of your way when you're writing your Python
code, as you'll see from the following cell.

In [None]:
from typing import Tuple

import pandas as pd
from sklearn.model_selection import train_test_split

from zenml import step


@step
def training_data_loader() -> (
    Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]
):
    """Load the Census Income dataset as tuple of Pandas DataFrame / Series."""
    # Load the dataset
    url = "https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
    column_names = [
        "age",
        "workclass",
        "fnlwgt",
        "education",
        "education-num",
        "marital-status",
        "occupation",
        "relationship",
        "race",
        "sex",
        "capital-gain",
        "capital-loss",
        "hours-per-week",
        "native-country",
        "income",
    ]
    data = pd.read_csv(
        url, names=column_names, na_values="?", skipinitialspace=True
    )

    # Drop rows with missing values
    data = data.dropna()

    # Encode categorical features and drop original columns
    categorical_cols = [
        "workclass",
        "education",
        "marital-status",
        "occupation",
        "relationship",
        "race",
        "sex",
        "native-country",
    ]
    data = pd.get_dummies(data, columns=categorical_cols, drop_first=True)

    # Encode target feature
    data["income"] = data["income"].apply(
        lambda x: 1 if x.strip() == ">50K" else 0
    )

    # Separate features and target
    X = data.drop("income", axis=1)
    y = data["income"]

    # Split the dataset into train and test sets
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    return (X_train, X_test, y_train, y_test)

We download the data, dropping some columns and then splitting it up into train
and test sets. The whole function is decorated with the `@step` decorator, which
tells ZenML to track this function as a step in the pipeline. This means that
ZenML will automatically version, track, and cache the data that is produced by
this function. This is a very powerful feature, as it means that you can
reproduce your data at any point in the future, even if the original data source
changes or disappears.

You'll also notice that we have included type hints for the outputs
to the function. These are not only useful for anyone reading your code, but
help ZenML process your data in a way appropriate to the specific data types.

ZenML is built in a way that allows you to experiment with your data and build
your pipelines as you work, so if you want to call this function to see how it
works, you can just call it directly. Here we take a look at the first few rows
of your training dataset.

In [None]:
X_train, X_test, y_train, y_test = training_data_loader()
X_train.head()

Everything looks as we'd expect and the values are all in the right format. We
can shift to training some models now! 🥳

# 3. Train Models

Now that we have our data it makes sense to train some models to get a sense of
how difficult the task is. The Census Income
dataset is sufficiently large and complex that it's unlikely we'll be able to
train a model that behaves perfectly since the problem is inherently complex,
but we can get a sense of what a reasonable baseline looks like.

We'll start with two simple models, a SGD Classifier and a Random Forest
Classifier, both batteries-included from `sklearn`. We'll train them both on the
same data and then compare their performance.

Since we're starting our work properly, it makes sense to start tracking the
experimentation that we're doing. ZenML integrates with MLflow to make this
easy. This happens out of the box when using our experiment tracker integration
and stack components. We'll see how this works below, but first let's set up
ZenML to know that it should use the MLFlow experiment tracker.

In [None]:
# Register the MLflow experiment tracker
!zenml experiment-tracker register mlflow --flavor=mlflow

# Register a new stack with our experiment tracker
!zenml stack register quickstart -a default\
                                 -o default\
                                 -e mlflow

!zenml stack set quickstart

We can now write the steps where we'll
train our models, making sure to specify the name of our experiment tracker in
the `@step` decorator. We could specify this manually using a string, but
instead we'll use the ZenML `Client` to access the name of our active stack's
experiment tracker.

In [None]:
import mlflow

from sklearn.base import ClassifierMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import SGDClassifier

from zenml.client import Client

experiment_tracker = Client().active_stack.experiment_tracker


@step(experiment_tracker=experiment_tracker.name)
def random_forest_trainer_mlflow(
    X_train: pd.DataFrame,
    y_train: pd.Series,
) -> ClassifierMixin:
    """Train a sklearn Random Forest classifier and log to MLflow."""
    mlflow.sklearn.autolog()  # log all model hyperparams and metrics to MLflow
    model = RandomForestClassifier()
    model.fit(X_train.to_numpy(), y_train.to_numpy())
    train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())
    print(f"Train accuracy: {train_acc}")
    return model


@step(experiment_tracker=experiment_tracker.name)
def sgd_trainer_mlflow(
    X_train: pd.DataFrame,
    y_train: pd.Series,
) -> ClassifierMixin:
    """Train a SGD classifier and log to MLflow."""
    mlflow.sklearn.autolog()  # log all model hyperparams and metrics to MLflow
    model = SGDClassifier()
    model.fit(X_train.to_numpy(), y_train.to_numpy())
    train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())
    print(f"Train accuracy: {train_acc}")
    return model

Our two training steps both return different kinds of `sklearn` classifier
models, so we use the generic `ClassifierMixin` type hint for the return type.

The end goal of this quick baseline evaluation is to understand which of the two
models performs better. We'll use the `evaluator` step to compare the two
models. This step takes in the two models we trained above, and compares them on
the test data we created earlier. It returns whichever model performs best along
with the accuracy score for that model.

In [None]:
from typing_extensions import Annotated


@step
def best_model_selector(
    X_test: pd.DataFrame,
    y_test: pd.Series,
    model1: ClassifierMixin,
    model2: ClassifierMixin,
) -> Tuple[
    Annotated[ClassifierMixin, "best_model"],
    Annotated[float, "best_model_test_acc"],
]:
    """Calculate the accuracy on the test set and return the best model and its accuracy."""
    test_acc1 = model1.score(X_test.to_numpy(), y_test.to_numpy())
    test_acc2 = model2.score(X_test.to_numpy(), y_test.to_numpy())
    print(f"Test accuracy ({model1.__class__.__name__}): {test_acc1}")
    print(f"Test accuracy ({model2.__class__.__name__}): {test_acc2}")
    if test_acc1 > test_acc2:
        best_model = model1
        best_model_test_acc = test_acc1
    else:
        best_model = model2
        best_model_test_acc = test_acc2
    return best_model, best_model_test_acc

Note the use of the `typing` module's `Annotated` type hint in the output of the
step. We're using this to give a name to the output of the step, which will make
it possible to access it via a keyword later on.

We'll likely want to use our model in the future so instead of simply outputting
the model we'll use the MLflow model registry to store it. This allows us to
version the model for retrieval and use later on as well as to use other
functionality made possible within the MLflow dashboard. This step is a bit
different from the ones listed above in that we're using a pre-built ZenML step
instead of just writing our own. You'll often come across these pre-built steps
for common workflows.

In [None]:
from zenml.integrations.mlflow.steps.mlflow_registry import (
    mlflow_register_model_step,
)

model_name = "zenml-quickstart-model"

register_model = mlflow_register_model_step.with_options(
    parameters=dict(
        name=model_name,
        description="The first run of the Quickstart pipeline.",
    )
)

We're now at the point where can bring all these steps together into a single
pipeline, the top-level organising entity for code in ZenML. Creating such a pipeline is
as simple as adding a `@pipeline` decorator to a function. This specific
pipeline doesn't return a value, but that option is available to you if you need.

In [None]:
from zenml import pipeline


@pipeline(enable_cache=True)
def train_and_register_model_pipeline() -> None:
    """Train a model."""
    X_train, X_test, y_train, y_test = training_data_loader()
    model1 = random_forest_trainer_mlflow(X_train=X_train, y_train=y_train)
    model2 = sgd_trainer_mlflow(X_train=X_train, y_train=y_train)
    best_model, _ = best_model_selector(
        X_test=X_test, y_test=y_test, model1=model1, model2=model2
    )
    register_model(best_model)

We've used the built-in MLflow registry to store our model, but ZenML doesn't
yet know that we want to use the MLflow flavor of the model registry stack
component in our stack. Let's add that now and update our stack.

In [None]:
# Register the MLflow model registry
!zenml model-registry register mlflow --flavor=mlflow

# Update our stack to include the model registry
!zenml stack update quickstart -r mlflow

![](../_assets/local_stack_with_local_mlflow_tracker_and_registry.png)

We're ready to run the pipeline now, which we can do just -- as with the step -- by calling the
pipeline function itself.

In [None]:
train_and_register_model_pipeline()

You can see from the logs already how our model training went: the
`RandomForestClassifier` performed considerably better than the `SGDClassifier`,
so that will have been the model that was returned from the evaluation step and
then registered with the MLflow model registry.

At this point you might be interested to view your pipeline in the ZenML
Dashboard. You can spin this up by executing the next cell. This will start a
server which you can access by clicking on the link that appears in the output
of the cell.

Log into the Dashboard using default credentials (username 'default' and
password left blank). From there you can inspect the pipeline or the specific
pipeline run. You can also examine the stack and components that we've
registered to run everything.

![](../llm_quickstart/_assets/zenml-up.gif)

In [None]:
from zenml.environment import Environment

if Environment.in_google_colab():
    # run ZenML through a cloudflare tunnel to get a public endpoint
    !zenml up --port 8237 & cloudflared tunnel --url http://localhost:8237
else:
    !zenml up

We're using MLflow for our experiment tracking. If you'd like to inspect the
MLflow dashboard to see your experiments and what's been logged so far, run the
following cell. This cell will spin up a local server that you can access via
the link mentioned after the "Listening at:" `INFO` log statement.

In [None]:
import os
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri

os.environ["MLFLOW_TRACKING_URI"] = get_tracking_uri()

if Environment.in_google_colab():
    # run mlflow through a cloudflare tunnel to get a public endpoint
    !mlflow ui --backend-store-uri $MLFLOW_TRACKING_URI & cloudflared tunnel --url http://localhost:5000
else:
    !mlflow ui --backend-store-uri $MLFLOW_TRACKING_URI

Our pipeline above registered the best model with the MLflow model registry.
Whenever you register a model it also versions the model since it's likely that
you'll be iterating and improving your model over time.

We'll now turn to actually deploying our model and serving some predictions, for
which we'll need to specify the model version we want to use. You can specify
the version number manually but below we'll use the ZenML `Client` to get the
latest version number.

In [None]:
from zenml.client import Client

most_recent_model_version_number = int(
    Client()
    .active_stack.model_registry.list_model_versions(metadata={})[0]
    .version
)
most_recent_model_version_number

Now we've trained our model, and we've found the best one, we want to deploy it
and run some inference on the deployed model. We'll use the local MLflow model
deployer which once again comes with some pre-built ZenML steps to save you reinventing the wheel.

In [None]:
from zenml.integrations.mlflow.steps.mlflow_deployer import (
    mlflow_model_registry_deployer_step,
)

model_deployer = mlflow_model_registry_deployer_step.with_options(
    parameters=dict(
        registry_model_name=model_name,
        registry_model_version=most_recent_model_version_number,
    )
)

When you deploy a model this is usually something you want to remain available
and running for a long time, so ZenML automatically creates a background service
for your deployed model. We load the service (already created by the
`model_deployer` step) and then use it to make some predictions.

In [None]:
from zenml.services import BaseService
from zenml.client import Client


@step(enable_cache=False)
def prediction_service_loader() -> BaseService:
    """Load the model service of our train_and_register_model_pipeline."""
    client = Client()
    model_deployer = client.active_stack.model_deployer
    services = model_deployer.find_model_server(
        pipeline_name="train_and_register_model_pipeline",
        running=True,
    )
    return services[0]


@step
def predictor(
    service: BaseService,
    data: pd.DataFrame,
) -> Annotated[list, "predictions"]:
    """Run a inference request against a prediction service."""
    service.start(timeout=10)  # should be a NOP if already started
    print(f"Running predictions on data (single individual): {data.to_numpy()[0]}")
    prediction = service.predict(data.to_numpy())
    print(f"Prediction (for single example slice) is: {bool(prediction.tolist()[0])}")
    return prediction.tolist()

In [None]:
# Register the MLflow model deployer
!zenml model-deployer register mlflow --flavor=mlflow

# Register a new stack with the new stack components
!zenml stack update quickstart -d mlflow

Once again there is one dependency in terms of how the step needs to run, so
we specify it upfront: the prediction service needs to be loaded
before we try to make predictions with it.

In [None]:
@pipeline
def deploy_and_predict() -> None:
    """Deploy the best model and run some predictions."""
    prediction_service_loader.after(model_deployer)

    model_deployer()
    _, inference_data, _, _ = training_data_loader()
    model_deployment_service = prediction_service_loader()
    predictor(service=model_deployment_service, data=inference_data)

Notice how we specify that we want the `prediction_service_loader` step to run *after* the
model_deployer step. This is because we won't have a model ready for prediction
until the deployment has taken place. ZenML automatically tries to run steps in
parallel, so sometimes if you have this kind of sequencing you need to do then
you'll need to specify it explicitly.

![](../_assets/local_stack_with_local_mlflow_tracker_and_registry_and_deployer.png)

Unlike in the previous case where we just ran the pipeline directly, we might
not want to deploy the model every time. Consider the case where our models are
returning values under 50% accuracy on the test data. In that case we might want
to address the issues with accuracy and not spin up a deployment at all. We can
access the artifacts associated with the previous pipeline run and check the
test accuracy metric to see if it's above a certain threshold. Adding this to
our workflow is as simple as adding a conditional step.

In [None]:
best_model_test_accuracy = (
    Client().get_pipeline("train_and_register_model_pipeline")
    .last_successful_run.steps["best_model_selector"]
    .outputs["best_model_test_acc"].load()
)

if best_model_test_accuracy > 0.7:
    deploy_and_predict()

If you recall, the purpose of this model was to predict whether or not someone
earns more than \$50,000 USD per year. You can see a single example in the output above.
Given the features of a particular individual, the model predicts that they do
not earn more than $50k per year.

If we were interested in learning more about the model's predictions, we could
separately load the predictor service and use it to pass in some other data or
try things out. To load the predictor we can run:

In [None]:
predictor_service = deploy_and_predict.model.last_successful_run.steps[
    "prediction_service_loader"
].output.load()

predictor_service

At this point, passing in some data is as simple as calling the `predict` method
on the predictor service. We can try this here:

In [None]:
print(
    f"Model predictions: {predictor_service.predict(X_test.to_numpy()[25:35])}"
)
print(f"Ground truth:      {y_test.to_numpy()[25:35]}")

We're passing in some of our test data into the model and getting back the
predictions. You can already start to see some of the places where our
predictions are not matching the ground truth labels. This is to be expected but
we could potentially use this to now iterate on our models by adding more steps.

To get an overview of the models and model versions that we have registered and
deployed so
far, we can use the CLI to list these out.

In [None]:
!zenml model-registry models list

In [None]:
!zenml model-registry models list-versions zenml-quickstart-model

In [None]:
!zenml model-deployer models list

To view all this on the ZenML Dashboard, simply spin up the server again and
view the steps via the DAG visualiser and also browse the artifacts.

In [None]:
if Environment.in_google_colab():
    !zenml down # server needs restarting due to colab bug.
    # run ZenML through a cloudflare tunnel to get a public endpoint
    !zenml up --port 8237 & cloudflared tunnel --url http://localhost:8237
else:
    !zenml up

## Congratulations!

You just built two ML pipelines! You trained two models, evaluated them against
a test set, registered the best one with the MLflow model registry, deployed it
and served some predictions. You also learned how to iterate on your models and
data by using some of the ZenML utility abstractions. You saw how to view your
artifacts and stacks via the CLI as well as the ZenML Dashboard.

And that is just the tip of the iceberg of what ZenML can do; check out the [**Integrations**](https://zenml.io/integrations) page for a list of all the cool MLOps tools that ZenML supports!

## What to do now

* If you have questions or feedback... join our [**Slack Community**](https://zenml.io/slack-invite) and become part of the ZenML family!
* If you want to try ZenML in a real-world setting... check out the [ZenML Cloud](https://cloud.zenml.io/), a free trial of
    ZenML's managed offering that runs on your Cloud platform. [**Sign up here**](https://sandbox.zenml.io/).