In [1]:
#!pip install sagemaker --upgrade

In [2]:
import sagemaker
from sagemaker.processing import ScriptProcessor
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.estimator import Estimator
from sagemaker.lambda_helper import Lambda
from sagemaker.inputs import TrainingInput

In [3]:
sess = sagemaker.Session()
role = sagemaker.get_execution_role()

# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
s3_prefix = 'hf-small-tune'

# train_dataset to s3
training_input_path = f's3://{sess.default_bucket()}/{s3_prefix}/hf_data/train'

# test_dataset to s3
test_input_path = f's3://{sess.default_bucket()}/{s3_prefix}/hf_data/test'

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

#filepath = f"s3://sagemaker-us-east-1-808242303800/bert-wiki-mlops/raw_dataset"
train_image_uri='763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-training:1.10.2-transformers4.17.0-gpu-py38-cu113-ubuntu20.04'
inference_image_uri='763104351884.dkr.ecr.us-east-1.amazonaws.com/huggingface-pytorch-inference:1.10.2-transformers4.17.0-gpu-py38-cu113-ubuntu20.04'

sagemaker role arn: arn:aws:iam::808242303800:role/sagemaker-execution-role
sagemaker bucket: sagemaker-us-east-1-808242303800
sagemaker session region: us-east-1


In [4]:
%%writefile scripts/data-process.py

#!/usr/bin/env python3
import boto3
import botocore
import sagemaker
import transformers
import datasets
from datasets import load_dataset
from datasets import load_from_disk
from datasets.filesystems import S3FileSystem
from transformers import AutoTokenizer
import datasets
import pandas as pd
import numpy as np
import argparse
import os

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
#    parser.add_argument("--file-name", type=str)
    parser.add_argument("--model-name", type=str)
    parser.add_argument("--train-ratio", type=float, default=0.8)
    parser.add_argument("--val-ratio", type=float, default=0.1)
    parser.add_argument("--star-threshold", type=int, default=3)
    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))
    
    # read data
#    s3 = S3FileSystem() 
#    input_data_path ="/opt/ml/processing/input"
#    print("Reading input data from {}".format(input_data_path))
    squad = load_dataset("squad")
#    squad_s3 = load_from_disk(input_data_path)
    from transformers import AutoTokenizer
    tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

    def preprocess_function(examples):
        questions = [q.strip() for q in examples["question"]]
        inputs = tokenizer(
            questions,
            examples["context"],
            max_length=384,
            truncation="only_second",
            return_offsets_mapping=True,
            padding="max_length",
        )
        offset_mapping = inputs.pop("offset_mapping")
        answers = examples["answers"]
        start_positions = []
        end_positions = []
        for i, offset in enumerate(offset_mapping):
            answer = answers[i]
            start_char = answer["answer_start"][0]
            end_char = answer["answer_start"][0] + len(answer["text"][0])
            sequence_ids = inputs.sequence_ids(i)
        
            # Find the start and end of the context
            idx = 0
            while sequence_ids[idx] != 1:
                idx += 1
            context_start = idx
            while sequence_ids[idx] == 1:
                idx += 1
            context_end = idx - 1
        
            # If the answer is not fully inside the context, label it (0, 0)
            if offset[context_start][0] > end_char or offset[context_end][1] < start_char:
                start_positions.append(0)
                end_positions.append(0)
            else:
            
                # Otherwise it's the start and end token positions
                idx = context_start
                while idx <= context_end and offset[idx][0] <= start_char:
                    idx += 1
                start_positions.append(idx - 1)
                idx = context_end
                while idx >= context_start and offset[idx][1] >= end_char:
                    idx -= 1
                end_positions.append(idx + 1)
        inputs["start_positions"] = start_positions
        inputs["end_positions"] = end_positions
        return inputs
    
    tokenized_squad = squad.map(preprocess_function, batched=True, remove_columns=squad["train"].column_names)
    
    ###############################################################################################
    default_bucket = 'sagemaker-us-east-1-808242303800'
    s3_prefix = 'hf-small-tune'
    s3 = S3FileSystem() 
    
    # save train_dataset to s3
    training_input_path = f's3://{default_bucket}/{s3_prefix}/hf_data/train'
    tokenized_squad["train"].shuffle().select(range(30000)).save_to_disk(training_input_path,fs=s3)

    # save test_dataset to s3
    test_input_path = f's3://{default_bucket}/{s3_prefix}/hf_data/test'
    tokenized_squad["validation"].shuffle().select(range(5000)).save_to_disk(test_input_path,fs=s3)

