# Deploy Mistral-7B fine tuned model via DJL on SageMaker

This notebook serves as a comprehensive guide for deploying Mistral 7B Instruct - LoRA fine-tuned on Amazon SageMaker using DeepSpeed and DJL. Refer to this [AWS Blog post](https://aws.amazon.com/blogs/machine-learning/deploy-large-models-on-amazon-sagemaker-using-djlserving-and-deepspeed-model-parallel-inference/) for more details. This model served as fine-tuned head of a custom RAG architecture, for more details check the blog post.

Steps:
1. **Prepare the Deployment Package**
    * Organize the necessary files including requirements.txt, serving.properties, and model.py within a designated directory.
    * Package the directory contents into a tar.gz file.
    * Upload the Deployment Package to Amazon S3

2. **Upload the packaged tar.gz file to an Amazon S3 bucket**
   - Upload the packaged `tar.gz` file to an Amazon S3 bucket. This serves as the storage location for the deployment package.

3. **Deploy the Model as a SageMaker Endpoint**
    - Utilize SageMaker's capabilities to deploy the packaged model as an endpoint for later API inference.

*Note: This notebook assumes familiarity with Amazon SageMaker, DJL, and basic concepts of deploying machine learning models. Additional documentation and resources are available for further reference and exploration.*


### 0. Initialization

In [1]:
!pip install sagemaker --upgrade --quiet

In [2]:
import sagemaker
from sagemaker.session import Session
from sagemaker import image_uris
from sagemaker import Model

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [3]:
role = sagemaker.get_execution_role()  # execution role for the endpoint
session = sagemaker.session.Session()  # sagemaker session for interacting with different AWS APIs
region = session._region_name

image_uri = image_uris.retrieve(framework="djl-deepspeed", version="0.24.0", region=session._region_name)

### 1. Preparing deployment package
Our directory should have the following structure:

faber_lora
├── model.py
├── serving.properties
├── requirements.txt
└── fine-tuned model

In [5]:
!mkdir -p faber_lora

Prepare requirements.txt and serving.properties in ./faber_lora

In [10]:
%%writefile faber_lora/serving.properties
engine=Python
option.model_id=mistralai/Mistral-7B-Instruct-v0.1
option.dtype=fp16
option.tensor_parallel_degree=4
option.enable_streaming=true
option.entryPoint=model.py
option.adapter_checkpoint=mistral-ft-doc-285-gess
option.adapter_name=mistral-lora

Writing faber_lora/serving.properties


In [11]:
%%writefile faber_lora/requirements.txt
git+https://github.com/huggingface/transformers
accelerate==0.23.0

Writing faber_lora/requirements.txt


Prepare model.py in ./faber_lora

In [12]:
%%writefile faber_lora/model.py
from peft import PeftModel, PeftConfig
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig
from djl_python.inputs import Input
from djl_python.outputs import Output
from djl_python.encode_decode import encode, decode
import torch

import logging
import re

import numpy as np
from transformers import Pipeline, PreTrainedTokenizer


device = "cuda"
model = None
tokenizer = None

# Create Instruct Pipeline

logger = logging.getLogger(__name__)

INSTRUCTION_KEY = "### Instruction:"
RESPONSE_KEY = "### Response:"
END_KEY = "### End"
INTRO_BLURB = (
    "Below is an instruction that describes a task. Write a response that appropriately completes the request."
)


PROMPT_FOR_GENERATION_FORMAT = """{intro}
{instruction_key}
{instruction}
{response_key}
""".format(
    intro=INTRO_BLURB,
    instruction_key=INSTRUCTION_KEY,
    instruction="{instruction}",
    response_key=RESPONSE_KEY,
)


def get_special_token_id(tokenizer: PreTrainedTokenizer, key: str) -> int:
    token_ids = tokenizer.encode(key)
    if len(token_ids) > 1:
        raise ValueError(f"Expected only a single token for '{key}' but found {token_ids}")
    return token_ids[0]


class InstructionTextGenerationPipeline(Pipeline):
    def __init__(
        self, *args, do_sample: bool = True, max_new_tokens: int = 512, temperature: float = 0.3, **kwargs
    ):
        super().__init__(*args, do_sample=do_sample, max_new_tokens=max_new_tokens, temperature=temperature, **kwargs)

    def _sanitize_parameters(self, return_instruction_text=False, **generate_kwargs):
        preprocess_params = {}

        tokenizer_response_key = next(
            (token for token in self.tokenizer.additional_special_tokens if token.startswith(RESPONSE_KEY)), None
        )

        response_key_token_id = None
        end_key_token_id = None
        if tokenizer_response_key:
            try:
                response_key_token_id = get_special_token_id(self.tokenizer, tokenizer_response_key)
                end_key_token_id = get_special_token_id(self.tokenizer, END_KEY)

                generate_kwargs["eos_token_id"] = end_key_token_id
            except ValueError:
                pass

        forward_params = generate_kwargs
        postprocess_params = {
            "response_key_token_id": response_key_token_id,
            "end_key_token_id": end_key_token_id,
            "return_instruction_text": return_instruction_text,
        }

        return preprocess_params, forward_params, postprocess_params

    def preprocess(self, instruction_text, **generate_kwargs):
        prompt_text = PROMPT_FOR_GENERATION_FORMAT.format(instruction=instruction_text)
        inputs = self.tokenizer(
            prompt_text,
            return_tensors="pt",
        )
        inputs["prompt_text"] = prompt_text
        inputs["instruction_text"] = instruction_text
        return inputs

    def _forward(self, model_inputs, **generate_kwargs):
        input_ids = model_inputs["input_ids"]
        attention_mask = model_inputs.get("attention_mask", None)
        generated_sequence = self.model.generate(
            input_ids=input_ids.to(self.model.device),
            attention_mask=attention_mask,
            pad_token_id=self.tokenizer.pad_token_id,
            **generate_kwargs,
        )[0] 
        instruction_text = model_inputs.pop("instruction_text")
        return {"generated_sequence": generated_sequence, "input_ids": input_ids, "instruction_text": instruction_text}

    def postprocess(self, model_outputs, response_key_token_id, end_key_token_id, return_instruction_text):
        sequence = model_outputs["generated_sequence"]
        instruction_text = model_outputs["instruction_text"]

        decoded = None

        if response_key_token_id and end_key_token_id:
            response_pos = None
            response_positions = np.where(sequence == response_key_token_id)[0]
            if len(response_positions) == 0:
                logger.warn(f"Could not find response key {response_key_token_id} in: {sequence}")
            else:
                response_pos = response_positions[0]

            if response_pos:
                end_pos = None
                end_positions = np.where(sequence == end_key_token_id)[0]
                if len(end_positions) > 0:
                    end_pos = end_positions[0]

                decoded = self.tokenizer.decode(sequence[response_pos + 1 : end_pos]).strip()
        else:

            fully_decoded = self.tokenizer.decode(sequence)

            m = re.search(r"#+\s*Response:\s*(.+?)#+\s*End", fully_decoded, flags=re.DOTALL)

            if m:
                decoded = m.group(1).strip()
            else:
                m = re.search(r"#+\s*Response:\s*(.+)", fully_decoded, flags=re.DOTALL)
                if m:
                    decoded = m.group(1).strip()
                else:
                    logger.warn(f"Failed to find response in:\n{fully_decoded}")
        
        if ("### " in decoded):
            decoded = decoded.split("### ")[0]
        
        if ("[STOP][STOP]"  in decoded):
            decoded = decoded.split("[STOP][STOP]")[0]
        
        if return_instruction_text:
            return {"instruction_text": instruction_text, "generated_text": decoded}

        return decoded


def generate_prompt(instruction, input=None):
    if input:
        return f"""Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.
                   ### Instruction: {instruction}
                   ### Input: {input}
                   ### Response:"""
    else:
        return f"""Below is an instruction that describes a task. Write a response that appropriately completes the request.
                   ### Instruction:
                   {instruction}
                   ### Response:"""

def evaluate(instruction,
        input=None,
        temperature=0.1,
        top_p=0.75,
        top_k=40,
        num_beams=4,
        max_new_tokens=256,
        **kwargs,
):
    model_gen = InstructionTextGenerationPipeline(model=model, tokenizer=tokenizer)    
    response = model_gen(instruction)    
    return response
 
    
def load_base_model(adapter_checkpoint, adapter_name):
    model_name = "mistralai/Mistral-7B-Instruct-v0.1"    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="auto",
    )
    model = PeftModel.from_pretrained(model, adapter_checkpoint, adapter_name)    
    return model, tokenizer


