# SageMaker Pipelines: train a Hugging Face model, deploy it with a Lambda step  

This notebook demonstrates how to use [SageMaker Pipelines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-sdk.html) to train a [Hugging Face](https://docs.aws.amazon.com/sagemaker/latest/dg/hugging-face.html) NLP model and deploy it using a Lambda function invoked by a SageMaker Pipelines Lambda step. The SageMaker integration with Hugging Face makes it easy to train and deploy advanced NLP models.  A Lambda step in SageMaker Pipelines enables you to easily do lightweight model deployments and other serverless operations.

In this example use case, the Hugging Face model is trained on the IMDb movie reviews dataset.  The goal is to predict the sentiment of the movie review (positive or negative).  The pipeline built in this notebook covers the full end-to-end workflow, from preparing the dataset, to model training, evaluation, registration (if model quality passes a test), and deployment.

**Prerequisites**:  
- Make sure your notebook environment has IAM managed policy `AmazonSageMakerPipelinesIntegrations` as well as `AmazonSageMakerFullAccess`

https://aws.amazon.com/de/blogs/machine-learning/use-a-sagemaker-pipeline-lambda-step-for-lightweight-model-deployments/

# Development Environment and Permissions 

## Installation

_*Note:* we only install the required libraries from Hugging Face and AWS. You also need PyTorch or Tensorflow, if you haven´t it installed_

In [None]:
!pip install "sagemaker>=2.48.0" --upgrade

In [None]:
import boto3
import os
import numpy as np
import pandas as pd
import sagemaker
import sys
import time

from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString

from sagemaker.lambda_helper import Lambda

from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import CacheConfig, ProcessingStep

from sagemaker.huggingface import HuggingFace, HuggingFaceModel
import sagemaker.huggingface

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import CreateModelStep, RegisterModel

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo,ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

from sagemaker.workflow.pipeline import Pipeline, PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables

## Permissions

_If you are going to use Sagemaker in a local environment. You need access to an IAM Role with the required permissions for Sagemaker. You can find [here](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) more about it._

In [None]:
import sagemaker

sess = sagemaker.Session()
region = sess.boto_region_name
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()

print(f"SageMaker role: {role}")
print(f"SageMaker bucket: {bucket}")
print(f"SageMaker region: {region}")

# Pipeline Overview

![pipeline](./imgs/overview.png)

# Defining the Pipeline

## 0. Pipeline parameters

Before defining the pipeline, it is important to parameterize it.  Almost any aspect of a SageMaker Pipeline can be parameterized, including instance types and counts.

https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html

### Constants

In [None]:
# S3 prefix where assets will be stored
s3_prefix = "hugging-face-pipeline-demo"

# base name prefix for SageMaker jobs (training, processing, inference)
base_job_prefix = s3_prefix

# Cache configuration for workflow
cache_config = CacheConfig(enable_caching=False, expire_after="30d")

# package versions
transformers_version = "4.11.0"
pytorch_version = "1.9.0"
py_version = "py38"

model_id_="distilbert-base-uncased"
dataset_name_="imdb"

### Parameters

In [None]:
# Global pipeline parameters

model_id = ParameterString(name="ModelId", default_value="distilbert-base-uncased")
dataset_name = ParameterString(name="DatasetName", default_value="imdb")

## 1. Processing Step

A SKLearn Processing step is used to invoke a SageMaker Processing job with a custom python script - `preprocessing.py`. 

### Processing Parameter

In [None]:
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.c5.2xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_script = ParameterString(name="ProcessingScript", default_value="./scripts/preprocessing.py")

### Processor

In [None]:
processing_output_destination = f"s3://{bucket}/{s3_prefix}/data"


sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=base_job_prefix + "/preprocessing",
    role=role
)

step_process = ProcessingStep(
    name="ProcessDataForTraining",
    cache_config=cache_config,
    processor=sklearn_processor,
    job_arguments=["--transformers_version",transformers_version,
                   "--pytorch_version",pytorch_version,
                   "--model_id",model_id_,
                   "--dataset_name",dataset_name_],
    outputs=[
        ProcessingOutput(
            output_name="train",
            destination=f"{processing_output_destination}/train",
            source="/opt/ml/processing/train",
        ),
        ProcessingOutput(
            output_name="test",
            destination=f"{processing_output_destination}/test",
            source="/opt/ml/processing/test",
        ),
        ProcessingOutput(
            output_name="validation",
            destination=f"{processing_output_destination}/test",
            source="/opt/ml/processing/validation",
        ),
    ],
    code=processing_script,
)