Overwriting scripts/data-process.py


In [5]:
%%writefile scripts/train.py

from transformers import AutoModelForQuestionAnswering, Trainer, TrainingArguments, AutoTokenizer
from datasets import load_from_disk
from transformers.data.data_collator import default_data_collator
import random
import logging
import sys
import argparse
import os
import torch

if __name__ == "__main__":

    parser = argparse.ArgumentParser()

    # hyperparameters sent by the client are passed as command-line arguments to the script.
    parser.add_argument("--epochs", type=int, default=3)
    parser.add_argument("--train-batch-size", type=int, default=16)
    parser.add_argument("--eval-batch-size", type=int, default=16)
    parser.add_argument("--model_name", type=str)
    parser.add_argument("--learning_rate", type=str, default=2e-5)

    # Data, model, and output directories
    parser.add_argument("--output-data-dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
    parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
    parser.add_argument("--n_gpus", type=str, default=os.environ["SM_NUM_GPUS"])
    parser.add_argument("--training_dir", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
    parser.add_argument("--test_dir", type=str, default=os.environ["SM_CHANNEL_TEST"])

    args, _ = parser.parse_known_args()

    # Set up logging
    logger = logging.getLogger(__name__)

    logging.basicConfig(
        level=logging.getLevelName("INFO"),
        handlers=[logging.StreamHandler(sys.stdout)],
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )

    # load datasets
    train_dataset = load_from_disk(args.training_dir)
    test_dataset = load_from_disk(args.test_dir)

    logger.info(f" loaded train_dataset length is: {len(train_dataset)}")
    logger.info(f" loaded test_dataset length is: {len(test_dataset)}")


    # download model from model hub
    model = AutoModelForQuestionAnswering.from_pretrained(args.model_name)
    tokenizer = AutoTokenizer.from_pretrained(args.model_name)

    # define training args
    training_args = TrainingArguments(
        output_dir=args.model_dir,
        evaluation_strategy="epoch",
        learning_rate=float(args.learning_rate),
        per_device_train_batch_size=args.train_batch_size,
        per_device_eval_batch_size=args.eval_batch_size,
        num_train_epochs=args.epochs,
        weight_decay=0.01,
        logging_dir=f"{args.output_data_dir}/logs",
    )

    # create Trainer instance
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=test_dataset,
        tokenizer=tokenizer,
        data_collator=default_data_collator
    )

    # train model
    trainer.train()

    # Saves the model to s3
    trainer.save_model(args.model_dir)

Overwriting scripts/train.py


In [6]:
%%writefile iam_helper.py

import boto3
import json

iam = boto3.client("iam")


def create_lambda_role(role_name):
    try:
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(
                {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Effect": "Allow",
                            "Principal": {"Service": "lambda.amazonaws.com"},
                            "Action": "sts:AssumeRole",
                        }
                    ],
                }
            ),
            Description="Role for Lambda to call SageMaker functions",
        )

        role_arn = response["Role"]["Arn"]

        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
        )

        response = iam.attach_role_policy(
            PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", RoleName=role_name
        )

        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f"Using ARN from existing role: {role_name}")
        response = iam.get_role(RoleName=role_name)
        return response["Role"]["Arn"]

Overwriting iam_helper.py


In [7]:
%%writefile scripts/lambda_sync.py
import json
import boto3


def lambda_handler(event, context):
    """ """
    sm_client = boto3.client("sagemaker")

    # The name of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]

    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]

    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": "ml.g4dn.xlarge",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
    )

    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
    )

    return {
        "statusCode": 200,
        "body": json.dumps("Created Endpoint!"),
        "other_key": "example_value",
    }

Overwriting scripts/lambda_sync.py


In [8]:
%%writefile scripts/lambda_async.py
import json
import boto3


def lambda_handler(event, context):
    """ """
    sm_client = boto3.client("sagemaker")

    # The name of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]
    model_package_arn = event["model_package_arn"]
    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]
    role = event["role"]
    bucket = event["bucket"]
    s3_prefix = event["s3_prefix"]
 
    container = {"ModelPackageName": model_package_arn}
    create_model_respose = sm_client.create_model(ModelName=model_name, ExecutionRoleArn=role, Containers=[container] )

    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": "ml.g4dn.xlarge",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
        AsyncInferenceConfig={
            "OutputConfig": {
                "S3OutputPath": f"s3://{bucket}/{s3_prefix}/output",
            },
            "ClientConfig": {
                "MaxConcurrentInvocationsPerInstance": 4
            },
        },
    )

    create_endpoint_response = sm_client.create_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)
    return {
        "statusCode": 200,
        "body": json.dumps("Created Endpoint!"),
        "other_key": "example_value",
    }


