Author: amiller, aws  
Description: an example of using amazon rekognition video to detect moderations in a video  
Based on code examples from the Rekognition Developer's Guide:  
  
https://docs.aws.amazon.com/rekognition/latest/dg/video-analyzing-with-sqs.html  

#Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.  
#PDX-License-Identifier: MIT-0 (For details, see https://github.com/awsdocs/amazon-rekognition-developer-guide/blob/master/LICENSE-SAMPLECODE.)import boto3  


In [1]:
import boto3
import json
import sys
import time

In [2]:
class VideoDetect:
    jobId = ''
    
    roleArn = ''
    bucket = ''
    video = ''
    verbose = True
   
    rek = None
    sqs = None
    sns = None
    outfile = ''
    resfile = None

    startJobId = ''
    sqsQueueUrl = ''
    snsTopicArn = ''
    processType = ''

    def __init__(self, role, bucket, video, region, ofile, verbose):    
        self.roleArn = role
        self.bucket = bucket
        self.video = video
        self.verbose = verbose
        
        session = boto3.Session(region_name=region)
        self.rek = session.client('rekognition')
        self.sqs = session.client('sqs')
        self.sns = session.client('sns')
        
        self.outfile = ofile

    def GetSQSMessageSuccess(self):

        jobFound = False
        succeeded = False
    
        while jobFound == False:
            sqsResponse = self.sqs.receive_message(QueueUrl=self.sqsQueueUrl, MessageAttributeNames=['ALL'], MaxNumberOfMessages=10) 

            if sqsResponse:
                if 'Messages' not in sqsResponse:
                    print('.', end='')
                    sys.stdout.flush()
                    time.sleep(5)
                    continue
                print()

                for message in sqsResponse['Messages']:
                    notification = json.loads(message['Body'])
                    rekMessage = json.loads(notification['Message'])

                    print(rekMessage['Status'])

                    if rekMessage['JobId'] == self.startJobId:
                        print('Matching Job Found: ' + rekMessage['JobId'])
                        jobFound = True
                        if (rekMessage['Status']=='SUCCEEDED'):
                            succeeded=True

                        self.sqs.delete_message(QueueUrl=self.sqsQueueUrl,
                                       ReceiptHandle=message['ReceiptHandle'])
                    else:
                        print("Job didn't match:" +
                              str(rekMessage['JobId']) + ' : ' + self.startJobId)
                    # Delete the unknown message. Consider sending to dead letter queue
                    self.sqs.delete_message(QueueUrl=self.sqsQueueUrl,
                                   ReceiptHandle=message['ReceiptHandle'])


        return succeeded

    # ============== Unsafe content =============== 
    def StartUnsafeContent(self):
        response=self.rek.start_content_moderation(Video={'S3Object': {'Bucket': self.bucket, 'Name': self.video}},
            NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn})

        self.startJobId=response['JobId']
        print('Start Job Id: ' + self.startJobId)

    def GetUnsafeContentResults(self):
        maxResults = 10
        paginationToken = ''
        finished = False

        self.resfile = open(self.outfile, 'w')
        if self.verbose == False:
            self.resfile.write('%s, %s\n' % ('detected-moderation', 'timestamp'))

        while finished == False:
            response = self.rek.get_content_moderation(JobId=self.startJobId, MaxResults=maxResults, NextToken=paginationToken)

            if self.verbose:
                self.resfile.write('\nCodec: ' + response['VideoMetadata']['Codec'])
                self.resfile.write('\nDuration: ' + str(response['VideoMetadata']['DurationMillis']))
                self.resfile.write('\nFormat: ' + response['VideoMetadata']['Format'])
                self.resfile.write('\nFrame rate: ' + str(response['VideoMetadata']['FrameRate']))
                self.resfile.write('\n')

            # iterate through all of the detected moderation label entries
            for contentModerationDetection in response['ModerationLabels']:
                if self.verbose:
                    self.resfile.write('\nLabel: ' + str(contentModerationDetection['ModerationLabel']['Name']))
                    self.resfile.write('\nConfidence: ' + str(contentModerationDetection['ModerationLabel']['Confidence']))
                    self.resfile.write('\nParent category: ' + str(contentModerationDetection['ModerationLabel']['ParentName']))
                    self.resfile.write('\nTimestamp: ' + str(contentModerationDetection['Timestamp']))
                    self.resfile.write('\n')
                else:
                    # if not verbose, then print in csv format
                    self.resfile.write('%s, %f\n' % (str(contentModerationDetection['ModerationLabel']['Name']), contentModerationDetection['Timestamp']/1000.0))

            if 'NextToken' in response:
                paginationToken = response['NextToken']
            else:
                finished = True       

        self.resfile.close()
    
    def CreateTopicandQueue(self):

        # this is used to create unique names for the SNS topic and SQS queue
        millis = str(int(round(time.time() * 1000)))

        #Create SNS topic
        snsTopicName="AmazonRekognitionExample" + millis

        topicResponse=self.sns.create_topic(Name=snsTopicName)
        self.snsTopicArn = topicResponse['TopicArn']

        #create SQS queue
        sqsQueueName="AmazonRekognitionQueue" + millis
        self.sqs.create_queue(QueueName=sqsQueueName)
        self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl']
 
        attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl,
                                                    AttributeNames=['QueueArn'])['Attributes']
                                        
        sqsQueueArn = attribs['QueueArn']

        # Subscribe SQS queue to SNS topic
        self.sns.subscribe(
            TopicArn=self.snsTopicArn,
            Protocol='sqs',
            Endpoint=sqsQueueArn)

        #Authorize SNS to write SQS queue 
        policy = """{{
  "Version":"2012-10-17",
  "Statement":[
    {{
      "Sid":"MyPolicy",
      "Effect":"Allow",
      "Principal" : {{"AWS" : "*"}},
      "Action":"SQS:SendMessage",
      "Resource": "{}",
      "Condition":{{
        "ArnEquals":{{
          "aws:SourceArn": "{}"
        }}
      }}
    }}
  ]
}}""".format(sqsQueueArn, self.snsTopicArn)
 
        response = self.sqs.set_queue_attributes(
            QueueUrl = self.sqsQueueUrl,
            Attributes = {
                'Policy' : policy
            })

    def DeleteTopicandQueue(self):
        self.sqs.delete_queue(QueueUrl=self.sqsQueueUrl)
        self.sns.delete_topic(TopicArn=self.snsTopicArn)

