# MLOps: End-to-End Hugging Face Transformers with the Hub & SageMaker Pipelines


# Development Environment and Permissions 

## Installation & Imports



In [2]:
import boto3
import os
import numpy as np
import pandas as pd
import sagemaker
import sys
import time

from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString

from sagemaker.lambda_helper import Lambda

from sagemaker.sklearn.processing import SKLearnProcessor

from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import CacheConfig, ProcessingStep

from sagemaker.huggingface import HuggingFace, HuggingFaceModel
import sagemaker.huggingface

from sagemaker.inputs import TrainingInput, CreateModelInput
from sagemaker.workflow.steps import TrainingStep, TransformStep

from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.step_collections import CreateModelStep, RegisterModel

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo,ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet

from sagemaker.workflow.pipeline import Pipeline, PipelineExperimentConfig
from sagemaker.workflow.execution_variables import ExecutionVariables

from sagemaker.huggingface.model import HuggingFaceModel
from sagemaker.s3 import S3Uploader,s3_path_join

from sagemaker.transformer import Transformer

# Defining the Pipeline

## 0. Pipeline parameters



In [3]:

import sagemaker

sess = sagemaker.Session()
region = sess.boto_region_name

# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_traindata_bucket='sagemaker-us-east-1-734836471744-stary'

# S3 prefix where every assets will be stored
s3_prefix = "stray-hugging-face-pipeline-demo"


role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session(default_bucket=sess.default_bucket())


# s3 bucket used for storing assets and artifacts
bucket = sagemaker_session.default_bucket()

# aws region used
region = sagemaker_session.boto_region_name

# base name prefix for sagemaker jobs (training, processing, inference)
base_job_prefix = s3_prefix

# Cache configuration for workflow
cache_config = CacheConfig(enable_caching=False, expire_after="30d")


# package versions
_transformers_version = "4.6"
_pytorch_version = "1.7"
_py_version = "py36"


train_bucket_path = "s3://{}/".format(sagemaker_traindata_bucket)


train_input_data_uri =train_bucket_path+"train.csv"

input_data = ParameterString(
    name="InputData",
    default_value=train_input_data_uri,
)


print(f"sagemaker role arn: {role}")
print(f"sagemaker train data bucket: {sagemaker_traindata_bucket}")
print(f"train data location: {train_input_data_uri}")
print(f"sagemaker bucket: {sagemaker_session.default_bucket()}")
print(f"sagemaker session region: {sagemaker_session.boto_region_name}")

sagemaker role arn: arn:aws:iam::734836471744:role/service-role/AmazonSageMaker-ExecutionRole-20220512T110407
sagemaker train data bucket: sagemaker-us-east-1-734836471744-stary
train data location: s3://sagemaker-us-east-1-734836471744-stary/train.csv
sagemaker bucket: sagemaker-us-east-1-734836471744
sagemaker session region: us-east-1


## 1. Processing Step

In [4]:
instance_type_process_1="ml.c5.4xlarge"
instance_count_process_1= 1
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value=instance_type_process_1)
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=instance_count_process_1)
processing_script = ParameterString(name="ProcessingScript", default_value="./scripts/preprocessing.py")


In [5]:
from sagemaker.workflow.functions import Join

processing_output_destination = f"s3://{bucket}/{s3_prefix}/data"


sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type=instance_type_process_1,
    instance_count=instance_count_process_1,
    base_job_name=base_job_prefix + "/preprocessing",
    sagemaker_session=sagemaker_session,
    role=role,
)

step_process = ProcessingStep(
    name="ProcessDataForTraining",
    cache_config=cache_config,
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    s3_prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "train",
                ],
            ),
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    s3_prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "validation",
                ],
            ),
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=Join(
                on="/",
                values=[
                    "s3://{}".format(bucket),
                    s3_prefix,
                    ExecutionVariables.PIPELINE_EXECUTION_ID,
                    "test",
                ],
            ),
        ),
    ],
    code="./scripts/preprocessing.py",
)


## 2. Model Training Step

We use SageMaker's [Hugging Face](https://sagemaker.readthedocs.io/en/stable/frameworks/huggingface/sagemaker.huggingface.html) Estimator class to create a model training step for the Hugging Face [DistilBERT](https://huggingface.co/distilbert-base-uncased) model.  Transformer-based models such as the original BERT can be very large and slow to train.  DistilBERT, however, is a small, fast, cheap and light Transformer model trained by distilling BERT base. It reduces the size of a BERT model by 40%, while retaining 97% of its language understanding capabilities and being 60% faster. 

The Hugging Face estimator also takes hyperparameters as a dictionary. The training instance type and size are pipeline parameters that can be easily varied in future pipeline runs without changing any code.  

### Training Parameter

In [6]:
# training step parameters
entry_point_train = 'run_glue.py'
source_dir_train = "./scripts"
instance_type_train = 'ml.p3.2xlarge'
instance_count_train = 1
volume_size_train = 400
train_batch_size_train = "4"
eval_batch_size_train = "4"

