# Assignment 6

## Question 1 (a)

### Write a Python function `send_survey` (which you can assume will be installed with the mobile app and will automatically be invoked after a survey is saved as a JSON file on the device):

In [1]:
import boto3
import json

def send_survey(survey_path, sqs_url):
    '''
    Input: survey_path (str): path to JSON survey data
        (e.g. `./survey.json')
        sqs_url (str): URL for SQS queue
    Output: StatusCode (int): indicating whether the survey
            was successfully sent into the SQS queue (200) or not (400)
    '''
    f = open (survey_path, "r")
    # Read data from file
    data = json.loads(f.read())

    sqs = boto3.client('sqs')
    response = sqs.send_message(QueueUrl=sqs_url,
                                MessageBody=json.dumps(data))
    
    return response['ResponseMetadata']['HTTPStatusCode']

## Question 1 (b)

### Create an SQS queue and configure it to act as a trigger for your Lambda function from Assignment 5 (which will process your data and write it to storage).

In [2]:
###########################################################################
##############          (A) LAUNCH CLOUD ARCHITECTURE         #############
###########################################################################

#---------------------(1) CREATE LAMBDA FUNCTION--------------------------

# Access IAM role to allow Lambda to interact with other AWS resources
aws_lambda = boto3.client('lambda')
iam_client = boto3.client('iam')
role = iam_client.get_role(RoleName='LabRole')

# Open zipped directory that contains lambda function
with open('a6.zip', 'rb') as f:
    lambda_zip = f.read() 

try:
    # If function hasn't yet been created, create it
    response = aws_lambda.create_function(
        FunctionName='a6',
        Runtime='python3.9',
        Role=role['Role']['Arn'], 
        Handler='lambda_function.lambda_handler', 
        Code=dict(ZipFile=lambda_zip),
        Timeout=60
    )
except aws_lambda.exceptions.ResourceConflictException:
    # If function already exists, update it based on zip file contents
    response = aws_lambda.update_function_code(
        FunctionName='a6',
        ZipFile=lambda_zip
        )

lambda_arn = response['FunctionArn']
print("Lambda function created. Lambda ARN: ", lambda_arn)

#---------------------(2) CREATE SQS QUEUE-------------------------------

# Initialize the SQS client
sqs = boto3.client('sqs')

# Create SQS Queue which will trigger lambda function
try:
    queue_url = sqs.create_queue(QueueName='a6',
                                 Attributes={'VisibilityTimeout': '70'})['QueueUrl'] 
except sqs.exceptions.QueueNameExists:
    queue_url = [url
                 for url in sqs.list_queues()['QueueUrls']
                 if 'a6' in url][0]
    
sqs_info = sqs.get_queue_attributes(QueueUrl=queue_url,
                                    AttributeNames=['QueueArn'])
sqs_arn = sqs_info['Attributes']['QueueArn']

print("SQS queue created. SQS ARN: ", sqs_arn)


#---------------------(3) TRIGGER LAMBDA THROUGH SQS QUEUE----------------------------

# Trigger Lambda Function when new messages enter SQS Queue
try:
    response = aws_lambda.create_event_source_mapping( 
        EventSourceArn=sqs_arn, 
        FunctionName='a6',
        Enabled=True,
        BatchSize=10
    )
except aws_lambda.exceptions.ResourceConflictException: 
    es_id = aws_lambda.list_event_source_mappings(
        EventSourceArn=sqs_arn,
        FunctionName='a6'
    )['EventSourceMappings'][0]['UUID']
    
    response = aws_lambda.update_event_source_mapping(
        UUID=es_id,
        FunctionName='a6',
        Enabled=True,
        BatchSize=10
    )

#---------------------(4) CREATE S3 BUCKET-------------------------------

# Initialize s3 client and resource
s3 = boto3.client('s3')
s3_resource = boto3.resource('s3')

# Create S3 bucket
s3.create_bucket(Bucket='mariagabrielaa-a6')

# Get the list_buckets response
response = s3.list_buckets()

# Print each bucket name (to check bucket was created)
for bucket in response['Buckets']:
    print("S3 bucket created. Bucket name: ", bucket['Name'])


#---------------------(5) CREATE DYNAMO DB TABLE-------------------------

dynamodb = boto3.resource('dynamodb')

table = dynamodb.create_table(
    TableName='a6_results',
    KeySchema=[
        {
            'AttributeName': 'user_id',
            'KeyType': 'HASH'
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'user_id',
            'AttributeType': 'S'
        }
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 1,
        'WriteCapacityUnits': 1
    }
)

# Wait until AWS confirms that table exists before moving on
table.meta.client.get_waiter('table_exists').wait(TableName='a6_results')

