# Step 4 - Message Processor

We launch our Stream processor to check for existing messages, rety every 5 seconds

In [None]:
import boto3
import json
import time
import pandas as pd
#SQS client library
sqs = boto3.client('sqs'
    , endpoint_url="http://localhost:4576" #only for test purposes
    , use_ssl=False #only for test purposes
    , region_name='us-east-1')
queue_url = 'http://localhost:4576/queue/sse_queue'
s3 = boto3.resource('s3'
    , endpoint_url="http://localhost:4572" #only for test purposes
    , use_ssl=False #only for test purposes
    , region_name='us-east-1')
#desired payload
map_keys = ['id','type','namespace','title','comment','timestamp','user','bot', 'ReceiptHandle']
list_msgs = [] 

def read_batch():
    while True:
        try:
            response = sqs.receive_message(
                QueueUrl = queue_url,
                MaxNumberOfMessages = 10 #Max Batch size
            )
            process_batch( response['Messages'] )
        except KeyError:
            print('\rNo messages available, retrying in 5 seconds...', sep=' ', end='', flush=True)
            time.sleep(5) 

def process_batch( messages ):
    global list_msgs
    for message in messages:
        d = json.loads(message['Body'])
        #This just cleans the message's body from non-desired data 
        clean_dict = { key:(d[key] if key in d else None) for key in map_keys }
        #We enrich our df with the message's receipt handle in order to clean it from the queue
        clean_dict['ReceiptHandle'] = message['ReceiptHandle']
        list_msgs.append(clean_dict)

    if len( list_msgs ) >= 100:
        print('\rBatch ready to be exported to the Data Lake', sep=' ', end='', flush=True)
        to_data_lake( list_msgs )
        list_msgs = list()

def to_data_lake( df ):
    batch_df = pd.DataFrame( list_msgs )
    csv = batch_df.to_csv( index=False )
    filename = 'batch-%s.csv' % df[0]['id']
    #csv to s3 bucket
    s3.Bucket('sse-bucket').put_object( Key=filename, Body=csv, ACL='public-read' )
    print('\r%s saved into the Data Lake' % filename, sep=' ', end='', flush=True)
    remove_messages( batch_df )

def remove_messages( df ):
    for receipt_handle in df['ReceiptHandle'].values:
        sqs.delete_message(
            QueueUrl = queue_url,
            ReceiptHandle = receipt_handle
        )


In [None]:
read_batch()