## Configure AWS DynamoDB Stream Triggers with AWS Lambda

-----
Many applications can benefit from the ability to capture changes to items stored in a DynamoDB table, at the point in time when such changes occur. Some of the example use cases are below:

- A popular mobile app modifies data in a DynamoDB table, at the rate of thousands of updates per second. Another application captures and stores data about these updates, providing near real time usage metrics for the mobile app.

- An application automatically sends notifications to the mobile devices of all friends in a group as soon as one friend uploads a new picture.


A DynamoDB stream is an ordered flow of information about changes to items in an Amazon DynamoDB table. When you enable a stream on a table, DynamoDB captures information about every modification to data items in the table. DynamoDB Streams are designed to allow external applications to monitor table updates and react in real-time.

An ordered flow of record modifications will become available via a custom API endpoint. Every time you create, update or delete records from the table, DynamoDB will write a new stream record containing the corresponding record data.

Whenever an application creates, updates, or deletes items in the table, DynamoDB Streams writes a stream record with the primary key attribute(s) of the items that were modified. A stream record contains information about a data modification to a single item in a DynamoDB table. You can configure the stream so that the stream records capture additional information, such as the "before" and "after" images of modified items.


The stream record information can be configured for each table, choosing between one of the following options:

- Keys only - The record will contain only the key attributes of the item.

- New image - The record will contain the entire item after it was modified.

- Old image - The record will contain the entire item before it was modified.

- New and old image - The record will contain both the new and the old items.


Please note that stream records are available almost in real-time and always in the correct order. This way, external applications can take arbitrary actions, such as sync cross-region tables, send mobile notifications based on new content, compute real-time usage metrics, etc.

In this lab, we will see how to read DynamoDB Streams in a serverless fashion with AWS Lambda. 

<img src="../images/dynabodb_trigger_flow.PNG">

Taken from AWS website

In [135]:
import boto3
import botocore
import os
import zipfile
import datetime
import pandas
import json
import time
import getpass
from subprocess import call

# Set the username from system
system_user_name=getpass.getuser()

# Set the DynaoDB table name
table_name=system_user_name+"dsa_courses"

# Set the lambda function name
lambda_name = system_user_name+"lambda"


client = boto3.client('dynamodb')
dynamodb = boto3.resource('dynamodb')
iam = boto3.client('iam')
lamb = boto3.client('lambda')

----

We will implement a simple trigger to keep courseid and ismandatory synchronized. Every time a new record is created we will add the computed field (ismandatory). Also, every time a record is updated, we will keep the two fields in sync.

The following is a list of possible scenarios to account for:

- A new record is created: We will simply initialize isMandatory with the correct value.

- A record is modified, but courseId hasn't changed: No operation.

- A record is modified, courseId has changed, but isMandatory is still the same: No operation.

- A record is modified, courseId has changed and isMandatory needs to be updated: Modify the record.

Note that on both (1) and (4) we will trigger a new MODIFY operation.

# Using AWS Lambda with Amazon DynamoDB

**Stream-based model** – This is a model where AWS Lambda polls the stream 4 times per second and, when it detects new records, invokes your Lambda by passing the update event as parameter.

In a stream-based model, you maintain event source mapping in AWS Lambda. The event source mapping describes which stream maps to which Lambda function. AWS Lambda provides an API (CreateEventSourceMapping) for you to create the mapping. You used the AWS Lambda console to create event source mappings in the walkthrough doc - Lambda_First_Tutorial.pdf.

----

* First, we create a Lambda function and test it by invoking it manually using sample event data.


* Second, we create a DynamoDB stream-enabled table and add an event source mapping in AWS Lambda to associate the stream with your Lambda function. AWS Lambda starts polling the stream. Then, test the end-to-end setup. As you create, update, and delete items from the table, Amazon DynamoDB writes records to the stream. AWS Lambda detects the new records as it polls the stream and executes your Lambda function on your behalf.



In [136]:
# Opening a new file with name in lambda_name(which essentially system_user_name+"lambda") 
# for example skaf48lambda in write mode.

# Writing that small piece of code into the file which is in the form of sring. This is function that executes 
# when lambda is executed
with open(lambda_name+".py", "w") as myfile:
    myfile.write('''\
from __future__ import print_function

def lambda_handler(event, context):
    for record in event['Records']:
        print(record)
    print('Successfully processed %s records.' % str(len(event['Records'])))
                 ''')

In [137]:
import os
import zipfile

# Open a zip file with same name in lambda_name(which essentially system_user_name+"lambda") in write mode
zf = zipfile.ZipFile(lambda_name+".zip", "w")

# Write the contents of above file we created into this zip folder
zf.write(lambda_name+".py")
zf.close()

### Create an IAM role