Overwriting scripts/lambda_async.py


In [9]:
sp_processor = ScriptProcessor(
    role=role,
    image_uri=train_image_uri,
    instance_type="ml.g4dn.xlarge",
#    instance_type='local_gpu',
#    transformers_version='4.6',
#    pytorch_version='1.7',
    instance_count=1,
    command=['python3'],
#    sagemaker_session=PipelineSession(),
)

In [10]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

step_process = ProcessingStep(
    name="HuggingFaceDataProcessStep",
    processor=sp_processor,
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    job_arguments=[
        "--model-name",
        "distilbert-base-uncased",
        "--train-ratio",
        "0.7",
        "--val-ratio",
        "0.15",
        "--star-threshold",
        "4"
    ],
    code='scripts/data-process.py',
)

In [11]:
train_image_uri = train_image_uri
# hyperparameters which are passed to the training job
hyperparameters={
    'epochs': 1,
    'model_name': 'distilbert-base-uncased'
}

huggingface_estimator = Estimator(
        entry_point='scripts/train.py',
        image_uri=train_image_uri,
        output_path=f's3://{sess.default_bucket()}/{s3_prefix}/train',  
        code_location=f's3://{sess.default_bucket()}/{s3_prefix}/train',
        instance_type='ml.p3.2xlarge',
        instance_count=1,
        role=role,
        # transformers_version='4.12.3',
        # pytorch_version='1.9.1',
        # py_version='py38',
        hyperparameters=hyperparameters,
        disable_profiler=True,
)

In [12]:
step_train = TrainingStep(
    name="hf-train",
    estimator=huggingface_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=training_input_path,
            # content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=test_input_path,
            # content_type="text/csv",
        ),
    },
)

step_train.add_depends_on([step_process])

In [13]:
inference_image_uri = inference_image_uri
# Create Model
from sagemaker.workflow.pipeline_context import PipelineSession
pipeline_session = PipelineSession()

model = Model(
    image_uri=inference_image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

step_create_model = ModelStep(
    name="CreateModel",
    step_args=model.create("ml.g4dn.xlarge"),
)



In [14]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="Approved",  # ModelApprovalStatus can be set to a default of "Approved" if you don't want manual approval.
)

step_register = RegisterModel(
    name="hf-small-model",
    estimator=huggingface_estimator,
    image_uri = inference_image_uri,
    #model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    #model="s3://sagemaker-us-east-1-808242303800/hf-small-tune/train/pipelines-m199adkqjuew-hf-train-Ux3gOnEEFx/output/model.tar.gz",
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.g4dn.xlarge"],
    # transform_instances=["ml.m5.large"],
    model_package_group_name="hf-model-group",
    approval_status=model_approval_status,
    # model_metrics=model_metrics,
)

In [15]:
from iam_helper import create_lambda_role

lambda_role = create_lambda_role("lambda-deployment-role") 

Using ARN from existing role: lambda-deployment-role


In [16]:
# Use the current time to define unique names for the resources created
import time
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

model_name = "sync-hf-model" + current_time
endpoint_config_name = "sync-hf-endpoint-config" + current_time
endpoint_name = "sync-hf-endpoint-" + current_time
function_name = "sagemaker-demo-sync-lambda-step" + current_time

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="scripts/lambda_sync.py",
    handler="lambda_sync.lambda_handler",
    timeout=600,
    memory_size=10240,
)

# The dictionary retured by the Lambda function is captured by LambdaOutput, each key in the dictionary corresponds to a
# LambdaOutput

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="other_key", output_type=LambdaOutputTypeEnum.String)

# The inputs provided to the Lambda function can be retrieved via the `event` object within the `lambda_handler` function
# in the Lambda
step_sync_deploy_lambda = LambdaStep(
    name="LambdaStepHuggingFaceSyncDeploy",
    lambda_func=func,
    inputs={
        "model_name": step_create_model.properties.ModelName,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
    },
    outputs=[output_param_1, output_param_2, output_param_3] )

In [17]:
# Use the current time to define unique names for the resources created
import time
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

