# Setup Private Workforce

Create your own private workforce for human reviews.

In [None]:
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.client('sagemaker')
s3 = boto3.client("s3", region)
cognito_idp = boto3.client('cognito-idp')
a2i = boto3.client("sagemaker-a2i-runtime")

# Setup Cognito Pool

In [None]:
import time

timestamp = int(time.time())

user_pool_name = 'groundtruth-user-pool-{}'.format(timestamp)

create_user_pool_response = cognito_idp.create_user_pool(PoolName=user_pool_name)

user_pool_id = create_user_pool_response['UserPool']['Id']

print(user_pool_id)

In [None]:
create_user_pool_client_response = cognito_idp.create_user_pool_client(UserPoolId=user_pool_id,
                                                                       ClientName='groundtruth-user-pool-client-{}'.format(timestamp),
                                                                       GenerateSecret=True,
                                                                       SupportedIdentityProviders=[
                                                                          'COGNITO'
                                                                       ],
                                                                       AllowedOAuthFlows=[
                                                                          'code',
                                                                          'implicit'
                                                                       ],
                                                                       AllowedOAuthScopes=[
                                                                           'email',
                                                                           'openid',
                                                                           'profile'
                                                                       ],
                                                                       CallbackURLs=[
                                                                           'https://datascienceonaws.com', 
                                                                       ],
                                                                       AllowedOAuthFlowsUserPoolClient=True)
create_user_pool_client_response

client_id = create_user_pool_client_response['UserPoolClient']['ClientId']

In [None]:
cognito_idp.create_user_pool_domain(UserPoolId=user_pool_id,
                                    Domain='groundtruth-user-pool-domain-{}'.format(timestamp))


In [None]:
user_group_name = 'sagemaker-groundtruth-user-group-{}'.format(timestamp)

cognito_idp.create_group(GroupName=user_group_name,
                         UserPoolId=user_pool_id)

In [None]:
sm.list_workteams()
# sm.delete_workteam(WorkteamName='groundtruth-workteam-1620927720')

In [None]:
sm.list_workforces()
#sm.delete_workforce(WorkforceName='groundtruth-workforce-name-1620949528')

In [None]:
workforce_name = 'groundtruth-workforce-name-{}'.format(timestamp)

create_workforce_response = sm.create_workforce(WorkforceName=workforce_name,
                                                CognitoConfig={
                                                    'UserPool': user_pool_id,
                                                    'ClientId': client_id
                                                })

create_workforce_response

In [None]:
sm.list_workforces()

In [None]:
describe_workforce_response = sm.describe_workforce(WorkforceName=workforce_name)
describe_workforce_response

In [None]:
# sm.delete_workforce(WorkforceName='groundtruth-workforce-name-1620949723')

In [None]:
workteam_name = 'groundtruth-workteam-{}'.format(timestamp)

In [None]:
# TODO: Add sleep

In [None]:
create_workteam_response = sm.create_workteam(
    Description='groundtruth workteam',
    WorkteamName=workteam_name,
    WorkforceName=workforce_name,
    MemberDefinitions=[
        {
            'CognitoMemberDefinition': {
                'UserPool': user_pool_id,
                'UserGroup': user_group_name,
                'ClientId': client_id
            }
        }
    ]
)

In [None]:
workteam_arn = create_workteam_response['WorkteamArn']
workteam_arn

Check if workteam is created properly - otherwise we get weird errors during workteam creation, etc.

Should be something like this:
```
{'Workteam': {'WorkteamName': 'groundtruth-workteam-1620927720',
  'MemberDefinitions': [{'CognitoMemberDefinition': {'UserPool': 'us-east-1_3uqqE0vui',
     'UserGroup': 'sagemaker-groundtruth-user-group-1620927720',
     'ClientId': '77k1dio8s4rbltlndgogml1hqd'}}],
  'WorkteamArn': 'arn:aws:sagemaker:us-east-1:835319576252:workteam/private-crowd/groundtruth-workteam-1620927720',
  'Description': 'groundtruth workteam',
  'SubDomain': 'bzxy1zzrn2.labeling.us-east-1.sagemaker.aws',
  'CreateDate': datetime.datetime(2021, 5, 13, 23, 31, 45, 498000, tzinfo=tzlocal()),
  'LastUpdatedDate': datetime.datetime(2021, 5, 13, 23, 31, 45, 498000, tzinfo=tzlocal()),
  'NotificationConfiguration': {}},
 'ResponseMetadata': {'RequestId': '294e612b-1396-4c2c-aa87-79c6f9783281',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '294e612b-1396-4c2c-aa87-79c6f9783281',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '549',
   'date': 'Thu, 13 May 2021 23:37:30 GMT'},
  'RetryAttempts': 0}}
```

In [None]:
describe_workteam_response = sm.describe_workteam(WorkteamName=workteam_name)
describe_workteam_response