In [3]:
''' 
Some notes on IAM Roles:
Within SageMaker Studio, each SageMaker User has an IAM Role. 
Each Notebook for this user will run with the Permissions specified in this Role (known as the Execution Role).
This Role can be seen in the Details of each User.

For the code running in this example Notebook, Amazon Rekognition needs an IAM service role which gives Rekognition access 
to the Amazon SNS service, on your behalf (meaning, on behalf of the SageMaker Execution Role). 

The Rekognition Service Role must be created in IAM outside of this example code. To do this, create an IAM Role and specify 
the Rekognition service and the default Policies are sufficient (i.e., no other Policies need to be added/modified).

Additionally, The SageMaker Execution Role must be allowed to Pass the Rekognition Service Role. To allow this, you must attach
a Policy to the SageMaker Execution Role that looks like this, where the Resource is the ARN of the Rekognition service Role which
you created. You can either create this as a new Policy and attach it or add it as an in-line Policy.

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "iam:GetRole",
                    "iam:PassRole"
                ],
                "Resource": "arn:aws:iam::810190279255:role/amRekognitionServiceRole"
            }
        ]
    }

'''

# modify these variables to match your enviornment
RekServiceRole = 'arn:aws:iam::810190279255:role/amRekognitionServiceRole'
bucket = 'am-buck2'
video = 'video1.mp4'
region = 'us-east-2'
verbose = False				# if verbose is false, then the results will be written in csv format
results_filename = "detected-moderations.csv"


analyzer=VideoDetect(RekServiceRole, bucket, video, region, results_filename, verbose)
analyzer.CreateTopicandQueue()

analyzer.StartUnsafeContent()
if analyzer.GetSQSMessageSuccess()==True:
    analyzer.GetUnsafeContentResults()

analyzer.DeleteTopicandQueue()
print('results file:  %s' % (results_filename))




Start Job Id: 71886f122f7f1fb3d484ee34c989a33786d7065d66b6cad6873d197d0bb88e27
............
SUCCEEDED
Matching Job Found: 71886f122f7f1fb3d484ee34c989a33786d7065d66b6cad6873d197d0bb88e27
results file:  detected-moderations.csv
