
### Serve large models on SageMaker with DJL DeepSpeed Container

In this notebook, we explore how to host a large language model on SageMaker using the latest container launched using from DeepSpeed and DJL. DJL provides for the serving framework while DeepSpeed is the key sharding library we leverage to enable hosting of large models.We use DJLServing as the model serving solution in this example. DJLServing is a high-performance universal model serving solution powered by the Deep Java Library (DJL) that is programming language agnostic. To learn more about DJL and DJLServing, you can refer to our recent blog post (https://aws.amazon.com/blogs/machine-learning/deploy-large-models-on-amazon-sagemaker-using-djlserving-and-deepspeed-model-parallel-inference/).

Model parallelism can help deploy large models that would normally be too large for a single GPU. With model parallelism, we partition and distribute a model across multiple GPUs. Each GPU holds a different part of the model, resolving the memory capacity issue for the largest deep learning models with billions of parameters. This notebook uses tensor parallelism techniques which allow GPUs to work simultaneously on the same layer of a model and achieve low latency inference relative to a pipeline parallel solution.

SageMaker has rolled out DeepSpeed container which now provides users with the ability to leverage the managed serving capabilities and help to provide the un-differentiated heavy lifting.

In this notebook, we deploy the open source llama 7B model across GPU's on a ml.g5.48xlarge instance. Note that the llama 7B fp16 model can be deployed on single GPU such as g5.2xlarge (24GB VRAM), we jsut show the code which can deploy the llm accross multiple GPUs in SageMaker. DeepSpeed is used for tensor parallelism inference while DJLServing handles inference requests and the distributed workers. For further reading on DeepSpeed you can refer to https://arxiv.org/pdf/2207.00032.pdf 


## Create SageMaker compatible Model artifact and Upload Model to S3

SageMaker needs the model to be in a Tarball format. In this notebook we are going to create the model with the Inference code to shorten the end point creation time. 

The tarball is in the following format

```
code
├──── 
│   └── model.py
│   └── requirements.txt
│   └── serving.properties

```


- `model.py` is the key file which will handle any requests for serving. 
- `requirements.txt` has the required libraries needed to be installed when the container starts up.
- `serving.properties` is the script that will have environment variables which can be used to customize model.py at run time.


#### Serving.properties has engine parameter which tells the DJL model server to use the DeepSpeed engine to load the model.

option.tensor_parallel_degree:  now we use the g5.48xlarge which has 8 GPUs, so we set the tensor_parallel_degree to 8.

option.s3url:  you should use your model path here. And the s3 path must be ended with "/".

batch_size:   it is for server side batch based on request level. You can set batch_size to the large value which can not result in the OOM. The current code about model.py is just demo for one prompt per client request.

max_batch_delay:   it is counted by millisecond. 

In [None]:
!pip install sagemaker boto3 --upgrade  --quiet
!pip install huggingface_hub

In [None]:
!rm -rf src
!mkdir src

In [None]:
%%writefile ./src/serving.properties
option.tensor_parallel_degree=1
option.s3url=s3://sagemaker-cn-north-1-394224607677/meta-llama/model_llama27b_hf/

In [None]:
%%writefile ./src/requirements.txt
transformers==4.29.2
sagemaker
nvgpu

In [None]:
%%writefile ./src/model.py
from djl_python import Input, Output
import os
import logging
import torch
import deepspeed
import transformers
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
from transformers.models.llama.tokenization_llama import LlamaTokenizer

predictor = None
#here, we need to set the global variable batch_size according to the batch_size in the serving.properties file.
batch_size = 8

def load_model(properties):
    tensor_parallel = properties["tensor_parallel_degree"]
    model_location = properties['model_dir']
    if "model_id" in properties:
        model_location = properties['model_id']
    logging.info(f"Loading model in {model_location}")
    
    tokenizer = LlamaTokenizer.from_pretrained(model_location, torch_dtype=torch.float16)

    #for deepspeed inference 
#     model = AutoModelForCausalLM.from_pretrained(model_location, low_cpu_mem_usage=True, torch_dtype=torch.float16)
#     print("----------model dtype is {0}---------".format(model.dtype))
#     model = deepspeed.init_inference(
#         model,
#         mp_size=tensor_parallel,
#         dtype=torch.half,
#         replace_method="auto",
#         replace_with_kernel_inject=True,
#     )
        
#     local_rank = int(os.getenv("LOCAL_RANK", "0"))
#     generator = pipeline(task="text-generation", model=model, tokenizer=tokenizer, use_cache=True, device=local_rank)
    
    
    #for HF accelerate inference
    
    model = AutoModelForCausalLM.from_pretrained(model_location, device_map="auto", torch_dtype=torch.float16)
    print("----------model dtype is {0}---------".format(model.dtype))
    generator = pipeline(task="text-generation", model=model, tokenizer=tokenizer, use_cache=True)
    
    
    #for llama model, maybe the followiong code is need when you invoke the pipleline API for batch input prompts.
    generator.tokenizer.pad_token_id = model.config.eos_token_id
    return generator, model, tokenizer


def handle(inputs: Input) -> None:
    global predictor, model, tokenizer
    try:
        if not predictor:
            predictor,model,tokenizer = load_model(inputs.get_properties())

        #print(inputs)
        if inputs.is_empty():
            # Model server makes an empty call to warmup the model on startup
            return None
        
        if inputs.is_batch():
            #the demo code is just suitable for single sample per client request
            bs = inputs.get_batch_size()
            logging.info(f"Dynamic batching size: {bs}.")
            batch = inputs.get_batches()
            #print(batch)
            tmp_inputs = []
            for _, item in enumerate(batch):
                tmp_item = item.get_as_json()
                tmp_inputs.append(tmp_item.get("input"))
            
            #For server side batch, we just use the custom generation parameters for single Sagemaker Endpoint.
            result = predictor(tmp_inputs, batch_size = bs, max_new_tokens = 128, min_new_tokens = 128, temperature = 1.0, do_sample = True)
            
            outputs = Output()
            for i in range(len(result)):
                outputs.add(result[i], key="generate_text", batch_index=i)
            return outputs
        else:
            inputs = inputs.get_as_json()
            if not inputs.get("input"):
                return Output().add_as_json({"code":-1,"msg":"input field can't be null"})

            #input data
            data = inputs.get("input")
            params = inputs.get("params",{})

            #for pure client side batch
            if type(data) == str:
                bs = 1
            elif type(data) == list:
                if len(data) > batch_size:
                    bs = batch_size
                else:
                    bs = len(data)
            else:
                return Output().add_as_json({"code":-1,"msg": "input has wrong type"})
                
            print("client side batch size is ", bs)
            #predictor
            #result = predictor(data, batch_size = bs, **params)
            result = predictor(data, batch_size = bs)

            #return
            return Output().add_as_json(result)
    except Exception as e:
        return Output().add_as_json({"code":-1,"msg":e})

#### Create required variables and initialize them to create the endpoint, we leverage boto3 for this

In [None]:
import sagemaker
from sagemaker import image_uris
import boto3
import os
import time
import json
from pathlib import Path

sage_session = sagemaker.Session()
model_bucket = sage_session.default_bucket()  # bucket to house artifacts
s3_code_prefix = (
    "hf-large-model-llama-7b-0604/code"  # folder within bucket where code artifact will go
)

s3_client = boto3.client("s3")
sm_client = boto3.client("sagemaker")
smr_client = boto3.client("sagemaker-runtime")

**Image URI for the DJL container is being used here**

In [None]:
#Note that: you can modify the image url according to your specific region.
#inference_image_uri = "727897471807.dkr.ecr.cn-north-1.amazonaws.com.cn/djl-inference:0.22.1-deepspeed0.9.2-cu118"

inference_image_uri = "727897471807.dkr.ecr.cn-north-1.amazonaws.com.cn/djl-inference:0.23.0-deepspeed0.9.5-cu118"
print(f"Image going to be used is ---- > {inference_image_uri}")

**Create the Tarball and then upload to S3 location**

In [None]:
!rm model.tar.gz
!tar czvf model.tar.gz src

In [None]:
s3_code_artifact = sage_session.upload_data("model.tar.gz", model_bucket, s3_code_prefix)
print(f"S3 Code or Model tar ball uploaded to --- > {s3_code_artifact}")

In [None]:
print(f"S3 Model Bucket is -- > {model_bucket}")

### To create the end point the steps are:

1. Create the Model using the Image container and the Model Tarball uploaded earlier
2. Create the endpoint config using the following key parameters

    a) Instance Type is ml.g5.48xlarge 
    
    b) ContainerStartupHealthCheckTimeoutInSeconds is 15*60 to ensure health check starts after the model is ready
    
