In [1]:
import boto3
import time
import json

In [12]:
sqs = boto3.client('sqs')
lambda_client = boto3.client('lambda')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')
dynamodb = boto3.resource('dynamodb')
s3_resource = boto3.resource('s3')
s3_client = boto3.client('s3')
dynamo_client = boto3.client('dynamodb')

In [4]:
# Create a zip file containing my Lambda function process_survey
# updated the code to process sqs info
! zip -r process_survey.zip process_survey.py

updating: process_survey.py (deflated 65%)


In [5]:
# Access our class IAM role, which allows Lambda
# to interact with other AWS resources
aws_lambda = boto3.client('lambda')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')

with open('process_survey.zip', 'rb') as f:
    lambda_zip = f.read()

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='process_survey',
        Runtime='python3.9',
        Role=role['Role']['Arn'],
        # Should be in the format of file-name.function-name
        Handler='process_survey.process_survey',    
        Code=dict(ZipFile=lambda_zip),
        Timeout=300
    )

except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip
    # file contents
    response = aws_lambda.update_function_code(
        FunctionName='process_survey',
        ZipFile=lambda_zip,
        Publish=True
        )

## Q1a: Write a Python function `send_survey`

In [6]:
def send_survey(survey_path, sqs_url):
    '''
    Input: survey_path (str): path to JSON survey data
        (e.g. `./survey.json')
        sqs_url (str): URL for SQS queue
    Output: StatusCode (int): indicating whether the survey
            was successfully sent into the SQS queue (200) or not (400)
    '''
    
    # Load survey data from file
    with open(survey_path, 'r') as f:
        survey_data = json.load(f)
        print(survey_data)
        
    response = sqs.send_message(QueueUrl=sqs_url,
                                MessageBody=json.dumps(survey_data))

    # Return status code based on response from SQS
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        return 200
    else:
        return 400

## Q1b: Create an SQS queue and configure it to act as a trigger

In the overall structure, the mobile device sends survey data to the send_survey function, which converts the data into a string representation and sends it to the SQS queue. The SQS queue then triggers the Lambda function to process the data and write it to storage.

In [7]:
# Create SQS Queue
try:
    queue_url = sqs.create_queue(QueueName='a6')['QueueUrl']
except sqs.exceptions.QueueNameExists:
    queue_url = [url
                 for url in sqs.list_queues()['QueueUrls']
                 if 'a6' in url][0]
print(queue_url)

sqs_info = sqs.get_queue_attributes(QueueUrl=queue_url,
                                    AttributeNames=['QueueArn'])
sqs_arn = sqs_info['Attributes']['QueueArn']
sqs_arn

https://sqs.us-east-1.amazonaws.com/364770543372/a6


'arn:aws:sqs:us-east-1:364770543372:a6'

In [8]:
# get the current attributes of the queue
sqs_info = sqs.get_queue_attributes(
    QueueUrl=queue_url,
    AttributeNames=['VisibilityTimeout']
)
visibility_timeout = int(sqs_info['Attributes']['VisibilityTimeout'])
print(visibility_timeout)

# increase the visibility timeout to 300 seconds
if visibility_timeout < 300:
    sqs.set_queue_attributes(
        QueueUrl=queue_url,
        Attributes={'VisibilityTimeout': '300'}
    )

300


In [14]:
# get the ARN of the Lambda function process_survey
# that I wrote in Assignment 5
lambda_function_name = 'process_survey'

lambda_function = lambda_client.get_function(FunctionName=lambda_function_name)
lambda_function_arn = lambda_function['Configuration']['FunctionArn']
lambda_function_arn 

'arn:aws:lambda:us-east-1:364770543372:function:process_survey'

In [15]:
# create an event source mapping to configure the SQS queue 
# as a trigger for the Lambda function
try:
    response = lambda_client.create_event_source_mapping(
        EventSourceArn=sqs_arn,
        FunctionName='process_survey',
        Enabled=True,
        BatchSize=10
    )
except lambda_client.exceptions.ResourceConflictException:
    es_id = lambda_client.list_event_source_mappings(
        EventSourceArn=sqs_arn,
        FunctionName='process_survey'
    )['EventSourceMappings'][0]['UUID']
    
    response = lambda_client.update_event_source_mapping(
        UUID=es_id,
        FunctionName='process_survey',
        Enabled=True,
        BatchSize=10
    )

Test my full survey submission pipeline using the example JSON files under `q1-test-json`

In [16]:
# Check if the s3 bucket structure still in place
bucket_name = 'jyassignment5'

try:
    s3_client.head_bucket(Bucket=bucket_name)
    print(f'Bucket {bucket_name} exists')
except s3_client.exceptions.NoSuchBucket:
    print(f'Bucket {bucket_name} does not exist')

Bucket jyassignment5 exists


In [17]:
# Recreate a DynamoDB table used in process_survey
table_name = 'survey_results'

