In [None]:
!pip install -U awscli boto3 sagemaker rich watermark --quiet

In [None]:
%load_ext watermark
%load_ext rich

%watermark -p awscli,boto3,sagemaker

### Upload model artifacts as `model.tar.gz` to S3


In [None]:
import tarfile
import os

model_path = os.path.join("./models", "preprocess.joblib")
model_output_path = os.path.join("./models", "model.tar.gz")

if not os.path.exists(model_output_path):
    print(f"Compressing model to {model_output_path}")
    tar = tarfile.open(model_output_path, "w:gz")
    tar.add(model_path, arcname="preprocess.joblib")
    tar.close()
else:
    print(f"Model file exists: {model_output_path}")

In [None]:
import sagemaker
import boto3
from sagemaker.s3 import S3Uploader, S3Downloader, s3_path_join
from sagemaker import session
from sagemaker import get_execution_role
from rich import print

sm_session = session.Session()
region = sm_session._region_name
role = get_execution_role()
bucket = sm_session.default_bucket()
sm_client = boto3.client("sagemaker")

prefix = "sagemaker/abalone"
account_id = boto3.client("sts").get_caller_identity().get("Account")
model_s3uri = s3_path_join(f"s3://{bucket}/{prefix}", "models/byoc-serial-inference/featurizer")

print(f"Role: {role}")
print(f"Bucket: {bucket}")
print(f"Model base: {model_s3uri}")

S3Uploader.upload(
    local_path=model_output_path, desired_s3_uri=model_s3uri, sagemaker_session=sm_session
)

### Create model object with custom inference image

In [None]:
from datetime import datetime
from uuid import uuid4

image_name = "byoc/abalone-featurizer"
ecr_image = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{image_name}:latest"
suffix = f"{str(uuid4())[:5]}-{datetime.now().strftime('%d%b%Y')}"

model_data_url = s3_path_join(model_s3uri, "model.tar.gz")

print(f"model_image_uri: {ecr_image}")
model_name = f"AbaloneXGB-featurizer-{suffix}"

print(f"Creating model : {model_name} with {model_data_url}")
model_response = sm_client.create_model(
    ModelName=model_name,
    ExecutionRoleArn=role,
    PrimaryContainer={"Image": ecr_image, "ModelDataUrl": model_data_url},
)

print(f"Response")
print(model_response["ModelArn"])

### Create Endpoint Config

In [None]:
endpoint_config_name = f"AbaloneXGB-epc-featurizer-{suffix}"
print(f"Creating endpoint config : {endpoint_config_name}")

# create endpoint config with data capture config using sagemaker boto3 client
epc_response = sm_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "AllTraffic",
            "ModelName": model_name,
            "InitialInstanceCount": 1,
            "InstanceType": "ml.m5.2xlarge",
            "InitialVariantWeight": 1,
        }
    ],
)

print(epc_response["EndpointConfigArn"])

### Create Endpoint

In [None]:
# create endpoint with endpoint config and endpoint_image_uri
endpoint_name = f"AbaloneXGB-ep-featurizer-{suffix}"
print(f"Creating endpoint : {endpoint_name}")

ep_response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name,
)

print(ep_response["EndpointArn"])

In [None]:
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp["EndpointStatus"]
print(f"Endpoint Status: {status}")
# Get the waiter object
waiter = sm_client.get_waiter("endpoint_in_service")
# Apply the waiter on the endpoint
waiter.wait(EndpointName=endpoint_name)
# Get endpoint status using describe endpoint
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
print(f"Endpoint Status: {resp['EndpointStatus']}")

### Send test inference

In [None]:
import os
from time import sleep

runtime_sm_client = boto3.client("sagemaker-runtime")

LOCALDIR = "./data"

local_test_dataset = f"{LOCALDIR}/test.csv"

test_s3uri = s3_path_join(f"s3://{bucket}/{prefix}", "data/test/test.csv")

# Download test data from S3
if not os.path.exists(local_test_dataset):
    print(f"Downloading test data to {LOCALDIR} from {test_s3uri} ...")
    S3Downloader.download(test_s3uri, local_path=LOCALDIR, sagemaker_session=sm_session)
else:
    print(f"Test file: {local_test_dataset} exists!")

limit = 5
i = 0

with open(local_test_dataset, "r") as _f:
    for row in _f:
        if i == 0:
            print(f"Headers")
            print(row)
            print("---" * 20)
            i += 1
        elif i <= limit:
            row = row.rstrip("\n")
            splits = row.split(",")
            # Remove the target column (last one)
            label = splits.pop(-1)
            input_cols = ",".join(s for s in splits)
            prediction = None
            try:
                print(f"Invoking EP with record")
                print(input_cols)
                prediction = runtime_sm_client.invoke_endpoint(
                    EndpointName=endpoint_name,
                    ContentType="text/csv",
                    Body=input_cols,
                )
                # print(prediction["Body"].read())
                response = prediction["Body"].read().decode("utf-8")
                print(response)
                # print(f"True: {label} | Predicted: {response}")
                i += 1
                sleep(0.5)
            except Exception as e:
                print(f"Prediction error: {e}")
                pass

In [None]:
from datetime import timedelta

logs_client = boto3.client("logs")
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=15)

log_group_name = f"/aws/sagemaker/Endpoints/{endpoint_name}"
log_streams = logs_client.describe_log_streams(logGroupName=log_group_name)
log_stream_name = log_streams["logStreams"][0]["logStreamName"]

# Retrieve the logs
logs = logs_client.get_log_events(
    logGroupName=log_group_name,
    logStreamName=log_stream_name,
    startTime=int(start_time.timestamp() * 1000),
    endTime=int(end_time.timestamp() * 1000),
)

# Print the logs
for event in logs["events"]:
    print(f"{datetime.fromtimestamp(event['timestamp'] // 1000)}: {event['message']}")

### Cleanup

In [None]:
# Delete endpoint, endpoint_configuration and model
print(f"EP: {endpoint_name}\nEPC: {endpoint_config_name}\nModel: {model_name}")

try:
    print(f"Deleting endpoint: {endpoint_name}")
    sm_client.delete_endpoint(EndpointName=endpoint_name)
except Exception as e:
    print(f"Error deleting EP: {endpoint_name}\n{e}")
    pass

try:
    print(f"Deleting endpoint_config: {endpoint_config_name}")
    sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)
except Exception as e:
    print(f"Error deleting EPC: {endpoint_config_name}\n{e}")
    pass

try:
    print(f"Deleting Model: {model_name}")
    sm_client.delete_model(ModelName=model_name)
except Exception as e:
    print(f"Error deleting Model: {model_name}\n{e}")
    pass