AWS service role of the type AWS Lambda – This role grants AWS Lambda permissions to assume the role.
AWSLambdaDynamoDBExecutionRole – This is the access permissions policy that you attach to the role.


If you want to create am IAM role using AWS web console you will follow below steps. 

* Sign in to the AWS Management Console and open the IAM console at https://console.aws.amazon.com/iam/.


* Follow the steps in Creating a Role to Delegate Permissions to an AWS Service in the IAM User Guide to create an IAM role (execution role). As you follow the steps to create a role, note the following:
    
    - In Role Name, use a name that is unique within your AWS account (for example, lambda-dynamodb-execution-role).

    - In Select Role Type, choose AWS Service Roles, and then choose AWS Lambda. This grants the AWS Lambda service permissions to assume the role.

    - In Attach Policy, choose AWSLambdaDynamoDBExecutionRole. The permissions in this policy are sufficient for the Lambda function in this tutorial.
    
<br>
Below function does the same thing. 

In [138]:
# Function to create a AWS role for performing lambda 

def create_role(name, policies=None):
    """ Create a role with an optional inline policy """
    policydoc = {
        "Version": "2012-10-17",
        "Statement": [
            {"Effect": "Allow", "Principal": {"Service": ["lambda.amazonaws.com"]}, "Action": ["sts:AssumeRole"]},
        ]
    }
    roles = [r['RoleName'] for r in iam.list_roles()['Roles']]
    if name in roles:
        print('IAM role %s exists' % (name))
        role = iam.get_role(RoleName=name)['Role']
    else:
        print('Creating IAM role %s' % (name))
        role = iam.create_role(RoleName=name, AssumeRolePolicyDocument=json.dumps(policydoc))['Role']

    # attach managed policy
    if policies is not None:
        for p in policies:
            iam.attach_role_policy(RoleName=role['RoleName'], PolicyArn=p)
    return role

In [139]:
# Call above function to create the role with predefined role type and access policy
role = create_role(system_user_name + '_lambda-dynamodb-execution-role', 
                   policies=['arn:aws:iam::aws:policy/service-role/AWSLambdaDynamoDBExecutionRole'])

IAM role skaf48_lambda-dynamodb-execution-role exists


### Create the Lambda Function and Test It Manually

Create a Lambda function by uploading the deployment package. The piece of code in the zip file we created above is the deployment package. Test the Lambda function by invoking it manually. Instead of creating an event source, we use a sample DynamoDB event data which is a set of json records. 

----

Below function if a lambda function with specified name already exists. If yes then it will update the code for existing lambda. Else, it will create a new lambda. 

In [140]:
def create_function(name, zfile, lsize=512, timeout=120, update=False):
    """ Create, or update if exists, lambda function """
    print("role:",role)
    
    with open(zfile, 'rb') as zipfile:
        if name in [f['FunctionName'] for f in lamb.list_functions()['Functions']]:
            if update:
                print('Updating %s lambda function code' % (name))
                return lamb.update_function_code(FunctionName=name, ZipFile=zipfile.read())
            else:
                print('Lambda function %s exists' % (name))
                for f in lamb.list_functions()['Functions']:
                    if f['FunctionName'] == name:
                        lfunc = f
        else:
            print('Creating %s lambda function' % (name))
            lfunc = lamb.create_function(
                FunctionName=name,
                Runtime='python3.6',
                Role=role['Arn'],
                Handler=lambda_name+'.lambda_handler',
                Description='Example lambda function to monitor DynamoDB streams',
                Timeout=timeout,
                MemorySize=lsize,
                Publish=True,
                Code={'ZipFile': zipfile.read()},
            )
        lfunc['Role'] = role
        return lfunc

In [141]:
# Call create_function() to create the lambda. The parameter update=True will ensure the existing lambda is updated with the 
# supplied code in the zip file.

lfunc = create_function(lambda_name, lambda_name+".zip", update=True)

role: {'RoleId': 'AROAISBVGSBM5RBT6VL2S', 'Path': '/', 'CreateDate': datetime.datetime(2017, 11, 8, 19, 42, 25, tzinfo=tzutc()), 'RoleName': 'skaf48_lambda-dynamodb-execution-role', 'AssumeRolePolicyDocument': {'Statement': [{'Action': 'sts:AssumeRole', 'Principal': {'Service': 'lambda.amazonaws.com'}, 'Effect': 'Allow'}], 'Version': '2012-10-17'}, 'Arn': 'arn:aws:iam::714861692883:role/skaf48_lambda-dynamodb-execution-role'}
Creating skaf48lambda lambda function


Sample test DynamoDNB record to test the lambda manually. 

