First, run the instructions in here - [Accessing AWS](https://www.notion.so/lattice/Accessing-AWS-043c558c833044e38329ef3dc9c1ea26#4e7d707777864670814e37aef0a08a6b) which will log you in using your okta roles. this will last 4 hours.

In [1]:
"""
SCRIPT TO GET MESSAGES FROM QUEUE AND WRITE TO CSV
"""

import boto3
import csv
import json
from botocore.exceptions import NoCredentialsError, PartialCredentialsError

def flatten_json(y, parent_key='', sep='.'):
    """
    Flattens a nested JSON object.

    :param y: The JSON object to flatten.
    :param parent_key: The base key string.
    :param sep: Separator between keys.
    :return: A flattened dictionary.
    """
    items = {}
    for k, v in y.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.update(flatten_json(v, new_key, sep=sep))
        elif isinstance(v, list):
            # Handle lists by joining items with a separator or indexing
            items[new_key] = '; '.join(map(str, v))
        else:
            items[new_key] = v
    return items

def main():
    try:
        # Initialize a session using the default credentials (managed by ktool)
        session = boto3.Session(
            region_name='us-west-2',  # Replace with your AWS region
            profile_name='lattice'
        )

        # Create SQS client using the session
        sqs = session.client('sqs')

        # Get the URL for the SQS queue
        # queue_url = sqs.get_queue_url(QueueName='weaver-worker-text-analysis-dlq')['QueueUrl']
        queue_url = sqs.get_queue_url(QueueName='text-analysis-clustering-dlq')['QueueUrl']

        all_messages = []
        csv_headers = set(['MessageId', 'ReceiptHandle'])  # Initial headers

        # Define maximum number of messages to prevent infinite loops (optional)
        MAX_MESSAGES = 10000  # Adjust as needed
        fetched_messages = 0

        while True:
            # Receive messages from SQS queue
            response = sqs.receive_message(
                QueueUrl=queue_url,
                MaxNumberOfMessages=10,  # Maximum allowed by SQS
                VisibilityTimeout=30,    # Adjust based on processing time
                WaitTimeSeconds=10       # Long polling
            )

            messages = response.get('Messages', [])

            if not messages:
                print("No more messages available in the queue.")
                break

            for message in messages:
                message_id = message.get('MessageId', '')
                receipt_handle = message.get('ReceiptHandle', '')
                body = message.get('Body', '{}')  # Default to empty JSON if Body is missing

                try:
                    # Parse the JSON body
                    body_json = json.loads(body)
                except json.JSONDecodeError as e:
                    print(f"Error decoding JSON for MessageId {message_id}: {e}")
                    body_json = {}

                # Flatten the JSON body
                flattened_body = flatten_json(body_json)

                # Update headers with keys from the flattened JSON
                csv_headers.update(flattened_body.keys())

                # Combine message metadata with flattened body
                combined_message = {
                    'MessageId': message_id,
                    'ReceiptHandle': receipt_handle
                }
                combined_message.update(flattened_body)

                all_messages.append(combined_message)

                # Optional: Delete the message after processing
                # Uncomment the following lines if you want to delete messages
                # sqs.delete_message(
                #     QueueUrl=queue_url,
                #     ReceiptHandle=receipt_handle
                # )

                fetched_messages += 1

                # Check if maximum message limit is reached
                if fetched_messages >= MAX_MESSAGES:
                    print(f"Reached maximum limit of {MAX_MESSAGES} messages.")
                    break

            # Optional: Print progress
            print(f"Fetched {fetched_messages} messages so far.")

            # Exit if maximum message limit is reached
            if fetched_messages >= MAX_MESSAGES:
                break

        if not all_messages:
            print("No messages were fetched from the queue.")
            return

        # Define the order of CSV headers (optional)
        ordered_headers = ['MessageId', 'ReceiptHandle'] + sorted(k for k in csv_headers if k not in ['MessageId', 'ReceiptHandle'])

        # Write CSV data to a file
        with open('sqs_messages.csv', 'w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=ordered_headers)
            writer.writeheader()
            for msg in all_messages:
                writer.writerow(msg)

        print(f"{len(all_messages)} messages have been written to sqs_messages.csv")

    except NoCredentialsError:
        print("Error: AWS credentials not found. Please authenticate using ktool.")
    except PartialCredentialsError:
        print("Error: Incomplete AWS credentials. Please check your ktool configuration.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    main()


Fetched 10 messages so far.
Fetched 20 messages so far.
Fetched 30 messages so far.
Fetched 40 messages so far.
Fetched 50 messages so far.
Fetched 60 messages so far.
Fetched 70 messages so far.
Fetched 80 messages so far.
Fetched 90 messages so far.
Fetched 100 messages so far.
Fetched 110 messages so far.
Fetched 120 messages so far.
Fetched 130 messages so far.
Fetched 140 messages so far.
Fetched 150 messages so far.
Fetched 160 messages so far.
Fetched 170 messages so far.
Fetched 180 messages so far.
Fetched 190 messages so far.
Fetched 200 messages so far.
Fetched 210 messages so far.
Fetched 220 messages so far.
Fetched 230 messages so far.
Fetched 240 messages so far.
Fetched 250 messages so far.
Fetched 260 messages so far.
Fetched 270 messages so far.
Fetched 280 messages so far.
Fetched 290 messages so far.
Fetched 300 messages so far.
Fetched 310 messages so far.
Fetched 320 messages so far.
Fetched 330 messages so far.
Fetched 340 messages so far.
Fetched 350 messages so

In [None]:
"""
Description: Script to get messages from the clustering DLQ and move it to the beginning of the process by sending an event to EventBridge.
Why would I do this? 
Usually just redrive the DLQ and starting from the clustering step shoudl be good enough, but we've had situations where the messages in the DLQ are so old
that their associated files in S3 have been deleted. 
In that case, we can't just redrive the DLQ, we need to start from the beginning of the process.


🚨 WARNING: PLEASE USE THIS CAREFULLY. 
Read through the script and make necessary adjustments before using. 
We are querying the Database directly, so we want to be careful with that. Additionally, this script can delete messages from SQS - 
and it's a permenant delete, so I've added an input to make sure that the user wants to delete the message.
IF you have a lot of messages, and are confident that the script is doing what you need, then you can comment out those input logs. 

How to use?
1. Replace your DB user name and password in the psycopg.connect() method.
2. Run the script.

"""

import boto3
import json
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import psycopg
from psycopg import sql

def main():

    # Make sure the user wants to proceed
    are_you_sure = input('Running this script will send events to EventBridge and delete messages from the SQS queue. Are you sure you want to proceed? (y/n): ')
    if are_you_sure.lower() != 'y':
        print("Exiting script.")
        return
    

    try:
        # Initialize a session using the default credentials (managed by ktool)
        session = boto3.Session(
            region_name='us-west-2',  # Replace with your AWS region
            profile_name='lattice'
        )

        # Create SQS and EventBridge clients using the session
        sqs = session.client('sqs')
        eventbridge = session.client('events')

        # Get the URL for the SQS queue
        queue_url = sqs.get_queue_url(QueueName='text-analysis-clustering-dlq')['QueueUrl']

        all_messages = []

        # Define maximum number of messages to prevent infinite loops (optional)
        MAX_MESSAGES = 100  # Adjust as needed
        fetched_messages = 0

        print("Starting to create database connection")
        # Establish a single database connection outside the loop
        with psycopg.connect('host=cyral-sidecar.lattice.com port=5432 dbname=entitystore user=user.name@lattice.com password=<cyral_password_here> sslmode=require') as conn:
            
            print("Database connection established")
            with conn.cursor() as cur:
                print("Database cursor created")
                while True:
                    # Receive messages from SQS queue
                    response = sqs.receive_message(
                        QueueUrl=queue_url,
                        MaxNumberOfMessages=10,  # Increase to fetch more messages per request
                        VisibilityTimeout=30,    # Adjust based on processing time
                        WaitTimeSeconds=10       # Long polling
                    )

                    messages = response.get('Messages', [])

                    if not messages:
                        print("No more messages available in the queue.")
                        break

                    for message in messages:
                        message_id = message.get('MessageId', '')
                        receipt_handle = message.get('ReceiptHandle', '')
                        body = message.get('Body', '{}')  # Default to empty JSON if Body is missing

                        try:
                            # Parse the JSON body
                            body_json = json.loads(body)
                        except json.JSONDecodeError as e:
                            print(f"Error decoding JSON for MessageId {message_id}: {e}")
                            body_json = {}

                        # Parse out the entity id from the body
                        # Example path: 'standard/performanceSummaries/hierarchical/24b0e6ef-25e6-4a24-b72a-7ff6e5783a7e.csv'
                        record = body_json.get('Records', [{}])[0]
                        s3_key = record.get('s3', {}).get('object', {}).get('key', '')

                        if not s3_key:
                            print(f"Skipping message {message_id} with no object key.")
                            continue

                        # Extract the entity_id by taking the last part after '/' and removing '.csv'
                        last_part = s3_key.rsplit('/', 1)[-1]
                        entity_id = last_part.rsplit('.', 1)[0]

                        print(f"Processing MessageId: {message_id}, Entity ID: {entity_id}")

                        try:
                            # Execute the SQL query securely
                            query = sql.SQL("SELECT * FROM text_analysis.summary_analyses WHERE entity_id = %s")
                            cur.execute(query, (entity_id,))
                            summary_analysis = cur.fetchone()
                            # Get column names from cursor.description
                            columns = [desc[0] for desc in cur.description]
                            # Create a dictionary mapping column names to their respective values
                            summary_analysis_dict = dict(zip(columns, summary_analysis))

                            print(f"Summary Analysis for Entity ID {entity_id}: {summary_analysis}")
                            company_entity_id = summary_analysis_dict.get('company_entity_id')
                            if company_entity_id is not None:
                                company_entity_id = str(company_entity_id)
                            product_surface = summary_analysis_dict.get('product_surface')
                            target_entity_id = summary_analysis_dict.get('target_entity_id')
                            if target_entity_id is not None: 
                                target_entity_id = str(target_entity_id)
                            target_entity_type = summary_analysis_dict.get('target_entity_type')
                            target_key = summary_analysis_dict.get('target_key')

                            event_detail = {
                                'summaryAnalysisEntityId': entity_id,
                                'companyEntityId': company_entity_id,
                                'productSurface': product_surface,
                                'targetEntityId': target_entity_id,
                                'targetEntityType': target_entity_type,
                                'targetKey': target_key,
                            }
                            print(event_detail)
                            print(json.dumps(event_detail))
                        except psycopg.Error as db_err:
                            print(f"Database error for Entity ID {entity_id}: {db_err}")
                            # Optionally, continue to next message or handle as needed
                            continue


                        # get user input if we should send the event to eventbridge (comment out the next line for faster execution)
                        send_event = input("Send event to EventBridge? (y/n): ")
                        if send_event.lower() != 'y':
                            print(f"Skipping sending event for Entity ID {entity_id}.")
                            continue
                        # Send message to EventBridge to start the process again
                        try:
                            eventbridge_response = eventbridge.put_events(
                                Entries=[
                                    {
                                        'Source': 'lattice.custom.text-analysis',
                                        'DetailType': 'lattice.ai-platform.summary-analysis.generation-triggered',
                                        'Detail': json.dumps(event_detail),
                                        'EventBusName': 'lattice-event-bus',
                                    }
                                ]
                            )
                            print(f"EventBridge response: {eventbridge_response}")
                            
                            print("failed entry count: ", eventbridge_response.get('FailedEntryCount'))
                            print(f"Event sent to EventBridge for Entity ID {entity_id}.")

                            if(eventbridge_response.get('FailedEntryCount') == 0):
                              
                                try:
                                     # Optional: Delete the message after successful processing  (comment out the next line for faster execution)
                                    delete_msg = input("Delete message from SQS? (y/n): ")
                                    if delete_msg.lower() != 'y': 
                                        print(f"Skipping deletion of message {message_id}.")
                                        continue
                                    sqs.delete_message(
                                        QueueUrl=queue_url,
                                        ReceiptHandle=receipt_handle
                                    )
                                    print(f"Deleted MessageId: {message_id} from SQS queue.")
                                except Exception as del_err:
                                    print(f"Failed to delete MessageId {message_id}: {del_err}")
                            else:
                                print("Error sending to EventBridge")
                                # Optionally, handle the error
                           
                          
                        except Exception as eb_err:
                            print(f"Failed to send event for Entity ID {entity_id}: {eb_err}")
                            
                        fetched_messages += 1
                        print("Fetched messages: ", fetched_messages)
                        # Check if maximum message limit is reached
                        if fetched_messages >= MAX_MESSAGES:
                            print(f"Reached maximum limit of {MAX_MESSAGES} messages.")
                            break

                    # Optional: Print progress
                    print(f"Fetched {fetched_messages} messages so far.")

                    # Exit if maximum message limit is reached
                    if fetched_messages >= MAX_MESSAGES:
                        break

        if not all_messages:
            print("No messages were fetched from the queue.")
            return

    except NoCredentialsError:
        print("Error: AWS credentials not found. Please authenticate using ktool.")
    except PartialCredentialsError:
        print("Error: Incomplete AWS credentials. Please check your ktool configuration.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

if __name__ == "__main__":
    main()


Starting to create database connection
Database connection established
Database cursor created
Processing MessageId: fb645193-a47d-4d1c-9a5f-3b0021dbaf4a, Entity ID: a481fed7-a6d5-4cbe-8c52-706ad49fdb03
Summary Analysis for Entity ID a481fed7-a6d5-4cbe-8c52-706ad49fdb03: (UUID('a481fed7-a6d5-4cbe-8c52-706ad49fdb03'), datetime.datetime(2024, 12, 12, 15, 42, 56, 141000, tzinfo=datetime.timezone.utc), datetime.datetime(2025, 1, 2, 14, 46, 29, 68000, tzinfo=datetime.timezone.utc), UUID('1c541048-8a3e-11ef-bbb6-df7ba7312a77'), 'complete', 'performanceSummaries12Month', 'reviewee', UUID('c2ae037f-a8d5-43a9-9ce6-78d59b913da5'), None)
c2ae037f-a8d5-43a9-9ce6-78d59b913da5
performanceSummaries12Month
{'summaryAnalysisEntityId': 'a481fed7-a6d5-4cbe-8c52-706ad49fdb03', 'companyEntityId': 'c2ae037f-a8d5-43a9-9ce6-78d59b913da5', 'productSurface': 'performanceSummaries12Month', 'targetEntityId': '1c541048-8a3e-11ef-bbb6-df7ba7312a77', 'targetEntityType': 'reviewee', 'targetKey': None}
{"summaryAnalys