# Simulating Event Processing in AWS Locally

**Introduction**: In this notebook, I simulate an event processing pipeline similar to what would be implemented using AWS services like EventBridge, Lambda, S3, CloudWatch, and Dead Letter Queue (DLQ). I simulate event generation, validation, processing, anomaly detection, and failure handling with retries. The goal is to mimic the core functionalities in a local environment for better understanding and testing of the entire event-driven architecture.


In [1]:
import random
import json
import time
import uuid
from datetime import datetime

Simulating Event Generation Here, we generate simulated customer event data. 
Each event includes attributes like eventId, customerId, salesRepId, eventType, and other details. 
A random selection is made for different event types and platforms. 
Additionally, a percentage of faulty events is introduced to simulate real-world data irregularities.

In [2]:
# ------------------- Simulate Event Generation -------------------

def generate_customer_event(i):
    customer_ids = ["C123", "C456", "C789", "C101", "C112"]
    sales_rep_ids = ["SR001", "SR002", "SR003", "SR004"]
    event_types = ["app_open", "inquiry", "meeting_scheduled", "call_logged", "ticket_created", "ticket_resolved"]

    event = {
        "eventId": 100000 + i,
        "customerId": random.choice(customer_ids),
        "salesRepId": random.choice(sales_rep_ids),
        "eventType": random.choice(event_types),
        "eventDetails": "Details about the event",
        "timestamp": datetime.now().isoformat(),
        "platform": random.choice(["iOS", "Android"]),
        "region": random.choice(["US", "EU", "IN", "APAC"]),
    }

    # Ensure a percentage of faulty events
    prob = random.random()
    
    if prob < 0.1:  
        event["customerId"] = None  # 10% missing customerId (critical failure)
    elif prob < 0.2:
        event["eventType"] = None  # 10% missing eventType (critical failure)
    elif prob < 0.3:
        event["platform"] = "Windows"  # 10% invalid platform (anomaly/ incorrect business data)

    return event

CloudWatch Logging Function This cell simulates logging messages to AWS CloudWatch. Logs are stored in a list and printed to the console to give a visual representation of CloudWatch logs, which would normally be handled by the AWS SDK for Python (Boto3).



In [3]:
# ------------------- CloudWatch Logging Function -------------------
logs = []
def log_cloudwatch(message):
    """
    Simulates logging a message to CloudWatch.
    """
    # In a real scenario, you would use AWS SDK for Python (Boto3) to log to CloudWatch Logs
    logs.append(message)
    print(f"[CloudWatch Log] {message}")

Mock Dead Letter Queue (DLQ) This function simulates sending failed events to a Dead Letter Queue (DLQ). Events that fail validation or processing steps are logged with the reason for failure and stored in the dlq list for later investigation.

In [4]:
# ------------------- Mock Dead Letter Queue (DLQ) -------------------
dlq = []  # Stores events that failed

def send_to_dlq(event, reason="Unknown"):
    """
    Sends the failed event to the Dead Letter Queue (DLQ) for further analysis.
    Includes a reason for failure.
    """
    global dlq
    
    event_with_reason = event.copy()
    event_with_reason["failure_reason"] = reason  # Attach failure reason
    
    dlq.append(event_with_reason)  # Store event in DLQ
    
    log_cloudwatch(f"Event {event['eventId']} sent to DLQ due to: {reason}.")


EventBridge Rule Check Function This function simulates the validation of events against EventBridge rules. The check ensures that all required fields are present in the event. If any field is missing, the event is flagged and sent to the DLQ for further review.



In [5]:
# ------------------- EventBridge Rule Check Function -------------------

def eventbridge_rule_check(event):
    """
    Basic structure validation for event routing via EventBridge (only checks if values are there).
    Logs to CloudWatch.
    """
    global event_bus
    required_fields = ['eventId', 'customerId', 'salesRepId', 'eventType', 'timestamp', 'platform', 'region']
    
    for field in required_fields:
        if field not in event:
            log_cloudwatch(f"EventBridge validation failed: Missing required field '{field}' in event {event['eventId']}. Sent to DLQ")
            #move_to_dlq(event, "EventBridge Rule")
            return False
    
    log_cloudwatch(f"EventBridge rule validation passed for event {event['eventId']}.")
    return True


