# SageMaker Pipelines: train a Hugging Face model, deploy it with a Lambda step synchronously
This notebook demonstrates how to use SageMaker Pipelines to train a Hugging Face NLP model and deploy it using a Lambda function invoked by a SageMaker Pipelines Lambda step. The SageMaker integration with Hugging Face makes it easy to train and deploy advanced NLP models. A Lambda step in SageMaker Pipelines enables you to easily do lightweight model deployments and other serverless operations.

In this example use case, the Hugging Face model is trained on the IMDb movie reviews dataset. The goal is to predict the sentiment of the movie review (positive or negative). The pipeline built in this notebook covers the full end-to-end workflow, from preparing the dataset, to model training, evaluation, registration (if model quality passes a test), and deployment.

Prerequisites:

- Make sure your notebook environment has IAM managed policy AmazonSageMakerPipelinesIntegrations as well as AmazonSageMakerFullAccess
- For SageMaker Studio, use the kernel Python 3 (Data Science)

We'll start by updating the SageMaker SDK, and importing some necessary packages.

In [3]:
import sys
!{sys.executable} -m pip install sagemaker --quiet --upgrade

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.20.51 requires botocore==1.21.51, but you have botocore 1.22.7 which is incompatible.[0m
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m


In [4]:
import boto3 # boto3 is the AWS SDK for Python
import os # provides functions for interacting with the operating system
import numpy as np # library that offers support for large multi-dimensional arrays
import pandas as pd # for data analysis and manipulation
import sagemaker # SageMaker Python SDK for training and deploying machine learning models on Amazon SageMaker.
import sys # provides access to some variables and functions used or maintained by the interpreter
import time # provides various time-related functions

from sagemaker.workflow.parameters import ParameterInteger, ParameterString # variables that accept string and integer types for pipeline definition
from sagemaker.workflow.lambda_step import (
    LambdaStep, # Constructs a lambda step for workflow
    LambdaOutput, # List of outputs from the lambda function
    LambdaOutputTypeEnum, # Specifies type of Lambda output
)
from sagemaker.lambda_helper import Lambda # Constructs a Lambda instance. This instance represents a Lambda function and provides methods for updating, deleting and invoking the function.

from sagemaker.sklearn.processing import SKLearnProcessor # Handles Amazon SageMaker processing tasks for jobs using scikit-learn

from sagemaker.processing import ProcessingInput, ProcessingOutput # accepts parameters that specify an Amazon S3 input/output for a processing job and provides a method to turn those parameters into a dictionary.
from sagemaker.workflow.steps import CacheConfig, ProcessingStep # to enable caching in pipeline workflow / processing step for workflow

from sagemaker.huggingface import HuggingFace, HuggingFaceModel # used to create HuggingFace estimator / model to register

from sagemaker.inputs import TrainingInput # Create a definition for input data used by an SageMaker training job
from sagemaker.workflow.steps import TrainingStep # Training step for workflow

from sagemaker.processing import ScriptProcessor # handles Amazon SageMaker Processing tasks for jobs using a machine learning framework, which allows for providing a script to be run as part of the Processing Job
from sagemaker.workflow.properties import PropertyFile # Provides a property file struct
from sagemaker.workflow.step_collections import CreateModelStep, RegisterModel # CreateModel step / Register Model step collection for workflow 

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo # A condition for less than or equal to comparisons
from sagemaker.workflow.condition_step import ( 
    ConditionStep, # Conditional step for pipelines to support conditional branching in the execution of steps
    JsonGet # Get JSON properties from PropertyFiles
)

from sagemaker.workflow.pipeline import Pipeline, PipelineExperimentConfig # Pipeline for workflow
from sagemaker.workflow.execution_variables import ExecutionVariables # Pipeline execution variables for workflow

Next, we'll perform some setup for SageMaker.

In [5]:
region = sagemaker.Session().boto_region_name # Get info of which region we are using on Amazon SageMaker (i.e. 'eu-central-1')
sm_client = boto3.client("sagemaker") # instantiate API for creating and managing Amazon SageMaker resources
boto_session = boto3.Session(region_name=region) # Create a boto3 session with defined region in which we want to create new connections
sagemaker_session = sagemaker.session.Session( # Initialize a SageMaker Session
    boto_session=boto_session, # The underlying boto3 session which AWS service calls are delegated to
    sagemaker_client=sm_client # Client which makes Amazon SageMaker service calls other than InvokeEndpoint
)

role = sagemaker.get_execution_role() #  Access the execution role

bucket = sagemaker_session.default_bucket() # The default Amazon S3 bucket used by this session

# define S3 prefix
s3_prefix = "hugging-face-pipeline-demo"
base_job_prefix = s3_prefix

In [6]:
bucket

'sagemaker-eu-central-1-910022457801'

In [4]:
role

'arn:aws:iam::910022457801:role/service-role/AmazonSageMaker-ExecutionRole-20210409T122209'

## Parameterizing the pipeline
Before defining the pipeline, it is important to parameterize it. Almost any aspect of a SageMaker Pipeline can be parameterized, including instance types and counts.

In [5]:
# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.c5.2xlarge") # define processing instance type
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1) # define processing instance count

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.p3.2xlarge") # define training instance type
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1) # define training instance count

