In [1]:
import boto3
import json
import hashlib
import psycopg2
from hashlib import md5

In [2]:
# AWS SQS configuration
queue_url = 'http://localhost:4566/000000000000/login-queue'
sqs = boto3.client('sqs', region_name='us-east-2', endpoint_url=queue_url,
                   aws_access_key_id='dummy', aws_secret_access_key='dummy')

In [3]:
# Postgres configuration
conn = psycopg2.connect(
    dbname='postgres',
    user='postgres', 
    password='postgres',
    host='localhost', 
    port='5432'
)
cur = conn.cursor()


In [4]:
def main():
    # Receive messages from the SQS queue
    response = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=10,
        VisibilityTimeout=0,
        WaitTimeSeconds=0
    )    

    if 'Messages' in response:
        for message in response['Messages']:
            message = response['Messages'][0]
            body = json.loads(message['Body'])
            print(message)

            # Hash the device_id and ip for masking and duplicate identification
            device_id = body.get('device_id', '19991')
            ip = body.get('ip', '0.0.0.0')
            masked_device_id = hashlib.md5(device_id.encode()).hexdigest()
            masked_ip = hashlib.md5(ip.encode()).hexdigest()
            print(masked_device_id)
            print(masked_ip)

            # Insert the masked data into user_logins table
            with conn:
                with conn.cursor() as cur:
                    insert_query = """
                        INSERT INTO user_logins (user_id, device_type, masked_ip, masked_device_id, locale, app_version, create_date)
                        VALUES (%s, %s, %s, %s, %s, %s, NOW())
                    """
                    cur.execute(insert_query, (
                        body['user_id'],
                        body['device_type'],
                        masked_ip,
                        masked_device_id,
                        body['locale'],
                        int(body['app_version'].replace('.', '')) if body['app_version'] else None
                    ))
                    conn.commit()

            # Delete the message from the queue after processing
            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=message['ReceiptHandle']
            )

    else:
        print('No messages in the queue')

if __name__ == "__main__":
    main()


{'MessageId': '5d43931a-3628-4a8b-ad0b-4e53ec3e8041', 'ReceiptHandle': 'MjhjZWJhMWQtNjVkOS00ODZjLTlkYmUtMDMyMTRkNDczOWM4IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bG9naW4tcXVldWUgNWQ0MzkzMWEtMzYyOC00YThiLWFkMGItNGU1M2VjM2U4MDQxIDE2OTIzMjMwNTIuMjQxMTQ4Nw==', 'MD5OfBody': '1dbb6e2abb89c160500061a651a05f6d', 'Body': '{"user_id": "60b9441c-e39d-406f-bba0-c7ff0e0ee07f", "app_version": "0.4.6", "device_type": "android", "ip": "223.31.97.46", "locale": "FR", "device_id": "149-99-5185"}'}
077c18a43906fc44531de05a0a5e521e
90953df5cd9402cd0bffec27608623bc
{'MessageId': '5d43931a-3628-4a8b-ad0b-4e53ec3e8041', 'ReceiptHandle': 'MjhjZWJhMWQtNjVkOS00ODZjLTlkYmUtMDMyMTRkNDczOWM4IGFybjphd3M6c3FzOnVzLWVhc3QtMTowMDAwMDAwMDAwMDA6bG9naW4tcXVldWUgNWQ0MzkzMWEtMzYyOC00YThiLWFkMGItNGU1M2VjM2U4MDQxIDE2OTIzMjMwNTIuMjQxMTQ4Nw==', 'MD5OfBody': '1dbb6e2abb89c160500061a651a05f6d', 'Body': '{"user_id": "60b9441c-e39d-406f-bba0-c7ff0e0ee07f", "app_version": "0.4.6", "device_type": "android", "ip": "223.31.97.46"