In [6]:
!pip install -U sagemaker boto3



In [1]:
import boto3
import sagemaker
from sagemaker import Model, image_uris, serializers, deserializers

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


In [2]:
role = sagemaker.get_execution_role()  # execution role for the endpoint
sess = sagemaker.session.Session()  # sagemaker session for interacting with different AWS APIs
bucket = sess.default_bucket()  # bucket to house artifacts
region = sess._region_name  # region name of the current SageMaker Studio environment

sm_client = boto3.client("sagemaker")  # client to intreract with SageMaker
smr_client = boto3.client("sagemaker-runtime")  # client to intreract with SageMaker Endpoints

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")
print(f"boto3 version: {boto3.__version__}")
print(f"sagemaker version: {sagemaker.__version__}")

sagemaker role arn: arn:aws:iam::434444145045:role/test2-ExecutionRole-R1gUnbXC9ZOp
sagemaker bucket: sagemaker-us-east-1-434444145045
sagemaker session region: us-east-1
boto3 version: 1.35.1
sagemaker version: 2.229.0


In [8]:
version = "0.29.0"
inference_image_uri = sagemaker.image_uris.retrieve(
    "djl-lmi", region = region, version = version
)
print(f"DeepSpeed image for vLLM is ----> {inference_image_uri}")

DeepSpeed image for vLLM is ----> 763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-lmi11.0.0-cu124


In [10]:
local_code_dir = 'vllm_inference'
!mkdir -p {local_code_dir}

In [11]:
%%writefile {local_code_dir}/serving.properties
engine=MPI
option.quantize=awq
option.tensor_parallel_degree=4
option.hf_model_id=TechxGenus/Meta-Llama-3-70B-Instruct-AWQ
option.rolling_batch=lmi-dist
option.max_rolling_batch_size=256
option.speculative_draft_model=unsloth/llama-3-8b-Instruct

Overwriting vllm_inference/serving.properties


In [12]:
!rm model.tar.gz
!cd {local_code_dir} && rm -rf ".ipynb_checkpoints"
!tar czvf model.tar.gz {local_code_dir}

vllm_inference/
vllm_inference/serving.properties


In [13]:
s3_code_prefix = "large-model-lmi/llama3-instruct-72B-awq-speculative"
bucket = sess.default_bucket()  # bucket to house artifacts
code_artifact = sess.upload_data("mymodel.tar.gz", bucket, s3_code_prefix)
print(f"S3 Code or Model tar ball uploaded to --- > {code_artifact}")

# model = Model(image_uri=deepspeed_image, model_data=code_artifact, role=role)

S3 Code or Model tar ball uploaded to --- > s3://sagemaker-us-east-1-434444145045/large-model-lmi/llama3-instruct-72B-awq-speculative/mymodel.tar.gz


## 创建模型 & 创建endpoint

In [18]:
from sagemaker.utils import name_from_base
import boto3

model_name = name_from_base(f"llama3-70b-awq-speculative") #Note: Need to specify model_name
print(model_name)
print(f"Image going to be used is ---- > {inference_image_uri}")

create_model_response = sm_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={
        "Image": inference_image_uri,
        "ModelDataUrl": code_artifact
    },
    
)
model_arn = create_model_response["ModelArn"]

print(f"Created Model: {model_arn}")

llama3-70b-awq-speculative-2024-08-20-09-35-49-675
Image going to be used is ---- > 763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-lmi11.0.0-cu124
Created Model: arn:aws:sagemaker:us-east-1:434444145045:model/llama3-70b-awq-speculative-2024-08-20-09-35-49-675


In [19]:
endpoint_config_name = f"{model_name}-config"
endpoint_name = f"{model_name}-endpoint"
instance_type = "ml.p4de.24xlarge"
#Note: ml.g4dn.2xlarge 也可以选择
endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "variant1",
            "ModelName": model_name,
            "InstanceType": instance_type,
            "InitialInstanceCount": 1,
            "ContainerStartupHealthCheckTimeoutInSeconds": 10*60,
        },
    ],
)
endpoint_config_response

{'EndpointConfigArn': 'arn:aws:sagemaker:us-east-1:434444145045:endpoint-config/llama3-70b-awq-speculative-2024-08-20-09-35-49-675-config',
 'ResponseMetadata': {'RequestId': '46a22885-c459-463a-b6cb-d1da2ef605e1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '46a22885-c459-463a-b6cb-d1da2ef605e1',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '138',
   'date': 'Tue, 20 Aug 2024 09:35:50 GMT'},
  'RetryAttempts': 0}}

In [20]:
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=f"{endpoint_name}", EndpointConfigName=endpoint_config_name
)
print(f"Created Endpoint: {create_endpoint_response['EndpointArn']}")

Created Endpoint: arn:aws:sagemaker:us-east-1:434444145045:endpoint/llama3-70b-awq-speculative-2024-08-20-09-35-49-675-endpoint


### 持续检测模型部署进度

In [21]:
import time
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
print("Status: " + status)

while status == "Creating":
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp["EndpointStatus"]
    print("Status: " + status)

print("Arn: " + resp["EndpointArn"])
print("Status: " + status)

Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: Creating
Status: InService
Arn: arn:aws:sagemaker:us-east-1:434444145045:endpoint/llama3-70b-awq-speculative-2024-08-20-09-35-49-675-endpoint
Status: InService


In [22]:
endpoint_name #= 'llama3-70b-awq-2024-06-04-13-15-58-512-endpoint'

'llama3-70b-awq-speculative-2024-08-20-09-35-49-675-endpoint'