# endpoint parameters
endpoint_instance_type = ParameterString(name="EndpointInstanceType", default_value="ml.g4dn.xlarge") # define endpoint instance type

output_destination = "s3://{}/{}/data".format(bucket, s3_prefix) # data destination on S3
cache_config = CacheConfig(enable_caching=False, expire_after="30d") # disable caching
model_package_group_name = "HuggingFaceModelPackageGroup" # where the model will be registered 

## Data preparation step
A SKLearn Processing step is used to invoke a SageMaker Processing job with a custom python script - preprocessing.py.

In [6]:
# Instantiate SKLearnProcessor
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1", # The version of scikit-learn
    instance_type=processing_instance_type, # Type of EC2 instance to use for processing
    instance_count=processing_instance_count, # The number of instances to run the Processing job with
    base_job_name=base_job_prefix + "/preprocessing", # Prefix for processing name
    sagemaker_session=sagemaker_session, # Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed
    role=role, # An AWS IAM role name or ARN
)

# Define processing step
step_process = ProcessingStep(
    name="ProcessDataForTraining", # The name of the processing step
    cache_config=cache_config, # to use or not caching (A sagemaker.workflow.steps.CacheConfig instance)
    processor=sklearn_processor, # use sklearn processor defined above (A sagemaker.processing.Processor instance) 
    outputs=[ # 3 outputs (train, test and validation). A list of sagemaker.processing.ProcessorOutput instances
        ProcessingOutput(
            output_name="train", # The name for the processing job output
            destination="{}/train".format(output_destination), # The destination of the output
            source="/opt/ml/processing/train", # The source for the output
        ProcessingOutput(
            output_name="test", # The name for the processing job output
            destination="{}/test".format(output_destination), # The destination of the output
            source="/opt/ml/processing/test", # The source for the output
        ),
        ProcessingOutput(
            output_name="validation", # The name for the processing job output
            destination="{}/test".format(output_destination), # The destination of the output
            source="/opt/ml/processing/validation", # The source for the output
        ),
    ],
    code="./scripts/preprocessing.py", # preprocessing script 
)

## Model training with the SageMaker Hugging Face Estimator
Use SageMaker's Hugging Face Estimator class to create a model training step for the Hugging Face DistilBERT model. Transformer-based models such as the original BERT can be very large and slow to train. DistilBERT, however, is a small, fast, cheap and light Transformer model trained by distilling BERT base. It reduces the size of a BERT model by 40%, while retaining 97% of its language understanding capabilities and being 60% faster.

As a first step to constructing the estimator for our model training job, we'll look up the Hugging Face container for our AWS Region.

In [7]:
# Instantiate a Docker container
container = sagemaker.image_uris.retrieve(framework="huggingface", # The name of the framework or algorithm
                                          region=boto3.Session().region_name, # The AWS region
                                          version="4.6.1", # The framework or algorithm version
                                          py_version="py36", # The Python version
                                          base_framework_version="pytorch1.7.1", # The base framework version
                                          instance_type="ml.p3.2xlarge", # The SageMaker instance type
                                          image_scope="training", # The image type, i.e. what it is used for. Valid values: “training”, “inference”, “eia”. If accelerator_type is set, image_scope is ignored.
                                          container_version="cu110-ubuntu18.04") # the version of docker image

print(container) 

763104351884.dkr.ecr.eu-central-1.amazonaws.com/huggingface-pytorch-training:1.7.1-transformers4.6.1-gpu-py36-cu110-ubuntu18.04


Besides the container specified above, the Hugging Face estimator also takes hyperparameters as a dictionary. The training instance type and size are pipeline parameters that can be easily varied in future pipeline runs without changing any code.

In [8]:
# Define hyperparameters for HuggingFace estimator
hyperparameters = {
    "model_name": "distilbert-base-uncased", # name of the model
    "train_batch_size": 32, # size of training batches
    "epochs": 1, # mumber of epochs
}

# Construct the HuggingFace estimator
estimator = HuggingFace(
    image_uri=container, # the estimator will use this image for training and hosting
    entry_point="train.py", # use train.py script for training
    source_dir="./scripts", # Path to a directory for training script
    base_job_name=base_job_prefix + "/training", # Prefix for training name 
    instance_type=training_instance_type, # Type of EC2 instance to use for training
    instance_count=training_instance_count, # The number of instances to run the Training job with
    role=role, # An AWS IAM role name or ARN
    transformers_version="4.6.1", # Transformers version you want to use for executing your model training code
    pytorch_version="1.7.1", # PyTorch version you want to use for executing your model training code
    py_version="py36", # Python version you want to use for executing your model training code
    hyperparameters=hyperparameters, # Hyperparameters that will be used for training
    sagemaker_session=sagemaker_session, # Session object which manages interactions with Amazon SageMaker APIs and any other AWS services needed
)

In [9]:
# Create training step
step_train = TrainingStep(
    name="TrainHuggingFaceModel",
    estimator=estimator, # pass the estimator that is constracted above
    inputs={ # we have training and test inputs
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri # Defines the location of s3 data to train on
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri # Defines the location of s3 data to train on
        ),
    },
    cache_config=cache_config, # to use caching or not (A sagemaker.workflow.steps.CacheConfig instance)
)

