# Creating SageMaker Ground Truth Labeling Jobs

This notebook demonstrates how to create and configure SageMaker Ground Truth labeling jobs for drone imagery object detection.

## Overview

SageMaker Ground Truth helps you build high-quality training datasets for your machine learning models. This notebook will guide you through:

1. Setting up your data in S3
2. Configuring a labeling job for object detection
3. Selecting and managing your workforce
4. Monitoring job progress
5. Accessing and using labeled data

## Prerequisites

- AWS account with appropriate permissions
- Access to SageMaker Studio
- Drone imagery dataset in S3 bucket
- `ground_truth_utils.py` module installed

## 1. Import Required Libraries

In [None]:
import boto3
import sagemaker
import pandas as pd
import numpy as np
import json
import os
import datetime
import time
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import io
from PIL import Image, ImageDraw

# Import our custom Ground Truth utilities
from src.data.ground_truth_utils import (
    create_labeling_job_config,
    monitor_labeling_job,
    convert_ground_truth_to_yolo,
    estimate_labeling_cost,
    create_labeling_instructions
)

# Set up AWS session with 'ab' profile
session = boto3.Session(profile_name='ab')
sagemaker_client = session.client('sagemaker')
s3_client = session.client('s3')

# Initialize SageMaker session
sagemaker_session = sagemaker.Session(boto_session=session)

# Set default bucket
bucket = "lucaskle-ab3-project-pv"
prefix = "ground-truth-jobs"

## 2. Configure Your Dataset

First, we need to prepare our dataset for labeling. Ground Truth requires a manifest file that lists all the images to be labeled.

In [None]:
# Create interactive widgets for dataset configuration
bucket_input = widgets.Text(
    value=bucket,
    description='S3 Bucket:',
    style={'description_width': 'initial'}
)

prefix_input = widgets.Text(
    value="raw-images/",
    description='Image Prefix:',
    style={'description_width': 'initial'}
)

max_images = widgets.IntSlider(
    value=100,
    min=10,
    max=1000,
    step=10,
    description='Max Images:',
    style={'description_width': 'initial'}
)

# Display widgets
display(bucket_input, prefix_input, max_images)

In [None]:
# Function to list images in S3
def list_images():
    # Define the S3 path where your images are stored
    image_path = f"s3://{bucket_input.value}/{prefix_input.value}"
    
    print(f"Listing images in {image_path}...")
    
    # List all images in the bucket
    response = s3_client.list_objects_v2(
        Bucket=bucket_input.value,
        Prefix=prefix_input.value
    )
    
    # Filter for image files
    image_extensions = [".jpg", ".jpeg", ".png"]
    image_files = []
    
    if 'Contents' in response:
        for obj in response['Contents']:
            key = obj['Key']
            if any(key.lower().endswith(ext) for ext in image_extensions):
                image_files.append(key)
    
    # Limit to max_images
    image_files = image_files[:max_images.value]
    
    print(f"Found {len(image_files)} images for labeling")
    
    # Display the first few images
    if image_files:
        print("\nFirst 5 images:")
        for i, image in enumerate(image_files[:5]):
            print(f"  {i+1}. {image}")
    
    return image_files

# Create button to list images
list_button = widgets.Button(
    description='List Images',
    button_style='info',
    icon='search'
)

output = widgets.Output()

def on_list_button_clicked(b):
    with output:
        clear_output()
        global image_files
        image_files = list_images()

list_button.on_click(on_list_button_clicked)

display(list_button, output)

### Create a manifest file for Ground Truth

In [None]:
# Function to create manifest file
def create_manifest(image_files):
    # Create a manifest file for Ground Truth
    manifest_data = []
    
    # Show progress bar
    with tqdm(total=len(image_files), desc="Creating manifest") as pbar:
        for image_file in image_files:
            manifest_data.append({
                "source-ref": f"s3://{bucket_input.value}/{image_file}"
            })
            pbar.update(1)
    
    # Write manifest to a local file
    manifest_file = "manifest.json"
    with open(manifest_file, 'w') as f:
        for item in manifest_data:
            f.write(json.dumps(item) + '\n')
    
    # Generate a unique manifest key
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
    manifest_s3_key = f"{prefix}/manifest/manifest-{timestamp}.json"
    
    # Upload manifest to S3
    s3_client.upload_file(manifest_file, bucket_input.value, manifest_s3_key)
    
    print(f"Manifest file uploaded to s3://{bucket_input.value}/{manifest_s3_key}")
    return f"s3://{bucket_input.value}/{manifest_s3_key}"

# Create button to create manifest
manifest_button = widgets.Button(
    description='Create Manifest',
    button_style='success',
    icon='file-upload',
    disabled=True
)

