# Huggingface on SageMaker Pipeline
### Binary Classification with `Trainer` and `imdb` dataset

1. [Introduction](#Introduction)  
2. [Development Environment and Permissions](#Development-Environment-and-Permissions)
    1. [Installation](#Installation)  
    2. [Development environment](#Development-environment)  
    3. [Permissions](#Permissions)
3. [Processing](#Preprocessing)   
    1. [Tokenization](#Tokenization)  
    2. [Uploading data to sagemaker_session_bucket](#Uploading-data-to-sagemaker_session_bucket)  
4. [Fine-tuning & starting Sagemaker Training Job](#Fine-tuning-\&-starting-Sagemaker-Training-Job)  
    1. [Creating an Estimator and start a training job](#Creating-an-Estimator-and-start-a-training-job)  
    2. [Estimator Parameters](#Estimator-Parameters)   
    3. [Download fine-tuned model from s3](#Download-fine-tuned-model-from-s3)
    3. [Attach to old training job to an estimator ](#Attach-to-old-training-job-to-an-estimator)  
5. [_Coming soon_:Push model to the Hugging Face hub](#Push-model-to-the-Hugging-Face-hub)

# Introduction

Welcome to our end-to-end binary Text-Classification example. In this demo, we will use the Hugging Faces `transformers` and `datasets` library together with a custom Amazon sagemaker-sdk extension to fine-tune a pre-trained transformer on binary text classification. In particular, the pre-trained model will be fine-tuned using the `imdb` dataset. To get started, we need to set up the environment with a few prerequisite steps, for permissions, configurations, and so on. 

This is an extend for this [get start demo](https://github.com/huggingface/notebooks/blob/main/sagemaker/01_getting_started_pytorch/sagemaker-notebook.ipynb), we add SageMaker Processing, SageMaker Batch Transform and SageMaker Pipeline.

# Development Environment and Permissions 

## Installation

_*Note:* we only install the required libraries from Hugging Face and AWS. You also need PyTorch or Tensorflow, if you haven´t it installed_

In [None]:
!pip install "sagemaker>=2.48.0" "transformers==4.12.3" "datasets[s3]==1.18.3" --upgrade

## Development environment 

In [None]:
import sagemaker.huggingface

## Permissions

_If you are going to use Sagemaker in a local environment. You need access to an IAM Role with the required permissions for Sagemaker. You can find [here](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) more about it._

In [None]:
import sagemaker

sess = sagemaker.Session()
# sagemaker session bucket -> used for uploading data, models and logs
# sagemaker will automatically create this bucket if it not exists
sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

role = sagemaker.get_execution_role()
sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

base_job_name = 'huggingfaces-sm-demo'

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

# Preprocessing

We are using the `datasets` library to download and preprocess the `imdb` dataset. After preprocessing, the dataset will be uploaded to our `sagemaker_session_bucket` to be used within our training job. The [imdb](http://ai.stanford.edu/~amaas/data/sentiment/) dataset consists of 25000 training and 25000 testing highly polar movie reviews.

在该部分，我们将原来的数据处理过程，改为在sagemaker processing上面跑。

In [None]:
from datasets import load_dataset

    # load dataset
dataset = load_dataset('imdb')
dataset.save_to_disk('./dataset/')

In [None]:
dataset

In [None]:
dataset.set_format("pandas")
train_dataset = dataset["train"][:]
test_dataset = dataset["test"][:]

In [None]:
type(train_dataset)

In [None]:
import pandas as pd

full_dataset = pd.concat([train_dataset, test_dataset])
full_dataset

In [None]:
!mkdir data

In [None]:
train_dataset.to_csv('./data/train.csv',index=False)
test_dataset.to_csv('./data/test.csv',index=False)

In [None]:
# upload datest_data_tfm to S3
from sagemaker.s3 import S3Uploader

input_dataset_prefix = 'hf-sm-pipeline/dataset/input'

dataset_s3_uri = f"s3://{sagemaker_session_bucket}/{input_dataset_prefix}"

S3Uploader.upload('./data/', desired_s3_uri=dataset_s3_uri)

In [None]:
%%writefile processing.py

# Tokenization
import argparse
import os

from datasets import load_dataset
from transformers import AutoTokenizer

input_data_path = "/opt/ml/processing/input_data"
output_data_path = "/opt/ml/processing/output_data"

if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument("--tokenizer_name", type=str, default="distilbert-base-uncased")
    parser.add_argument("--dataset_name", type=str, default="imdb")

    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))

    
    # tokenizer used in preprocessing
    tokenizer_name = args.tokenizer_name # 'distilbert-base-uncased'

    # dataset used
    dataset_name = args.dataset_name # 'imdb'

    # s3 key prefix for the data
#     s3_prefix = 'samples/datasets/imdb'

    # load dataset
#     dataset = load_dataset(dataset_name)

    # download tokenizer
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_name)

    # tokenizer helper function
    def tokenize(batch):
        return tokenizer(batch['text'], padding='max_length', truncation=True)

    # load dataset
    train_dataset = load_dataset('csv', data_files={'train':os.path.join(input_data_path,'train.csv')})
    test_dataset = load_dataset('csv', data_files={'test':os.path.join(input_data_path,'test.csv')})
#     train_dataset, test_dataset = load_dataset(dataset_name, split=['train', 'test'])
#     test_dataset = test_dataset.shuffle().select(range(10000)) # smaller the size for test dataset to 10k 


    # tokenize dataset
    train_dataset = train_dataset.map(tokenize, batched=True)
    test_dataset = test_dataset.map(tokenize, batched=True)

    # set format for pytorch
    train_dataset =  train_dataset.rename_column("label", "labels")
    train_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
    test_dataset = test_dataset.rename_column("label", "labels")
    test_dataset.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])


    # save dataset to /opt/ml/processing/
    train_dataset.save_to_disk(output_data_path)
    test_dataset.save_to_disk(output_data_path)

In [None]:
from sagemaker.processing import (ProcessingInput, ProcessingOutput,
                                  ScriptProcessor)

processing_repository_uri = '763104351884.dkr.ecr.us-west-2.amazonaws.com/huggingface-pytorch-training:1.9-transformers4.12-gpu-py38-cu111-ubuntu20.04'
script_processor = ScriptProcessor(command=['python3'],
                image_uri=processing_repository_uri,
                role=role,
                instance_count=1,
                instance_type='ml.m5.2xlarge',
                base_job_name=base_job_name + '-processing')

prefix = 'hf-sm-pipeline/dataset'

input_data = 's3://{}/{}/input'.format(sagemaker_session_bucket, prefix)
output_data = 's3://{}/{}/output'.format(sagemaker_session_bucket, prefix)

tokenizer_name = 'distilbert-base-uncased'
dataset_name = 'imdb'

script_processor.run(code='processing.py',
                      inputs=[ProcessingInput(
                        source=input_data,
                        destination='/opt/ml/processing/input_data',
                        s3_data_distribution_type='ShardedByS3Key')],
                      outputs=[ProcessingOutput(destination=output_data,
                                                source='/opt/ml/processing/output_data',
                                                s3_upload_mode = 'Continuous')],
                      arguments=['--tokenizer_name', tokenizer_name,
                                '--dataset_name', dataset_name]
                     )
script_processor_job_description = script_processor.jobs[-1].describe()
print(script_processor_job_description)

# Fine-tuning & starting Sagemaker Training Job

In order to create a sagemaker training job we need an `HuggingFace` Estimator. The Estimator handles end-to-end Amazon SageMaker training and deployment tasks. In a Estimator we define, which fine-tuning script should be used as `entry_point`, which `instance_type` should be used, which `hyperparameters` are passed in .....



```python
huggingface_estimator = HuggingFace(entry_point='train.py',
                            source_dir='./scripts',
                            base_job_name='huggingface-sdk-extension',
                            instance_type='ml.p3.2xlarge',
                            instance_count=1,
                            transformers_version='4.4',
                            pytorch_version='1.6',
                            py_version='py36',
                            role=role,
                            hyperparameters = {'epochs': 1,
                                               'train_batch_size': 32,
                                               'model_name':'distilbert-base-uncased'
                                                })
```

When we create a SageMaker training job, SageMaker takes care of starting and managing all the required ec2 instances for us with the `huggingface` container, uploads the provided fine-tuning script `train.py` and downloads the data from our `sagemaker_session_bucket` into the container at `/opt/ml/input/data`. Then, it starts the training job by running. 

```python
/opt/conda/bin/python train.py --epochs 1 --model_name distilbert-base-uncased --train_batch_size 32
```

The `hyperparameters` you define in the `HuggingFace` estimator are passed in as named arguments. 

Sagemaker is providing useful properties about the training environment through various environment variables, including the following:

* `SM_MODEL_DIR`: A string that represents the path where the training job writes the model artifacts to. After training, artifacts in this directory are uploaded to S3 for model hosting.

* `SM_NUM_GPUS`: An integer representing the number of GPUs available to the host.

* `SM_CHANNEL_XXXX:` A string that represents the path to the directory that contains the input data for the specified channel. For example, if you specify two input channels in the HuggingFace estimator’s fit call, named `train` and `test`, the environment variables `SM_CHANNEL_TRAIN` and `SM_CHANNEL_TEST` are set.


To run your training job locally you can define `instance_type='local'` or `instance_type='local_gpu'` for gpu usage. _Note: this does not working within SageMaker Studio_


## Creating an Estimator and start a training job

In [None]:
from sagemaker.huggingface import HuggingFace

# hyperparameters, which are passed into the training job
hyperparameters={'epochs': 1,
                 'train_batch_size': 32,
                 'model_name':'distilbert-base-uncased'
                 }

huggingface_estimator = HuggingFace(entry_point='train.py',
                            source_dir='./scripts',
                            instance_type='ml.p3.2xlarge',
                            instance_count=1,
                            role=role,
                            transformers_version='4.12',
                            pytorch_version='1.9',
                            py_version='py38',
                            hyperparameters = hyperparameters)

In [None]:
# starting the train job with our uploaded datasets as input

training_input_path = output_data + '/train'
test_input_path = output_data + '/test'

huggingface_estimator.fit({'train': training_input_path, 'test': test_input_path})

## Batch transform

Now let's try use batch transform to inference mount of data.

In [None]:
from  sagemaker.model import Model

image_uri = '763104351884.dkr.ecr.us-west-2.amazonaws.com/huggingface-pytorch-inference:1.9-transformers4.12-cpu-py38-ubuntu20.04'

hf_model = Model(image_uri=image_uri, 
              model_data=huggingface_estimator.model_data, 
              role=role)

### 准备推理数据并上传到S3

In [None]:
%%writefile test_data_tfm.jsonl
{"inputs":"I love using the new Inference DLC."}
{"inputs":"I love using the new Inference DLC."}
{"inputs":"I love using the new Inference DLC."}
{"inputs":"I love using the new Inference DLC."}

In [None]:
S3Uploader.upload('test_data_tfm.jsonl', f's3://{sagemaker_session_bucket}/hf-sm-pipeline/tfm/input')

In [None]:
## This is another way to create batch transform job with huggingface model

# from sagemaker.huggingface.model import HuggingFaceModel

# huggingface_model = HuggingFaceModel(
#     role=role, 
#     model_data=huggingface_estimator.model_data, 
#     transformers_version='4.12', 
#     pytorch_version='1.9', 
#     py_version='py38'
# )

# tfm_output = f's3://{sagemaker_session_bucket}/hf-sm-pipeline/tfm/output'

# # create transformer to run a batch job
# batch_job = huggingface_model.transformer(
#     instance_count=1,
#     instance_type='ml.m5.xlarge',
#     strategy='SingleRecord',
#     output_path=tfm_output, 
# )

# test_data = f's3://{sagemaker_session_bucket}/hf-sm-pipeline/tfm/input'

# # starts batch transform job and uses S3 data as input
# batch_job.transform(
#     data=test_data,
#     content_type='application/json',    
#     split_type='Line'
# )

#### 创建 SageMaker Batch Transform任务

[Hugging Face SageMaker Batch Transform](https://huggingface.co/docs/sagemaker/inference#run-batch-transform-with-transformers-and-sagemaker)

In [None]:
tfm_output = f's3://{sagemaker_session_bucket}/hf-sm-pipeline/tfm/output'

tfm = hf_model.transformer(
    instance_count=1, 
    instance_type='ml.m5.xlarge', 
    output_path=tfm_output, 
    strategy='SingleRecord',
#     max_concurrent_transforms=None, 
#     max_payload=None
    )

test_data = f's3://{sagemaker_session_bucket}/hf-sm-pipeline/tfm/input'

tfm.transform(
    data=test_data, 
    data_type='S3Prefix', 
    split_type='Line', #
    content_type='application/json',#
    wait=True, 
    logs=True)

In [None]:
import json
from sagemaker.s3 import S3Downloader,S3Uploader,s3_path_join
from ast import literal_eval
# creating s3 uri for result file -> input file + .out
output_file = f"test_data_tfm.jsonl.out"
output_path = s3_path_join(tfm_output,output_file)

# download file
S3Downloader.download(output_path,'.')

batch_transform_result = []
with open(output_file) as f:
    for line in f:
        # converts jsonline array to normal array
        line = "[" + line.replace("[","").replace("]",",") + "]"
        batch_transform_result = literal_eval(line) 
        
# print results 
print(batch_transform_result[:3])

### Deploy a realtime endpoint

In [None]:
## Get model uri

model_s3_uri = huggingface_estimator.model_data
model_s3_uri

In [None]:
from sagemaker.huggingface.model import HuggingFaceModel

role = sagemaker.get_execution_role()
endpoint_name = 'realtime-hf-text'

# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
   model_data=model_s3_uri,  # path to your trained SageMaker model
   role=role,                                            # IAM role with permissions to create an endpoint
   transformers_version="4.12",                           # Transformers version used
   pytorch_version="1.9",                                # PyTorch version used
   py_version='py38',                                    # Python version used
)

# deploy model to SageMaker Inference
predictor = huggingface_model.deploy(
   endpoint_name=endpoint_name,
   initial_instance_count=1,
   instance_type="ml.m5.xlarge"
)


In [None]:
# example request: you always need to define "inputs"
data = {
   "inputs": "Camera - You are awarded a SiPix Digital Camera! call 09061221066 fromm landline. Delivery within 28 days."
}

# request
predictor.predict(data)

In [None]:
import boto3
import json

client = boto3.client('sagemaker-runtime')
data = {
   "inputs": "Camera - You are awarded a SiPix Digital Camera! call 09061221066 fromm landline. Delivery within 28 days."
}

payload = json.dumps(data)
response = client.invoke_endpoint(
    EndpointName=endpoint_name,
    Body=payload,
    ContentType='application/json',
)

print(response['Body'].read().decode())

In [None]:
#### Delete endpoint if not need

predictor.delete_endpoint()

---

# Build SageMaker Pipeline

In [None]:
import sys
import boto3
import sagemaker


sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
sagemaker_session_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"HuggingFacesTCModelPackageGroupName"

In [None]:
dataset_prefix = 'hf-sm-pipeline/dataset'
batch_prefix = 'hf-sm-pipeline/tfm'

input_data_uri = 's3://{}/{}/input'.format(sagemaker_session_bucket, dataset_prefix)
output_data_uri = 's3://{}/{}/output'.format(sagemaker_session_bucket, dataset_prefix)

batch_data_uri = 's3://{}/{}/input'.format(sagemaker_session_bucket, batch_prefix)

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)


processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
# model_approval_status = ParameterString(
#     name="ModelApprovalStatus", default_value="PendingManualApproval"
# )
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
# mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

### Define ProcessingStep

In [None]:
from sagemaker.processing import (ProcessingInput, ProcessingOutput,
                                  ScriptProcessor)
from sagemaker.workflow.steps import ProcessingStep

processing_repository_uri = '763104351884.dkr.ecr.us-west-2.amazonaws.com/huggingface-pytorch-training:1.9-transformers4.12-gpu-py38-cu111-ubuntu20.04'

script_processor = ScriptProcessor(command=['python3'],
                image_uri=processing_repository_uri,
                role=role,
                instance_count=1,
                instance_type='ml.m5.2xlarge',
                base_job_name=base_job_name + '-processing')


tokenizer_name = 'distilbert-base-uncased'
dataset_name = 'imdb'


step_process = ProcessingStep(
    name="TextTokenizerProcess",
    processor=script_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input_data"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/output_data/train", destination=output_data_uri+'/train/'),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output_data/test", destination=output_data_uri+'/test/'),
    ],
    job_arguments=['--tokenizer_name', tokenizer_name, '--dataset_name', dataset_name],
    code="processing.py",
)


### Define a Training Step to Train a Model

In [None]:
from sagemaker.huggingface import HuggingFace
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

# hyperparameters, which are passed into the training job
hyperparameters={'epochs': 1,
                 'train_batch_size': 32,
                 'model_name':'distilbert-base-uncased'
                 }

huggingface_estimator = HuggingFace(entry_point='train.py',
                            source_dir='./scripts',
                            instance_type='ml.p3.2xlarge',
                            instance_count=1,
                            role=role,
                            transformers_version='4.12',
                            pytorch_version='1.9',
                            py_version='py38',
                            hyperparameters = hyperparameters)


step_train = TrainingStep(
    name="HuggingFaceTextClassificationTrain",
    estimator=huggingface_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
        ),
    },
)

#### You also could add a model evaluation step here
We will dismiss this step in demo

### Define a Create Model Step to Create a Model

In [None]:
from sagemaker.huggingface.model import HuggingFaceModel
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

huggingface_model = HuggingFaceModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, 
    transformers_version='4.12', 
    pytorch_version='1.9', 
    py_version='py38',
    sagemaker_session=sagemaker_session,
    role=role,
)


inputs = CreateModelInput(
    instance_type="ml.p3.2xlarge",
)
step_create_model = CreateModelStep(
    name="HuggingFaceTextClassificationCreateModel",
    model=huggingface_model,
    inputs=inputs,
)

### Define a Transform Step to Perform Batch Transformation

In [None]:
from sagemaker.transformer import Transformer

from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

batch_output = 's3://{}/{}/output'.format(sagemaker_session_bucket, dataset_prefix)

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=batch_output,
    strategy='SingleRecord',
)


step_transform = TransformStep(
    name="HuggingFaceTextClassificationTransform", 
    transformer=transformer, 
    inputs=TransformInput(data=batch_data,
                         split_type='Line',
                         content_type='application/json',)
)

### Define a Pipeline of Parameters, Steps

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"HuggingFaceTextClassificationPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_create_model, step_transform],
)

#### Examining the pipeline definition (Optional)

In [None]:
import json


definition = json.loads(pipeline.definition())
definition

### Submit the pipeline to SageMaker and start execution

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start()

#### Lineage
Review the lineage of the artifacts generated by the pipeline.

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)