In [None]:
!zenml init

In [1]:
# Do the imports at the top

import random
from zenml import ExternalArtifact, pipeline 
from zenml.client import Client
from zenml.logger import get_logger
from uuid import UUID

import os
from typing import Optional, List

from zenml import pipeline

from steps import (
    data_loader,
    data_preprocessor,
    data_splitter,
    model_evaluator,
    model_trainer,
    inference_predict,
    inference_preprocessor
)

logger = get_logger(__name__)

client = Client()

In [2]:
@pipeline
def _feature_engineering(
    test_size: float = 0.2,
    drop_na: Optional[bool] = None,
    normalize: Optional[bool] = None,
    drop_columns: Optional[List[str]] = None,
    target: Optional[str] = "target",
):
    """
    Feature engineering pipeline.

    This is a pipeline that loads the data, processes it and splits
    it into train and test sets.

    Args:
        test_size: Size of holdout set for training 0.0..1.0
        drop_na: If `True` NA values will be removed from dataset
        normalize: If `True` dataset will be normalized with MinMaxScaler
        drop_columns: List of columns to drop from dataset
        target: Name of target column in dataset
    """
    ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
    # Link all the steps together by calling them and passing the output
    # of one step as the input of the next step.
    raw_data = data_loader(random_state=random.randint(0, 100), target=target)
    dataset_trn, dataset_tst = data_splitter(
        dataset=raw_data,
        test_size=test_size,
    )
    dataset_trn, dataset_tst, _ = data_preprocessor(
        dataset_trn=dataset_trn,
        dataset_tst=dataset_tst,
        drop_na=drop_na,
        normalize=normalize,
        drop_columns=drop_columns,
        target=target,
    )
    
    return dataset_trn, dataset_tst

In [3]:
pipeline_args = {}
pipeline_args["config_path"] = os.path.join("configs", "feature_engineering.yaml")
fe_p_configured = _feature_engineering.with_options(**pipeline_args)

In [4]:
latest_run = fe_p_configured()