3. Create the end point using the endpoint config created    
    

One of the key parameters here is **TENSOR_PARALLEL_DEGREE** which essentially tells the DeepSpeed library to partition the models along 8 GPU's. This is a tunable and configurable parameter.

This parameter also controls the no of workers per model which will be started up when DJL serving runs. As an example if we have a 8 GPU machine and we are creating 8 partitions then we will have 1 worker per model to serve the requests. For further reading on DeepSpeedyou can follow the link https://www.deepspeed.ai/tutorials/inference-tutorial/#initializing-for-inference. 

In [None]:
from sagemaker.utils import name_from_base

model_name = name_from_base(f"llama-7b-finetuned")
print(model_name)

role = sagemaker.get_execution_role()

create_model_response = sm_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={
        "Image": inference_image_uri,
        "ModelDataUrl": s3_code_artifact,
    },
)
model_arn = create_model_response["ModelArn"]

print(f"Created Model: {model_arn}")

VolumnSizeInGB has been left as commented out. You should use this value for Instance types which support EBS volume mounts. The current instance we are using comes with a pre configured space and does not support additional volume mounts

In [None]:
endpoint_config_name = f"{model_name}-config-06041312"
endpoint_name = f"{model_name}-endpoint"

endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "variant1",
            "ModelName": model_name,
            "InstanceType": "ml.g4dn.2xlarge",
            "InitialInstanceCount": 1,
            #"VolumeSizeInGB" : 300,
            #"ModelDataDownloadTimeoutInSeconds": 15*60,
            "ContainerStartupHealthCheckTimeoutInSeconds": 15*60,
        },
    ],
)
endpoint_config_response