model_name = "async-hf-model" + current_time
endpoint_config_name = "async-hf-endpoint-config" + current_time
endpoint_name = "async-hf-endpoint-" + current_time
function_name = "sagemaker-async-hf-lambda-step" + current_time

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="scripts/lambda_async.py",
    handler="lambda_async.lambda_handler",
    timeout=600,
    memory_size=10240,
)

# The dictionary retured by the Lambda function is captured by LambdaOutput, each key in the dictionary corresponds to a
# LambdaOutput

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="other_key", output_type=LambdaOutputTypeEnum.String)

# The inputs provided to the Lambda function can be retrieved via the `event` object within the `lambda_handler` function
# in the Lambda
step_async_deploy_lambda = LambdaStep(
    name="LambdaStepHuggingFaceAsyncDeploy",
    lambda_func=func,
    inputs={
        "model_name": model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "model_package_arn": step_register.steps[0].properties.ModelPackageArn,
        "role": role,
        "bucket": sagemaker_session_bucket,
        "s3_prefix": s3_prefix
    },
    outputs=[output_param_1, output_param_2, output_param_3] )

In [18]:
pipeline = Pipeline(
    name="hf3-sagemaker-workshop-pipeline",
    parameters=[
        model_approval_status,
    ],
    steps=[step_process, step_train, step_create_model, step_register, step_sync_deploy_lambda, step_async_deploy_lambda],
    sagemaker_session=sess,
)

In [19]:
pipeline.create(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:808242303800:pipeline/hf3-sagemaker-workshop-pipeline',
 'ResponseMetadata': {'RequestId': '24558466-79af-4268-a9c7-cfdd1ce8a34f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '24558466-79af-4268-a9c7-cfdd1ce8a34f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '99',
   'date': 'Sat, 19 Nov 2022 11:32:04 GMT'},
  'RetryAttempts': 0}}

In [20]:
execution = pipeline.start()

In [25]:
import boto3
import botocore
import json
import pprint as pp

runtime = boto3.client("sagemaker-runtime")

endpoint_name = "sync-hf-endpoint-11-19-11-32-02"
content_type = "application/json"
payload = {
	'inputs': {
		"question": "What's my name?",
		"context": "My name is Clara and I live in Berkeley."
	}
}
res = json.dumps(payload)

response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType=content_type,
    Body=res
)

pp.pprint(response)
response_json = json.loads(botocore.response.StreamingBody.read(response['Body']))
print(response_json)

{'Body': <botocore.response.StreamingBody object at 0x7fbca9c40e20>,
 'ContentType': 'application/json',
 'InvokedProductionVariant': 'AllTraffic',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '65',
                                      'content-type': 'application/json',
                                      'date': 'Sat, 19 Nov 2022 13:25:25 GMT',
                                      'x-amzn-invoked-production-variant': 'AllTraffic',
                                      'x-amzn-requestid': '21b47df8-a3aa-4867-8338-00f8cb6dd361'},
                      'HTTPStatusCode': 200,
                      'RequestId': '21b47df8-a3aa-4867-8338-00f8cb6dd361',
                      'RetryAttempts': 0}}
{'score': 0.7042312622070312, 'start': 11, 'end': 16, 'answer': 'Clara'}


In [31]:
region = sagemaker.Session().boto_region_name
import json
import boto3
sm_client = boto3.client("sagemaker")
boto_session = boto3.Session(region_name=region)
sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session, sagemaker_client=sm_client
)

item = {"inputs":{
		"question": "What's my name?",
		"context": "My name is Clara and I live in Berkeley."
	}}

with open("test.json", 'w') as f:
    f.write(json.dumps(item))
    
s3_item = sagemaker_session.upload_data(path = 'test.json', bucket = sess.default_bucket(), key_prefix = s3_prefix)


In [32]:
sm_runtime = boto_session.client("sagemaker-runtime")
response_1 = sm_runtime.invoke_endpoint_async(
    EndpointName="async-hf-endpoint-11-19-11-32-03",
    InputLocation=s3_item,
    ContentType="application/json",
)


In [33]:
import os
import re

out_prefix = 'hf-small-tune/output/'
out_file = re.search('output\/(.*)$', str(response_1['OutputLocation'])).group(1)
sagemaker_session.download_data(path = './test_output', bucket = sess.default_bucket(), key_prefix = out_prefix + out_file)
with open('./test_output/' + os.listdir('./test_output')[0]) as f:
    for line in f:
        print(line)

{"score":0.26507049798965454,"start":11,"end":39,"answer":"Clara and I live in Berkeley"}
