In [2]:
import boto3
import psycopg2
import hashlib
import json

# Create a new SQS client
sqs = boto3.client('sqs', endpoint_url='http://localhost:4566')
queue_url = 'http://localhost:4566/000000000000/login-queue'

# Connect to PostgreSQL
conn = psycopg2.connect(
    dbname='postgres',
    user='postgres',
    password='postgres',
    host='localhost',
    port='5432'
)
cur = conn.cursor()

def mask_pii(data):
    """Helper function to mask PII fields."""
    for key in ['device_id', 'ip']:
        if key in data:
            # Mask PII data by hashing it
            data[key] = hashlib.sha256(data[key].encode()).hexdigest()
    if 'app_version' in data:
        # Remove periods and convert to integer
        data['app_version'] = int(data['app_version'].replace('.', ''))
    return data

while True:
    # Receive message from SQS queue
    response = sqs.receive_message(
        QueueUrl=queue_url,
        AttributeNames=['All'],
        MaxNumberOfMessages=1,
        MessageAttributeNames=['All'],
        VisibilityTimeout=0,
        WaitTimeSeconds=0
    )

    if 'Messages' in response:  # when the queue is exhausted, the response dict contains no 'Messages' key
        for message in response['Messages']:  # 'Messages' is a list
            # Process JSON message
            message_body = json.loads(message['Body'])

            # Mask PII data
            masked_data = mask_pii(message_body)

            # Insert the record into the Postgres database
            cur.execute(
                "INSERT INTO user_logins (user_id, device_type, masked_ip, masked_device_id, locale, app_version, create_date) VALUES (%s, %s, %s, %s, %s, %s, %s)",
                (masked_data.get('user_id'), masked_data.get('device_type'), masked_data.get('masked_ip'), masked_data.get('masked_device_id'), masked_data.get('locale'), masked_data.get('app_version'), masked_data.get('create_date'))
            )
            conn.commit()

            # Delete processed message from queue
            sqs.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=message['ReceiptHandle']
            )
    else:
        print('Queue is now empty')
        break

# Close database connection
cur.close()
conn.close()


Queue is now empty