Lambda Validation Check Function This cell simulates the validation logic that would be applied to events within an AWS Lambda function. It checks for missing or invalid values in the event data (e.g., customerId, eventType). Events failing this validation are moved to the DLQ.



In [6]:
# ------------------- Lambda Validation Check Function -------------------

def lambda_validation_check(event):
    """
    Validates the event in the Lambda function based on business logic.
    Logs to CloudWatch.
    """
    if event['customerId'] is None or event['customerId'] == "":
        log_cloudwatch(f"Lambda validation failed: Missing or invalid 'customerId' in event {event['eventId']}. Sending to DLQ")
        #send_to_dlq(event, "Lambda validation Rule")
        return False

    if event['eventType'] is None or event['eventType'] == "":
        log_cloudwatch(f"Lambda validation failed: Missing or invalid 'eventType' in event {event['eventId']}. Sending to DLQ")
        #send_to_dlq(event, "Lambda validation Rule")
        return False

    valid_event_types = ["app_open", "inquiry", "meeting_scheduled", "call_logged", "ticket_created", "ticket_resolved"]
    if event['eventType'] not in valid_event_types:
        log_cloudwatch(f"Lambda validation failed: Invalid 'eventType' {event['eventType']} in event {event['eventId']}. Sending to DLQ")
        #send_to_dlq(event, "Lambda validation Rule")
        return False
    
    log_cloudwatch(f"Lambda validation passed for event {event['eventId']}.")
    return True

Mock S3 Storage Simulates the storage of events in AWS S3. Events are stored based on whether they are successful or failed. The storage is tracked to ensure deduplication, and logs are generated whenever an event is moved to S3 storage.

In [7]:
# ------------------- Mock S3 Storage -------------------

storage = {
    "success": set(),  # Track event IDs for uniqueness
    "failed": set()
}
event_logs = {
    "success": {},
    "failed": {}
}

def move_to_s3(event, status="success"):
    """
    Simulates storing the event in S3 while ensuring deduplication with overwrite.
    Stores full event details in event_logs and tracks unique event IDs in storage.
    """
    event_id = event['eventId']

    # Store event ID in tracking set
    storage[status].add(event_id)

    # Overwrite event details in event_logs
    event_logs[status][event_id] = {
        "eventId": event['eventId'],
        "customerId": event['customerId'],
        "salesRepId": event['salesRepId'],
        "eventType": event['eventType'],
        "timestamp": event['timestamp'],
        "platform": event['platform'],
        "region": event['region'],
        "eventDetails": event['eventDetails'],
    }

    log_cloudwatch(f"Event {event_id} stored in {status} storage (S3)")

SageMaker Anomaly Detection This cell simulates anomaly detection that might be done with AWS SageMaker or another anomaly detection service. It checks for invalid data points (e.g., platform set to "Windows") and logs any anomalies detected.

In [8]:

# ------------------- SageMaker Anomaly Detection -------------------

def detect_anomalies(event):
    """
    Checks for any anomalies OR incorrectly processed data in the event data after successful calculation.
    Logs the anomaly detection in CloudWatch.
    Apply Anomaly detection algorithms to find. Below is just an example.
    """
    anomalies = []
    
    if event['platform'] == "Windows":  # Invalid platform
        anomalies.append("Anomaly: Invalid platform: Windows.")
    
    if anomalies:
        log_cloudwatch(f"Anomaly detected for event {event['eventId']}: {anomalies}")
        print(f"Event {event['eventId']} has anomalies: {anomalies}")
        return True  # Anomaly detected
    return False


Correcting Anomalies Simulates the correction of detected anomalies. If anomalies like an invalid platform are found in an event, this function modifies the event to correct the issue (e.g., changing platform to "iOS").



In [9]:
# ------------------- Already Calculated and flagged events Correction Function -------------------

def correct_anomaly(event):
    """
    Simulates correcting anomalies in a successfully calculated event.
    """
    log_cloudwatch(f"Correcting anomaly in event {event['eventId']}...")
    
    # Example: Fix invalid platform
    if event['platform'] == "Windows":
        event['platform'] = "iOS"  # Correcting platform anomaly
    
    return event