In [None]:
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=f"{endpoint_name}", EndpointConfigName=endpoint_config_name
)
print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}")

#### Wait for the end point to be created.

### This step can take ~ 15 min or longer so please be patient

In [None]:
import time

resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
print("Status: " + status)

while status == "Creating":
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]
    print("Status: " + status)

print("Arn: " + resp["EndpointArn"])
print("Status: " + status)

#### Leverage the Boto3 to invoke the endpoint. 

This is a generative model so we pass in a Text as a prompt and Model will complete the sentence and return the results


In [None]:
%%time
import json
import boto3

smr_client = boto3.client("sagemaker-runtime")

prompt1 = "The house is wonderful. I"
prompt2="##Eva:How often do you travel?## Malcolm:I like David Bowie too. I don’t travel much any more, but I used to.## Eva:That's cool! I recently took a road trip with my friend. We had so much fun and it opened up so many possibilities for us. What kind of places did you like to explore?## Malcolm:I love history and culture, so those are my favorite.## Eva: He was born in Birmingham, England and raised in Los Angeles, California.Eva: Yes, Sir. Queen is one of the most influential bands of all time.## Malcolm:It is. They are one of my favorite rock groups. What about you?## Eva:I'm more into classic rock, especially David Bowie. Who is your favorite artist?## Malcolm:Marylin Manson. You?## Eva:My favorite artist is David Bowie.## Eva:How often do you travel?## Malcolm:I like David Bowie too. I don’t travel much any more, but I used to.## Eva:That's cool! I recently took a road trip with my friend. We had so much fun and it opened up so many possibilities for us. What kind of places did you like to explore?## Malcolm:I love history and culture, so those are my favorite.## Eva: He was born in Birmingham, England and raised in Los Angeles, California.##Eva: Yes, Sir. Queen is one of the most influential bands of all time.## Malcolm:It is. They are one of my favorite rock groups. What about you?## Eva:I'm more into classic rock, especially David Bowie. Who is your favorite artist?## Malcolm:Marylin Manson. You?## Eva:My favorite artist is David Bowie.## Eva:How often do you travel?## Malcolm:I like David Bowie too. I don’t travel much any more, but I used to.## Eva:That's cool! I recently took a road trip with my friend. We had so much fun and it opened up so many possibilities for us. What kind of places did you like to explore?## Malcolm:I love history and culture, so those are my favorite.## Eva: He was born in Birmingham, England and raised in Los Angeles, California.##Eva: Yes, Sir. Queen is one of the most influential bands of all time.## Malcolm:It is. They are one of my favorite rock groups. What about you?## Eva:I'm more into classic rock, especially David Bowie. Who is your favorite artist?## Malcolm:Marylin Manson. You?## Eva:My favorite artist is David Bowie.## Eva:How often do you travel?## Malcolm:I like David Bowie too. I don’t travel much any more, but I used to.## Eva:That's cool! I recently took a road trip with my friend. We had so much fun and it opened up so many possibilities for us. What kind of places did you like to explore?## Malcolm:I love history and culture, so those are my favorite.## Eva: He was born in Birmingham, England and raised in Los Angeles, California.#### Malcolm:Oh. What are you wearing right now, pet?## Eva:"

parameters = {
  "early_stopping": True,
  "max_new_tokens": 128,
  "min_new_tokens": 128,
  "do_sample": True,
  "temperature": 1.0,
}

response_model = smr_client.invoke_endpoint(
            EndpointName=endpoint_name,
            Body=json.dumps(
            {
                #"input": prompt1,
                "input": prompt1,
                #"input": [prompt2,prompt2],
                #"input": [prompt2,prompt2, prompt2,prompt2],
                #"input": [prompt1,prompt1, prompt1,prompt1, prompt1,prompt1, prompt1,prompt1],
                #"input": [prompt2,prompt2, prompt2,prompt2, prompt2,prompt2, prompt2,prompt2],
                #"input": [prompt1, prompt2],
                #"input": [prompt1, prompt2, prompt1, prompt2, prompt1, prompt2,prompt1, prompt2,],
                "params": parameters
            }
            ),
            ContentType="application/json",
        )

response_model['Body'].read().decode('utf8')

In [None]:
%%time

#model_name = name_from_base(f"llama-7b-finetuned")
endpoint_name = 'llama-7b-finetuned-2023-08-11-05-47-03-147-endpoint'

smr_client = boto3.client("sagemaker-runtime")
response_model = smr_client.invoke_endpoint(
    EndpointName=endpoint_name,
    Body=json.dumps({"input": "北京在哪里?"}),
    ContentType="application/json",
)

response_model["Body"].read().decode("utf8")