## Model evaluation step
A ProcessingStep is used to evaluate the performance of the trained model. Based on the results of the evaluation, either the model is created, registered, and deployed, or the pipeline stops.

In the training job, the model was evaluated against the test dataset, and the result of the evaluation was stored in the model.tar.gz file saved by the training job. The results of that evaluation are copied into a PropertyFile in this ProcessingStep so that it can be used in the ConditionStep.

In [10]:
# Create processor for evaluation
script_eval = ScriptProcessor(
    image_uri=container, # The URI of the Docker image to use for the processing jobs
    command=["python3"], # The command to run, along with any command-line flags
    instance_type=processing_instance_type, # The type of EC2 instance to use for processing
    instance_count=1, # The number of instances to run a processing job with
    base_job_name=base_job_prefix + "/evaluation", # Prefix for processing name
    sagemaker_session=sagemaker_session, # Session object which manages interactions with Amazon SageMaker and any other AWS services needed
    role=role, # An AWS IAM role name or ARN
)

# Define report as json file
evaluation_report = PropertyFile(
    name="HuggingFaceEvaluationReport", # The name of the property file for reference with JsonGet functions
    output_name="evaluation", # The name of the processing job output channel
    path="evaluation.json", # The path to the file at the output channel location
)

# Construct evaluation step 
step_eval = ProcessingStep(
    name="HuggingfaceEvalLoss", # The name of the processing step
    processor=script_eval, # Use script processor defined above. A sagemaker.processing.Processor instance
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts, # The source for the input. If a local path is provided, it will automatically be uploaded to S3 under: “s3://<default-bucket-name>/<job-name>/input/<input-name>”.
            destination="/opt/ml/processing/model", # The destination of the input
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation", # The name of the output
            source="/opt/ml/processing/evaluation", # The source for the output
            destination=f"s3://{bucket}/{s3_prefix}/evaluation_report", # The destination of the output 
        ),
    ],
    code="./scripts/evaluate.py", # script for evaluation 
    property_files=[evaluation_report], # A list of property files that workflow looks for and resolves from the configured processing output list
    cache_config=cache_config, # to use caching or not (A sagemaker.workflow.steps.CacheConfig instance)
)

## Register the model
The trained model is registered in the Model Registry under a Model Package Group. Each time a new model is registered, it is given a new version number by default. The model is registered in the "Approved" state so that it can be deployed by the Lambda function. Registration will only happen if the output of the ConditionStep is true, i.e, the metrics being checked are within the threshold defined.

In [11]:
# Create HuggingFaceModel 
model = HuggingFaceModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, # The Amazon S3 location of a SageMaker model data .tar.gz file
    role=role, # An AWS IAM role specified with either the name or full ARN
    transformers_version="4.6.1", # Transformers version for executing model training code
    pytorch_version="1.7.1", # PyTorch version for executing inference code
    py_version="py36", # Python version for executing model training code
    sagemaker_session=sagemaker_session, # Session object which manages interactions with Amazon SageMaker and any other AWS services needed
)