## 2. Model Training Step

Use SageMaker's [Hugging Face](https://sagemaker.readthedocs.io/en/stable/frameworks/huggingface/sagemaker.huggingface.html) Estimator class to create a model training step for the Hugging Face [DistilBERT](https://huggingface.co/distilbert-base-uncased) model.  Transformer-based models such as the original BERT can be very large and slow to train.  DistilBERT, however, is a small, fast, cheap and light Transformer model trained by distilling BERT base. It reduces the size of a BERT model by 40%, while retaining 97% of its language understanding capabilities and being 60% faster. 

Besides the container specified above, the Hugging Face estimator also takes hyperparameters as a dictionary.  The training instance type and size are pipeline parameters that can be easily varied in future pipeline runs without changing any code.  

In [None]:
# training step parameters
training_entry_point = ParameterString(name="TrainingEntryPoint", default_value="train.py")
training_source_dir = ParameterString(name="TrainingSourceDir", default_value="./scripts")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.p3.2xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

In [None]:
# hyperparameters, which are passed into the training job
epochs=ParameterString(name="Epochs", default_value="1")
eval_batch_size=ParameterString(name="EvalBatchSize", default_value="32")         
train_batch_size=ParameterString(name="TrainBatchSize", default_value="16")             
learning_rate=ParameterString(name="LearningRate", default_value="3e-5")               
fp16=ParameterString(name="Fp16", default_value="True")

huggingface_estimator = HuggingFace(
    entry_point=training_entry_point,
    source_dir=training_source_dir,
    base_job_name=base_job_prefix + "/training",
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role,
    transformers_version=transformers_version,
    pytorch_version=pytorch_version,
    py_version=py_version,
    hyperparameters={
         'epochs':epochs,  
         'eval_batch_size': eval_batch_size,   
         'train_batch_size': train_batch_size,              
         'learning_rate': learning_rate,               
         'model_id': model_id,
         'fp16': fp16
    },
)

In [None]:
step_train = TrainingStep(
    name="TrainHuggingFaceModel",
    estimator=huggingface_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        ),
    },
    cache_config=cache_config,
)

## 3. Model evaluation Step

A ProcessingStep is used to evaluate the performance of the trained model. Based on the results of the evaluation, either the model is created, registered, and deployed, or the pipeline stops.

In the training job, the model was evaluated against the test dataset, and the result of the evaluation was stored in the `model.tar.gz` file saved by the training job.  The results of that evaluation are copied into a `PropertyFile` in this ProcessingStep so that it can be used in the ConditionStep. 

In [None]:
evaluation_script = ParameterString(name="EvaluationScript", default_value="./scripts/evaluate.py")

In [None]:
# script_eval = ScriptProcessor(
#     image_uri=container,
#     command=["python3"],
#     instance_type=processing_instance_type,
#     instance_count=1,
#     base_job_name=base_job_prefix + "/evaluation",
#     sagemaker_session=sagemaker_session,
#     role=role,
# )
script_eval = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name=base_job_prefix + "/evaluation",
    role=role,
)

evaluation_report = PropertyFile(
    name="HuggingFaceEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

step_eval = ProcessingStep(
    name="HuggingfaceEvalLoss",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation",
            destination=f"s3://{bucket}/{s3_prefix}/evaluation_report",
        ),
    ],
    code=evaluation_script,
    property_files=[evaluation_report],
    cache_config=cache_config,
)

## 4. Register the model

The trained model is registered in the Model Registry under a Model Package Group. Each time a new model is registered, it is given a new version number by default.  The model is registered in the "Approved" state so that it can be deployed by the Lambda function. Registration will only happen if the output of the ConditionStep is true, i.e, the metrics being checked are within the threshold defined.