# get data about table (should currently be no items in table)
print("Number of current tables: ", table.item_count)
print("Table creation time: ", table.creation_date_time)

print()
print("SQS -> Lambda -> S3 -> Dynamo DB Architecture has been launched")


###########################################################################
##############         (B) TEAR DOWN CLOUD ARCHITECTURE       #############
###########################################################################

#----------------(1) DELETE LAMBDA AND EVENT SOURCE MAPPINGS---------------

def delete_lambda(lambda_name):
    try:
        aws_lambda.delete_function(FunctionName=lambda_name)
        print("Lambda Function Deleted")
    except aws_lambda.exceptions.ResourceNotFoundException:
        print("AWS Lambda Function Already Deleted")

    event_source_mappings = aws_lambda.list_event_source_mappings(
        FunctionName='a6')['EventSourceMappings']
    
    for mapping in event_source_mappings:
        aws_lambda.delete_event_source_mapping(UUID=mapping['UUID'])
        print("Event source mapping for 'a6' function deleted")


#----------------------(2) DELETE SQS QUEUE-------------------------------

def delete_queue(queue_url):
    try:
        sqs.delete_queue(QueueUrl=queue_url)
        print("SQS Queue Deleted")
    except sqs.exceptions.QueueDoesNotExist:
        print("SQS Queue Already Deleted")

#----------------------(3) DELETE S3 OBJECTS------------------------------

def delete_s3_objects(bucket_name):

    bucket = s3_resource.Bucket(bucket_name)

    for item in bucket.objects.all():
        item.delete()
    print("S3 bucket objects deleted")

    s3_resource.Bucket(bucket_name).delete()
    print("S3 bucket deleted")


#---------------------(4) DELETE DYNAMO DB TABLE--------------------------

def delete_table(table_object):
    table_object.delete()
    print("Dynamo DB table deleted")

Lambda function created. Lambda ARN:  arn:aws:lambda:us-east-1:589049386593:function:a6
SQS queue created. SQS ARN:  arn:aws:sqs:us-east-1:589049386593:a6
S3 bucket created. Bucket name:  mariagabrielaa-a6
Number of current tables:  0
Table creation time:  2023-05-01 01:31:15.001000-05:00

SQS -> Lambda -> S3 -> Dynamo DB Architecture has been launched



### Test SQS queue with launched cloud architecture by sending test survey submissions

In [3]:
import os
import time

PATH = '/Users/mariagabrielaayala/Desktop/Spring 2023/Big Data and High Performance Computing/a6-magabrielaa/data/'

lst_survey_paths = []

files = os.listdir(PATH)

for f in files:
    lst_survey_paths.append(PATH + f)

# Send survey responses to SQS queue with 10 second delay in between
for survey_path in lst_survey_paths:
    time.sleep(10)
    survey_response = send_survey(survey_path, queue_url)
    print(survey_response)

200
200
200
200
200
200
200
200
200
200


#### Print a list of objects in S3 bucket

In [4]:
# Print list of objects in S3 bucket
response = s3.list_objects(Bucket="mariagabrielaa-a6")
lst_objects = []
for obj in response['Contents']:
    lst_objects.append(obj['Key'])

print(lst_objects)

['0001092821120000.json', '0001092921120000.json', '0001093021120300.json', '0002092821120000.json', '0003092821120001.json', '0004092821120002.json', '0005092821122000.json']


#### Query records in Dynamo DB table

In [5]:
# Query Dynamo DB table
lst_users = ['0001', '0002', '0003', '0004', '0005']

for user in lst_users:
        response = table.get_item(
                Key={'user_id': user}
                )
        print(response['Item'])