try:
    table = dynamodb.create_table(
        TableName=table_name,
        KeySchema=[
            {
                'AttributeName': 'user_id',
                'KeyType': 'HASH'
            }
        ],
        AttributeDefinitions=[
            {
                'AttributeName': 'user_id',
                'AttributeType': 'S'
            }
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 1,
            'WriteCapacityUnits': 1
        }
    )

except dynamo_client.exceptions.ResourceInUseException:
    table = dynamodb.Table(table_name)
    
# Wait until AWS confirms that table exists before moving on
table.meta.client.get_waiter('table_exists').wait(TableName=table_name)

# get data about table (should currently be no items in table)
print(table.item_count)
print(table.creation_date_time)

0
2023-04-28 19:26:20.006000-05:00


## Test the structure

In [18]:
# Loop over all example JSON files and send survey data to SQS queue
for i in range(1, 11):
    survey_path = f'q1-test-json/test{i}.json'
    print(survey_path)
    response = send_survey(survey_path, queue_url)
    print(f"Invocation {i} status code: {response}")

# pausing execution for 10 seconds in between Lambda invocations
time.sleep(10)

q1-test-json/test1.json
{'user_id': '0001', 'timestamp': '092821120000', 'time_elapsed': 5, 'q1': 5, 'q2': 3, 'q3': 2, 'q4': 2, 'q5': 4, 'freetext': 'I had a very bad day today...'}
Invocation 1 status code: 200
q1-test-json/test2.json
{'user_id': '0001', 'timestamp': '092821120001', 'time_elapsed': 2, 'q1': 5, 'q2': 3, 'q3': 2, 'q4': 2, 'q5': 4, 'freetext': 'I had a very bad day today...'}
Invocation 2 status code: 200
q1-test-json/test3.json
{'user_id': '0001', 'timestamp': '092821120100', 'time_elapsed': 5, 'q1': 5, 'q2': 3, 'q3': 2, 'q4': 2, 'q5': 4, 'freetext': ''}
Invocation 3 status code: 200
q1-test-json/test4.json
{'user_id': '0002', 'timestamp': '092821120000', 'time_elapsed': 5, 'q1': 4, 'q2': 1, 'q3': 1, 'q4': 1, 'q5': 3, 'freetext': "I'm having a great day!"}
Invocation 4 status code: 200
q1-test-json/test5.json
{'user_id': '0003', 'timestamp': '092821120000', 'time_elapsed': 1, 'q1': 1, 'q2': 1, 'q3': 1, 'q4': 1, 'q5': 1, 'freetext': ''}
Invocation 5 status code: 200
q1-t

In [19]:
# List the objects in the bucket
bucket_name = 'jyassignment5'
bucket_resource = s3_resource.Bucket(bucket_name)
obj_lst = [obj.key for obj in bucket_resource.objects.all()]
obj_lst

['0001092821120000.json',
 '0001092921120000.json',
 '0001093021120300.json',
 '0002092821120000.json',
 '0003092821120001.json',
 '0004092821120002.json',
 '0005092821122000.json']

In [24]:
# examine whether we get the correct answer
obj_lst == ['0001092821120000.json', '0001092921120000.json', '0001093021120300.json',
 '0002092821120000.json', '0003092821120001.json', '0004092821120002.json',
 '0005092821122000.json']

True

In [21]:
# query my DynamoDB table
response = table.scan()
items = response['Items']

print(items)
print(len(items))

[{'q1': Decimal('3'), 'q2': Decimal('3'), 'user_id': '0005', 'q3': Decimal('3'), 'q4': Decimal('3'), 'q5': Decimal('3'), 'freetext': "I'm feeling okay, but not spectacular", 'num_completed': Decimal('1')}, {'q1': Decimal('1'), 'q2': Decimal('1'), 'user_id': '0001', 'q3': Decimal('2'), 'q4': Decimal('2'), 'q5': Decimal('2'), 'freetext': "I lost my car keys this afternoon at lunch, so I'm more stressed than normal", 'num_completed': Decimal('3')}, {'q1': Decimal('1'), 'q2': Decimal('1'), 'user_id': '0004', 'q3': Decimal('1'), 'q4': Decimal('1'), 'q5': Decimal('1'), 'freetext': 'I had a very bad day today...', 'num_completed': Decimal('1')}, {'q1': Decimal('1'), 'q2': Decimal('3'), 'user_id': '0003', 'q3': Decimal('3'), 'q4': Decimal('1'), 'q5': Decimal('4'), 'freetext': 'It was a beautiful, sunny day today.', 'num_completed': Decimal('1')}, {'q1': Decimal('4'), 'q2': Decimal('1'), 'user_id': '0002', 'q3': Decimal('1'), 'q4': Decimal('1'), 'q5': Decimal('3'), 'freetext': "I'm having a gre

In [22]:
# delete all of the objects in my S3 bucket 
bucket_resource.objects.all().delete()
print('All objects deleted from the S3 bucket.')

All objects deleted from the S3 bucket.


In [23]:
# delete my DynamoDB table
table.delete()
print(f"Table '{table_name}' has been deleted.")

Table 'survey_results' has been deleted.