# pipeline hyperparameters, which are passed into the training job
training_entry_point = ParameterString(name="TrainingEntryPoint", default_value= entry_point_train)
training_source_dir = ParameterString(name="TrainingSourceDir", default_value=source_dir_train)
training_instance_type = ParameterString(name="TrainingInstanceType", default_value=instance_type_train)
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=instance_count_train)
TrainBatchSize=ParameterString(name="per_device_train_batch_size", default_value=train_batch_size_train)
EvalBatchSize=ParameterString(name="per_device_eval_batch_size", default_value=eval_batch_size_train)         
Epochs=ParameterString(name="num_train_epochs", default_value="5")       
learning_rate=ParameterString(name="LearningRate", default_value="5e-5")               
fp16=ParameterString(name="Fp16", default_value="False")

### Hugging Face Estimator

In [7]:

huggingface_estimator = HuggingFace(
    entry_point=entry_point_train,
    source_dir=source_dir_train,
    base_job_name=base_job_prefix + "/training",
    instance_type=instance_type_train,
    instance_count=instance_count_train,
    volume_size=volume_size_train,
    role=role,
    transformers_version=_transformers_version,
    pytorch_version=_pytorch_version,
    py_version=_py_version,
    hyperparameters={'per_device_train_batch_size':train_batch_size_train,
                     'per_device_eval_batch_size': eval_batch_size_train,
                     'model_name_or_path': 'bert-base-uncased',
                     'train_file':'/opt/ml/input/data/train/train.csv',
                     'validation_file':'/opt/ml/input/data/validation/validation.csv',
                     'test_file':'/opt/ml/input/data/test/test.csv',
                     'do_train': True,
                     'do_predict': True,
                     'do_eval': True,
                     'save_total_limit':3,
                     'output_dir': '/opt/ml/model',
                     'num_train_epochs': 5,
                     'learning_rate': 5e-5,
                     'seed': 7,
                     'fp16': False,
                     'eval_steps': 1000,
                     },
    sagemaker_session=sagemaker_session,
)




step_train = TrainingStep(
    name="TrainHuggingFaceModel",
    estimator=huggingface_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)




# Pipeline definition and execution

SageMaker Pipelines constructs the pipeline graph from the implicit definition created by the way pipeline steps inputs and outputs are specified.  There's no need to specify that a step is a "parallel" or "serial" step.  Steps such as model registration after the condition step are not listed in the pipeline definition because they do not run unless the condition is true.  If so, they are run in order based on their specified inputs and outputs.

Each Parameter we defined holds a default value, which can be overwritten before starting the pipeline. [Parameter Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html)




### Overwriting Parameters

In [8]:
# define parameter which should be overwritten
pipeline_parameters=dict(
        ModelId="stary-base-uncased"
    )

### Create Pipeline

In [9]:
pipeline = Pipeline(
    name=f"HuggingFaceDemoPipeline",
    parameters=[
        input_data,
        processing_instance_type,
        processing_instance_count,
        processing_script,
        training_entry_point,
        training_source_dir,
        training_instance_type,
        training_instance_count,
        Epochs,
        TrainBatchSize,
        EvalBatchSize,
        learning_rate,
        fp16
        
    ],
    steps=[step_process,step_train],
    sagemaker_session=sagemaker_session,
)


We can examine the pipeline definition in JSON format.  You also can inspect the pipeline graph in SageMaker Studio by going to the page for your pipeline.  

In [10]:
import json

json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-734836471744-stary/train.csv'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.4xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ProcessingScript',
   'Type': 'String',
   'DefaultValue': './scripts/preprocessing.py'},
  {'Name': 'TrainingEntryPoint',
   'Type': 'String',
   'DefaultValue': 'run_glue.py'},
  {'Name': 'TrainingSourceDir', 'Type': 'String', 'DefaultValue': './scripts'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.p3.2xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'num_train_epochs', 'Type': 'String', 'DefaultValue': '5'},
  {'Name': 'per_device_train_batch_size',
   'Type': 'String',
   'DefaultValue': '4'},
  {'Name': 'per_device_eval_batch_size',
   'Type':

![pipeline](./imgs/pipeline.png)

`upsert` creates or updates the pipeline.

In [11]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:734836471744:pipeline/huggingfacedemopipeline',
 'ResponseMetadata': {'RequestId': '93f57a1a-a169-45b4-a267-3d5db11e481c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '93f57a1a-a169-45b4-a267-3d5db11e481c',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '91',
   'date': 'Mon, 25 Jul 2022 02:53:53 GMT'},
  'RetryAttempts': 0}}

### Run the pipeline

In [12]:
execution = pipeline.start()

In [13]:
execution.wait()

WaiterError: Waiter PipelineExecutionComplete failed: Max attempts exceeded

## Cleanup Resources

The following cell will delete the resources created by the Lambda function and the Lambda itself. 
Deleting other resources such as the S3 bucket and the IAM role for the Lambda function are the responsibility of the notebook user. 