{'q1': Decimal('1'), 'q2': Decimal('1'), 'user_id': '0001', 'q3': Decimal('2'), 'q4': Decimal('2'), 'q5': Decimal('2'), 'num_submission': Decimal('3'), 'freetext': "I lost my car keys this afternoon at lunch, so I'm more stressed than normal"}
{'q1': Decimal('4'), 'q2': Decimal('1'), 'user_id': '0002', 'q3': Decimal('1'), 'q4': Decimal('1'), 'q5': Decimal('3'), 'num_submission': Decimal('1'), 'freetext': "I'm having a great day!"}
{'q1': Decimal('1'), 'q2': Decimal('3'), 'user_id': '0003', 'q3': Decimal('3'), 'q4': Decimal('1'), 'q5': Decimal('4'), 'num_submission': Decimal('1'), 'freetext': 'It was a beautiful, sunny day today.'}
{'q1': Decimal('1'), 'q2': Decimal('1'), 'user_id': '0004', 'q3': Decimal('1'), 'q4': Decimal('1'), 'q5': Decimal('1'), 'num_submission': Decimal('1'), 'freetext': 'I had a very bad day today...'}
{'q1': Decimal('3'), 'q2': Decimal('3'), 'user_id': '0005', 'q3': Decimal('3'), 'q4': Decimal('3'), 'q5': Decimal('3'), 'num_submission': Decimal('1'), 'freetext': 

## Question 1(c) 

####  Your PI, who is overseeing this project, is worried that if all of the participants in the study (potentially thousands) submit surveys at the same time in the day, this might cause the system to crash and your lab might lose data (this happened to your PI when they ran a similar digital survey via on-premise servers in the early 2000s). How would you reassure your PI that your architecture is scalable and will be able to handle such spikes in demand? Your response should be at least 200 words and discuss the scalabilty of each of the cloud services you used in your pipeline in detail.



The cloud architecture that sustains this pipeline is designed for scalability.

First, when a user submits a survey, that submission is posted to an AWS **SQS queue**, which is durable and can process a near unlimited number of concurrent messages. SQS is designed as a distributive service that can scale when demand increases. AWS has many virtual machines distributed across availability zones and regions, so that when there is a spike in demand, messages are automatically distributed across servers. This allows SQS to handle a large volume of messages and process them in parallel.

In our pipeline, we are using a standard queue instead of a “First In First Out” FIFO queue because it is not necessary to process messages in order or serially. This improves scalability because messages are processed in **parallel**, reducing processing times. Speed is relevant when there are spikes in demand, as that could be the source for a potential bottleneck. Also, standard queues support  **“at least once”** message delivery, which means that if a message could not be processed for any reason, it returns to the queue after the visibility timeout, to be re-processed by Lambda. This helps to prevent any loss of data and avoid the PI’s previous negative experience. Only when the message is susccessfully processed will it be deleted by Lambda.

Second, the SQS queue triggers a **Lambda function**, which is **event-driven**, meaning that it only needs to be called when there is one or more messages to be processed. This helps with costs and scalability as the event-driven architecture ensures resources are not wasted. Lambda uses **event source mapping** to process messages from the SQS queue, where it **batches** the messages and invokes the mapped lambda function with a **concurrency** of up to 1000 invocations for a single Lambda function. Considering one invocation can process multiple messages, this is a very high concurrency number. Lambda scales out or in automatically on our behalf, depending on demand.

In standard queues, like ours, the event source mapping polls the queue to process incoming messages beginning at 5 concurrent batches with 5 functions at a time. When there is a spike in demand, Lambda scales out adding “up to 60 functions per minute up to 1,000 functions” to process those messages. If the spike of demand is too high, one potential issue is that Lambda may reach the account’s **reserved concurrency** limit. In this case, the message will be returned to the queue for re-processing and depending on our queue settings, it will retry to process the message a certain number of times before going to the Dead Letter Queue. In any case, there are many options on how to handle messages in this rare situation without losing any survey submissions.

Third, we use an** S3 data lake** to store **raw** survey data, which is also **distributed** across availability zones and regions. This makes the service scalable and fault-tolerant. Another benefit is that it can store **infinitely-sized** and amount of data, so it will have no issues with a spike in demand, in particular considering the survey entries are small data. S3 can write and read into storage in **parallel**, which makes it very fast and scalable. Essentially, we can have multiple invocations of Lambda writing data into S3 at the same time with no issues. In our case, we are only writing data into S3, but if we were to read it in for additional processing, S3 uses the object key as a hashing mechanism to partition the data and find it quickly, allowing for parallel reads as needed.

Fourth, the pipeline ends by writing/updating one item per user id into a **Dynamo DB** table that essentially keeps track of the amount of survey submissions for each user.  As a **NoSQL** solution, Dynamo’s benefit is its **high throughput** that allows for many small concurrent reads and writes, which is exactly what we are doing with this study, in which we are making a lot of small updated to the table based on real-time survey submissions. Dynamo DB also has essentially **infinite storage** and it is highly flexible, reason why it scales out well. In our case, we are using Dynamo DB only to keep track of the amount of survey submissions or the latest submission per entry, so we only need to handle light SQL queries, which Dynamo can handle well.


Reference: https://aws.amazon.com/blogs/compute/introducing-maximum-concurrency-of-aws-lambda-functions-when-using-amazon-sqs-as-an-event-source/


### Tear down cloud architecture

In [6]:
delete_lambda('a6')
delete_queue(queue_url)
delete_s3_objects('mariagabrielaa-a6')
delete_table(table)

Lambda Function Deleted
Event source mapping for 'a6' function deleted
SQS Queue Deleted
S3 bucket objects deleted
S3 bucket deleted
Dynamo DB table deleted