def inference(inputs: Input):
    json_input = decode(inputs, "application/json")
    sequence = json_input.get("inputs")
    generation_kwargs = json_input.get("parameters", {})
    output = Output()
    outs = evaluate(sequence)
    encode(output, outs, "application/json")
    return output


def handle(inputs: Input):
    """
    Default handler function
    """
    global model, tokenizer
    if not model:
        # stateful model
        props = inputs.get_properties()
        model, tokenizer = load_base_model(props.get("adapter_checkpoint"), props.get("adapter_name"))

    if inputs.is_empty():
        # initialization request
        return None

    return inference(inputs)


Writing faber_lora/model.py


### 2. Upload model artifacts gz file to S3

In [14]:
%%bash
cp -r models/mistral-ft-doc-285-gess faber_lora/

In [15]:
%%bash
tar -cvzf faber_model.tar.gz faber_lora/

faber_lora/
faber_lora/requirements.txt
faber_lora/mistral-ft-doc-285-gess/
faber_lora/mistral-ft-doc-285-gess/runs/
faber_lora/mistral-ft-doc-285-gess/runs/Oct27_04-58-34_ip-172-16-10-151.us-west-2.compute.internal/
faber_lora/mistral-ft-doc-285-gess/runs/Oct27_04-58-34_ip-172-16-10-151.us-west-2.compute.internal/events.out.tfevents.1698382714.ip-172-16-10-151.us-west-2.compute.internal
faber_lora/mistral-ft-doc-285-gess/runs/Oct27_04-03-26_ip-172-16-10-151.us-west-2.compute.internal/
faber_lora/mistral-ft-doc-285-gess/runs/Oct27_04-03-26_ip-172-16-10-151.us-west-2.compute.internal/events.out.tfevents.1698379409.ip-172-16-10-151.us-west-2.compute.internal
faber_lora/mistral-ft-doc-285-gess/runs/Oct27_02-09-04_ip-172-16-10-151.us-west-2.compute.internal/
faber_lora/mistral-ft-doc-285-gess/runs/Oct27_02-09-04_ip-172-16-10-151.us-west-2.compute.internal/events.out.tfevents.1698372547.ip-172-16-10-151.us-west-2.compute.internal
faber_lora/mistral-ft-doc-285-gess/runs/Oct27_03-33-29_ip-172