In [143]:
input_data = b"""{
   "Records":[
      {
         "eventID":"1",
         "eventName":"INSERT",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"111",
            "SizeBytes":26,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"2",
         "eventName":"MODIFY",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "NewImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"New item!"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"222",
            "SizeBytes":59,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      },
      {
         "eventID":"3",
         "eventName":"REMOVE",
         "eventVersion":"1.0",
         "eventSource":"aws:dynamodb",
         "awsRegion":"us-east-1",
         "dynamodb":{
            "Keys":{
               "Id":{
                  "N":"101"
               }
            },
            "OldImage":{
               "Message":{
                  "S":"This item has changed"
               },
               "Id":{
                  "N":"101"
               }
            },
            "SequenceNumber":"333",
            "SizeBytes":38,
            "StreamViewType":"NEW_AND_OLD_IMAGES"
         },
         "eventSourceARN":"stream-ARN"
      }
   ]
}"""

In [144]:
# Invoke the lambda manually

response = lamb.invoke(
    FunctionName=lambda_name,
    InvocationType='RequestResponse',
    LogType='Tail',
    Payload=input_data
)

Monitor the activity of your Lambda function in the AWS Lambda console.

The AWS Lambda console shows a graphical representation of some of the CloudWatch metrics in the Cloudwatch Metrics at a glance section for the function. For each graph you can also click the logs link to view the CloudWatch logs directly.

Go to lambda service in AWS web console. The dashboard has graphs for different metrics of the graph. Click on invocation count graph. 

<img src="../images/lambda_graphs.PNG">

Make sure to custom the time line as highlighted in the picture. Set it to last 30 mins so you can see the number of times lambda is invoked in that time period. Since we invoked the service once in last 30 mins, you see the count 1. 

<img src="../images/invocation count.PNG">

Click on logs in cloudwatch and then on your lambda function to open the logs.

<img src="../images/cloudwatch_logs.PNG">

You will see logs created for every record in the input. There are 3 events insert, modify and remove. 

<img src="../images/logs.PNG">

Next step we will add an Event Source, a DynamoDB. Create a DynamoDB Stream and associate it with above Lambda function.

We will do the following below:

* Create an Amazon DynamoDB table with a stream enabled.


* Create an event source mapping in AWS Lambda. This event source mapping associates the DynamoDB stream with your Lambda function. After you create this event source mapping, AWS Lambda starts polling the stream.


* Test the end-to-end experience. As you perform table updates, DynamoDB writes event records to the stream. As AWS Lambda polls the stream, it detects new records in the stream and executes your Lambda function on your behalf by passing events to the function.

### Important

You must create a DynamoDB table in the same region where you created the Lambda function. This notebook assumes the US East (N. Virginia) region. In addition, both the table and the Lambda functions must belong to the same AWS account.

In order to receive DynamoDB updates, you need to enable each table's stream. We have enabled table's stream using the parameter StreamSpecification as shown below


    StreamSpecification={'StreamEnabled': True,
                         'StreamViewType': 'NEW_AND_OLD_IMAGES' }

In [147]:
def create_dynamodb_table(table_name, key_name,KeyType):
    try:
        response = client.describe_table(TableName=table_name)
    except botocore.exceptions.ClientError as e:
        print("DynamoDB table '" + table_name + "' does not appear to exist, creating...")
        table = dynamodb.create_table(
                    TableName = table_name,
                    KeySchema = [ { 'AttributeName': key_name,
                                    'KeyType': 'HASH'  } ], # Partition key
                    AttributeDefinitions = [ 
                                  { 'AttributeName': key_name,
                                  'AttributeType': KeyType 
                                  } ],
                    ProvisionedThroughput = { 'ReadCapacityUnits': 1,
                                              'WriteCapacityUnits': 1 },
                    StreamSpecification={
                                            'StreamEnabled': True,
                                            'StreamViewType': 'NEW_AND_OLD_IMAGES'
                                        }
                )
        # Wait until the table exists.
        table.meta.client.get_waiter('table_exists').wait(TableName=table_name) 
        print("DynamoDB table '" + table_name + "' created.")

In [148]:
create_dynamodb_table(table_name,"courseId","N")

** Write down the stream ARN. You need this in the next step when you associate the stream with your Lambda function.**

In [157]:
# Get the details of DynamoDB cluster
response = client.describe_table(
    TableName=table_name
)

In [158]:
# Get the ARN of stream enabled on dsa_courses dynamodb table.
response["Table"]["LatestStreamArn"]

'arn:aws:dynamodb:us-east-1:714861692883:table/skaf48dsa_courses/stream/2017-11-09T21:49:43.128'

In [163]:
# Run this cell if you want to delete an event source mapping

# response = lamb.delete_event_source_mapping(
#     UUID='050ab64f-b533-4beb-832e-cd129d938ef7'
# )

### Add an Event Source in AWS Lambda

Run below cell calling create_mapping function. After the cell executes, capture the UUID. We need this UUID to refer to the event source mapping in any commands, for example, when deleting the event source mapping.