[1;35mInitiating a new run for the pipeline: [0m[1;36m_feature_engineering[1;35m.[0m
[1;35mReusing registered version: [0m[1;36m(version: 7)[1;35m.[0m
[1;35mNew model version [0m[1;36m22[1;35m was created.[0m
[1;35mExecuting a new run.[0m
[1;35mUsing user: [0m[1;36mhamza@zenml.io[1;35m[0m
[1;35mUsing stack: [0m[1;36mdefault[1;35m[0m
[1;35m  artifact_store: [0m[1;36mdefault[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35mStep [0m[1;36mdata_loader[1;35m has started.[0m
[1;35mDataset with 541 records loaded![0m
[1;35mStep [0m[1;36mdata_loader[1;35m has finished in [0m[1;36m6.238s[1;35m.[0m
[1;35mStep [0m[1;36mdata_splitter[1;35m has started.[0m
[1;35mStep [0m[1;36mdata_splitter[1;35m has finished in [0m[1;36m8.293s[1;35m.[0m
[1;35mStep [0m[1;36mdata_preprocessor[1;35m has started.[0m
[1;35mStep [0m[1;36mdata_preprocessor[1;35m has finished in [0m[1;36m11.469s[1;35m.[0m
[1;35mRun [0m[1;36m_featu

In [5]:
@pipeline
def _training(
    train_dataset_id: Optional[UUID] = None,
    test_dataset_id: Optional[UUID] = None,
    min_train_accuracy: float = 0.0,
    min_test_accuracy: float = 0.0,
):
    """
    Model training pipeline.

    This is a pipeline that loads the data, processes it and splits
    it into train and test sets, then search for best hyperparameters,
    trains and evaluates a model.

    Args:
        test_size: Size of holdout set for training 0.0..1.0
        drop_na: If `True` NA values will be removed from dataset
        normalize: If `True` dataset will be normalized with MinMaxScaler
        drop_columns: List of columns to drop from dataset
    """
    ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
    # Link all the steps together by calling them and passing the output
    # of one step as the input of the next step.
    
    # Execute Feature Engineering Pipeline
    if train_dataset_id is None or test_dataset_id is None:
        dataset_trn, dataset_tst = _feature_engineering()
    else:
        dataset_trn = ExternalArtifact(id=train_dataset_id)
        dataset_tst = ExternalArtifact(id=test_dataset_id)
    
    model = model_trainer(
        dataset_trn=dataset_trn,
    )

    model_evaluator(
        model=model,
        dataset_trn=dataset_trn,
        dataset_tst=dataset_tst,
        min_train_accuracy=min_train_accuracy,
        min_test_accuracy=min_test_accuracy,
    )


In [6]:
pipeline_args = {}
pipeline_args["config_path"] = os.path.join("configs", "training.yaml")
fe_t_configured = _training.with_options(**pipeline_args)

[1;35m[0m[1;36mversion[1;35m [0m[1;36mlatest[1;35m matches one of the possible [0m[1;36mModelStages[1;35m and will be fetched using stage.[0m


In [7]:
fe_t_configured()

NameError: name 'target' is not defined

In [None]:
from typing import Optional

import pandas as pd
from typing_extensions import Annotated

from zenml import get_step_context, step
from zenml.logger import get_logger

logger = get_logger(__name__)


@step
def inference_predict(
    dataset_inf: pd.DataFrame,
) -> Annotated[pd.Series, "predictions"]:
    """Predictions step.

    This is an example of a predictions step that takes the data in and returns
    predicted values.

    This step is parameterized, which allows you to configure the step
    independently of the step code, before running it in a pipeline.
    In this example, the step can be configured to use different input data.
    See the documentation for more information:

        https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines

    Args:
        dataset_inf: The inference dataset.

    Returns:
        The predictions as pandas series
    """
    ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
    model_version = get_step_context().model_version

    print(model_version)

    # run prediction from memory
    predictor = model_version.load_artifact("model")
    predictions = predictor.predict(dataset_inf)

    print(predictions)
    predictions = pd.Series(predictions, name="predicted")
    ### YOUR CODE ENDS HERE ###

    return predictions


In [None]:
@pipeline
def _batch_inference():
    """
    Model batch inference pipeline.

    This is a pipeline that loads the inference data, processes
    it, analyze for data drift and run inference.
    """
    ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ###
    # Link all the steps together by calling them and passing the output
    # of one step as the input of the next step.
    ########## ETL stage  ##########
    random_state = client.get_artifact("dataset").run_metadata["random_state"].value
    target = client.get_artifact("dataset_trn").run_metadata['target'].value
    df_inference = data_loader(
        random_state=random_state, is_inference=True
    )
    df_inference = inference_preprocessor(
        dataset_inf=df_inference,
        preprocess_pipeline=ExternalArtifact(name="preprocess_pipeline"),
        target=target,
    )
    inference_predict(
        dataset_inf=df_inference,
    )


In [None]:
pipeline_args = {}
pipeline_args["config_path"] = os.path.join("configs", "inference.yaml")
fe_b_configured = _batch_inference.with_options(**pipeline_args)

In [None]:
fe_b_configured()

# Huggingface Model to Sagemaker Endpoint: Automating MLOps with ZenML
Deploying Huggingface models to AWS Sagemaker endpoints typically only requires a few lines of code. However, there's a growing demand to not just deploy, but to seamlessly automate the entire flow from training to production with comprehensive lineage tracking. ZenML adeptly fills this niche, providing an end-to-end MLOps solution for Huggingface users wishing to deploy to Sagemaker. Below, we’ll walk through the architecture that ZenML employs to bring a Huggingface model into production with AWS Sagemaker. Of course all of this can be adapted to not just Sagemaker, but any other model deployment service like GCP Vertex or Azure ML Platform.

This blog post showcases one way of using ZenML pipelines to achieve this:

- Create and version a dataset in a feature_engineering_pipeline.
- Train/Finetune a BERT-based Sentiment Analysis NLP model and push to Huggingface Hub in a training_pipeline.
- Promote this model to Production by comparing to previous models in a promotion_pipeline.
- Deploy the model at the Production Stage to a AWS Sagemaker endpoint with a deployment_pipeline.

<img src="assets/pipelines_overview.png" alt="Pipelines Overview">

In [None]:
# Do the imports at the top

import numpy as np
from datasets import DatasetDict, load_dataset
from typing_extensions import Annotated
from zenml import step
from zenml.logger import get_logger

import os
from typing import Optional
from datetime import datetime as dt

from zenml import pipeline
from zenml.model import ModelConfig

from steps import (
    data_loader,
    notify_on_failure,
    tokenization_step,
    tokenizer_loader,
    generate_reference_and_comparison_datasets,
)
from zenml.integrations.evidently.metrics import EvidentlyMetricConfig
from zenml.integrations.evidently.steps import (
    EvidentlyColumnMapping,
    evidently_report_step,
)

from pipelines import (
    sentinment_analysis_deploy_pipeline,
    sentinment_analysis_promote_pipeline,
    sentinment_analysis_training_pipeline,
)

logger = get_logger(__name__)

# 🍳Breaking it down





## 👶 Step 1: Start with feature engineering

Automated feature engineering forms the foundation of this MLOps workflow. Thats why the first pipeline is the feature engineering pipeline. This pipeline loads some data from Huggingface and uses a base tokenizer to create a tokenized dataset. The data loader step is a simple Python function that returns a Huggingface dataloader object:

In [None]:
@step
def data_loader() -> Annotated[DatasetDict, "dataset"]:
    logger.info(f"Loading dataset airline_reviews... ")
    hf_dataset = load_dataset("Shayanvsf/US_Airline_Sentiment")
    hf_dataset = hf_dataset.rename_column("airline_sentiment", "label")
    hf_dataset = hf_dataset.remove_columns(
        ["airline_sentiment_confidence", "negativereason_confidence"]
    )
    return hf_dataset

Notice that you can give each dataset a name with Python’s Annotated object. The DatasetDict is a native Huggingface dataset which ZenML knows how to persist through steps. This flow ensures reproducibility and version control for every dataset iteration.

Also notice this is a simple Python function, that can be called with the `entrypoint` wrapper:

In [None]:
hf_dataset = data_loader.entrypoint()
print(hf_dataset)

Now we put this a full feature engineering pipeline. Each run of the feature engineering pipeline produces a new dataset to use for the training pipeline. ZenML versions this data as it flows through the pipeline.

<img src="assets/pipelines_feature_eng.png" alt="Pipelines Feature Engineering">

### Set your stack

In [None]:
!zenml stack describe hf-sagemaker-local

In [None]:
!zenml stack set hf-sagemaker-local

In [None]:
!zenml stack get

### Run the pipeline

In [None]:
@pipeline(on_failure=notify_on_failure)
def sentinment_analysis_feature_engineering_pipeline(
    lower_case: Optional[bool] = True,
    padding: Optional[str] = "max_length",
    max_seq_length: Optional[int] = 128,
    text_column: Optional[str] = "text",
    label_column: Optional[str] = "label",
):
    # Link all the steps together by calling them and passing the output
    # of one step as the input of the next step.

    ########## Load Dataset stage ##########
    dataset = data_loader()

    ########## Data Quality stage ##########
    reference_dataset, comparison_dataset = generate_reference_and_comparison_datasets(
        dataset
    )
    text_data_report = evidently_report_step.with_options(
        parameters=dict(
            column_mapping=EvidentlyColumnMapping(
                target="label",
                text_features=["text"],
            ),
            metrics=[
                EvidentlyMetricConfig.metric("DataQualityPreset"),
                EvidentlyMetricConfig.metric(
                    "TextOverviewPreset", column_name="text"
                ),
            ],
            # We need to download the NLTK data for the TextOverviewPreset
            download_nltk_data=True,
        ),
    )
    text_data_report(reference_dataset, comparison_dataset)

    ########## Tokenization stage ##########
    tokenizer = tokenizer_loader(lower_case=lower_case)
    tokenized_data = tokenization_step(
        dataset=dataset,
        tokenizer=tokenizer,
        padding=padding,
        max_seq_length=max_seq_length,
        text_column=text_column,
        label_column=label_column,
    )
    return tokenizer, tokenized_data

In [None]:
# Run a pipeline with the required parameters. 
no_cache: bool = True
zenml_model_name: str = "distil_bert_sentiment_analysis"
max_seq_length = 512

# This executes all steps in the pipeline in the correct order using the orchestrator
# stack component that is configured in your active ZenML stack.
model_config = ModelConfig(
    name=zenml_model_name,
    license="Apache 2.0",
    description="Show case Model Control Plane.",
    create_new_model_version=True,
    delete_new_version_on_failure=True,
    tags=["sentiment_analysis", "huggingface"],
)

pipeline_args = {}

if no_cache:
    pipeline_args["enable_cache"] = False

# Execute Feature Engineering Pipeline
pipeline_args["model_config"] = model_config
pipeline_args["config_path"] = os.path.join("configs", "feature_engineering_config.yaml")
run_args_feature = {
    "max_seq_length": max_seq_length,
}
pipeline_args[
    "run_name"
] = f"sentinment_analysis_feature_engineering_pipeline_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"
p = sentinment_analysis_feature_engineering_pipeline.with_options(**pipeline_args)
p(**run_args_feature)

In [None]:
from zenml.client import Client
from IPython.display import display, HTML

client = Client()
# CHANGE THIS TO THE LATEST RUN ID
latest_run = client.get_pipeline_run("sentinment_analysis_feature_engineering_pipeline_run_2023_11_21_10_55_56")
html = latest_run.steps["evidently_report_step"].outputs['report_html'].load()
display(HTML(html))

## 💪 Step 2: Train the model with Huggingface Hub as the model registry
 

Once the feature engineering pipeline has run a few times, we have many datasets to choose from. We can feed our desired one into a function that trains the model on the data. Thanks to the ZenML Huggingface integration, this data is loaded directly from the ZenML artifact store.

<img src="assets/training_pipeline_overview.png" alt="Pipelines Trains">

On the left side, we see our local MLOps stack, which defines our infrastructure and tooling we are using for this particular pipeline. ZenML makes it easy to run on a local stack on your development machine, or switch out the stack to run on a AWS Kubeflow-based stack (if you want to scale up).

On the right side is the new kid on the block - the ZenML Model Control Plane. The Model Control Plane is a new feature in ZenML that allows users to have a complete overview of their machine learning models. It allows teams to consolidate all artifacts related to their ML models into one place, and manage its lifecycle easily as you can see from this view from the ZenML Cloud:

In [None]:
pipeline_args["config_path"] = os.path.join("configs", "trainer_config.yaml")

pipeline_args["enable_cache"] = True

run_args_train = {
    "num_epochs": 1,
    "train_batch_size": 64,
    "eval_batch_size": 64,
    "learning_rate": 2e-4,
    "weight_decay": 0.01,
    "max_seq_length": 512,
}

# Use versioned artifacts from the last step
# run_args_train["dataset_artifact_id"] = latest_run.steps['tokenization_step'].output.id
# run_args_train["tokenizer_artifact_id"] = latest_run.steps['tokenizer_loader'].output.id

# Configure the model
pipeline_args["model_config"] = model_config

pipeline_args[
    "run_name"
] = f"sentinment_analysis_training_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"

In [None]:
sentinment_analysis_training_pipeline.with_options(**pipeline_args)(
    **run_args_train
)

In [None]:
### Check out a new stack
!zenml stack describe hf-sagemaker-airflow

In [None]:
### Change the stack
!zenml stack set hf-sagemaker-airflow

In [None]:
sentinment_analysis_training_pipeline.with_options(**pipeline_args)(
    **run_args_train
)

## 🫅 Step 3: Promote the model to production


Following training, the automated promotion pipeline evaluates models against predefined metrics, identifying and marking the most performant one as 'Production ready'. This is another common use case for the Model Control Plane; we store the relevant metrics there to access them easily later.

<img src="assets/promoting_pipeline_overview.png" alt="Pipelines Trains">

In [None]:
!zenml stack set hf-sagemaker-local

In [None]:
run_args_promoting = {}
model_config = ModelConfig(name=zenml_model_name)
pipeline_args["config_path"] = os.path.join("configs", "promoting_config.yaml")

pipeline_args["model_config"] = model_config

pipeline_args[
    "run_name"
] = f"sentinment_analysis_promoting_pipeline_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"

In [None]:
sentinment_analysis_promote_pipeline.with_options(**pipeline_args)(
    **run_args_promoting
)

## 💯 Step 4: Deploy the model to AWS Sagemaker Endpoints


This is the final step to automate the deployment of the slated production model to a Sagemaker endpoint. The deployment pipelines handles the complexities of AWS interactions and ensures that the model, along with its full history and context, is transitioned into a live environment ready for use. Here again we use the Model Control Plane interface to query the Huggingface revision and use that information to push to Huggingface Hub.

<img src="assets/deploying_pipeline_overview.png" alt="Pipelines Trains">


In [None]:
!zenml stack set hf-sagemaker-local

In [None]:
pipeline_args["config_path"] = os.path.join("configs", "deploying_config.yaml")

# Deploying pipeline has new ZenML model config
model_config = ModelConfig(
    name=zenml_model_name,
    version=ModelStages.PRODUCTION,
)
pipeline_args["model_config"] = model_config
pipeline_args["enable_cache"] = False
run_args_deploying = {}
pipeline_args[
    "run_name"
] = f"sentinment_analysis_deploy_pipeline_run_{dt.now().strftime('%Y_%m_%d_%H_%M_%S')}"

In [None]:
sentinment_analysis_deploy_pipeline.with_options(**pipeline_args)(
    **run_args_deploying
)

ZenML builds upon the straightforward deployment capability of Huggingface models to AWS Sagemaker, and transforms it into a sophisticated, repeatable, and transparent MLOps workflow. It takes charge of the intricate steps necessary for modern ML systems, ensuring that software engineering leads can focus on iteration and innovation rather than operational intricacies.

To delve deeper into each stage, refer to the comprehensive guide on GitHub[: zenml-io/zenml-huggingface-sagemak](https://github.com/zenml-io/zenml-huggingface-sagemaker)er. Additionally[, this YouTube playli](https://www.youtube.com/watch?v=Q1EH2H8Akgo&list=PLhNrLW_IWplw6dBbmGcL828-atJMu3CwF)st provides a detailed visual walkthrough of the entire pipeline: Huggingface to Sagemaker ZenML tutorial.

Interested in standardizing your MLOps workflows? ZenML Cloud is now available to all - get a managed ZenML server with important features such as RBAC and pipeline trigge[rs. Book a ](https://zenml.io/book-a-demo)demo with us now to learn how you can create your own MLOps pipelines today.