# QWen 2.5 through vLLM on Sagemaker

## Use DJL with the SageMaker Python SDK
- SageMaker Python SDK를 사용하면 Deep Java Library를 이용하여 Amazon SageMaker에서 모델을 호스팅할 수 있습니다.
- Deep Java Library (DJL) Serving은 DJL이 제공하는 고성능 범용 독립형 모델 서빙 솔루션입니다. DJL Serving은 다양한 프레임워크로 학습된 모델을 로드하는 것을 지원합니다.
- SageMaker Python SDK를 사용하면 DeepSpeed와 HuggingFace Accelerate와 같은 백엔드를 활용하여 DJL Serving으로 대규모 모델을 호스팅할 수 있습니다.
- DJL Serving의 지원 버전에 대한 정보는 AWS 문서를 참조하십시오.
- 최신 지원 버전을 사용하는 것을 권장합니다. 왜냐하면 그곳에 우리의 개발 노력이 집중되어 있기 때문입니다.
- SageMaker Python SDK 사용에 대한 일반적인 정보는 SageMaker Python SDK 사용하기를 참조하십시오.
> REF: [BLOG] Deploy LLM with vLLM on SageMaker in only 13 lines of code

In [1]:
import os
from dotenv import load_dotenv
load_dotenv()

True

### 1. Depoly model on SageMaker

In [2]:
import boto3
import sagemaker
from sagemaker import get_execution_role

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
role = get_execution_role()
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
sm_client = boto3.client("sagemaker", region_name=region)
sm_runtime_client = boto3.client("sagemaker-runtime")
sm_autoscaling_client = boto3.client("application-autoscaling")

### Setup Configuration

In [4]:
model_id = "Qwen/Qwen2.5-7B-Instruct"
instance_type = "ml.g5.24xlarge"

container_uri = sagemaker.image_uris.retrieve(
    framework="djl-lmi", version="0.30.0", region=region
)
container_uri

Defaulting to the only supported framework/algorithm version: 0.28.0. Ignoring framework/algorithm version: 0.30.0.


'763104351884.dkr.ecr.us-west-2.amazonaws.com/djl-inference:0.28.0-lmi10.0.0-cu124'

In [5]:
container_startup_health_check_timeout = 900

endpoint_name = sagemaker.utils.name_from_base("QWEN-2-5-7B-G5")

print (f'container_uri: {container_uri}')
print (f'container_startup_health_check_timeout: {container_startup_health_check_timeout}')
print (f'instance_type: {instance_type}')
print (f'endpoint_name: {endpoint_name}')

container_uri: 763104351884.dkr.ecr.us-west-2.amazonaws.com/djl-inference:0.28.0-lmi10.0.0-cu124
container_startup_health_check_timeout: 900
instance_type: ml.g5.24xlarge
endpoint_name: QWEN-2-5-7B-G5-2025-08-13-13-56-33-717


### Creat model with env variables

- Target model: DeepSeek-Coder-V2-Light-Instruct
- Backend for attention computation in vLLM
- Available options:
    - "TORCH_SDPA": use torch.nn.MultiheadAttention
    - "FLASH_ATTN": use FlashAttention
    - "XFORMERS": use XFormers
    - "ROCM_FLASH": use ROCmFlashAttention
    - "FLASHINFER": use flashinfer


- '"OPTION_DISABLE_FLASH_ATTN": "false"' is for HF Accelerate with Seq-Scheduler
- It will be ignored when using vLLM beckend

> [DOC] DJL-Container and Model Configurations (info. about properties)

> [DOC] Backend Specific Configurations

In [6]:
HF_TOKEN = os.environ.get("HF_TOKEN","")
deploy_env = {
    "HF_MODEL_ID": model_id,
    "OPTION_ROLLING_BATCH": "vllm",
    "OPTION_TENSOR_PARALLEL_DEGREE": "max",
    "OPTION_MAX_ROLLING_BATCH_SIZE": "64",
    "OPTION_DTYPE":"fp16",
    "OPTION_TRUST_REMOTE_CODE": "true",
    "OPTION_MAX_MODEL_LEN": "4096",
    "VLLM_ATTENTION_BACKEND": "XFORMERS",
    #"OPTION_DISABLE_FLASH_ATTN": "false", ## HF Accelerate with Seq-Scheduler
    "HF_TOKEN": HF_TOKEN, # "<your token>"
}

In [14]:
model = sagemaker.Model(
    image_uri=container_uri,
    role=role,
    env=deploy_env
)