manifest_output = widgets.Output()

def on_manifest_button_clicked(b):
    with manifest_output:
        clear_output()
        global manifest_uri
        manifest_uri = create_manifest(image_files)
        # Enable the next step
        job_config_button.disabled = False

manifest_button.on_click(on_manifest_button_clicked)

# Enable manifest button after listing images
def enable_manifest_button(b):
    manifest_button.disabled = False

list_button.on_click(enable_manifest_button)

display(manifest_button, manifest_output)

## 3. Configure Labeling Job

Now we'll configure our labeling job for object detection.

In [None]:
# Create interactive widgets for job configuration
job_name_input = widgets.Text(
    value=f"drone-detection-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}",
    description='Job Name:',
    style={'description_width': 'initial'}
)

task_type = widgets.Dropdown(
    options=['BoundingBox', 'ImageClassification'],
    value='BoundingBox',
    description='Task Type:',
    style={'description_width': 'initial'}
)

worker_type = widgets.Dropdown(
    options=['private', 'public'],
    value='private',
    description='Worker Type:',
    style={'description_width': 'initial'}
)

categories_input = widgets.TagsInput(
    value=['drone', 'vehicle', 'person', 'building'],
    description='Categories:',
    style={'description_width': 'initial'}
)

instructions_input = widgets.Textarea(
    value='Label all drones and other objects visible in the image.',
    description='Instructions:',
    layout={'width': '100%', 'height': '100px'},
    style={'description_width': 'initial'}
)

max_budget = widgets.FloatSlider(
    value=100.0,
    min=10.0,
    max=1000.0,
    step=10.0,
    description='Max Budget ($):',
    style={'description_width': 'initial'}
)

# Display widgets
display(job_name_input, task_type, worker_type, categories_input, instructions_input, max_budget)

In [None]:
# Function to estimate labeling cost
def update_cost_estimate(change=None):
    # Get the number of images
    try:
        num_images = len(image_files)
    except NameError:
        num_images = max_images.value
    
    # Estimate cost
    cost_estimate = estimate_labeling_cost(
        num_images=num_images,
        task_type=task_type.value,
        worker_type=worker_type.value,
        objects_per_image=3.0  # Assuming average of 3 objects per image
    )
    
    # Update cost display
    with cost_output:
        clear_output()
        print(f"Cost Estimate for {num_images} images:")
        print(f"Base Cost: ${cost_estimate['base_cost']:.2f}")
        print(f"Adjusted Cost: ${cost_estimate['adjusted_cost']:.2f}")
        print(f"Service Cost: ${cost_estimate['service_cost']:.2f}")
        print(f"Total Estimated Cost: ${cost_estimate['total_cost']:.2f}")

# Create button to estimate cost
cost_button = widgets.Button(
    description='Estimate Cost',
    button_style='warning',
    icon='dollar-sign'
)

cost_output = widgets.Output()

cost_button.on_click(update_cost_estimate)

# Update cost when parameters change
task_type.observe(update_cost_estimate, names='value')
worker_type.observe(update_cost_estimate, names='value')

display(cost_button, cost_output)

In [None]:
# Function to create job configuration
def create_job_config():
    # Define output path
    output_path = f"s3://{bucket_input.value}/{prefix}/output/"
    
    # Create labeling instructions
    instructions = create_labeling_instructions(
        task_type=task_type.value,
        categories=categories_input.value
    )
    
    # Create labeling job configuration
    labeling_job_config = create_labeling_job_config(
        job_name=job_name_input.value,
        input_path=manifest_uri,
        output_path=output_path,
        task_type=task_type.value,
        worker_type=worker_type.value,
        labels=categories_input.value,
        instructions=instructions_input.value,
        max_budget_usd=max_budget.value
    )
    
    return labeling_job_config, output_path

# Create button to configure job
job_config_button = widgets.Button(
    description='Configure Job',
    button_style='info',
    icon='cog',
    disabled=True
)

job_config_output = widgets.Output()

def on_job_config_button_clicked(b):
    with job_config_output:
        clear_output()
        global labeling_job_config, output_path
        labeling_job_config, output_path = create_job_config()
        
        # Display key configuration parameters
        print(f"Job Name: {job_name_input.value}")
        print(f"Input Manifest: {manifest_uri}")
        print(f"Output Path: {output_path}")
        print(f"Task Type: {task_type.value} Object Detection")
        print(f"Categories: {categories_input.value}")
        print(f"Worker Type: {worker_type.value}")
        print(f"Max Budget: ${max_budget.value:.2f} USD")
        
        # Enable the next step
        start_job_button.disabled = False

job_config_button.on_click(on_job_config_button_clicked)

