In [0]:
import boto3
import json
import re
from email import message_from_string

# Initialize AWS SQS client
sqs = boto3.client('sqs', region_name='us-east-1')

# Replace with your actual SQS Queue URL
queue_url = 'https://sqs.us-east-1.amazonaws.com/221082192243/email-file-paths-queue'

# Function to analyze the email content
def analyze_email(email_content):
    email_msg = message_from_string(email_content)
    body = email_msg.get_payload()
    thread_pattern = re.compile(r"(?i)(^From: .+?$|^Date: .+?$|^To: .+?$)", re.MULTILINE)
    matches = thread_pattern.findall(body)
    estimated_email_count = len(matches) // 3
    if estimated_email_count >= 2:
        return {"looks_like_thread": True, "thread_email_count": estimated_email_count}
    return {"looks_like_thread": False, "thread_email_count": 1}

# Function to process messages from SQS
def process_email_threads():
    print("Polling messages from SQS...")

    no_message_retries = 0  # Counter for retries when no messages are found
    max_retries = 5  # Define how many retries before stopping

    while no_message_retries < max_retries:
        response = sqs.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20
        )

        messages = response.get('Messages', [])
        if not messages:
            no_message_retries += 1
            print("No new messages. Waiting... (Retry {}/{})".format(no_message_retries, max_retries))
            continue

        # Reset retries on receiving messages
        no_message_retries = 0

        for message in messages:
            try:
                sns_notification = json.loads(message['Body'])
                message_body = json.loads(sns_notification.get('Message', '{}'))
                file_path = message_body['file_path']
                print(f"Processing email file: {file_path}")

                # Read and analyze the email content
                email_content = dbutils.fs.head(file_path, 100000)
                result = analyze_email(email_content)

                # Display results
                if result["looks_like_thread"]:
                    print("This may be an email thread.")
                else:
                    print("Identified as a single email.")
                print(f"Number of emails detected: {result['thread_email_count']}")

                # Delete message after processing
                sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
                print("Message processed and deleted.")

            except Exception as e:
                print(f"Error processing message: {e}")

    print("No new messages after multiple retries. Stopping code.")

# Run the function
process_email_threads()