### Deploy Model on Sagemaker

In [15]:
model.deploy(
    instance_type=instance_type,
    initial_instance_count=2,
    endpoint_name=endpoint_name,
    container_startup_health_check_timeout=container_startup_health_check_timeout,
    sagemaker_session=sagemaker_session
)

------------!

### Invocation

In [7]:
endpoint_name = "QWEN-2-5-7B-G5-2025-08-12-03-05-19-368"

In [8]:
predictor = sagemaker.Predictor(
    endpoint_name=endpoint_name,
    sagemaker_session=sagemaker_session,
    serializer=sagemaker.serializers.JSONSerializer(),
    deserializer=sagemaker.deserializers.JSONDeserializer(),
)

In [10]:
# 호출 예시
response = predictor.predict({
    "inputs": "AI Agent에 대해 100단어 내외로 설명해 주세요.",
    "parameters": {
        "max_new_tokens": 1024,
        "stop": ["<|endoftext|>"]  # Stop sequences 지정 가능
    }
})

print(response)

{'generated_text': ' AI Agent는 인공지능 기술을 활용해 인간의 지시에 따라 작업을 수행하거나 의사결정을 돕는 컴퓨터 프로그램입니다. 다양한 분야에서 활용되며, 자연어 처리, 학습 알고리즘 등을 통해 지속적으로 발전하고 있습니다.'}


### Streaming Output

In [18]:
import json

def generate_payload(chat):
        
    # JSON 페이로드 생성
    body = {
        "messages": chat,
        "max_tokens": 512,
        "stream": True,
        "ignore_eos": False
    }
    
    # JSON을 문자열로 변환하고 bytes로 인코딩
    return json.dumps(body).encode('utf-8')

In [19]:
chat = [
    {"role": "system", "content": "너는 질의응답 챗봇입니다. 사용자의 질문의 의도를 파악하여 답변합니다. 답변은 한국어로 합니다"},
    {"role": "user", "content": "AWS AIML Specialist 솔루션즈 아키텍트 역할에 대해 설명해줘"},
]

In [21]:
%%time
# Invoke the endpoint
resp = sm_runtime_client.invoke_endpoint_with_response_stream(
    EndpointName=endpoint_name, 
    Body=generate_payload(chat),
    ContentType="application/json"
)
print("Generated response:")
print("-" * 40)

buffer = ""
string = "" 
for event in resp['Body']:
    if 'PayloadPart' in event:
        chunk = event['PayloadPart']['Bytes'].decode()
        buffer += chunk
        try:
            # Try to parse the buffer as JSON
            data = json.loads(buffer)
            if 'choices' in data:
                print(data['choices'][0]['delta']['content'], end='', flush=True)
                string += data['choices'][0]['delta']['content'] 
            buffer = ""  # Clear the buffer after successful parsing
        except json.JSONDecodeError:
            # If parsing fails, keep the buffer for the next iteration
            pass

print("\n" + "-" * 40)

Generated response:
----------------------------------------
AWS AIML Specialist Soln Architect(AIML은 Artificial Intelligence and Machine Learning의 약자)는 AWS에서 제공하는 인공지능 및 machine learning 서비스를 사용하여 클라이언트가 데이터를 분석하고 비즈니스 문제를 해결할 수 있도록 하는 아키텍처 설계자 역할을 의미합니다. 이를 위해 다음의 역할이 요구됩니다:

1. 데이터 분석 및 모델 개발: 비교적 큰 데이터셋에 대해 분석이 가능하고, 최적의 모델을 선택하여 개발할 수 있어야 합니다.

2. 서비스 활용: AWS의 AIML 서비스를 잘 이해하고 활용할 수 있어야 합니다. AWS에서는 SageMaker, Comprehend, Forecast 등의 다양한 AIML 서비스를 제공하고 있습니다.

3. 비즈니스ضع함: 클라이언트의 비즈니스 문제를 이해하고, 이를 해결할 수 있는 기술iske 솔루션을 제안하고 설계할 수 있어야 합니다.

4. 커뮤니케이션: 클라이언트와의 소통을 원활히 하고, 기술적인 내용을 간단하게 설명하여 이해할 수 있어야 합니다.

5. 아키텍처 설계: 적절한 데이터 처리, 저장 및 모델 구성을 위한 아키텍처를 설계할 수 있어야 합니다.

6. 보안 및 품질 관리: AIML 모델의 안전性和质量都需要考虑。