# Register the model 
step_register = RegisterModel(
    name="HuggingFaceRegisterModel", # The name of the register step.
    model=model, # A PipelineModel object that comprises a list of models which gets executed as a serial inference pipeline or a Model object
    content_types=["application/json"], # The supported MIME types for the input data
    response_types=["application/json"], # The supported MIME types for the output data
    inference_instances=["ml.g4dn.xlarge", "ml.m5.xlarge"], # A list of the instance types that are used to generate inferences in real-time
    transform_instances=["ml.g4dn.xlarge", "ml.m5.xlarge"], # A list of the instance types on which a transformation job can be run or on which an endpoint can be deployed
    model_package_group_name=model_package_group_name, # The Model Package Group name
    approval_status="Approved", # Model Approval Status, values can be “Approved”, “Rejected”, or “PendingManualApproval”
)

## Lambda step for model deployment
The SageMaker SDK provides a Lambda helper class that can be used to create a Lambda function. This function is provided to the Lambda step for invocation via the pipeline. Alternatively, a predefined Lambda function can be provided to the Lambda step.

The SageMaker Execution Role requires the policy AmazonSageMakerPipelinesIntegrations to create the Lambda function, and the Lambda function needs a role with policies allowing creation of a SageMaker endpoint.

A helper function in iam_helper.py is provided to create the Lambda role. To use the script, the notebook execution role must include the required policy to create an IAM role.

In [12]:
# this creates lambda_deployer.py script and fills it with everything inside this box
%%writefile lambda_deployer.py

"""
This Lambda function creates an Endpoint Configuration and deploys a model to an Endpoint. 
The name of the model to deploy is provided via the `event` argument
"""

import json # to create request body in json format
import boto3 # to communicate with AWS (sagemaker)

# Define lambda function which creates a SageMaker endpoint
def lambda_handler(event, context):
    """ """
    sm_client = boto3.client("sagemaker") # interface with SageMaker

    # The name of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]
    model_package_arn = event["model_package_arn"]
    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]
    role = event["role"]

    container = {"ModelPackageName": model_package_arn}

    # Create model with provided model name, execution role and container
    create_model_response = sm_client.create_model(
        ModelName=model_name, ExecutionRoleArn=role, Containers=[container]
    )

    # Create config for endpoint
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": "ml.m5.xlarge",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
    )

    # Create endpoint (provide name and config)
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
    )

    return {
        "statusCode": 200,
        "body": json.dumps("Created Endpoint!"),
        "other_key": "example_value",
    }

Overwriting lambda_deployer.py


The following code creates an IAM role using the iam_helper script. Alternatively, you could provide an existing IAM role to be used by the Lambda function.

In [13]:
import boto3
import json

iam = boto3.client('iam')

# Create lambda role with neccessary policies
def create_lambda_role(role_name):
    try:
        response = iam.create_role(
            RoleName = role_name,
            AssumeRolePolicyDocument = json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description='Role for Lambda to call ECS Fargate task'
        )

        role_arn = response['Role']['Arn']

        response = iam.attach_role_policy(
            RoleName = role_name,
            PolicyArn = 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
        )

        response = iam.attach_role_policy(
            RoleName = role_name,
            PolicyArn = 'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess'
        )
        
        policy_json = {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "iam:CreateRole",
                        "iam:PutRolePolicy",
                        "iam:AttachRolePolicy",
                        "iam:DetachRolePolicy",
                        "iam:GetRole",
                        "lambda:CreateFunction",
                        "lambda:InvokeAsync",
                        "lambda:InvokeFunction",
                        "iam:PassRole",
                        "lambda:UpdateAlias",
                        "lambda:CreateAlias",
                        "lambda:GetFunctionConfiguration",
                        "lambda:AddPermission",
                        "lambda:UpdateFunctionCode"
                    ],
                    "Resource": [
                        "*"
                    ]
                }
            ]
        }
        
        policy_res = iam.create_policy(
        PolicyName='CreateRole',
        PolicyDocument=json.dumps(policy_json)
        )
        policy_arn = policy_res['Policy']['Arn']
        
        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn=policy_arn
        )

        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f'Using ARN from existing role: {role_name}')
        response = iam.get_role(RoleName=role_name)
        return response['Role']['Arn']