In [None]:
%%bash
aws s3 cp faber_model.tar.gz s3://ai-models/


### 3. Deploy as SageMaker Inference Endpoint

In [17]:
#instance_type = "ml.p3.8xlarge"     # 64GB GPU Memory
#instance_type = "ml.p2.8xlarge"      # 96GB GPU Memory
instance_type = "ml.g5.12xlarge"      # 96GB GPU Memory
#instance_type = "ml.g5.2xlarge"  

model_s3_location = "s3://ai-models/faber_model.tar.gz"

In [18]:
import sagemaker.djl_inference

model = Model(
    image_uri,
    model_data=model_s3_location,
    predictor_cls = sagemaker.djl_inference.DJLPredictor, 
    role=role
)

In [19]:
predictor = model.deploy(
    initial_instance_count=1, 
    instance_type=instance_type
)

----------------!

### 4. Testing the endpoint

In [22]:
import boto3
import json

endpoint = 'djl-inference-2024-04-11-10-43-22-559'
runtime = boto3.client('runtime.sagemaker')

payload = {
    "inputs": "hey, how are you doing?",
    "parameters": {
        "do_sample": True,
        "top_p": 0.9,
        "temperature": 0.3,
    }
}

In [23]:
response = runtime.invoke_endpoint(EndpointName=endpoint,
                                   ContentType='application/json',
                                   Body=json.dumps(payload).encode("utf-8"))

In [25]:
pred = json.loads(response['Body'].read())

In [26]:
pred

"I'm just a computer program, so I don't have feelings or the ability to respond in a personal way.  However, I'm here to help you with any questions you have about the technical requirements for the building services design. Is there something specific you would like to know?</s>"

### 5. Clean-up resources

In [29]:
# predictor.delete_endpoint()
# model.delete_model()