CloudWatch Anomaly Flagging This function simulates the process of flagging events with anomalies in CloudWatch. It also triggers the correction process by invoking the anomaly correction function defined in the previous cell.



In [10]:
# ------------------- CloudWatch Anomaly Flagging -------------------

def cloudwatch_flag_event_and_trigger_correction(event):
    """
    Logs a flagged anomaly event in CloudWatch and trigger correction lambda.
    """
    message = f"[CloudWatch Anomaly] Event {event['eventId']} flagged for anomalies"
    log_cloudwatch(message)
    # Try to correct the anomaly
    corrected_event = correct_anomaly(event)
    return corrected_event

Correction for Failed Events Simulates the correction of events that initially failed validation or processing. After correction, events are moved to the success storage (S3) to track the corrected events.



In [11]:
# ------------------- Already Calculated and flagged events Correction Function -------------------


def correction_for_failed(event):
    """
    Correction function for events that failed initially but were retried and then corrected.
    """
    log_cloudwatch(f"Correcting failed event {event['eventId']}...")
    
    # If a failed event is corrected, move to success storage
    move_to_s3(event, "success")
    return event


Retry Logic Simulates retrying the processing of events that failed during earlier stages (e.g., missing customerId or eventType). Events are retried a limited number of times before being permanently moved to the DLQ.



In [12]:
# ------------------- Retry Logic -------------------

def retry_event(event):
    retries = 0
    max_retries = 2
    fixed = False
    
    while retries < max_retries:
        log_cloudwatch(f"Retry attempt {retries + 1} for event {event['eventId']}...")

        if event['customerId'] is None or event['eventType'] is None:
            log_cloudwatch(f"Event {event['eventId']} cannot be fixed. Moving to failed storage.")
            move_to_s3(event, "failed")
            return False  # Critical failure

        if event['platform'] == "Windows":
            event['platform'] = "iOS"  # Fixable issue
            fixed = True
        
        if fixed:
            log_cloudwatch(f"Event {event['eventId']} fixed but retained in DLQ for review.")
            return True  # Stay in DLQ

        retries += 1

    log_cloudwatch(f"Event {event['eventId']} failed after {max_retries} retries. Moving to failed storage.")
    move_to_s3(event, "failed")
    return False


Event Calculation Simulates the event calculation step. After successful validation and anomaly checks, this function represents the final event processing and stores the event in S3 if it's successfully processed.



In [13]:
# ------------------- Event Calculation -------------------

def calculate_event(event):
    """
    Calculate the event if not already processed. Otherwise, skip.
    """
    calculated = False
    log_cloudwatch(f"Calculating event {event['eventId']}...")
    time.sleep(1)  # Simulate processing time
    calculated = True
    if calculated:
        move_to_s3(event, "success")
        log_cloudwatch(f"Event {event['eventId']} successfully calculated and stored.")
    return calculated


Retry After Correction Simulates retrying the calculation of an event after it has been corrected due to an anomaly detection. Events are retried multiple times before being permanently marked as failed or successfully recalculated.

In [14]:
def retry_after_correction(corrected_event):
    # Retry calculation up to 3 times
    max_retries = 3
    success = False
    for attempt in range(max_retries):
        log_cloudwatch(f"Recalculating event {corrected_event['eventId']} (Attempt {attempt + 1})...")

        if calculate_event(corrected_event):
            success = True
            break  # Exit loop if calculation succeeds

    if success:
        # Move to success storage if recalculation is successful
        move_to_s3(corrected_event, "success")
        log_cloudwatch(f"Event {corrected_event['eventId']} successfully recalculated and stored.")
    else:
        # If all retries fail, move to DLQ
        send_to_dlq(corrected_event, "retries failed")  # Send to DLQ
        log_cloudwatch(f"Event {corrected_event['eventId']} failed recalculations after {max_retries} attempts and sent to DLQ.")


Retry from DLQ This function processes events stored in the DLQ, attempting to fix and recalculate them. Events that can be fixed are processed and stored successfully, while those that continue to fail are kept in the DLQ.