display(job_config_button, job_config_output)

## 4. Create and Start the Labeling Job

In [None]:
# Function to start labeling job
def start_labeling_job(labeling_job_config):
    # Create the labeling job
    response = sagemaker_client.create_labeling_job(**labeling_job_config)
    
    # Get the labeling job ARN
    labeling_job_arn = response['LabelingJobArn']
    
    print(f"Created labeling job: {labeling_job_config['LabelingJobName']}")
    print(f"Job ARN: {labeling_job_arn}")
    
    return labeling_job_config['LabelingJobName']

# Create button to start job
start_job_button = widgets.Button(
    description='Start Labeling Job',
    button_style='success',
    icon='play',
    disabled=True
)

start_job_output = widgets.Output()

def on_start_job_button_clicked(b):
    with start_job_output:
        clear_output()
        global job_name
        job_name = start_labeling_job(labeling_job_config)
        # Enable the next step
        monitor_job_button.disabled = False

start_job_button.on_click(on_start_job_button_clicked)

display(start_job_button, start_job_output)

## 5. Monitor Labeling Job Progress

Now we can monitor the progress of our labeling job.

In [None]:
# Function to monitor job progress
def monitor_job_progress(job_name):
    # Monitor the labeling job
    job_status = monitor_labeling_job(job_name, sagemaker_client)
    
    print(f"Job Status: {job_status['LabelingJobStatus']}")
    print(f"Total Objects: {job_status['LabelCounters']['TotalObjects']}")
    print(f"Labeled Objects: {job_status['LabelCounters']['LabeledObjects']}")
    print(f"Failed Objects: {job_status['LabelCounters']['FailedObjects']}")
    
    # Calculate completion percentage
    if job_status['LabelCounters']['TotalObjects'] > 0:
        completion_percentage = (job_status['LabelCounters']['LabeledObjects'] / 
                                job_status['LabelCounters']['TotalObjects']) * 100
        print(f"Completion: {completion_percentage:.2f}%")
        
        # Create a progress bar
        progress_bar = widgets.FloatProgress(
            value=completion_percentage,
            min=0,
            max=100.0,
            description='Progress:',
            bar_style='info',
            style={'bar_color': '#2196F3'}
        )
        display(progress_bar)
    
    return job_status

# Create button to monitor job
monitor_job_button = widgets.Button(
    description='Monitor Job',
    button_style='info',
    icon='search',
    disabled=True
)

monitor_job_output = widgets.Output()

def on_monitor_job_button_clicked(b):
    with monitor_job_output:
        clear_output()
        job_status = monitor_job_progress(job_name_input.value)
        
        # Enable the next step if job is complete
        if job_status['LabelingJobStatus'] == 'Completed':
            process_labels_button.disabled = False

monitor_job_button.on_click(on_monitor_job_button_clicked)

# Create auto-refresh checkbox
auto_refresh = widgets.Checkbox(
    value=False,
    description='Auto-refresh (30s)',
    disabled=False
)

def auto_refresh_monitor(change):
    if change['new']:
        # Start auto-refresh
        global refresh_timer
        refresh_timer = widgets.Output()
        display(refresh_timer)
        
        def refresh_loop():
            while auto_refresh.value:
                with monitor_job_output:
                    clear_output()
                    job_status = monitor_job_progress(job_name_input.value)
                    
                    # Enable the next step if job is complete
                    if job_status['LabelingJobStatus'] == 'Completed':
                        process_labels_button.disabled = False
                        auto_refresh.value = False
                        break
                
                # Wait 30 seconds before refreshing
                with refresh_timer:
                    clear_output()
                    for i in range(30, 0, -1):
                        if not auto_refresh.value:
                            break
                        print(f"Refreshing in {i} seconds...")
                        time.sleep(1)
                        clear_output()
        
        import threading
        threading.Thread(target=refresh_loop).start()

auto_refresh.observe(auto_refresh_monitor, names='value')

display(widgets.HBox([monitor_job_button, auto_refresh]), monitor_job_output)

## 6. Process Completed Labels

Once the labeling job is complete, we can process the results and convert them to YOLOv11 format.

