<a href="https://colab.research.google.com/github/lokeshreddy82/Upload-a-handwritten-math-problem-OCR-LLM-solves-explains/blob/https%2Fgithub.com%2Flokeshreddy82%2FUpload-a-handwritten-math-problem-OCR-LLM-solves/Upload_a_handwritten_math_problem_%E2%86%92_OCR_%E2%86%92_LLM_solves_explains.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import time
import random
from collections import deque

# --- Configuration ---
LOG_SOURCES = ["web_server_1", "app_server_2", "db_server_3", "auth_service_4"]
LOG_LEVELS = ["INFO", "DEBUG", "WARNING", "ERROR", "CRITICAL"]
SIMULATION_DURATION_SECONDS = 60 # How long to run the simulation
LOG_GENERATION_INTERVAL_SECONDS = 0.1 # How frequently new logs are generated

# Anomaly Detection Parameters
WINDOW_SIZE_SECONDS = 10 # Size of the sliding window for anomaly detection
ERROR_THRESHOLD = 3 # Number of 'ERROR' logs in a window to trigger an anomaly

# --- Helper Functions ---

def generate_synthetic_log():
    """Generates a single synthetic log entry."""
    source = random.choice(LOG_SOURCES)
    level = random.choices(LOG_LEVELS, weights=[0.6, 0.2, 0.1, 0.08, 0.02], k=1)[0] # Bias towards INFO/DEBUG
    timestamp = time.time()
    message_templates = [
        f"User logged in from IP 192.168.1.{random.randint(1, 254)}",
        "Successfully processed request for /api/data",
        "Database query executed in 15ms",
        "Connection closed by remote host",
        "Disk space low: 85% used",
        "Failed to authenticate user 'admin'",
        "NullPointerException in line 42",
        "Service unavailable: upstream server down",
        "Memory usage exceeded threshold",
        "Authentication successful for user 'guest'"
    ]
    message = random.choice(message_templates)

    # Inject specific error messages more frequently for 'ERROR' level
    if level == "ERROR":
        error_messages = [
            "Failed to connect to database",
            "Critical service 'X' stopped unexpectedly",
            "Unauthorized access attempt detected",
            "File not found: /var/log/app.log",
            "High CPU utilization: 98%"
        ]
        message = random.choice(error_messages)

    return {
        "timestamp": timestamp,
        "source": source,
        "level": level,
        "message": message
    }

def process_log_entry(log_entry, error_log_window):
    """
    Processes a single log entry, adds it to the window, and removes expired entries.
    Returns the current count of ERROR logs in the window.
    """
    current_time = time.time()

    # Add new log entry to the window if it's an ERROR
    if log_entry["level"] == "ERROR":
        error_log_window.append(log_entry["timestamp"])

    # Remove old log entries from the window
    while error_log_window and error_log_window[0] < current_time - WINDOW_SIZE_SECONDS:
        error_log_window.popleft()

    return len(error_log_window)

def detect_anomaly(error_count):
    """Detects anomaly based on the error count threshold."""
    if error_count >= ERROR_THRESHOLD:
        return True
    return False

def generate_alert(log_entry, error_count):
    """Generates a simplified alert message."""
    print(f"\n!!! ANOMALY DETECTED !!!")
    print(f"  Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(log_entry['timestamp']))}")
    print(f"  Reason: {error_count} 'ERROR' logs in the last {WINDOW_SIZE_SECONDS} seconds (Threshold: {ERROR_THRESHOLD})")
    print(f"  Last Log: [{log_entry['level']}] {log_entry['source']} - {log_entry['message']}")
    print(f"-----------------------------------\n")

# --- Main Simulation Loop ---

def run_simulation():
    """
    Simulates real-time log ingestion and anomaly detection.
    """
    print(f"Starting real-time log anomaly detection simulation...")
    print(f"Monitoring for '{ERROR_THRESHOLD}' or more 'ERROR' logs in a {WINDOW_SIZE_SECONDS}-second window.")

    # deque to store timestamps of ERROR logs within the sliding window
    error_log_window = deque()
    start_time = time.time()
    last_alert_time = 0 # To prevent spamming alerts for the same continuous anomaly

    while time.time() - start_time < SIMULATION_DURATION_SECONDS:
        # Simulate log ingestion
        log_entry = generate_synthetic_log()

        # In a real system, this would be consumed from Kafka/Pulsar
        # print(f"[{log_entry['level']}] {log_entry['source']} - {log_entry['message']}")

        # Process log entry and update feature (error count in window)
        current_error_count = process_log_entry(log_entry, error_log_window)

        # Anomaly Detection
        if detect_anomaly(current_error_count):
            # Only alert if it's a new anomaly or enough time has passed since last alert
            if time.time() - last_alert_time > WINDOW_SIZE_SECONDS / 2: # Cooldown period
                generate_alert(log_entry, current_error_count)
                last_alert_time = time.time()

        # Simulate real-time delay
        time.sleep(LOG_GENERATION_INTERVAL_SECONDS)

    print("\nSimulation finished.")

if __name__ == "__main__":
    run_simulation()

Starting real-time log anomaly detection simulation...
Monitoring for '3' or more 'ERROR' logs in a 10-second window.

!!! ANOMALY DETECTED !!!
  Timestamp: 2025-07-22 16:01:09
  Reason: 3 'ERROR' logs in the last 10 seconds (Threshold: 3)
  Last Log: [ERROR] db_server_3 - File not found: /var/log/app.log
-----------------------------------


!!! ANOMALY DETECTED !!!
  Timestamp: 2025-07-22 16:01:14
  Reason: 8 'ERROR' logs in the last 10 seconds (Threshold: 3)
  Last Log: [INFO] auth_service_4 - Memory usage exceeded threshold
-----------------------------------


!!! ANOMALY DETECTED !!!
  Timestamp: 2025-07-22 16:01:19
  Reason: 10 'ERROR' logs in the last 10 seconds (Threshold: 3)
-----------------------------------


!!! ANOMALY DETECTED !!!
  Timestamp: 2025-07-22 16:01:24
  Reason: 9 'ERROR' logs in the last 10 seconds (Threshold: 3)
  Last Log: [DEBUG] db_server_3 - User logged in from IP 192.168.1.129
-----------------------------------


!!! ANOMALY DETECTED !!!
  Timestamp: 