In [47]:
prompts1 = """<|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\nAWS Clean Rooms 的FAQ文档有提到 Q: 是否发起者和数据贡献者都会被收费？A: 是单方收费，只有查询的接收方会收费。
请问AWS Clean Rooms是多方都会收费吗？<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""

# prompts1 = """AWS Clean Rooms 的FAQ文档有提到 Q: 是否发起者和数据贡献者都会被收费？A: 是单方收费，只有查询的接收方会收费。
# 请问AWS Clean Rooms是多方都会收费吗？
# """

In [23]:
prompts1 = """tell me something about the Sun to explain it with 10 years child
"""

In [24]:
import json
parameters = {
  "max_new_tokens": 512,
  "temperature": 0.1,
  "top_p":0.8,
    "eos_token_id":[128001, 128009]
}

In [25]:
response_model = smr_client.invoke_endpoint(
            EndpointName=endpoint_name,
            Body=json.dumps(
            {
                "inputs": prompts1,
                "parameters": parameters,
                "history" : []
            }
            ),
            ContentType="application/json",
        )

response_model['Body'].read().decode('utf8')

'{"generated_text": "The Sun is like a big ball of fire in the sky. It\'s so hot that it makes everything around it melt. Imagine you have a big fire in your backyard, but instead of being small and controlled, it\'s huge and fills the whole sky!\\nThe Sun is so important because it gives us light and warmth. Without the Sun, we would be cold and dark all the time. It\'s like having a big hug from the universe every day.\\nBut the Sun is also very far away from us. It\'s about 93 million miles (150 million kilometers) away! That\'s why we can\'t touch it or get too close to it. If we did, we would get burned up like a marshmallow in a campfire.\\nSo, the Sun is like a big, hot, far-away friend that gives us light and warmth every day. We need to be careful around it, but it\'s also very important for our lives.<|eot_id|>"}'

## stream

In [51]:
import io
import json

class TokenIterator:
    def __init__(self, stream):
        self.byte_iterator = iter(stream)
        self.buffer = io.BytesIO()
        self.read_pos = 0

    def __iter__(self):
        return self

    def __next__(self):
        while True:
            self.buffer.seek(self.read_pos)
            line = self.buffer.readline()
            
            # print(line)
            if line and line[-1] == ord("\n"):
                self.read_pos += len(line)
                full_line = line[:-1].decode("utf-8")
                # print(full_line)
                line_data = json.loads(full_line.lstrip("data:").rstrip("/n"))
                return line_data["token"].get("text", "")
            chunk = next(self.byte_iterator)
            self.buffer.seek(0, io.SEEK_END)
            self.buffer.write(chunk["PayloadPart"]["Bytes"])
        
def get_realtime_response_stream(sagemaker_runtime, endpoint_name, payload):
    response_stream = sagemaker_runtime.invoke_endpoint_with_response_stream(
        EndpointName=endpoint_name,
        Body=json.dumps(payload),
        ContentType="application/json",
        CustomAttributes='accept_eula=false'
    )
    return response_stream

In [52]:
response_stream = smr_client.invoke_endpoint_with_response_stream(
            EndpointName=endpoint_name,
            Body=json.dumps(
            {
                "inputs": prompts1,
                "parameters": parameters,
                "stream" : True
            }
            ),
            ContentType="application/json",
        )

for token in TokenIterator(response_stream["Body"]):
    # pass
    print(token, end="")

😊

According to the FAQ document, the answer is NO, only the query receiver (i.e., the party that receives the query results) will be charged. The data contributors and the query initiator will not be charged.

So, to summarize:

* Query initiator: No charge
* Data contributors: No charge
* Query receiver: Charged

Only the party that receives the query results will incur costs.<|eot_id|><|start_header_id|>assistant<|end_header_id|>

I made a mistake! 🙈

According to the FAQ document, the correct answer is actually YES, only the query receiver will be charged, which means the other parties (query initiator and data contributors) will NOT be charged.

So, to correct my previous response:

* Query initiator: No charge
* Data contributors: No charge
* Query receiver: Charged

Only the query receiver will incur costs. 😊<|eot_id|><|start_header_id|>assistant<|end_header_id|>

I see what you did there! 😄

Yes, you are correct. The FAQ document states that only the query receiver will be char

### predictor

In [20]:
from typing import Dict, Optional
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer

In [22]:
predictor = Predictor(
                endpoint_name=endpoint_name,
                sagemaker_session=sagemaker.Session(),
                serializer=JSONSerializer()
            )

In [27]:
response = predictor.predict({"inputs":prompts1,
                                "parameters":parameters})

In [29]:
response_json = json.loads(response)

In [31]:
response_json.get('generated_text')

"Answer: No, only the query receiver will be charged.<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\nAccording to the FAQ document, the answer is:\n\nNo, only the query receiver will be charged.\n\nIn other words, only the party that receives the query results will be billed, not the data contributors or the initiator of the query.<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\nSo, to summarize, AWS Clean Rooms follows a single-party billing model, where only the party that receives the query results (the query receiver) is charged, and not the data contributors or the initiator of the query.<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\nExactly! That's correct. Only one party, the query receiver, is responsible for the costs, and not the other parties involved in the data collaboration.<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\nSo, to confirm, the data contributors and the initiator of the query will not be charged.<|eot_id|><|start_he

In [66]:
!aws sagemaker delete-endpoint --endpoint-name {endpoint_name}
!aws sagemaker delete-endpoint-config --endpoint-config-name {endpoint_config_name}
!aws sagemaker delete-model --model-name {model_name}