In [None]:
from __future__ import print_function
import base64
import json
import boto3
from datetime import datetime
from boto3.dynamodb.conditions import Key, Attr

print('Loading function')

"""
The following is example Python code that receives Kinesis event record data as input and processes it. For illustration, the code writes to some of the incoming event data to CloudWatch Logs.
"""

TABLE_NAME = 'OfferImpression2'
dynmodb = boto3.resource('dynamodb')
table = dynmodb.Table(TABLE_NAME)

def lambda_handler(event, context):
    ## 0: aggregae all the event data as user_id, offer_id, number_of_impressions
    ## 1 for each event records pair(user_id, offer_id)
    ## 2:   retrieve (user_id, offer_id)
    ##      if (user_id, offer_id) exits in dynama:
    ##          update the count by adding one
    ##      if not:
    ##          insert new pair
    ##  end 
    print ('Number of records: ', len(event['Records']))
    
    for record in event['Records']:
        #print (record)
        # Kinesis data is base64 encoded so decode here
        #payload = base64.b64decode(record['kinesis']['data'])
        
        ##Obtain the timestamp
        timestamp = record["kinesis"]['approximateArrivalTimestamp']
        dt_object = datetime.fromtimestamp(timestamp)
        dt = str(dt_object.date())
        
        clock_time = datetime.today()

        ## Load the data
        payload = base64.b64decode(record['kinesis']['data'])
                
        payload_dict = json.loads(payload)  ## payload is a str
        
        message = payload_dict['message']
        
        if message['name'] == 'Offer Impression Made':
            #print ('message: ', message)
            try:
                user_id = message['userId']
            except:
                print ('Error with userId:',payload )
                
            try:
                offer_id = message['Offer ID']
            except:
                print ('Error with offer ID: ',payload )
            print ('incoming: ', 'dt: ', dt,' ', 'user_id: ', user_id, ' ', 'offer_id: ', offer_id)
                    
            
            ##Try to obtain record from dynamodb
            try:
                """
                resp = table.get_item(
                            Key={
                                'user_id': user_id,
                                'offer_id': offer_id
                            }
                        )
                """
                
                resp = table.query(
                KeyConditionExpression=Key('user_id').eq(user_id) & Key('offer_id').eq(offer_id),
                FilterExpression = Attr('event_date').eq(dt)
                )
    
                items = resp.get('Items', 0)
                print ('Items in dynamodb: ', items)
                print ('Clock time: ',clock_time,' ','Stream time:',dt_object)
            
                if len(items) == 0:      #if user and offer pair not in DB
                    ##Attach the record, put in the dynamodb
                    table.put_item(
                    Item = {
                        'user_id':user_id,
                        'offer_id':offer_id,
                        'event_date': dt,
                        'total_impressions':1
                        }
                    )
                    #print ('item loaded:', user_id, ' ',offer_id , ' ','Clock time: ',clock_time,' ','stream date:',dt_object,' ','kinesis data: ', payload)
                    #print ('item loaded:', user_id, ' ',offer_id , ' ','Clock time: ',clock_time,' ','stream time:',dt_object)
                
                else: #If the user and offer pair in DB
                    ## update the record by adding 1
                    count_new = (items[0]['total_impressions']+1)
                    #print ('Total count: ',count_new)
                    table.update_item(
                        Key={
                            'user_id': user_id,
                            'offer_id': offer_id
                        },
                        UpdateExpression='SET event_date = :val1, total_impressions = :val2',
                        ExpressionAttributeValues={
                        ':val1': dt, ':val2': count_new
                        }
                    )
                    #print ('item updated:', user_id, ' ',offer_id , ' ','Clock time: ',clock_time,' ','stream time:',dt_object)
            except:
                print ('Error with get_item')
            
            
            

            


In [None]:
 from __future__ import print_function
import base64
import json
import boto3
from datetime import datetime
from boto3.dynamodb.conditions import Key, Attr

print('Loading function')

"""
The following is example Python code that receives Kinesis event record data as input and processes it. For illustration, the code writes to some of the incoming event data to CloudWatch Logs.
"""

TABLE_NAME = 'OfferIMpression3'
dynmodb = boto3.resource('dynamodb')
table = dynmodb.Table(TABLE_NAME)

def lambda_handler(event, context):
    ## 0: aggregae all the event data as user_id, offer_id, number_of_impressions
    ## 1 for each event records pair(user_id, offer_id)
    ## 2:   retrieve (user_id, offer_id)
    ##      if (user_id, offer_id) exits in dynama:
    ##          update the count by adding one
    ##      if not:
    ##          insert new pair
    ##  end 
    print ('Number of records: ', len(event['Records']))
    
    for record in event['Records']:
        #print (record)
        # Kinesis data is base64 encoded so decode here
        #payload = base64.b64decode(record['kinesis']['data'])
        
        ##Obtain the timestamp
        timestamp = record["kinesis"]['approximateArrivalTimestamp']
        dt_object = datetime.fromtimestamp(timestamp)
        dt = str(dt_object.date())
        
        clock_time = datetime.today()

        ## Load the data
        payload = base64.b64decode(record['kinesis']['data'])
                
        payload_dict = json.loads(payload)  ## payload is a str
        
        message = payload_dict['message']
        
        if message['name'] == 'Offer Impression Made':
            #print ('message: ', message)
            try:
                user_id = message['userId']
            except:
                print ('Error with userId:',payload )
                
            try:
                offer_id = message['Offer ID']
            except:
                print ('Error with offer ID: ',payload )
            print ('incoming: ', 'dt: ', dt,' ', 'user_id: ', user_id, ' ', 'offer_id: ', offer_id,' ','Clock time: ',clock_time,' ','stream date:',dt_object)
                    
            
            ##Try to obtain record from dynamodb
            try:
                """
                resp = table.get_item(
                            Key={
                                'user_id': user_id,
                                'offer_id': offer_id
                            }
                        )
                """
                
                resp = table.query(
                KeyConditionExpression=Key('user_id').eq(user_id) & Key('offer_id').eq(offer_id),
                FilterExpression = Attr('event_date').eq(dt)
                )
    
                items = resp.get('Items', 0)
                print ('Items in dynamodb: ', items)
            
                if len(items) == 0:      #if user and offer pair not in DB
                    ##Attach the record, put in the dynamodb
                    table.put_item(
                    Item = {
                        'user_id':user_id,
                        'offer_id':offer_id,
                        'event_date': dt,
                        'total_impressions':1
                        }
                    )
                    #print ('item loaded:', user_id, ' ',offer_id , ' ','Clock time: ',clock_time,' ','stream date:',dt_object,' ','kinesis data: ', payload)
                    #print ('item loaded:', user_id, ' ',offer_id , ' ','Clock time: ',clock_time,' ','stream time:',dt_object)
                
                else: #If the user and offer pair in DB
                    ## update the record by adding 1
                    count_new = (items[0]['total_impressions']+1)
                    #print ('Total count: ',count_new)
                    table.update_item(
                        Key={
                            'user_id': user_id,
                            'offer_id': offer_id
                        },
                        UpdateExpression='SET event_date = :val1, total_impressions = :val2',
                        ExpressionAttributeValues={
                        ':val1': dt, ':val2': count_new
                        }
                    )
                    #print ('item updated:', user_id, ' ',offer_id , ' ','Clock time: ',clock_time,' ','stream time:',dt_object)
            except:
                print ('Error with get_item')
            
            
            

            
