In [34]:
import psycopg2
import base64
import boto3
import json
import configparser
import argparse
from datetime import datetime
import collections
import sys

In [83]:
def extract(url, queue_name, waitTime, numberOfMessages):
    
    #Instantiating SQS client using boto3 to receive messages from queue
    sqsClient = boto3.client("sqs", endpoint_url = url)
    messages = []
    
    try:
        response = sqsClient.receive_message(
                QueueUrl = url + '/' + queue_name,
                WaitTimeSeconds = waitTime,
                MaxNumberOfMessages = numberOfMessages)
    except Exception as exception:
        #Print the exception occuring while receiving the response
        print("Error - " +str(exception) + '. Try again')
        sys.exit()
        
    #Appending response messages in the queue    
    messages = response['Messages']
    
    return messages

In [84]:
messages = extract('http://localhost:4566', 'login-queue', 10,10)

In [85]:
messages

[{'MessageId': '545e2971-df9d-4bce-969f-7c60c21e079c',
  'ReceiptHandle': 'MDQ0NDhiNDEtMDQ1Yi00OWEwLThkZjEtYmNiNDZkYTM3YzI3IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bG9naW4tcXVldWUgNTQ1ZTI5NzEtZGY5ZC00YmNlLTk2OWYtN2M2MGMyMWUwNzljIDE2NzM5ODA1MTIuNDkzOTgxNg==',
  'MD5OfBody': 'e4f1de8c099c0acd7cb05ba9e790ac02',
  'Body': '{"user_id": "424cdd21-063a-43a7-b91b-7ca1a833afae", "app_version": "2.3.0", "device_type": "android", "ip": "199.172.111.135", "locale": "RU", "device_id": "593-47-5928"}'},
 {'MessageId': '332cfbf9-6d58-4104-857e-21a9ca45e8c9',
  'ReceiptHandle': 'NTRiNmQ4NmItYjJlMS00NjY4LWJkNGUtMTE5YjY0NzAzNDBlIGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bG9naW4tcXVldWUgMzMyY2ZiZjktNmQ1OC00MTA0LTg1N2UtMjFhOWNhNDVlOGM5IDE2NzM5ODA1MTIuNDk0MjM4NA==',
  'MD5OfBody': '347f6ce29bd4f361b13bc54c05d0a5fc',
  'Body': '{"user_id": "c0173198-76a8-4e67-bfc2-74eaa3bbff57", "app_version": "0.2.6", "device_type": "ios", "ip": "241.6.88.151", "locale": "PH", "device_id": "104-25-0070"}'},
 {'

In [134]:
def encodeAndDecode(messageAttributes, operation):
    if operation == 'encode':
        #Converting to bytes like object
        bytes_encoded = messageAttributes.encode('ascii')
        #Converting to base64
        result = base64.b64encode(bytes_encoded).decode('ascii')
        return result
    elif operation == 'decode':
        result = base64.b64decode(messageAttributes).decode('ascii')
        return result

In [171]:
def transform(messages):
    #Transformed messages result to be stored
    transformed_messages = []
    
    try:
        if len(messages) == 0:
            raise IndexError('Message queue is empty')
    except IndexError as error:
        print("Error - " + str(error))
        sys.exit()
    
    for message in messages:
        messageBody = json.loads(message['Body'])
        
        #Get IP and device_id from messageBody
        ip = messageBody['ip']
        device_id = messageBody['device_id']
        
        # Reference: https://stackabuse.com/encoding-and-decoding-base64-strings-in-python/
        # Masking IP and device_id using base64 encoding technique
        encodedIP = encodeAndDecode(ip, 'encode')
        encodedDeviceID = encodeAndDecode(device_id, 'encode')
        
        #Replacing masked IP and device_id with the original attributes
        messageBody['ip'] = encodedIP
        messageBody['device_id'] = encodedDeviceID
        
        #Replacing None with 'None' to insert into postgres
        messageBody['locale'] = 'None' if messageBody['locale'] == None else messageBody['locale']
        
        #Adding create date
        messageBody['create_date'] = datetime.now().strftime("%Y-%m-%d")
        transformed_messages.append(messageBody)

    return transformed_messages
        

In [172]:
transformed_messages = transform(messages)
list(transformed_messages[0].values())

['424cdd21-063a-43a7-b91b-7ca1a833afae',
 '2.3.0',
 'android',
 'MTk5LjE3Mi4xMTEuMTM1',
 'RU',
 'NTkzLTQ3LTU5Mjg=',
 '2023-01-17']

In [169]:
def loadMessages(transformed_messages):
    try:
        if len(transformed_messages) == 0:
            raise IndexError
    except IndexError as error:
        print('Error - '+ str(error))
        sys.exit()
    
    con = psycopg2.connect(dbname = 'postgres', user = 'postgres', password = 'postgres')
    cursor = con.cursor()
    for i in range(len(transformed_messages)):
        data = list(transformed_messages[i].values())
        cursor.execute("INSERT into user_logins( \
                        user_id, app_version, device_type, masked_ip, locale, masked_device_id, create_date \
                        )VALUES(%s,%s,%s,%s,%s,%s,%s);",data)
        con.commit()
    con.close()
    return

In [170]:
loadMessages(transformed_messages)