In [14]:
lambda_role = create_lambda_role("lambda-deployment-role")

Using ARN from existing role: lambda-deployment-role


Next, we define the Lambda step. After defining some object names, we use the Lambda helper class to create the actual Lambda function, then pass it to the Lambda step.

In [16]:
# Use the current time to define unique names for the resources created
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

model_name = "demo-hf-model" + current_time
endpoint_config_name = "demo-hf-endpoint-config" + current_time
endpoint_name = "demo-hf-endpoint-" + current_time
function_name = "sagemaker-demo-hf-lambda-step" + current_time

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name, # name of lambda function
    execution_role_arn=lambda_role, # IAM lambda role 
    script="lambda_deployer.py", # Path of Lambda function script
    handler="lambda_deployer.lambda_handler", # Lambda handler specified as "lambda_script.lambda_handler"
    timeout=600, # Maximum time the Lambda function can run before the lambda step fails
    memory_size=10240, # Amount of memory in MB a lambda function can use at runtime
)

# The dictionary returned by the Lambda function is captured by LambdaOutput, 
# each key in the dictionary corresponds to a LambdaOutput

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="other_key", output_type=LambdaOutputTypeEnum.String)

In [None]:
# The inputs provided to the Lambda function can be retrieved via the `event` object within the `lambda_handler` function
# in the Lambda
step_deploy_lambda = LambdaStep(
    name="LambdaStepHuggingFaceDeploy", # The name of the lambda step
    lambda_func=func, # An instance of sagemaker.lambda_helper.Lambda. LambdaStep just invokes the function
    inputs={ # Input arguments that will be provided to the lambda function
        "model_name": model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "model_package_arn": step_register.steps[0].properties.ModelPackageArn,
        "role": role,
    },
    outputs=[output_param_1, output_param_2, output_param_3], # List of outputs from the lambda function
)

## Condition for deployment
For the condition to be True and the steps after evaluation to run, the evaluated loss of the Hugging Face model must be less than 0.3.

In [18]:
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet( # Get JSON properties from PropertyFiles
        step=step_eval, # The step name from which to get the property file
        property_file=evaluation_report, # Either a PropertyFile instance or the name of a property file
        json_path="eval_loss", # The JSON path expression to the requested value
    ), # The execution variable, parameter, property, or Python primitive value to use in the comparison
    right=0.3, # The execution variable, parameter, property, or Python primitive value to compare to
)

step_cond = ConditionStep(
    name="CheckHuggingfaceEvalLoss", # The name of the condition step
    conditions=[cond_lte], # A list of sagemaker.workflow.conditions.Condition instances
    if_steps=[step_register, step_deploy_lambda], # A list of sagemaker.workflow.steps.Step instances that are marked as ready for execution if the list of conditions evaluates to True
    else_steps=[], # nothing to execute, breaks the pipeline
)

The class JsonGet has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


## Pipeline definition and execution
SageMaker Pipelines constructs the pipeline graph from the implicit definition created by the way pipeline steps inputs and outputs are specified. There's no need to specify that a step is a "parallel" or "serial" step. Steps such as model registration after the condition step are not listed in the pipeline definition because they do not run unless the condition is true. If so, they are run in order based on their specified inputs and outputs.

In [19]:
pipeline = Pipeline(
    name="HuggingFaceDemoPipeline", # The name of the pipeline
    parameters=[ # The list of the parameters
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        training_instance_count,
    ],
    steps=[step_process, step_train, step_eval, step_cond], # The list of the non-conditional steps associated with the pipeline
    sagemaker_session=sagemaker_session, # Session object that manages interactions with Amazon SageMaker APIs and any other AWS services needed
)

We can examine the pipeline definition in JSON format. You also can inspect the pipeline graph in SageMaker Studio by going to the page for your pipeline.

In [20]:
import json

json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.2xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.p3.2xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'ProcessDataForTraining',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '492215442770.dkr.ecr.eu-central-1.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
   

In [21]:
# Creates a pipeline or updates it, if it already exists
pipeline.upsert(role_arn=role) # The role arn that is assumed by workflow to create step artifacts

{'PipelineArn': 'arn:aws:sagemaker:eu-central-1:910022457801:pipeline/huggingfacedemopipeline',
 'ResponseMetadata': {'RequestId': '8fcba1df-6ab9-402c-a917-f92d3533202b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '8fcba1df-6ab9-402c-a917-f92d3533202b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '94',
   'date': 'Fri, 29 Oct 2021 11:54:57 GMT'},
  'RetryAttempts': 0}}