In [None]:
model = HuggingFaceModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    transformers_version=transformers_version,
    pytorch_version=pytorch_version,
    py_version=py_version,
    sagemaker_session=sess
)
model_package_group_name = "HuggingFaceModelPackageGroup"
step_register = RegisterModel(
    name="HuggingFaceRegisterModel",
    model=model,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.g4dn.xlarge", "ml.m5.xlarge"],
    transform_instances=["ml.g4dn.xlarge", "ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status="Approved",
)

## 5. Model Deployment

The SageMaker SDK provides a Lambda helper class that can be used to create a Lambda function. This function is provided to the Lambda step for invocation via the pipeline. Alternatively, a predefined Lambda function can be provided to the Lambda step. 

The SageMaker Execution Role requires the policy `AmazonSageMakerPipelinesIntegrations` to create the Lambda function, and the Lambda function needs a role with policies allowing creation of a SageMaker endpoint. 

A helper function in `iam_helper.py` is provided to create the Lambda role. To use the script, the notebook execution role must include the required policy to create an IAM role. 

In [None]:
# custom Helper Step for ModelDeployment
from utils.deploy_step import ModelDeployment

step_deployment = ModelDeployment(
    model_name=f"{model_id_}-{dataset_name_}",
    registered_model=step_register.steps[0],
    endpoint_instance_type="ml.g4dn.xlarge",
    role=role,
    autoscaling_policy=None,
)

## Condition for deployment

For the condition to be True and the steps after evaluation to run, the evaluated loss of the Hugging Face model must be less than 0.3.

In [None]:
threshold_accuracy = ParameterFloat(name="ThresholdAccuracy", default_value=0.8)

In [None]:
cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="eval_accuracy",
    ),
    right=threshold_accuracy,
)

step_cond = ConditionStep(
    name="CheckHuggingfaceEvalLoss",
    conditions=[cond_lte],
    if_steps=[step_register, step_deployment],
    else_steps=[],
)

# Pipeline definition and execution

SageMaker Pipelines constructs the pipeline graph from the implicit definition created by the way pipeline steps inputs and outputs are specified.  There's no need to specify that a step is a "parallel" or "serial" step.  Steps such as model registration after the condition step are not listed in the pipeline definition because they do not run unless the condition is true.  If so, they are run in order based on their specified inputs and outputs.

https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html

In [None]:
# define parameter which should be overwritten
pipeline_parameters=dict(
        ModelId=model_id,
        ThresholdAccuracy=0.7,
        Epochs="3",
        TrainBatchSize="32",
        EvalBatchSize="64",
    )

In [None]:
pipeline = Pipeline(
    name=f"HuggingFaceDemoPipeline",
    parameters=[
        model_id,
        dataset_name,
        processing_instance_type,
        processing_instance_count,
        processing_script,
        training_entry_point,
        training_source_dir,
        training_instance_type,
        training_instance_count,
        evaluation_script,
        threshold_accuracy,
        epochs,
        eval_batch_size,
        train_batch_size,
        learning_rate,
        fp16
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

We can examine the pipeline definition in JSON format.  You also can inspect the pipeline graph in SageMaker Studio by going to the page for your pipeline.  

In [None]:
import json

json.loads(pipeline.definition())

![pipeline](./imgs/pipeline.png)

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start(parameters=pipeline_parameters)

In [None]:
execution.wait()

## Getting predictions from the endpoint

After the previous cell completes, you can check whether the endpoint has finished deploying.  In SageMaker Studio, click on the tilted triange icon in the left toolbar, then select **Endpoints** in the drop down menu.  In the list of endpoints, click on the one with the name beginning `demo-hf-endpoint`, then click on **AWS settings**.  When the Status becomes InService, you can run the following code cells.

### Setup 

We can use the endpoint name to set up a `Predictor` object that will be used to get predictions.

In [None]:
from sagemaker.huggingface import HuggingFacePredictor

endpoint_name = f"{model_id}-{dataset_name}"

hf_predictor = HuggingFacePredictor(endpoint_name,sagemaker_session=sagemaker_session)

### Test data

Here are a couple of sample reviews we would like to classify as positive (LABEL_1) or negative (LABEL_0).  Demonstrating the power of advanced Transformer-based models such as this Hugging Face model, the model should do quite well even though the reviews are mixed.  

In [None]:
sentiment_input1 = {"inputs":"Although the movie had some plot weaknesses, it was engaging. Special effects were mind boggling.  Can't wait to see what this creative team does next."}

hf_predictor.predict(sentiment_input1)

In [None]:
sentiment_input2 = {"inputs":"There was some good acting, but the story was ridiculous. The other sequels in this franchise were better.  It's time to take a break from this IP, but if they switch it up for the next one, I'll check it out."}

hf_predictor.predict(sentiment_input2)

## Cleanup Resources

The following cell will delete the resources created by the Lambda function and the Lambda itself. 
Deleting other resources such as the S3 bucket and the IAM role for the Lambda function are the responsibility of the notebook user. 

In [None]:
sm_client = boto3.client("sagemaker")

# Delete the Lambda function
step_deployment.func.delete()

# Delete the endpoint
hf_predictor.delete_endpoint()