In [15]:
# ------------------- Retry Logic -------------------
def retry_from_dlq():
    """
    Retries processing events from the DLQ.
    If an event is successfully processed, it moves to success storage.
    If anomalies are found, the event is corrected and recalculated.
    If it still fails after retries, it remains in DLQ.
    """
    global dlq  # Use the global DLQ
    
    # Iterate over a copy of DLQ to avoid modifying it while iterating
    for event_with_reason in dlq[:]:  # Iterate over a shallow copy to avoid modifying the list while iterating
        event = event_with_reason.copy()  # Copy event data without the reason for retry logic
        reason = event_with_reason.get("failure_reason", "Unknown")  # Retrieve the failure reason
     
        log_cloudwatch(f"\nRetrying event {event['eventId']} from DLQ. Reason: {reason}...")

        if retry_event(event):  # Attempt to fix the event before recalculation
            # Proceed with calculation
            calculated = calculate_event(event)
            if calculated:
                if detect_anomalies(event):
                    # Flag and correct the event
                    cloudwatch_flag_event(event)
                    corrected_event = correction_for_failed(event)

                    # Recalculate after correction
                    if calculate_event(corrected_event):
                        move_to_s3(corrected_event, "success")
                        log_cloudwatch(f"Event {corrected_event['eventId']} corrected and successfully recalculated.")
                        dlq.remove(event_tuple)  # Remove the event from DLQ after success
                    else:
                        log_cloudwatch(f"Event {event['eventId']} still failed after correction.")
                else:
                    # If no anomaly, move to success storage
                    move_to_s3(event, "success")
                    dlq.remove(event_tuple)  # Remove the event from DLQ
            else:
                log_cloudwatch(f"Event {event['eventId']} failed recalculation.")
        else:
            log_cloudwatch(f"Event {event['eventId']} could not be fixed. Keeping in DLQ.")

Full Event Processing This cell integrates the entire event processing flow. It generates a batch of events and processes each event through the various steps (validation, calculation, anomaly detection, etc.). The function handles retrying failed events and correcting anomalies, ensuring a smooth end-to-end pipeline simulation.

In [16]:
# ------------------- Full Event Processing -------------------

def process_batch(batch_size=100):
    events = [generate_customer_event(i) for i in range(batch_size)]
        
    for event in events:
        print(f"\nProcessing event {event['eventId']}...")
        print(event)

        # ------------------ Step 1: EventBridge Rule Check ------------------
        log_cloudwatch(f"Processing event {event['eventId']} through EventBridge validation...")
        if not eventbridge_rule_check(event):
            # If EventBridge Rule check fails, send to DLQ and stop further checks
            log_cloudwatch(f"Event {event['eventId']} failed EventBridge rule validation. Moving to DLQ.")
            send_to_dlq(event, "EventBridge Rule")
            continue  # Skip further processing for this event
        
        # ------------------ Step 2: Lambda Validation Check ------------------
        log_cloudwatch(f"Processing event {event['eventId']} through Lambda validation...")
        if not lambda_validation_check(event):
            # If Lambda validation fails, send to DLQ
            log_cloudwatch(f"Event {event['eventId']} failed Lambda validation. Moving to DLQ.")
            send_to_dlq(event, "Lambda Validation")
            continue  # Skip further processing for this event
        
        # Event is valid, proceed to calculation step
        calculated = calculate_event(event)
        if calculated:
            # After calculation, detect anomalies
            log_cloudwatch(f"Detecting anomalies for event {event['eventId']}...")
            if detect_anomalies(event):
                # Flag event in CloudWatch and trigger correction
                corrected_event = cloudwatch_flag_event_and_trigger_correction(event)
                if corrected_event:
                    log_cloudwatch(f"Event {event['eventId']} corrected after anomaly detection.")
                    retry_after_correction(event)
            else:
                print("No anomaly detected. Moving on.")
        else:
            # Move not calculated event to DLQ for retry
            send_to_dlq(event, "Not Calculated")
    
    retry_from_dlq()

In [17]:
process_batch(100)


