## Creating a Labeling Job

Using the locally generated manifest JSON-lines files, we can create a Labeling Job in Ground Truth. The labeling job must be assigned to a workteam and a worker's template can be provided.

In [None]:
import sagemaker
import time
import boto3
import json

In [None]:
sm_session = sagemaker.session.Session()
region = sm_session.boto_region_name
default_bucket = sm_session.default_bucket()
role = sagemaker.get_execution_role()
s3 = boto3.client("s3")

BUCKET = default_bucket
PREFIX = "groundtruth_demo"
ACCOUNT_ID = sm_session.account_id()

### Util Functions

In [None]:
s3_resource = boto3.resource("s3")
sagemaker_client = boto3.client('sagemaker')

# Util function to upload to S3
def upload_s3(local_file, bucket, prefix):
    s3.upload_file(local_file, bucket, "{}/{}".format(prefix, local_file))
    s3_path = "s3://{}/{}/{}".format(bucket, prefix, local_file)
    print("Uploading {} to {}".format(local_file, s3_path))
    return s3_path

# Util function to read JSON from S3 path
def print_json(bucket, key, N=3):
    obj = s3_resource.Object(bucket, key)
    json_text = obj.get()['Body'].read().decode('utf-8')
    for line in json_text.splitlines()[0:N]:
        print(json.dumps(json.loads(line), indent=4))
        
# Launch a labeling Job
def launch_job(labeling_job_config):
    
    print("Launching job {}".format(labeling_job_config['LabelingJobName']))
    sagemaker_client.create_labeling_job(**labeling_job_config)
    
    status = "InProgress"

    # We keep monitoring the Job Status
    while(status == 'InProgress'):
        job_status = sagemaker_client.describe_labeling_job(LabelingJobName=labeling_job_config['LabelingJobName'])
        status = job_status['LabelingJobStatus']
        print(status)
        print(job_status['LabelCounters'])
        time.sleep(30)

### Required Templates or Files

We will need for the labeling job:

- UI Template (SageMaker GroundTruth provided built-in, or custom)
- Input Manifest (what are the images we will label, their location on S3)
- Class Labels file (What are the labels we will use in our labeling task)

#### Input Manifest

Describes objects for annotation, potentially with annotations from previous labeling jobs.

Let's upload a clean input manifest with only reference to the objects to be annotated

In [None]:
manifest_name = "input.manifest"
input_manifest_path = upload_s3(manifest_name, BUCKET, PREFIX)

#### UI Template for Workers

UI follows the Liquid template language. Liquid is an open-source template language created by Shopify and written in Ruby.

You can find more template UI samples to start from at:

https://github.com/aws-samples/amazon-sagemaker-ground-truth-task-uis

and

https://github.com/aws-samples/amazon-a2i-sample-task-uis

We will use a different template for each different type of labeling task.

#### Class Labels File

A JSON file uploaded to S3 that describes the list of label(s) used in the labeling job.

E.g.

In [None]:
CLASS_LIST = ["Chicken", "Bird"]
json.dumps({"labels": [{"label": label} for label in CLASS_LIST]})

## Make sure Images Bucket has CORS Enabled

If you create a job through the Ground Truth console, CORS is enabled by default. Earlier in 2020, widely used browsers like Chrome and Firefox changed their default behavior for rotating images based on image metadata, referred to as EXIF data. The web standards group W3C has decided that the ability to control rotation of images violates the web’s Same-origin Policy, as such to ensure human workers annotate your input images in a predictable orientation you must add a CORS header policy to the Amazon S3 buckets that contain your input images.

In [None]:
s3.put_bucket_cors(
    Bucket=default_bucket,
    CORSConfiguration={
        'CORSRules': [
            {
                'ID': 'corsrulesgroundtruth',
                'AllowedMethods': ['GET'],
                'AllowedOrigins': ['*'],
                'ExposeHeaders': ['Access-Control-Allow-Origin']
            }
        ]
    }
)

### Create Labeling Job

Let's start with the simplest example, a Classification job for images, whether they show a bird that is flying, or other. 

We decide the list of classes, and set a number of variables related to the job we're about to launch. We upload the classes to S3

We also upload the template we are going to use in our labeling task.

In [None]:
CLASS_LIST = ["Flying", "Other"]
with open("class_labels.json", "w") as f:
    json.dump({"labels": [{"label": label} for label in CLASS_LIST]}, f)

task_description = "Does the picture depict a bird that is flying?"
task_keywords = ["Image", "Classification"]
task_title = "Please decide whether picture shows bird flying, or other"
job_name = "ground-truth-class-demo-" + str(int(time.time()))

TEMPLATE='groundtruth-class-template.liquid'

template_path = upload_s3(TEMPLATE, BUCKET, "{}/{}".format(PREFIX, job_name))
labels_path = upload_s3("class_labels.json", BUCKET, "{}/{}".format(PREFIX, job_name))

We define the Lambda functions to prepare image for annotation, and to consolidate multiple worker annotations. These are provided by AWS (but can be customized).