In [None]:
username = 'user-{}'.format(timestamp)

temporary_password = 'Password@420'

cognito_idp.admin_create_user(Username=username,
                              UserPoolId=user_pool_id,
                              TemporaryPassword=temporary_password,
                              MessageAction='SUPPRESS')

In [None]:
cognito_idp.admin_add_user_to_group(
    UserPoolId=user_pool_id,
    Username=username,
    GroupName=user_group_name
)

# A2I

In [None]:
output_path = 's3://{}/a2i-results-{}'.format(bucket, timestamp)
print(output_path)

In [None]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://{}.console.aws.amazon.com/sagemaker/groundtruth?region={}#/labeling-workforces/private-details/{}">Workforce</a></b>'.format(region, region, workteam_name)))

In [None]:
import boto3

account_id = boto3.client("sts").get_caller_identity().get("Account")

# Create the Human Task UI using a Worker Task Template

Create a human task UI resource, giving a UI template.  This template will be rendered to the human workers whenever human interaction is required.

Below we've provided a simple demo template that is compatible with our use case.

For other pre-built UIs (70+), check: https://github.com/aws-samples/amazon-a2i-sample-task-uis

In [None]:
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
    <crowd-classifier name="sentiment"
                      categories="['-1', '0', '1']"
                      initial-value="{{ task.input.initialValue }}"
                      header="Classify Reviews into Sentiment:  -1 (negative), 0 (neutral), and 1 (positive)">
      
        <classification-target>
            {{ task.input.taskObject }}
        </classification-target>
      
        <full-instructions header="Classify reviews into sentiment:  -1 (negative), 0 (neutral), and 1 (positive)">
            <p><strong>1</strong>: joy, excitement, delight</p>       
            <p><strong>0</strong>: neither positive or negative, such as stating a fact</p>
            <p><strong>-1</strong>: anger, sarcasm, anxiety</p>
        </full-instructions>

        <short-instructions>
            Classify reviews into sentiment:  -1 (negative), 0 (neutral), and 1 (positive)
        </short-instructions>
    </crowd-classifier>