If the even source mapping already exists then its ARN is stored in source variable. If not an exception is raised. In the exception we are creating an event source and storing the ARN of the same in source variable.

In [160]:
try:
    source = lamb.list_event_source_mappings(FunctionName=lambda_name,
                                           EventSourceArn=response["Table"]["LatestStreamArn"])['EventSourceMappings']
except:
    source = lamb.create_event_source_mapping(FunctionName=lambda_name, 
                                              EventSourceArn=response["Table"]["LatestStreamArn"],
                                              Enabled=True,
                                              StartingPosition='TRIM_HORIZON')

### Test the Setup

You're all done! 

Add, update, delete items to the table dsa_courses DynamoDB table. DynamoDB writes records of these actions to the stream.

AWS Lambda polls the stream and when it detects updates to the stream, it invokes your Lambda function by passing in the event data it finds in the stream.

The lambda function executes and creates logs in Amazon CloudWatch.

Lets go ahead and put a record in the table

### Insert a record into the database table

In [161]:
table.put_item(
   Item={
        'courseName': 'Cloud computing',
        'courseId': 8635,
        'credits': 3,
        'isMandatory':'yes'
    }
)

{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
   'content-length': '2',
   'content-type': 'application/x-amz-json-1.0',
   'date': 'Thu, 09 Nov 2017 22:03:58 GMT',
   'server': 'Server',
   'x-amz-crc32': '2745614147',
   'x-amzn-requestid': 'A2J8N84GD09P81BVPB2RHRBD53VV4KQNSO5AEMVJF66Q9ASUAAJG'},
  'HTTPStatusCode': 200,
  'RequestId': 'A2J8N84GD09P81BVPB2RHRBD53VV4KQNSO5AEMVJF66Q9ASUAAJG',
  'RetryAttempts': 0}}

In [162]:
table.put_item(
   Item={
        'courseName': 'Statmath',
        'courseId': 8610,
        'credits': 3,
        'isMandatory':'yes'
    }
)

{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
   'content-length': '2',
   'content-type': 'application/x-amz-json-1.0',
   'date': 'Thu, 09 Nov 2017 22:04:24 GMT',
   'server': 'Server',
   'x-amz-crc32': '2745614147',
   'x-amzn-requestid': 'QPJK7CJ3NCJ4BUNAUSEB3KMRFFVV4KQNSO5AEMVJF66Q9ASUAAJG'},
  'HTTPStatusCode': 200,
  'RequestId': 'QPJK7CJ3NCJ4BUNAUSEB3KMRFFVV4KQNSO5AEMVJF66Q9ASUAAJG',
  'RetryAttempts': 0}}

### Check the logs


So for the record that is pushed into the table, lambda function created a log for a change that happaned on the table. 

<img src="../images/dynamodb_trigger.PNG">

# Delete the table

Delete the table by running below cell.

In [7]:
response = client.delete_table(
    TableName=table_name
)

# Ignore below cell. It is for troubleshooting

In [None]:
# import boto3

# DDB = boto3.resource('dynamodb').Table('dsa_courses')

# def lambda_handler(event, context):
#     records = event['Records']
#     print("Received %s records" % len(records))

#     for record in records:
#         print(record)

#         # if new record or update
#         if record['eventName'].upper() in {'INSERT', 'MODIFY'}:

#             # primary key
#             record_id = record['dynamodb']['Keys']['Id']['S']

#             # init local vars
#             old_email = old_is_personal = new_email = new_is_personal = None

#             # new and old images
#             old_image = record['dynamodb'].get('OldImage') or {}
#             new_image = record['dynamodb'].get('NewImage') or {}

#             # old values (optional, only on update)
#             if 'Email' in old_image:
#                 old_email = old_image['Email']['S']
#             if 'IsPersonalEmail' in old_image:
#                 old_is_personal = old_image['IsPersonalEmail']['BOOL']

#             # new values
#             if 'Email' in new_image:
#                 new_email = new_image['Email']['S']
#                 new_is_personal = is_personal_email(new_email)

#             # avoid recursion on update and write only if strictly needed
#             if old_email != new_email and old_is_personal != new_is_personal:
#                 update_record(record_id, new_is_personal)

#     print("Processed %s records" % len(records))


# def update_record(record_id, is_personal):
#     print("Updating %s: IsPersonalEmail=%s" % (record_id, is_personal))
#     DDB.update_item(
#         Key={'Id': record_id},
#         UpdateExpression='SET IsPersonalEmail = :val',
#         ExpressionAttributeValues={':val': is_personal or False},
#     )


# def is_personal_email(email):
#     domains = {"gmail.com", "outlook.com", "hotmail.com"}
#     return any(email.endswith(domain) for domain in domains)