Processing event 100000...
{'eventId': 100000, 'customerId': 'C101', 'salesRepId': 'SR004', 'eventType': None, 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167516', 'platform': 'iOS', 'region': 'US'}
[CloudWatch Log] Processing event 100000 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100000.
[CloudWatch Log] Processing event 100000 through Lambda validation...
[CloudWatch Log] Lambda validation failed: Missing or invalid 'eventType' in event 100000. Sending to DLQ
[CloudWatch Log] Event 100000 failed Lambda validation. Moving to DLQ.
[CloudWatch Log] Event 100000 sent to DLQ due to: Lambda Validation.

Processing event 100001...
{'eventId': 100001, 'customerId': 'C789', 'salesRepId': 'SR002', 'eventType': 'inquiry', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167523', 'platform': 'Android', 'region': 'US'}
[CloudWatch Log] Processing event 100001 through EventBridge val

[CloudWatch Log] Event 100010 stored in success storage (S3)
[CloudWatch Log] Event 100010 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100010...
No anomaly detected. Moving on.

Processing event 100011...
{'eventId': 100011, 'customerId': 'C112', 'salesRepId': 'SR004', 'eventType': 'ticket_resolved', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167548', 'platform': 'iOS', 'region': 'US'}
[CloudWatch Log] Processing event 100011 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100011.
[CloudWatch Log] Processing event 100011 through Lambda validation...
[CloudWatch Log] Lambda validation passed for event 100011.
[CloudWatch Log] Calculating event 100011...
[CloudWatch Log] Event 100011 stored in success storage (S3)
[CloudWatch Log] Event 100011 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100011...
No anomaly detected. Moving on.

P

[CloudWatch Log] Event 100019 stored in success storage (S3)
[CloudWatch Log] Event 100019 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100019...
No anomaly detected. Moving on.

Processing event 100020...
{'eventId': 100020, 'customerId': 'C101', 'salesRepId': 'SR003', 'eventType': 'app_open', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167569', 'platform': 'iOS', 'region': 'IN'}
[CloudWatch Log] Processing event 100020 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100020.
[CloudWatch Log] Processing event 100020 through Lambda validation...
[CloudWatch Log] Lambda validation passed for event 100020.
[CloudWatch Log] Calculating event 100020...
[CloudWatch Log] Event 100020 stored in success storage (S3)
[CloudWatch Log] Event 100020 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100020...
No anomaly detected. Moving on.

Processi

[CloudWatch Log] Event 100030 stored in success storage (S3)
[CloudWatch Log] Event 100030 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100030...
No anomaly detected. Moving on.

Processing event 100031...
{'eventId': 100031, 'customerId': 'C123', 'salesRepId': 'SR001', 'eventType': 'ticket_created', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167595', 'platform': 'iOS', 'region': 'EU'}
[CloudWatch Log] Processing event 100031 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100031.
[CloudWatch Log] Processing event 100031 through Lambda validation...
[CloudWatch Log] Lambda validation passed for event 100031.
[CloudWatch Log] Calculating event 100031...
[CloudWatch Log] Event 100031 stored in success storage (S3)
[CloudWatch Log] Event 100031 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100031...
No anomaly detected. Moving on.

Pr

[CloudWatch Log] Event 100039 stored in success storage (S3)
[CloudWatch Log] Event 100039 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100039...
[CloudWatch Log] Anomaly detected for event 100039: ['Anomaly: Invalid platform: Windows.']
Event 100039 has anomalies: ['Anomaly: Invalid platform: Windows.']
[CloudWatch Log] [CloudWatch Anomaly] Event 100039 flagged for anomalies
[CloudWatch Log] Correcting anomaly in event 100039...
[CloudWatch Log] Event 100039 corrected after anomaly detection.
[CloudWatch Log] Recalculating event 100039 (Attempt 1)...
[CloudWatch Log] Calculating event 100039...
[CloudWatch Log] Event 100039 stored in success storage (S3)
[CloudWatch Log] Event 100039 successfully calculated and stored.
[CloudWatch Log] Event 100039 stored in success storage (S3)
[CloudWatch Log] Event 100039 successfully recalculated and stored.

Processing event 100040...
{'eventId': 100040, 'customerId': None, 'salesRepId': 'SR003', 'eventType':

[CloudWatch Log] Event 100049 stored in success storage (S3)
[CloudWatch Log] Event 100049 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100049...
No anomaly detected. Moving on.

Processing event 100050...
{'eventId': 100050, 'customerId': 'C456', 'salesRepId': 'SR003', 'eventType': 'inquiry', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167642', 'platform': 'iOS', 'region': 'IN'}
[CloudWatch Log] Processing event 100050 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100050.
[CloudWatch Log] Processing event 100050 through Lambda validation...
[CloudWatch Log] Lambda validation passed for event 100050.
[CloudWatch Log] Calculating event 100050...
[CloudWatch Log] Event 100050 stored in success storage (S3)
[CloudWatch Log] Event 100050 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100050...
No anomaly detected. Moving on.

Processin

[CloudWatch Log] Event 100058 stored in success storage (S3)
[CloudWatch Log] Event 100058 successfully calculated and stored.
[CloudWatch Log] Event 100058 stored in success storage (S3)
[CloudWatch Log] Event 100058 successfully recalculated and stored.

Processing event 100059...
{'eventId': 100059, 'customerId': 'C789', 'salesRepId': 'SR001', 'eventType': 'app_open', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167716', 'platform': 'Android', 'region': 'US'}
[CloudWatch Log] Processing event 100059 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100059.
[CloudWatch Log] Processing event 100059 through Lambda validation...
[CloudWatch Log] Lambda validation passed for event 100059.
[CloudWatch Log] Calculating event 100059...
[CloudWatch Log] Event 100059 stored in success storage (S3)
[CloudWatch Log] Event 100059 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100059.

[CloudWatch Log] Event 100070 stored in success storage (S3)
[CloudWatch Log] Event 100070 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100070...
No anomaly detected. Moving on.

Processing event 100071...
{'eventId': 100071, 'customerId': 'C789', 'salesRepId': 'SR001', 'eventType': 'call_logged', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167743', 'platform': 'Android', 'region': 'APAC'}
[CloudWatch Log] Processing event 100071 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100071.
[CloudWatch Log] Processing event 100071 through Lambda validation...
[CloudWatch Log] Lambda validation passed for event 100071.
[CloudWatch Log] Calculating event 100071...
[CloudWatch Log] Event 100071 stored in success storage (S3)
[CloudWatch Log] Event 100071 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100071...
No anomaly detected. Moving on.


[CloudWatch Log] Event 100078 stored in success storage (S3)
[CloudWatch Log] Event 100078 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100078...
No anomaly detected. Moving on.

Processing event 100079...
{'eventId': 100079, 'customerId': 'C789', 'salesRepId': 'SR004', 'eventType': 'meeting_scheduled', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167762', 'platform': 'Android', 'region': 'APAC'}
[CloudWatch Log] Processing event 100079 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100079.
[CloudWatch Log] Processing event 100079 through Lambda validation...
[CloudWatch Log] Lambda validation passed for event 100079.
[CloudWatch Log] Calculating event 100079...
[CloudWatch Log] Event 100079 stored in success storage (S3)
[CloudWatch Log] Event 100079 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100079...
No anomaly detected. Movin

[CloudWatch Log] Event 100089 stored in success storage (S3)
[CloudWatch Log] Event 100089 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100089...
No anomaly detected. Moving on.

Processing event 100090...
{'eventId': 100090, 'customerId': 'C112', 'salesRepId': 'SR004', 'eventType': 'ticket_resolved', 'eventDetails': 'Details about the event', 'timestamp': '2025-03-23T19:37:34.167790', 'platform': 'iOS', 'region': 'APAC'}
[CloudWatch Log] Processing event 100090 through EventBridge validation...
[CloudWatch Log] EventBridge rule validation passed for event 100090.
[CloudWatch Log] Processing event 100090 through Lambda validation...
[CloudWatch Log] Lambda validation passed for event 100090.
[CloudWatch Log] Calculating event 100090...
[CloudWatch Log] Event 100090 stored in success storage (S3)
[CloudWatch Log] Event 100090 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100090...
No anomaly detected. Moving on.


[CloudWatch Log] Event 100099 stored in success storage (S3)
[CloudWatch Log] Event 100099 successfully calculated and stored.
[CloudWatch Log] Detecting anomalies for event 100099...
No anomaly detected. Moving on.
[CloudWatch Log] 
Retrying event 100000 from DLQ. Reason: Lambda Validation...
[CloudWatch Log] Retry attempt 1 for event 100000...
[CloudWatch Log] Event 100000 cannot be fixed. Moving to failed storage.
[CloudWatch Log] Event 100000 stored in failed storage (S3)
[CloudWatch Log] Event 100000 could not be fixed. Keeping in DLQ.
[CloudWatch Log] 
Retrying event 100002 from DLQ. Reason: Lambda Validation...
[CloudWatch Log] Retry attempt 1 for event 100002...
[CloudWatch Log] Event 100002 cannot be fixed. Moving to failed storage.
[CloudWatch Log] Event 100002 stored in failed storage (S3)
[CloudWatch Log] Event 100002 could not be fixed. Keeping in DLQ.
[CloudWatch Log] 
Retrying event 100005 from DLQ. Reason: Lambda Validation...
[CloudWatch Log] Retry attempt 1 for event 

In [18]:
# Print DLQ event IDs
for event in dlq:
    print(event['eventId'])

100000
100002
100005
100022
100040
100048
100068
100069
100085
100086
100087
100093
100095
100098


In [19]:
storage["success"]

{100001,
 100003,
 100004,
 100006,
 100007,
 100008,
 100009,
 100010,
 100011,
 100012,
 100013,
 100014,
 100015,
 100016,
 100017,
 100018,
 100019,
 100020,
 100021,
 100023,
 100024,
 100025,
 100026,
 100027,
 100028,
 100029,
 100030,
 100031,
 100032,
 100033,
 100034,
 100035,
 100036,
 100037,
 100038,
 100039,
 100041,
 100042,
 100043,
 100044,
 100045,
 100046,
 100047,
 100049,
 100050,
 100051,
 100052,
 100053,
 100054,
 100055,
 100056,
 100057,
 100058,
 100059,
 100060,
 100061,
 100062,
 100063,
 100064,
 100065,
 100066,
 100067,
 100070,
 100071,
 100072,
 100073,
 100074,
 100075,
 100076,
 100077,
 100078,
 100079,
 100080,
 100081,
 100082,
 100083,
 100084,
 100088,
 100089,
 100090,
 100091,
 100092,
 100094,
 100096,
 100097,
 100099}

In [20]:
storage["failed"]

{100000,
 100002,
 100005,
 100022,
 100040,
 100048,
 100068,
 100069,
 100085,
 100086,
 100087,
 100093,
 100095,
 100098}

In [21]:
event_logs["failed"]

{100000: {'eventId': 100000,
  'customerId': 'C101',
  'salesRepId': 'SR004',
  'eventType': None,
  'timestamp': '2025-03-23T19:37:34.167516',
  'platform': 'iOS',
  'region': 'US',
  'eventDetails': 'Details about the event'},
 100002: {'eventId': 100002,
  'customerId': 'C101',
  'salesRepId': 'SR003',
  'eventType': None,
  'timestamp': '2025-03-23T19:37:34.167526',
  'platform': 'Android',
  'region': 'IN',
  'eventDetails': 'Details about the event'},
 100005: {'eventId': 100005,
  'customerId': None,
  'salesRepId': 'SR001',
  'eventType': 'ticket_created',
  'timestamp': '2025-03-23T19:37:34.167533',
  'platform': 'Android',
  'region': 'APAC',
  'eventDetails': 'Details about the event'},
 100022: {'eventId': 100022,
  'customerId': None,
  'salesRepId': 'SR001',
  'eventType': 'meeting_scheduled',
  'timestamp': '2025-03-23T19:37:34.167574',
  'platform': 'iOS',
  'region': 'IN',
  'eventDetails': 'Details about the event'},
 100040: {'eventId': 100040,
  'customerId': None,
