# **DEMONSTRATION OF MY APPROACH**
## **Overview**  
In this notebook, I have demonstrated how **event failures (missing, duplicate, and incorrect events) would be detected and corrected** in Linq’s event-driven system.

📌 **Note:** While this is implemented in Python, in a real-world Google Cloud setup:  
- **Pub/Sub** would handle real-time event streaming.  
- **Cloud Functions** would validate and detect issues.  
- **Dataflow** would perform anomaly detection and cross-system verification.  
- **APIs (HubSpot, Stripe, Twilio)** would be used for external validation.  


For simplicity, I have simulated these processes in Python instead of an actual cloud deployment.  


*Flow of my solution:*


1. Detect failures first (event missing, duplicate, or incorrect).
2. Reprocess faulty events using external checks and reconstruction.
3. Publish corrected events back to the system for accurate downstream processing.

*Let me take you through my solution demonstration step by step:*

## **Step 1: Simulating Incoming Events**

I will simulate events arriving via Pub/Sub with a **mock event stream**.
- Some events will be *missing* (gaps in event IDs).
- Some will be *duplicates* (same event ID + timestamp).
- Some will be *incorrect* (data corruption).


In [17]:
import random
import uuid
from datetime import datetime, timedelta

# Mock event stream (simulating Pub/Sub ingestion)
event_stream = [
    {"event_id":1001, "type":"LeadCreated", "timestamp":"2025-02-09T12:00:00", "valid":True},
    {"event_id":1002, "type":"CRM_Sync", "timestamp":"2025-02-09T12:01:00", "valid":True},
    {"event_id":1002, "type":"CRM_Sync", "timestamp":"2025-02-09T12:01:00", "valid":True},
    {"event_id":1004, "type":"LeadCreated", "timestamp":"2025-02-09T12:02:00", "valid":True},  # Missing 1003
    {"event_id":1005, "type":"CRM_Sync", "timestamp":"2025-02-09T12:03:00", "valid":True},
    {"event_id":1005, "type":"CRM_Sync", "timestamp":"2025-02-09T12:03:01", "valid":True},
    {"event_id":1006, "type":"Transaction", "timestamp":"2025-02-09T12:04:00", "valid":False},
    {"event_id":1006, "type":"Transaction", "timestamp":"2025-02-09T12:04:00", "valid":False},  # Incorrect data (assumption) and Duplicate Event
    {"type":"Transaction", "timestamp":"2025-02-09T12:04:00", "valid":True}, # No event ID in this event
    {"event_id":1007, "type":"Transaction", "timestamp":"2025-02-09T12:04:00"},
]

# Print the incoming event stream
print("Incoming Events:")
for event in event_stream:
    print(event)