Refer to the AWS-provided function ARNs at:

https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_HumanTaskConfig.html

And

https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_AnnotationConsolidationConfig.html

We also select the type of workforce we want to use, for private workforce, refer to the ARN to the workforce in the account where it was created.

In [None]:
# Select Private Workforce
TEAM_NAME = "<add-your-created-team-name-here>"
private_workteam_arn = "arn:aws:sagemaker:eu-central-1:{}:workteam/private-crowd/{}".format(ACCOUNT_ID, TEAM_NAME)
# Pre-labeling task Lambda function (AWS-provided)
prehuman_arn = "arn:aws:lambda:{}:{}:function:PRE-ImageMultiClass".format(region, "203001061592")
# Annotation consolidation Lambda (AWS-provided)
acs_arn = "arn:aws:lambda:{}:{}:function:ACS-ImageMultiClass".format(region, "203001061592")

Now we create the labeling job definition

In [None]:
labeling_job_config = {
    "InputConfig": {
        "DataSource": {
            "S3DataSource": {
                "ManifestS3Uri": input_manifest_path,
            }
        },
        "DataAttributes": {
            "ContentClassifiers": ["FreeOfPersonallyIdentifiableInformation", "FreeOfAdultContent"]
        },
    },
    "OutputConfig": {
        "S3OutputPath": "s3://{}/{}/output/".format(BUCKET, PREFIX),
    },
    "HumanTaskConfig": {
        "WorkteamArn": private_workteam_arn,
        "PreHumanTaskLambdaArn": prehuman_arn,
        "AnnotationConsolidationConfig": {
            "AnnotationConsolidationLambdaArn": acs_arn,
        },
        "MaxConcurrentTaskCount": 200,  # 200 images will be sent at a time to the workteam.
        "NumberOfHumanWorkersPerDataObject": 1,  # We will obtain and consolidate x human annotations for each image.
        "TaskAvailabilityLifetimeInSeconds": 21600,  # Your workteam has 6 hours to complete all pending tasks.
        "TaskDescription": task_description,
        "TaskKeywords": task_keywords,
        "TaskTimeLimitInSeconds": 300,  # Each image must be labeled within 5 minutes.
        "TaskTitle": task_title,
        "UiConfig": {
            "UiTemplateS3Uri": template_path,
        },
    },
    "LabelingJobName": job_name,
    "RoleArn": role,
    "LabelAttributeName": job_name,
    "LabelCategoryConfigS3Uri": labels_path,
}

### Launch Classification Job with Private Workforce

In [None]:
launch_job(labeling_job_config)

Let's inspect the output

In [None]:
print_json(BUCKET, "{}/output/{}/manifests/output/output.manifest".format(PREFIX, job_name), N=10)

### Launch Object Detection Labeling Job with Private Workforce

Let's now launch an OD job where we draw boxes around birds in images. We change some task parameters, as well as the template, and launch it

In [None]:
CLASS_LIST = ["Bird"]
LABEL_FILE = "od_labels.json"
TEMPLATE='groundtruth-od-template.liquid'

with open(LABEL_FILE, "w") as f:
    json.dump({"labels": [{"label": label} for label in CLASS_LIST]}, f)

task_description = "Draw a bounding box around birds in the picture"
task_keywords = ["Image", "Object", "Detection"]
task_title = "Bounding box around any birds"
job_name = "ground-truth-od-demo-" + str(int(time.time()))

template_path = upload_s3(TEMPLATE, BUCKET, "{}/{}".format(PREFIX, job_name))
labels_path = upload_s3(LABEL_FILE, BUCKET, "{}/{}".format(PREFIX, job_name))

# New Lamda pre and post processing functions
prehuman_arn = "arn:aws:lambda:{}:{}:function:PRE-BoundingBox".format(region, "203001061592")
acs_arn = "arn:aws:lambda:{}:{}:function:ACS-BoundingBox".format(region, "203001061592")

In [None]:
labeling_job_config['LabelingJobName'] = job_name
labeling_job_config['LabelAttributeName'] = job_name
labeling_job_config['LabelCategoryConfigS3Uri'] = labels_path
labeling_job_config['HumanTaskConfig']['WorkteamArn'] = private_workteam_arn
labeling_job_config['HumanTaskConfig']['TaskDescription'] = task_description
labeling_job_config['HumanTaskConfig']['TaskTitle'] = task_title
labeling_job_config['HumanTaskConfig']['TaskKeywords'] = task_keywords
labeling_job_config['HumanTaskConfig']['NumberOfHumanWorkersPerDataObject'] = 1
labeling_job_config['HumanTaskConfig']['PreHumanTaskLambdaArn'] = prehuman_arn
labeling_job_config['HumanTaskConfig']['AnnotationConsolidationConfig']['AnnotationConsolidationLambdaArn'] = acs_arn
labeling_job_config['HumanTaskConfig']['UiConfig']['UiTemplateS3Uri'] = template_path

launch_job(labeling_job_config)

Let's inspect output