In [None]:
# Function to process labels
def process_labels(job_name):
    # Check if the job is complete
    job_status = monitor_labeling_job(job_name, sagemaker_client)
    
    if job_status['LabelingJobStatus'] == 'Completed':
        # Get the output manifest
        output_manifest = f"{output_path}{job_name}/manifests/output/output.manifest"
        
        # Define the output directory for YOLOv11 format
        yolo_output_dir = f"s3://{bucket_input.value}/{prefix}/yolo-format/{job_name}/"
        
        # Create class mapping
        class_mapping = {category: i for i, category in enumerate(categories_input.value)}
        
        print(f"Converting Ground Truth output to YOLOv11 format")
        print(f"Class mapping: {class_mapping}")
        
        # Show progress bar for conversion
        with tqdm(total=100, desc="Converting") as pbar:
            # Simulate progress for now (actual implementation would track real progress)
            for i in range(10):
                time.sleep(0.5)
                pbar.update(10)
            
            # Convert Ground Truth output to YOLOv11 format
            convert_ground_truth_to_yolo(
                input_manifest=output_manifest,
                output_directory=yolo_output_dir,
                class_mapping=class_mapping
            )
        
        print(f"Converted annotations saved to: {yolo_output_dir}")
        return yolo_output_dir
    else:
        print(f"Labeling job is not yet complete. Current status: {job_status['LabelingJobStatus']}")
        return None

# Create button to process labels
process_labels_button = widgets.Button(
    description='Process Labels',
    button_style='success',
    icon='check',
    disabled=True
)

process_labels_output = widgets.Output()

def on_process_labels_button_clicked(b):
    with process_labels_output:
        clear_output()
        global yolo_output_dir
        yolo_output_dir = process_labels(job_name_input.value)
        if yolo_output_dir:
            # Enable the next step
            visualize_button.disabled = False

process_labels_button.on_click(on_process_labels_button_clicked)

display(process_labels_button, process_labels_output)

## 7. Visualize Labeled Data

Let's visualize some of the labeled images to verify the quality of annotations.

In [None]:
# Function to visualize labeled data
def visualize_labeled_data(manifest_path, num_samples=5):
    print("Visualizing labeled data...")
    
    # This is a placeholder implementation
    # In a real implementation, you would:
    # 1. Download the manifest file from S3
    # 2. Parse the JSON lines to extract annotations
    # 3. Download the images from S3
    # 4. Draw bounding boxes on the images
    # 5. Display the annotated images
    
    # Create a figure with subplots
    fig, axes = plt.subplots(1, num_samples, figsize=(20, 4))
    
    # For demonstration, create some sample visualizations
    for i in range(num_samples):
        # Create a blank image
        img = Image.new('RGB', (640, 480), color=(240, 240, 240))
        draw = ImageDraw.Draw(img)
        
        # Draw some sample bounding boxes
        colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0)]
        labels = categories_input.value
        
        # Draw 2-3 random boxes
        num_boxes = np.random.randint(2, 4)
        for j in range(num_boxes):
            # Random box coordinates
            x1 = np.random.randint(50, 500)
            y1 = np.random.randint(50, 350)
            x2 = x1 + np.random.randint(50, 150)
            y2 = y1 + np.random.randint(50, 150)
            
            # Random label
            label_idx = np.random.randint(0, len(labels))
            label = labels[label_idx]
            color = colors[label_idx % len(colors)]
            
            # Draw box
            draw.rectangle([x1, y1, x2, y2], outline=color, width=3)
            draw.text((x1, y1-15), label, fill=color)
        
        # Convert to numpy array for matplotlib
        img_array = np.array(img)
        
        # Display in subplot
        axes[i].imshow(img_array)
        axes[i].set_title(f"Sample {i+1}")
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.show()
    
    print("In a real implementation, this would show actual labeled images from your dataset.")
    print("The visualization would include:")
    print("  - Loading images from S3")
    print("  - Drawing bounding boxes based on annotations")
    print("  - Displaying images with annotations in the notebook")

# Create button to visualize labels
visualize_button = widgets.Button(
    description='Visualize Labels',
    button_style='info',
    icon='image',
    disabled=True
)

visualize_output = widgets.Output()

def on_visualize_button_clicked(b):
    with visualize_output:
        clear_output()
        # In a real implementation, you would use the actual manifest path
        manifest_path = f"{output_path}{job_name_input.value}/manifests/output/output.manifest"
        visualize_labeled_data(manifest_path, sample_size.value)

visualize_button.on_click(on_visualize_button_clicked)

# Sample size slider
sample_size = widgets.IntSlider(
    value=5,
    min=1,
    max=20,
    step=1,
    description='Samples:',
    style={'description_width': 'initial'}
)

display(widgets.HBox([visualize_button, sample_size]), visualize_output)

## 8. Next Steps

Now that you have labeled data in YOLOv11 format, you can:

1. Use the labeled data for training a YOLOv11 model
2. Analyze the quality of annotations
3. Create additional labeling jobs for more data
4. Integrate the labeled data into your SageMaker Pipeline

For more information, see the following resources:
- [SageMaker Ground Truth Documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/sms.html)
- [YOLOv11 Training Guide](notebooks/model-development/yolov11-training.ipynb)