Incoming Events:
{'event_id': 1001, 'type': 'LeadCreated', 'timestamp': '2025-02-09T12:00:00', 'valid': True}
{'event_id': 1002, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:01:00', 'valid': True}
{'event_id': 1002, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:01:00', 'valid': True}
{'event_id': 1004, 'type': 'LeadCreated', 'timestamp': '2025-02-09T12:02:00', 'valid': True}
{'event_id': 1005, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:03:00', 'valid': True}
{'event_id': 1005, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:03:01', 'valid': True}
{'event_id': 1006, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': False}
{'event_id': 1006, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': False}
{'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': True}
{'event_id': 1007, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00'}


*Note:* In a real cloud system, incorrect events would be identified through cross-system validation (e.g., comparing Linq's data with HubSpot, Stripe, or Twilio APIs) as I've mentioned in my approach. However, since this is a simplified  demonstration, I assume an event is incorrect if `"valid": False`.  


## **Performing a Validation Step before proceeding**

In [18]:
import re

def validate_event_format(event):
    """
    Validates the format of an event.

    Checks:
    - `event_id` is an integer (if present).
    - `type` is a non-empty string.
    - `timestamp` is in valid ISO 8601 format.
    - `valid` is a boolean.
    """
    # ISO 8601 Timestamp Pattern (strict format: YYYY-MM-DDTHH:MM:SS)
    timestamp_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$"

    # Check for missing fields
    if "type" not in event or not isinstance(event["type"], str):
        return False
    if "timestamp" not in event or not re.match(timestamp_pattern, event["timestamp"]):
        return False
    if "valid" not in event or not isinstance(event["valid"], bool):
        return False
    if "event_id" in event and not isinstance(event["event_id"], int):
        return False  # If event_id exists, it must be an integer

    return True

In [19]:
def correct_event_format(event):
    """
    Validates and attempts to correct the format of an event.

    - If `event_id` or `timestamp` is missing, sends the event to DLQ.
    - If `valid` is missing, assumes True.
    - If `type` is missing or empty, sends to DLQ.
    """
    dlq_events = []  # Events that need manual intervention

    # Send to DLQ if `event_id` is missing
    if "event_id" not in event:
        dlq_events.append(event)
        return None, dlq_events  # Mark for manual review

    # Send to DLQ if `timestamp` is missing or invalid
    try:
        datetime.strptime(event["timestamp"], "%Y-%m-%dT%H:%M:%S")  # Check if valid
    except (KeyError, ValueError):
        dlq_events.append(event)
        return None, dlq_events  # Mark for manual review

    # Fix missing `valid` field (this is minor and can be assumed True)
    if "valid" not in event:
        event["valid"] = True

    # Send to DLQ if `type` is missing or empty
    if "type" not in event or not isinstance(event["type"], str) or event["type"].strip() == "":
        dlq_events.append(event)
        return None, dlq_events  # Mark for manual review

    return event, dlq_events

In [20]:
validated_events = []
dlq_events = []

for event in event_stream:
    if validate_event_format(event):
        validated_events.append(event)
    else:
        corrected_event, dlq_fix = correct_event_format(event)
        if corrected_event:
            validated_events.append(corrected_event)  # Use corrected event
        dlq_events.extend(dlq_fix)  # Send unfixable events to DLQ

# Replace `event_stream` with only valid events
event_stream = validated_events

In [21]:
event_stream

[{'event_id': 1001,
  'type': 'LeadCreated',
  'timestamp': '2025-02-09T12:00:00',
  'valid': True},
 {'event_id': 1002,
  'type': 'CRM_Sync',
  'timestamp': '2025-02-09T12:01:00',
  'valid': True},
 {'event_id': 1002,
  'type': 'CRM_Sync',
  'timestamp': '2025-02-09T12:01:00',
  'valid': True},
 {'event_id': 1004,
  'type': 'LeadCreated',
  'timestamp': '2025-02-09T12:02:00',
  'valid': True},
 {'event_id': 1005,
  'type': 'CRM_Sync',
  'timestamp': '2025-02-09T12:03:00',
  'valid': True},
 {'event_id': 1005,
  'type': 'CRM_Sync',
  'timestamp': '2025-02-09T12:03:01',
  'valid': True},
 {'event_id': 1006,
  'type': 'Transaction',
  'timestamp': '2025-02-09T12:04:00',
  'valid': False},
 {'event_id': 1006,
  'type': 'Transaction',
  'timestamp': '2025-02-09T12:04:00',
  'valid': False},
 {'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': True},
 {'event_id': 1007,
  'type': 'Transaction',
  'timestamp': '2025-02-09T12:04:00',
  'valid': True}]

## **Step 2: Detecting Issues**

Now, I will implement **detection logic** for:  
1. Missing Events: Checking if event IDs skip a number.  
2. Duplicate Events: Identifying duplicate timestamps.  
3. Incorrect Events: Finding corrupted/missing data.

In [22]:
# Defining a function to detect missing ID's of events
def detect_missing_events(events):
    """
    This function detects missing event IDs by checking sequential order.

    Logic:
    - Events should have sequential IDs (e.g., 1001, 1002, 1003, 1004).
    - If an event ID is skipped (e.g., 1001 → 1002 → 1004), it means 1003 is missing.
    - This function identifies such gaps and returns a list of missing event IDs.
    """
    # Filter events with 'event_id' before sorting
    events_with_id = [event for event in events if "event_id" in event]
    sorted_events = sorted(events_with_id, key=lambda x: x["event_id"])
    event_ids = {event["event_id"] for event in sorted_events}
    missing_events=[]

    # Iterating through the sorted list to check for gaps in thr incoming stream
    for i in range(min(event_ids), max(event_ids)):
        if i not in event_ids:
            missing_events.append(i)

    return missing_events

def detect_duplicate_events(events):
    """
    This function identifies duplicate events by checking if the same 'event ID & timestamp' (we're assuming this to be a
    unique combination for an incoming event) occur more than once.

    Logic:
    - If an event has the same `event_id` and `timestamp` as another event, it's a duplicate.
    - The function uses a set to track seen event keys and then detect duplicates.
    - This function now checks if 'event_id' is present in the event before creating the key.
    - If 'event_id' is missing, the event is considered unique and not a duplicate - will be addressed later.
    """
    seen = set() # Unique event identifiers (event_id+timestamp)
    duplicates=[] # List to store duplicate events

    for event in events:
        if "event_id" in event:  # Check if 'event_id' exists in the event
            key = (event["event_id"], event["timestamp"]) # Storing the unique event key
            if key in seen:
                duplicates.append(event)
            else:
                seen.add(key)

    return duplicates

# Defining a function to detect incorrect events
def detect_incorrect_events(events):
    """
    This function flags events that have incorrect or corrupted data.

    Logic:
    - If an event has `valid=False`, it is considered incorrect (e.g., missing fields, wrong values).
    - The function filters out such events and returns a list of incorrect ones.
    """
    return [event for event in events if not event["valid"] or "event_id" not in event]

# Run detection
missing=detect_missing_events(event_stream)
duplicates=detect_duplicate_events(event_stream)
incorrect=detect_incorrect_events(event_stream)

# Print results
print("\n Missing Events:", missing)
print("Duplicate Events:", duplicates)
print("Incorrect Events:", incorrect)


 Missing Events: [1003]
Duplicate Events: [{'event_id': 1002, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:01:00', 'valid': True}, {'event_id': 1006, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': False}]
Incorrect Events: [{'event_id': 1006, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': False}, {'event_id': 1006, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': False}, {'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': True}]


## **Step 3: Reconstructing & Fixing Events**

Now that I've detected issues, I will:  
1. Recreate missing events using inference.  
2. Remove duplicate events to prevent double processing.  
3. Fix incorrect data using API-based corrections (simulated).

In [23]:
def reconstruct_missing_events(existing_events):
    """
    This function reconstructs missing events dynamically using `detect_missing_events()`.

    Steps:
    - First, it detect missing events using Step 2 function.
    - Then, estimate timestamps from neighboring events (previous & next).
    - Assign event type dynamically based on event frequency (this is an assumption for the sake of this demonstration).
    - Return the reconstructed events.

    Again, for production, we would factor in more business logic as I've mentioned in my approach
    """
    missing_event_ids = detect_missing_events(existing_events)
    reconstructed_events=[]

    # Sort events for timestamp estimation
    # Filter events with 'event_id' before sorting to avoid KeyError
    events_with_id = [event for event in existing_events if "event_id" in event]
    sorted_events = sorted(events_with_id, key=lambda x: x["event_id"])

    for event_id in missing_event_ids:
        # Find closest previous and next events
        prev_event = None
        next_event = None
        for event in sorted_events:
            if event["event_id"] < event_id:
                prev_event = event  # Last event before missing one
            elif event["event_id"] > event_id and next_event is None:
                next_event = event  # First event after missing one
                break

        # Estimate timestamp based on neighboring events
        # Logic:
        """
        If a previous event (prev_event) and a next event (next_event) exist, the missing event’s timestamp is calculated as the midpoint between them.
        If only a previous event exists, the missing event's timestamp is placed shortly after it (+30 seconds).
        If only a next event exists, the missing event's timestamp is placed shortly before it (-30 seconds).
        If no reference points exist, the function assigns it to the current time (datetime.utcnow()) as a default.
        """
        if prev_event and next_event:
            estimated_timestamp = datetime.strptime(prev_event["timestamp"], "%Y-%m-%dT%H:%M:%S") + (
                datetime.strptime(next_event["timestamp"], "%Y-%m-%dT%H:%M:%S") - datetime.strptime(prev_event["timestamp"], "%Y-%m-%dT%H:%M:%S")
            ) / 2
        elif prev_event:
            estimated_timestamp = datetime.strptime(prev_event["timestamp"], "%Y-%m-%dT%H:%M:%S") + timedelta(seconds=30)
        elif next_event:
            estimated_timestamp = datetime.strptime(next_event["timestamp"], "%Y-%m-%dT%H:%M:%S") - timedelta(seconds=30)
        else:
            estimated_timestamp = datetime.utcnow()

        # Assign event type (most frequent type in event stream)
        event_types = [event["type"] for event in existing_events]
        assumed_event_type = max(set(event_types), key=event_types.count)

        # Create reconstructed event
        reconstructed_events.append({"event_id": event_id, "type": assumed_event_type, "timestamp": estimated_timestamp.isoformat(), "valid": True})

    return reconstructed_events


def remove_duplicates(events):
    """
    This function removes duplicate events while keeping one valid instance.

    Steps:
    - Identify duplicate events using `detect_duplicate_events()`.
    - Keep one valid instance of each duplicate.
    """
    unique_events = []
    seen = set()

    for event in events:
        if "event_id" in event:  # Ensure event_id exists
            key = (event["event_id"], event["timestamp"])
            if key not in seen:
                unique_events.append(event)
                seen.add(key)
        else:
            # If no event_id, just add as-is (handled separately in DLQ logic)
            unique_events.append(event)

    return unique_events


def fix_incorrect_events(events):
    """
    This function fixes incorrect events (sending to DLQ for manual review).

    Steps:
    - Identify incorrect events using `detect_incorrect_events()`.
    - If an event is missing critical fields, send it to DLQ.
    - If valid=False, send it to DLQ instead of fixing it.
    """
    incorrect_events = detect_incorrect_events(events)
    corrected_events = []
    dlq_events = []  # Events needing manual review

    for event in events:
        if event in incorrect_events:
            # Case 1: Critical fields missing -> Send to DLQ
            if "event_id" not in event or not event["valid"]:  # If valid=False, send to DLQ
                dlq_events.append(event)
                continue  # Skip adding this event to corrected list

        # Otherwise, keep the event
        corrected_events.append(event)

    return corrected_events, dlq_events


reconstructed_events=reconstruct_missing_events(event_stream)  # Fix missing events
cleaned_event_stream=remove_duplicates(event_stream)  # Remove duplicate events
fixed_events, dlq_events=fix_incorrect_events(cleaned_event_stream)  # Fix incorrect events

final_event_stream = fixed_events+reconstructed_events  # Final list for publishing

# Send DLQ Events for Manual Review (Combine previous DLQ + New DLQ)
dlq_events.extend(dlq_fix)

print("\n Reconstructed Missing Events:", reconstructed_events)
print("Final Cleaned & Fixed Event Stream:", final_event_stream)
print("Events Sent to DLQ (Manual Review Required):", dlq_events)



 Reconstructed Missing Events: [{'event_id': 1003, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:01:30', 'valid': True}]
Final Cleaned & Fixed Event Stream: [{'event_id': 1001, 'type': 'LeadCreated', 'timestamp': '2025-02-09T12:00:00', 'valid': True}, {'event_id': 1002, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:01:00', 'valid': True}, {'event_id': 1004, 'type': 'LeadCreated', 'timestamp': '2025-02-09T12:02:00', 'valid': True}, {'event_id': 1005, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:03:00', 'valid': True}, {'event_id': 1005, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:03:01', 'valid': True}, {'event_id': 1007, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': True}, {'event_id': 1003, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:01:30', 'valid': True}]
Events Sent to DLQ (Manual Review Required): [{'event_id': 1006, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': False}, {'type': 'Transaction', 'timestamp': '2025-02-09T12:

##  **Step 4: Publishing Fixed Events**

 Finally, I will send corrected events back to the system (simulated Pub/Sub).


In [24]:
def publish_to_pubsub(events, topic_name="cleaned-events"):
    """
    This function simulates publishing corrected events back to Pub/Sub.

    Steps:
    - Iterates over all cleaned events.
    - Publishes them to a mock Pub/Sub topic.
    - Logs event IDs for tracking.
    """
    print(f"\n Publishing {len(events)} cleaned events to Pub/Sub topic: {topic_name}")
    for event in events:
        print(f"Published: {event}")


def send_to_dlq(events, dlq_topic_name="dead-letter-queue"):
    """
    This function simulates sending events to Dead Letter Queue (DLQ) for manual review.

    Steps:
    - Iterates over events that couldn't be fixed.
    - Publishes them to a separate DLQ for human intervention.
    """
    print(f"\n Sending {len(events)} problematic events to DLQ: {dlq_topic_name}")
    for event in events:
        print(f"DLQ Event: {event}")


# Publish Cleaned Data
publish_to_pubsub(final_event_stream)

# Send Problematic Events to DLQ
send_to_dlq(dlq_events)


 Publishing 7 cleaned events to Pub/Sub topic: cleaned-events
Published: {'event_id': 1001, 'type': 'LeadCreated', 'timestamp': '2025-02-09T12:00:00', 'valid': True}
Published: {'event_id': 1002, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:01:00', 'valid': True}
Published: {'event_id': 1004, 'type': 'LeadCreated', 'timestamp': '2025-02-09T12:02:00', 'valid': True}
Published: {'event_id': 1005, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:03:00', 'valid': True}
Published: {'event_id': 1005, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:03:01', 'valid': True}
Published: {'event_id': 1007, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': True}
Published: {'event_id': 1003, 'type': 'CRM_Sync', 'timestamp': '2025-02-09T12:01:30', 'valid': True}

 Sending 2 problematic events to DLQ: dead-letter-queue
DLQ Event: {'event_id': 1006, 'type': 'Transaction', 'timestamp': '2025-02-09T12:04:00', 'valid': False}
DLQ Event: {'type': 'Transaction', 'timestamp': '2025-02-09

# How This Would Work in Google Cloud (GCP)
While I've demonstrated the logic in Python here, in a real production setup at Linq, this solution would be deployed as follows:

### **Event Streaming with Pub/Sub**
- Events (e.g., `"Business Card Scanned"`, `"CRM Sync"`, `"Revenue Transaction"`) are **published to Google Pub/Sub** as messages.
- The system listens for new events in real time.

### **Event Validation Using Cloud Functions**
- **A Google Cloud Function** is triggered for every new event.
- It **validates event format** (checks required fields, timestamps, etc.).
- If the event is **invalid**, it is sent to a **Dead Letter Queue (DLQ)** for review.

### **Detecting & Handling Event Issues**
#### Missing Events
- A **Cloud Function checks for ID gaps** in sequentially processed events.
- **If an event is missing**, a reprocessing request is **triggered via Pub/Sub**.

#### Duplicate Events
- **UUID-based deduplication** ensures that duplicate events don’t get reprocessed.
- **If a duplicate is detected**, it is sent to a **DLQ** instead of being stored.

#### Incorrectly Processed Events
- **Cloud Dataflow cross-checks event data** with external APIs (HubSpot, Stripe, Twilio).
- If an event **doesn’t match expected values**, it is **flagged for recalculation**.

### **Reprocessing & Correction**
- **A Cloud Function reconstructs missing events** using inferred properties.
- **A Cloud Run service** queries APIs to **fix incorrect events dynamically**.
- **Corrected events are published back to Pub/Sub** to be reprocessed normally.

### **Storing & Using the Corrected Events**
- Once an event is **validated and correctly processed**, it flows **into Linq’s database/CRM system**.
- **AI-driven workflows** (like follow-ups and CRM syncs) run **only on clean, validated events**.

By implementing this architecture, Linq can **ensure high data reliability** while processing millions of events per hour.