</crowd-form>
"""

In [None]:
# Task UI name - this value is unique per account and region. You can also provide your own value here.
task_ui_name = 'ui-{}'.format(timestamp)

# Create a Human Task UI resource.
human_task_ui_response = sm.create_human_task_ui(HumanTaskUiName=task_ui_name, UiTemplate={"Content": template})
human_task_ui_arn = human_task_ui_response["HumanTaskUiArn"]
print(human_task_ui_arn)

# Create a Flow Definition

In this section, we're going to create a flow definition. Flow Definitions allow us to specify:

* The workforce that your tasks will be sent to.
* The instructions that your workforce will receive. This is called a worker task template.
* The configuration of your worker tasks, including the number of workers that receive a task and time limits to complete tasks.
* Where your output data will be stored.

This demo is going to use the API, but you can optionally create this workflow definition in the console as well. 

For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.

In [None]:
# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flow_definition_name = 'fd-{}'.format(timestamp)

create_workflow_definition_response = sm.create_flow_definition(
    FlowDefinitionName=flow_definition_name,
    RoleArn=role,
    HumanLoopConfig={
        "WorkteamArn": workteam_arn,
        "HumanTaskUiArn": human_task_ui_arn,
        "TaskCount": 1,
        "TaskDescription": "Classify Reviews into sentiment:  -1 (negative), 0 (neutral), 1 (positive)",
        "TaskTitle": "Classify Reviews into sentiment:  -1 (negative), 0 (neutral), 1 (positive)",
    },
    OutputConfig={"S3OutputPath": output_path},
)

augmented_ai_flow_definition_arn = create_workflow_definition_response["FlowDefinitionArn"]

# _If you see an error ^^^^ above ^^^^, you need to create your private workforce first. See the steps above. Then, re-run this cell._

In [None]:
# Describe flow definition - status should turn to "active"
for x in range(60):
    describe_flow_definition_response = sm.describe_flow_definition(FlowDefinitionName=flow_definition_name)
    print(describe_flow_definition_response["FlowDefinitionStatus"])
    if describe_flow_definition_response["FlowDefinitionStatus"] == "Active":
        print("Flow Definition is active")
        break
    time.sleep(2)

# Fix Labels

# Check the Confidence Score for Each Prediction
If < threshold, start the human loop.  You can integrate this type of logic into your application using the SDK.  In this case, we're using the Python SDK.

# Use our SageMaker to predict some sample reviews

# Start a human loop when our model does not predict with confidence above a certain threshold 
The human loop will engage our workforce for human review if the confidence of the prediction is less than the provided confidence.

![](img/augmented-ai-custom-predictions.png)

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

class SentimentPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(endpoint_name, 
                         sagemaker_session=sagemaker_session, 
                         serializer=JSONLinesSerializer(),
                         deserializer=JSONLinesDeserializer())

In [None]:
from sagemaker.pytorch.model import PyTorchModel

pytorch_model_name = 'model-{}'.format(timestamp)

model = PyTorchModel(name=pytorch_model_name,
                     model_data='s3://dlai-practical-data-science/models/ab/variant_a/model.tar.gz',
                     predictor_cls=SentimentPredictor,
                     entry_point='inference.py',
                     source_dir='src',
                     framework_version='1.6.0',
                     py_version='py3',
                     role=role)

### _This cell will take approximately 5-10 minutes to run._

In [None]:
%%time

pytorch_endpoint_name = 'endpoint-{}'.format(timestamp)

predictor = model.deploy(initial_instance_count=1, 
                         instance_type='ml.m5.large', 
                         endpoint_name=pytorch_endpoint_name)

In [None]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">SageMaker REST Endpoint</a></b>'.format(region, pytorch_endpoint_name)))


In [None]:
reviews = ["I enjoy this product", 
           "I am unhappy with this product", 
           "It is okay", 
           "sometimes it works"]

In [None]:
import json

human_loops_started = []

CONFIDENCE_SCORE_THRESHOLD = 0.90

for review in reviews:
    inputs = [
        {"features": [review]},
    ]

    response = predictor.predict(inputs)
    print(response)
    prediction = response[0]['predicted_label']
    confidence_score = response[0]['probability']

    print('Checking prediction confidence {} for sample review: "{}"'.format(confidence_score, review))

    # Our condition for when we want to engage a human for review
    if confidence_score < CONFIDENCE_SCORE_THRESHOLD:
        human_loop_name = str(time.time()).replace('.', '-') # using milliseconds
        input_content = {"initialValue": prediction, "taskObject": review}
        start_loop_response = a2i.start_human_loop(
            HumanLoopName=human_loop_name,
            FlowDefinitionArn=augmented_ai_flow_definition_arn,
            HumanLoopInput={"InputContent": json.dumps(input_content)},
        )

        human_loops_started.append(human_loop_name)

        print(
            f"Confidence score of {confidence_score * 100}% for prediction of {prediction} is less than the threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
        )
        print(f"*** ==> Starting human loop with name: {human_loop_name}  \n")
    else:
        print(
            f"Confidence score of {confidence_score * 100}% for star rating of {prediction} is above threshold of {CONFIDENCE_SCORE_THRESHOLD * 100}%"
        )
        print("Human loop not needed. \n")

# Check Status of Human Loop

In [None]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f"HumanLoop Name: {human_loop_name}")
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print("")

    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)

# _Wait For Workers to Complete Their Human Loop Tasks_

Navigate to the link below and login with your email and password that you used when you set up the Private Workforce.

Navigate to the private worker portal and complete the human loop.  Make sure you have invited yourself to the workteam and received the signup email.

_Note:  Check your spam filter if you have not received the email._

In [None]:
labeling_ui = sm.describe_workteam(WorkteamName=workteam_name)["Workteam"]["SubDomain"]

In [None]:
from IPython.core.display import display, HTML

display(HTML('Click <a target="blank" href="https://{}">here</a> to start labeling with username <b>{}</b> and temporary password <b>{}</b>.'.format(labeling_ui, username, temporary_password)))

![Data Labeling](img/label-data-job-instructions.png)

# Verify that the human loops were completed by the workforce.
This cell will not complete until you label the data above.

In [None]:
import time

completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f"HumanLoop Name: {human_loop_name}")
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print("")
    while resp["HumanLoopStatus"] != "Completed":
        print(f"Waiting for HumanLoop to complete.")
        time.sleep(10)
        resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)
        print(f"Completed!")
        print("")

# _YOU MUST LABEL BEFORE CONTINUING_

# View Human Labels  

Once the work is complete, Amazon A2I stores the results in the specified S3 bucket and sends a Cloudwatch Event.  Let's check the S3 contents.

In [None]:
import re
from pprint import pprint

fixed_items = []

for resp in completed_human_loops:
    split_string = re.split("s3://" + bucket + "/", resp["HumanLoopOutput"]["OutputS3Uri"])
    output_bucket_key = split_string[1]

    response = s3.get_object(Bucket=bucket, Key=output_bucket_key)
    content = response["Body"].read().decode("utf-8")
    json_output = json.loads(content)
    pprint(json_output)

    input_content = json_output["inputContent"]
    human_answer = json_output["humanAnswers"][0]["answerContent"]
    fixed_item = {"input_content": input_content, "human_answer": human_answer}
    fixed_items.append(fixed_item)

# Prepare the Data for Re-training

In [None]:
df_fixed_items = pd.DataFrame(fixed_items)  
df_fixed_items.head()