In [None]:
import boto3
import io
import json
import random


from PIL import Image, ImageDraw, ImageFont

sagemaker_runtime = boto3.client('sagemaker-runtime') # Set up SageMaker client to invoke endpoint

ENDPOINT_NAME = 'MY ENDPOINT NAME' # Name of SageMaker endpoint (realtime or serverless)
ASYNC_ENDPOINT_NAME = 'MY ASYNC ENDPOINT NAME' # Name of async SageMaker endpoint
MY_BUCKET = 'mlbucket13' # S3 Bucket to hold async payload
KEY = 'yolov9/async/input/image.jpg' # S3 Key to where async payload will be uploaded.

image_path = './assets/dogs.jpg' # Path to image
# image_path = './assets/people.jpg' # Path to image


### Original image

In [None]:
image_pil = Image.open(image_path)
display(image_pil)

### Encode image and send to the endpoint (Realtime / serverless)

In [None]:
image_bytes = io.BytesIO()
image_pil.save(image_bytes, format='JPEG')
image_bytes.seek(0)
payload = image_bytes.read()

In [None]:
%%time
# Invoke SageMaker endpoint
res = sagemaker_runtime.invoke_endpoint(
    EndpointName=ENDPOINT_NAME, 
    ContentType='image/jpeg',  # Specify the correct content type for image
    Body=payload  # Pass the image bytes directly
)
res = res['Body'].read().decode()

bounding_boxes = json.loads(res)['bounding_boxes']

#### Async invoke

In [None]:
# Upload payload to s3

s3 = boto3.client('s3')

s3.put_object(Bucket=MY_BUCKET, Key=KEY, Body=payload)

s3_input_uri = f's3://{MY_BUCKET}/{KEY}'

In [None]:
# Invoke async endpoint

async_response = sagemaker_runtime.invoke_endpoint_async(
        EndpointName=ASYNC_ENDPOINT_NAME, 
        InputLocation=s3_input_uri,
        InvocationTimeoutSeconds=3600
        )

In [None]:
# Fetch and parse async inference result from S3

from urllib.parse import urlparse

output_s3_uri = async_response['OutputLocation']

# Parse the S3 URI to extract bucket name and object key
parsed_uri = urlparse(output_s3_uri)
bucket_name = parsed_uri.netloc
object_key = parsed_uri.path.lstrip('/')

# Read the object directly into memory
response = s3.get_object(Bucket=bucket_name, Key=object_key)
data = response['Body'].read().decode()
data = json.loads(data)
data

### Draw bounding boxes

In [None]:
class_map = {
    0: 'person',
    1: 'bicycle',
    2: 'car',
    3: 'motorcycle',
    4: 'airplane',
    5: 'bus',
    6: 'train',
    7: 'truck',
    8: 'boat',
    9: 'traffic light',
    10: 'fire hydrant',
    11: 'stop sign',
    12: 'parking meter',
    13: 'bench',
    14: 'bird',
    15: 'cat',
    16: 'dog',
    17: 'horse',
    18: 'sheep',
    19: 'cow',
    20: 'elephant',
    21: 'bear',
    22: 'zebra',
    23: 'giraffe',
    24: 'backpack',
    25: 'umbrella',
    26: 'handbag',
    27: 'tie',
    28: 'suitcase',
    29: 'frisbee',
    30: 'skis',
    31: 'snowboard',
    32: 'sports ball',
    33: 'kite',
    34: 'baseball bat',
    35: 'baseball glove',
    36: 'skateboard',
    37: 'surfboard',
    38: 'tennis racket',
    39: 'bottle',
    40: 'wine glass',
    41: 'cup',
    42: 'fork',
    43: 'knife',
    44: 'spoon',
    45: 'bowl',
    46: 'banana',
    47: 'apple',
    48: 'sandwich',
    49: 'orange',
    50: 'broccoli',
    51: 'carrot',
    52: 'hot dog',
    53: 'pizza',
    54: 'donut',
    55: 'cake',
    56: 'chair',
    57: 'couch',
    58: 'potted plant',
    59: 'bed',
    60: 'dining table',
    61: 'toilet',
    62: 'tv',
    63: 'laptop',
    64: 'mouse',
    65: 'remote',
    66: 'keyboard',
    67: 'cell phone',
    68: 'microwave',
    69: 'oven',
    70: 'toaster',
    71: 'sink',
    72: 'refrigerator',
    73: 'book',
    74: 'clock',
    75: 'vase',
    76: 'scissors',
    77: 'teddy bear',
    78: 'hair drier',
    79: 'toothbrush'
}

In [None]:
def draw_bounding_boxes(image_path, bounding_boxes, min_confidence=0.4):

    # Method to get colors for different classes
    def get_color(color_map, key):
        # Check if the key already has a color assigned
        if key in color_map:
            return color_map[key]
        else:
            # Generate a random color
            color = (random.randint(0, 255), random.randint(50, 200), random.randint(50, 200))
            # Assign the random color to the key
            color_map[key] = color
            return color

    
    # Keep track of class colors
    color_map = {}

    # Open the image using PIL
    image = Image.open(image_path)
    
    # Create a drawing object
    draw = ImageDraw.Draw(image)

    # Load a font
    font_size = 48
    font = ImageFont.load_default(font_size)

    # Draw bounding boxes on the image
    for box in bounding_boxes:
        left, top, right, bottom, confidence, class_id = box

        # Get color for the class
        color = get_color(color_map, class_id)

        # If confidence is below the minimum threshold, skip the bounding box
        if confidence < min_confidence:
            continue

        box_coords = [(left, top), (right, top), (right, bottom), (left, bottom), (left, top)]
        draw.line(box_coords, fill=color, width=4)

        # Get class name and create text label
        class_name = class_map[int(class_id)]
        text = f"{class_name}: {round(confidence,2)}"
        
        # Get size of text
        (width, baseline), (offset_x, offset_y) = font.font.getsize(text)

        # Draw class name at the top left corner of the bounding box
        draw.rectangle([left, top, left + width, top + font_size+5], fill=color)
        draw.text((left, top), text, fill='white', font=font)
    
    # Display the image
    display(image)

draw_bounding_boxes(image_path, bounding_boxes, 0.1)