这种角色需要的技能和经验包括机器学习、人工智能、云架构设计以及与客户沟通的能力。同时，还需要熟悉AWS的服务和工具，以及在实际项目中应用这些技术的能力。
----------------------------------------
CPU times: user 204 ms, sys: 29.2 ms, total: 234 ms
Wall time: 4.66 s


## AutoScaling

In [22]:
import pprint
import random

In [24]:
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)

# SageMaker expects resource id to be provided with the following structure
resource_id = f"endpoint/{endpoint_name}/variant/{resp['ProductionVariants'][0]['VariantName']}"

# Scaling configuration
scaling_config_response = sm_autoscaling_client.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount", 
    MinCapacity=1,
    MaxCapacity=2
)

In [29]:
# resp['ProductionVariants'][0]

In [None]:
# Create Scaling Policy
policy_name = f"scaling-policy-{endpoint_name}"
scaling_policy_response = sm_autoscaling_client.put_scaling_policy(
    PolicyName=policy_name,
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="TargetTrackingScaling",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue": 5.0, # Target for avg invocations per minutes
        "PredefinedMetricSpecification": {
            "PredefinedMetricType": "SageMakerVariantInvocationsPerInstance",
        },
        "ScaleInCooldown": 600, # Duration in seconds until scale in
        "ScaleOutCooldown": 60 # Duration in seconds between scale out
    }
)

In [None]:
response = sm_autoscaling_client.describe_scaling_policies(ServiceNamespace="sagemaker")

pp = pprint.PrettyPrinter(indent=4, depth=4)
for i in response["ScalingPolicies"]:
    pp.pprint(i["PolicyName"])
    print("")
    if("TargetTrackingScalingPolicyConfiguration" in i):
        pp.pprint(i["TargetTrackingScalingPolicyConfiguration"])

In [None]:
# 다양한 코딩 태스크를 위한 프롬프트 리스트
prompts = [
    "write a quick sort algorithm in python.",
    "Write a Python function to implement a binary search algorithm.",
    "Create a JavaScript function to flatten a nested array.",
    "Implement a simple REST API using Flask in Python.",
    "Write a SQL query to find the top 5 customers by total purchase amount.",
    "Create a React component for a todo list with basic CRUD operations.",
    "Implement a depth-first search algorithm for a graph in C++.",
    "Write a bash script to find and delete files older than 30 days.",
    "Create a Python class to represent a deck of cards with shuffle and deal methods.",
    "Write a regular expression to validate email addresses.",
    "Implement a basic CI/CD pipeline using GitHub Actions."
]

def generate_payload():
    # 랜덤하게 프롬프트 선택
    prompt = random.choice(prompts)
    
    # JSON 페이로드 생성
    body = {
        "inputs": prompt,
        "parameters": {
            "max_new_tokens": 400,
            # "return_full_text": False  # This does not work with Phi3
        },
        "stream": True,
    }
    
    # JSON을 문자열로 변환하고 bytes로 인코딩
    return json.dumps(body).encode('utf-8')

In [None]:
%%time
import time

request_duration = 250
end_time = time.time() + request_duration
print(f"Endpoint will be tested for {request_duration} seconds")

while time.time() < end_time:
    payload = generate_payload()
    # Invoke the endpoint
    response = sm_runtime_client.invoke_endpoint_with_response_stream(
        EndpointName=endpoint_name, 
        # Body=json.dumps(body), 
        Body = payload,
        ContentType="application/json"
    )

In [None]:
# Check the instance counts after the endpoint gets more load
response = sm_client.describe_endpoint(EndpointName=endpoint_name)
endpoint_status = response["EndpointStatus"]
request_duration = 250
end_time = time.time() + request_duration
print(f"Waiting for Instance count increase for a max of {request_duration} seconds. Please re run this cell in case the count does not change")
while time.time() < end_time:
    response = sm_client.describe_endpoint(EndpointName=endpoint_name)
    endpoint_status = response["EndpointStatus"]
    instance_count = response["ProductionVariants"][0]["CurrentInstanceCount"]
    print(f"Status: {endpoint_status}")
    print(f"Current Instance count: {instance_count}")
    if (endpoint_status=="InService") and (instance_count>1):
        break
    else:
        time.sleep(15)

### Clean Up

In [None]:

# Delete model
sm_client.delete_model(ModelName=model_name)

# Delete endpoint configuration
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

# Delete endpoint
sm_client.delete_endpoint(EndpointName=endpoint_name)