In [None]:
print_json(BUCKET, "{}/output/{}/manifests/output/output.manifest".format(PREFIX, job_name), N=3)

### Launch Object Detection Labeling Job with Private Workforce - 2 Workers

Let's take a look at annotation consolidation

In [None]:
job_name = "ground-truth-od-demo-2workers-" + str(int(time.time()))

labeling_job_config['LabelingJobName'] = job_name
labeling_job_config['LabelAttributeName'] = job_name
labeling_job_config['HumanTaskConfig']['NumberOfHumanWorkersPerDataObject'] = 2

launch_job(labeling_job_config)

In [None]:
#Inspect output
print_json(BUCKET, "{}/output/{}/manifests/output/output.manifest".format(PREFIX, job_name), N=10)

**Note** Since we have two workers labeling each object, we now have a calculated confidence score.

### Launch Object Detection Labeling Job with Public Workforce

This uses Amazon Mechanical Turk to label our data. In addition to changing the workforce, we need to specify an additional price for each human annotator. There is a recommended list of prices for the different labeling tasks at: https://aws.amazon.com/sagemaker/data-labeling/pricing/

For Object Detection, the suggested price is 3.6c ($) per labeler and data object. More complex labeling such as image segmentation is suggested to be priced at 84c per labeler, due to the increased manual work. Image classification is cheaper at 1.2c.

In [None]:
# Public Workforce (Mechanical Turk) Arn
public_workteam_arn = "arn:aws:sagemaker:{}:394669845002:workteam/public-crowd/default".format(region)
job_name = "ground-truth-od-demo-public-workforce" + str(int(time.time()))

labeling_job_config['HumanTaskConfig']['WorkteamArn'] = public_workteam_arn
labeling_job_config['HumanTaskConfig']['PublicWorkforceTaskPrice'] = {
    "AmountInUsd": {
        "Dollars": 0,
        "Cents": 3,
        "TenthFractionsOfACent": 6,
    }
}
# Let's set number of labelers per image to 2
labeling_job_config['HumanTaskConfig']['NumberOfHumanWorkersPerDataObject'] = 2
labeling_job_config['LabelingJobName'] = job_name

launch_job(labeling_job_config)

In [None]:
#Inspect output
print_json(BUCKET, "{}/output/{}/manifests/output/output.manifest".format(PREFIX, job_name), N=3)

### Launch Image Segmentation Labeling Job with Private Workforce

Now let's use another type of Labeling, namely pixel by pixel classification.

In [None]:
CLASS_LIST = ["Bird", "Background", "Tree"]
LABEL_FILE = "seg_labels.json"
TEMPLATE='groundtruth-segmentation-template.liquid'

with open(LABEL_FILE, "w") as f:
    json.dump({"labels": [{"label": label} for label in CLASS_LIST]}, f)

task_description = "Create image segments according to available labels"
task_keywords = ["Image", "Segmentation"]
task_title = "Classify pixels according to labels"
job_name = "ground-truth-seg-demo-" + str(int(time.time()))

template_path = upload_s3(TEMPLATE, BUCKET, "{}/{}".format(PREFIX, job_name))
labels_path = upload_s3(LABEL_FILE, BUCKET, "{}/{}".format(PREFIX, job_name))

# New Lamda pre and post processing functions
prehuman_arn = "arn:aws:lambda:{}:{}:function:PRE-SemanticSegmentation".format(region, "203001061592")
acs_arn = "arn:aws:lambda:{}:{}:function:ACS-SemanticSegmentation".format(region, "203001061592")

labeling_job_config['LabelingJobName'] = job_name
labeling_job_config['LabelAttributeName'] = job_name + '-ref'
labeling_job_config['LabelCategoryConfigS3Uri'] = labels_path
labeling_job_config['HumanTaskConfig']['WorkteamArn'] = private_workteam_arn
try:
    del(labeling_job_config['HumanTaskConfig']['PublicWorkforceTaskPrice']) # Remove price information since we're going to use private workforce
except:
    pass
labeling_job_config['HumanTaskConfig']['TaskDescription'] = task_description
labeling_job_config['HumanTaskConfig']['TaskKeywords'] = task_keywords
labeling_job_config['HumanTaskConfig']['TaskTitle'] = task_title
labeling_job_config['HumanTaskConfig']['NumberOfHumanWorkersPerDataObject'] = 1
labeling_job_config['HumanTaskConfig']['PreHumanTaskLambdaArn'] = prehuman_arn
labeling_job_config['HumanTaskConfig']['AnnotationConsolidationConfig']['AnnotationConsolidationLambdaArn'] = acs_arn
labeling_job_config['HumanTaskConfig']['UiConfig']['UiTemplateS3Uri'] = template_path

launch_job(labeling_job_config)

In [None]:
#Inspect output
print_json(BUCKET, "{}/output/{}/manifests/output/output.manifest".format(PREFIX, job_name), N=1)

In a segmentation job, the output contains a reference to a generated PNG file of the same dimensions of the original image, with labels as pixel values.