In [22]:
# Starts a Pipeline execution in the Workflow service
execution = pipeline.start()

In [23]:
# Waits for the pipeline to complete execution
execution.wait()

## Getting predictions from the endpoint
After the previous cell completes, you can check whether the endpoint has finished deploying. In SageMaker Studio, click on the tilted triange icon in the left toolbar, then select Endpoints in the drop down menu. In the list of endpoints, click on the one with the name beginning demo-hf-endpoint, then click on AWS settings. When the Status becomes InService, you can run the following code cells.

## Setup
We can use the endpoint name to set up a Predictor object that will be used to get predictions.

In [24]:
from sagemaker.deserializers import JSONDeserializer
from sagemaker.serializers import JSONSerializer

hf_predictor = sagemaker.predictor.Predictor(endpoint_name, # Name of the Amazon SageMaker endpoint to which requests are sent
                                             sagemaker_session=sagemaker_session, # A SageMaker Session object, used for SageMaker interactions
                                             serializer=JSONSerializer(), # A serializer object, used to encode data for an inference endpoint
                                             deserializer=JSONDeserializer() # A deserializer object, used to decode data from an inference endpoint
                                            )

## Test data
Here are a couple of sample reviews we would like to classify as positive (LABEL_1) or negative (LABEL_0). Demonstrating the power of advanced Transformer-based models such as this Hugging Face model, the model should do quite well even though the reviews are mixed.

In [25]:
sentiment_input1 = {"inputs":"Although the movie had some plot weaknesses, it was engaging. Special effects were mind boggling.  Can't wait to see what this creative team does next."}

hf_predictor.predict(sentiment_input1)

[{'label': 'LABEL_1', 'score': 0.8424215912818909}]

In [26]:
sentiment_input2 = {"inputs":"There was some good acting, but the story was ridiculous. The other sequels in this franchise were better.  It's time to take a break from this IP, but if they switch it up for the next one, I'll check it out."}

hf_predictor.predict(sentiment_input2)

[{'label': 'LABEL_0', 'score': 0.9616789817810059}]

## Examining the endpoint
Once the endpoint is in service, there are several different ways to examine its current configuration and parameters. One way is to use SageMaker Studio to examine the configuration. In the left toolbar, click the tilted triangle icon, select Endpoints from the drop down menu at the top, then click the endpoint name (beginning demo-hf-endpoint). Click the AWS Settings tab and you can review various endpoint metadata.

Another way is to use the boto3 SDK for AWS. You can use this to check certain other parameters, such as the current number of instances behind the endpoint. This is done in the code cell below, and might be useful in circumstances such as when you need to do an ad hoc check on endpoint scaling.

In [27]:
endpoint_desc = sm_client.describe_endpoint(EndpointName=endpoint_name)
current_instance_count = endpoint_desc["ProductionVariants"][0]["CurrentInstanceCount"]
print(f"Current instance count = {current_instance_count}")

Current instance count = 1


## Cleanup Resources
The following cell will delete the resources created by the Lambda function and the Lambda itself. Deleting other resources such as the S3 bucket and the IAM role for the Lambda function are the responsibility of the notebook user.

In [28]:
# Delete the Lambda function
func.delete()

# Delete the endpoint
sm_client.delete_endpoint(EndpointName=endpoint_name)

# Delete the EndpointConfig
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

# Delete the model
sm_client.delete_model(ModelName=model_name)

{'ResponseMetadata': {'RequestId': '45ebf0d0-16d5-42f1-a495-376fcc766abb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '45ebf0d0-16d5-42f1-a495-376fcc766abb',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Fri, 29 Oct 2021 13:17:08 GMT'},
  'RetryAttempts': 0}}

 - Use a Lambda step for lightweight model deployments to SageMaker hosting services
 - `Lightweight` means deployments to development or test endpoints, or to internal endpoints that aren’t customer-facing or serving high volumes of traffic
 - Lambda step enables to add serverless compute operations to pipelines for many different kinds of tasks such as lightweight deployments
 - Use the Lambda step to run serverless tasks or jobs on Lambda such as splitting datasets or sending custom notifications
 - The Lambda step can perform tasks such as looking up the latest approved model registered in the SageMaker Model Registry after model building is complete, and then updating an endpoint with that model (or creating a new endpoint if one doesn’t already exist)