In [1]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import warnings

warnings.filterwarnings('ignore')

# Set random seeds for reproducibility.
np.random.seed(42)
random.seed(42)

## Configuration

In [24]:
# Log generation configuration.
NUM_NORMAL_LOGS = 95000  # Normal logs (includes expected errors).
NUM_ANOMALOUS_LOGS = 5000  # True anomalous patterns (~5% anomaly rate).
TOTAL_LOGS = NUM_NORMAL_LOGS + NUM_ANOMALOUS_LOGS

# Services in the system.
SERVICES = [
    'auth-service',
    'payment-service',
    'order-service',
    'inventory-service',
    'user-service',
    'notification-service',
    'api-gateway'
]

# Log levels.
LOG_LEVELS = ['INFO', 'DEBUG', 'WARN', 'ERROR', 'FATAL']

# Normal log level distribution (includes some expected errors).
NORMAL_LEVEL_WEIGHTS = [0.65, 0.20, 0.10, 0.045, 0.005]

# Anomaly types.
ANOMALY_TYPES = ['spike', 'cascade', 'new_pattern', 'resource_exhaustion', 'security_breach']

## Normal Log Templates

In [25]:
# Normal log message templates (including EXPECTED errors that are NOT anomalies).
NORMAL_TEMPLATES = {
    'INFO': [
        'Request processed successfully - user_id: {user_id}, duration: {duration}ms',
        'Authentication successful for user {user_id}',
        'Payment processed - transaction_id: {transaction_id}, amount: ${amount}',
        'Order created successfully - order_id: {order_id}',
        'Cache hit for key: {cache_key}',
        'Database query completed - rows: {rows}, duration: {duration}ms',
        'API request received - endpoint: {endpoint}, method: {method}',
        'User session created - session_id: {session_id}',
        'Email notification sent to {email}',
        'Inventory updated - item_id: {item_id}, quantity: {quantity}',
    ],
    'DEBUG': [
        'Entering function: {function_name}',
        'Cache miss for key: {cache_key}',
        'Query plan: {query_plan}',
        'Response payload size: {size} bytes',
        'Connection pool stats: active={active}, idle={idle}',
    ],
    'WARN': [
        'High memory usage detected: {memory}%',
        'Slow query detected - duration: {duration}ms',
        'Rate limit approaching for user {user_id}',
        'Connection retry attempt {attempt} of 3',
        'Cache eviction due to size limit',
        'Service latency above threshold: {duration}ms',
    ],
    # EXPECTED ERRORS (business logic, user errors - NOT anomalies).
    'ERROR': [
        'Authentication failed - invalid password for user {user_id}',
        'Validation error - missing required field: {field_name}',
        'Order failed - insufficient inventory for item {item_id}',
        'Payment declined - insufficient funds for user {user_id}',
        'Invalid request - malformed JSON in request body',
        'Resource not found - user_id {user_id} does not exist',
        'Session expired for user {user_id}',
        'Rate limit exceeded for user {user_id} - retry after {retry_after}s',
    ],
    # RARE but expected critical issues (NOT anomalies when isolated).
    'FATAL': [
        'Unhandled exception in request handler - {exception_type}',
    ]
}

## Anomalous Log Templates

In [26]:
# Anomalous log message templates (TRUE system anomalies).
# These represent UNUSUAL patterns, not just error-level logs.
ANOMALY_TEMPLATES = {
    # Error spike patterns.
    'spike_error': [
        'Database connection pool exhausted - max connections: {max_conn}',
        'Service unavailable - {service} not responding after {attempts} attempts',
        'Connection timeout to {host} - {error_msg}',
        'Failed to acquire database lock - deadlock detected',
    ],
    # Cascade failure patterns.
    'cascade': [
        'Circuit breaker opened for {service} - failure threshold exceeded',
        'Upstream service {service} unavailable - cascading failure',
        'Database cluster unreachable - all nodes down',
        'Message queue full - dropping messages',
    ],
    # New/unusual error patterns.
    'new_pattern': [
        'NullPointerException in {function_name} at line {line}',
        'OutOfMemoryError - heap space exceeded',
        'StackOverflowError in {function_name}',
        'Data corruption detected in table {table_name}',
        'Unexpected error code {error_code} from external API',
    ],
    # Resource exhaustion.
    'resource': [
        'Disk space critically low: {disk_space}% remaining',
        'Memory usage critical: {memory}% used',
        'CPU usage sustained above 95% for {duration} seconds',
        'File descriptor limit reached',
        'Thread pool exhausted - queue size: {queue_size}',
    ],
    # Security anomalies.
    'security': [
        'Suspicious activity detected from IP: {ip_address}',
        'Brute force attack detected - {attempts} failed login attempts',
        'Potential SQL injection attempt in query',
        'Unauthorized access attempt to admin endpoint',
        'Token validation failed - possible token forgery',
    ]
}

## Helper Functions

In [27]:
def generate_timestamp_from_datetime(dt: datetime) -> str:
    """
    Convert datetime to ISO format timestamp string.

    param dt: Datetime object.
    """
    return dt.isoformat() + 'Z'


def generate_normal_log(timestamp: datetime, service: str) -> dict:
    """
    Generate a normal log entry (including expected business errors).

    param timestamp: Log timestamp as datetime.
    param service: Service name.
    """
    level = random.choices(LOG_LEVELS, weights=NORMAL_LEVEL_WEIGHTS)[0]
    template = random.choice(NORMAL_TEMPLATES[level])
    
    # Generate realistic field values.
    message = template.format(
        user_id=random.randint(1000, 9999),
        duration=random.randint(10, 500),
        transaction_id=f'txn_{random.randint(100000, 999999)}',
        amount=round(random.uniform(10, 1000), 2),
        order_id=f'ord_{random.randint(100000, 999999)}',
        cache_key=f'cache_{random.randint(1, 100)}',
        rows=random.randint(1, 1000),
        endpoint=random.choice(['/api/users', '/api/orders', '/api/payments']),
        method=random.choice(['GET', 'POST', 'PUT']),
        session_id=f'sess_{random.randint(100000, 999999)}',
        email=f'user{random.randint(1, 1000)}@example.com',
        item_id=f'item_{random.randint(1, 500)}',
        quantity=random.randint(1, 100),
        function_name=random.choice(['processPayment', 'validateUser', 'updateInventory']),
        query_plan='index_scan',
        size=random.randint(100, 50000),
        active=random.randint(5, 20),
        idle=random.randint(10, 50),
        memory=random.randint(40, 75),
        attempt=random.randint(1, 3),
        field_name=random.choice(['email', 'password', 'amount', 'item_id']),
        retry_after=random.randint(30, 300),
        exception_type=random.choice(['ValueError', 'TypeError', 'KeyError'])
    )
    
    return {
        'timestamp': generate_timestamp_from_datetime(timestamp),
        'level': level,
        'service': service,
        'message': message,
        'is_anomaly': 0,
        'anomaly_type': None
    }


def generate_anomaly_spike(start_time: datetime, service: str, spike_size: int = 50) -> list[dict]:
    """
    Generate a spike of error logs (anomaly pattern).

    param start_time: Starting timestamp.
    param service: Service experiencing the spike.
    param spike_size: Number of errors in the spike.
    """
    logs = []
    anomaly_category = random.choice(['spike_error', 'cascade', 'resource'])
    template = random.choice(ANOMALY_TEMPLATES[anomaly_category])
    
    for i in range(spike_size):
        timestamp = start_time + timedelta(seconds=i * random.uniform(0.1, 2))
        
        message = template.format(
            max_conn=random.choice([50, 100, 200]),
            service=random.choice(SERVICES),
            attempts=random.randint(3, 10),
            host=f'db-{random.randint(1, 5)}.example.com',
            error_msg=random.choice(['Connection refused', 'Timeout', 'Network unreachable']),
            function_name=random.choice(['processPayment', 'validateUser', 'updateInventory']),
            line=random.randint(100, 500),
            table_name=random.choice(['users', 'orders', 'payments']),
            error_code=f'E{random.randint(1000, 9999)}',
            disk_space=random.randint(1, 5),
            memory=random.randint(90, 99),
            duration=random.randint(300, 600),
            queue_size=random.randint(10000, 50000),
            ip_address=f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}'
        )
        
        logs.append({
            'timestamp': generate_timestamp_from_datetime(timestamp),
            'level': random.choice(['ERROR', 'FATAL', 'WARN']),
            'service': service,
            'message': message,
            'is_anomaly': 1,
            'anomaly_type': f'{anomaly_category}_spike'
        })
    
    return logs


def generate_cascade_failure(start_time: datetime, affected_services: list[str], cascade_size: int = 30) -> list[dict]:
    """
    Generate cascade failure across multiple services (anomaly pattern).

    param start_time: Starting timestamp.
    param affected_services: Services affected by cascade.
    param cascade_size: Number of logs in cascade.
    """
    logs = []
    templates = ANOMALY_TEMPLATES['cascade']
    
    for i in range(cascade_size):
        timestamp = start_time + timedelta(seconds=i * random.uniform(1, 5))
        service = random.choice(affected_services)
        template = random.choice(templates)
        
        message = template.format(
            service=random.choice(SERVICES)
        )
        
        logs.append({
            'timestamp': generate_timestamp_from_datetime(timestamp),
            'level': random.choice(['ERROR', 'FATAL']),
            'service': service,
            'message': message,
            'is_anomaly': 1,
            'anomaly_type': 'cascade_failure'
        })
    
    return logs


def generate_new_pattern_anomaly(timestamp: datetime, service: str) -> dict:
    """
    Generate a new/unusual error pattern (anomaly).

    param timestamp: Log timestamp.
    param service: Service name.
    """
    template = random.choice(ANOMALY_TEMPLATES['new_pattern'])
    
    message = template.format(
        function_name=random.choice(['processPayment', 'validateUser', 'updateInventory']),
        line=random.randint(50, 500),
        table_name=random.choice(['users', 'orders', 'payments']),
        error_code=f'E{random.randint(1000, 9999)}'
    )
    
    return {
        'timestamp': generate_timestamp_from_datetime(timestamp),
        'level': random.choice(['ERROR', 'FATAL']),
        'service': service,
        'message': message,
        'is_anomaly': 1,
        'anomaly_type': 'new_pattern'
    }


def generate_security_anomaly(start_time: datetime, num_events: int = 20) -> list[dict]:
    """
    Generate security-related anomaly (multiple suspicious events).

    param start_time: Starting timestamp.
    param num_events: Number of security events.
    """
    logs = []
    ip = f'{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}'
    
    for i in range(num_events):
        timestamp = start_time + timedelta(seconds=i * random.uniform(0.5, 3))
        template = random.choice(ANOMALY_TEMPLATES['security'])
        
        message = template.format(
            ip_address=ip,
            attempts=i + 1
        )
        
        logs.append({
            'timestamp': generate_timestamp_from_datetime(timestamp),
            'level': 'WARN' if i < num_events - 5 else 'ERROR',
            'service': 'auth-service',
            'message': message,
            'is_anomaly': 1,
            'anomaly_type': 'security_breach'
        })
    
    return logs

## Generate Training Data

In [28]:
def generate_training_data(
    num_normal: int = NUM_NORMAL_LOGS,
    duration_minutes: int = 60,
    num_spikes: int = 15,
    num_cascades: int = 5,
    num_new_patterns: int = 100,
    num_security_incidents: int = 3
) -> pd.DataFrame:
    """
    Generate realistic training data with normal logs and various anomaly patterns.

    param num_normal: Number of normal log entries (baseline traffic).
    param duration_minutes: Time window in minutes.
    param num_spikes: Number of error spikes to inject.
    param num_cascades: Number of cascade failures to inject.
    param num_new_patterns: Number of new/unusual error patterns.
    param num_security_incidents: Number of security incidents.
    """
    print("="*60)
    print("REALISTIC LOG DATA GENERATION")
    print("="*60)
    print(f"\nGenerating logs over {duration_minutes} minute window...")
    print(f"Normal logs (baseline): {num_normal:,}")
    print(f"Anomaly patterns to inject:")
    print(f"  - Error spikes: {num_spikes}")
    print(f"  - Cascade failures: {num_cascades}")
    print(f"  - New error patterns: {num_new_patterns}")
    print(f"  - Security incidents: {num_security_incidents}")
    print()
    
    start_time = datetime.now() - timedelta(minutes=duration_minutes)
    logs = []
    
    # Generate normal baseline logs.
    print("Generating baseline traffic...")
    for i in range(num_normal):
        # Calculate timestamp with realistic spacing.
        avg_interval = (duration_minutes * 60) / num_normal
        offset = i * avg_interval + random.uniform(-avg_interval * 0.3, avg_interval * 0.3)
        timestamp = start_time + timedelta(seconds=offset)
        
        service = random.choice(SERVICES)
        logs.append(generate_normal_log(timestamp, service))
        
        if (i + 1) % 20000 == 0:
            print(f"  Generated {i + 1:,} baseline logs...")
    
    print(f"✓ Baseline complete: {num_normal:,} logs\n")
    
    # Inject error spikes.
    print(f"Injecting {num_spikes} error spikes...")
    for i in range(num_spikes):
        spike_time = start_time + timedelta(minutes=random.uniform(5, duration_minutes - 5))
        service = random.choice(SERVICES)
        spike_size = random.randint(30, 80)
        spike_logs = generate_anomaly_spike(spike_time, service, spike_size)
        logs.extend(spike_logs)
        print(f"  Spike {i+1}: {spike_size} errors at {spike_time.strftime('%H:%M:%S')} from {service}")
    
    print(f"✓ Spikes injected\n")
    
    # Inject cascade failures.
    print(f"Injecting {num_cascades} cascade failures...")
    for i in range(num_cascades):
        cascade_time = start_time + timedelta(minutes=random.uniform(10, duration_minutes - 10))
        affected = random.sample(SERVICES, k=random.randint(3, 5))
        cascade_size = random.randint(40, 60)
        cascade_logs = generate_cascade_failure(cascade_time, affected, cascade_size)
        logs.extend(cascade_logs)
        print(f"  Cascade {i+1}: {cascade_size} errors across {len(affected)} services at {cascade_time.strftime('%H:%M:%S')}")
    
    print(f"✓ Cascades injected\n")
    
    # Inject new/unusual patterns.
    print(f"Injecting {num_new_patterns} new error patterns...")
    for i in range(num_new_patterns):
        pattern_time = start_time + timedelta(minutes=random.uniform(0, duration_minutes))
        service = random.choice(SERVICES)
        logs.append(generate_new_pattern_anomaly(pattern_time, service))
    
    print(f"✓ New patterns injected\n")
    
    # Inject security incidents.
    print(f"Injecting {num_security_incidents} security incidents...")
    for i in range(num_security_incidents):
        incident_time = start_time + timedelta(minutes=random.uniform(5, duration_minutes - 5))
        incident_logs = generate_security_anomaly(incident_time, num_events=random.randint(15, 30))
        logs.extend(incident_logs)
        print(f"  Security incident {i+1} at {incident_time.strftime('%H:%M:%S')}")
    
    print(f"✓ Security incidents injected\n")
    
    # Create DataFrame and sort by timestamp.
    print("Creating dataset...")
    df = pd.DataFrame(logs)
    df['timestamp_dt'] = pd.to_datetime(df['timestamp'])
    df = df.sort_values('timestamp_dt').reset_index(drop=True)
    df = df.drop('timestamp_dt', axis=1)
    
    # Calculate statistics.
    total_logs = len(df)
    anomalous_logs = df['is_anomaly'].sum()
    anomaly_rate = (anomalous_logs / total_logs) * 100
    
    print(f"\n{'='*60}")
    print("DATASET GENERATED SUCCESSFULLY")
    print(f"{'='*60}")
    print(f"Total logs: {total_logs:,}")
    print(f"Normal logs: {total_logs - anomalous_logs:,} ({100 - anomaly_rate:.2f}%)")
    print(f"Anomalous logs: {anomalous_logs:,} ({anomaly_rate:.2f}%)")
    print(f"\nKey insight: Not all ERROR/FATAL logs are anomalies!")
    print(f"  - Baseline contains {df[(df['is_anomaly'] == 0) & (df['level'].isin(['ERROR', 'FATAL']))].shape[0]:,} expected errors")
    print(f"  - Anomalies are identified by PATTERNS (spikes, cascades, new errors)")
    print()
    
    return df

## Generate and Inspect Data

In [29]:
# Generate the training data with realistic anomaly patterns.
df = generate_training_data(
    num_normal=NUM_NORMAL_LOGS,
    duration_minutes=60,
    num_spikes=15,
    num_cascades=5,
    num_new_patterns=100,
    num_security_incidents=3
)

print(f"Shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")

REALISTIC LOG DATA GENERATION

Generating logs over 60 minute window...
Normal logs (baseline): 95,000
Anomaly patterns to inject:
  - Error spikes: 15
  - Cascade failures: 5
  - New error patterns: 100
  - Security incidents: 3

Generating baseline traffic...
  Generated 20,000 baseline logs...
  Generated 40,000 baseline logs...
  Generated 60,000 baseline logs...
  Generated 80,000 baseline logs...
✓ Baseline complete: 95,000 logs

Injecting 15 error spikes...
  Spike 1: 50 errors at 16:04:10 from api-gateway
  Spike 2: 56 errors at 16:02:49 from user-service
  Spike 3: 66 errors at 15:34:24 from payment-service
  Spike 4: 42 errors at 16:16:11 from api-gateway
  Spike 5: 80 errors at 15:33:04 from api-gateway
  Spike 6: 72 errors at 15:34:58 from notification-service
  Spike 7: 50 errors at 15:48:07 from order-service
  Spike 8: 66 errors at 15:54:40 from api-gateway
  Spike 9: 73 errors at 16:09:51 from auth-service
  Spike 10: 32 errors at 15:56:47 from auth-service
  Spike 11: 

In [30]:
# Display first few rows.
print("\n" + "="*60)
print("Sample Normal Logs:")
print("="*60)
df[df['is_anomaly'] == 0].head(10)


Sample Normal Logs:


Unnamed: 0,timestamp,level,service,message,is_anomaly,anomaly_type
0,2025-12-24T15:22:22.408389Z,DEBUG,auth-service,Cache miss for key: cache_70,0,
1,2025-12-24T15:22:22.445143Z,WARN,api-gateway,Slow query detected - duration: 184ms,0,
2,2025-12-24T15:22:22.476306Z,INFO,notification-service,"Database query completed - rows: 297, duration...",0,
3,2025-12-24T15:22:22.521972Z,DEBUG,user-service,Cache miss for key: cache_89,0,
4,2025-12-24T15:22:22.565355Z,INFO,order-service,User session created - session_id: sess_665158,0,
5,2025-12-24T15:22:22.602903Z,INFO,payment-service,API request received - endpoint: /api/payments...,0,
6,2025-12-24T15:22:22.621293Z,WARN,notification-service,Rate limit approaching for user 9201,0,
7,2025-12-24T15:22:22.679093Z,DEBUG,api-gateway,Cache miss for key: cache_94,0,
8,2025-12-24T15:22:22.714181Z,INFO,notification-service,Cache hit for key: cache_58,0,
9,2025-12-24T15:22:22.755490Z,INFO,api-gateway,Email notification sent to user249@example.com,0,


In [31]:
# Display anomalous logs.
print("\n" + "="*60)
print("Sample Anomalous Logs:")
print("="*60)
df[df['is_anomaly'] == 1].head(10)


Sample Anomalous Logs:


Unnamed: 0,timestamp,level,service,message,is_anomaly,anomaly_type
589,2025-12-24T15:22:44.729397Z,FATAL,order-service,OutOfMemoryError - heap space exceeded,1,new_pattern
1019,2025-12-24T15:23:00.956047Z,ERROR,user-service,Data corruption detected in table users,1,new_pattern
1237,2025-12-24T15:23:09.198228Z,ERROR,inventory-service,NullPointerException in updateInventory at lin...,1,new_pattern
1338,2025-12-24T15:23:12.983577Z,FATAL,notification-service,Unexpected error code E2501 from external API,1,new_pattern
2389,2025-12-24T15:23:52.745656Z,ERROR,payment-service,StackOverflowError in updateInventory,1,new_pattern
3344,2025-12-24T15:24:28.927695Z,FATAL,auth-service,Data corruption detected in table orders,1,new_pattern
5038,2025-12-24T15:25:33.075876Z,ERROR,auth-service,NullPointerException in validateUser at line 487,1,new_pattern
6826,2025-12-24T15:26:40.801193Z,FATAL,notification-service,Unexpected error code E3785 from external API,1,new_pattern
7285,2025-12-24T15:26:58.137832Z,FATAL,user-service,Unexpected error code E7441 from external API,1,new_pattern
7960,2025-12-24T15:27:23.697186Z,ERROR,auth-service,Unexpected error code E9127 from external API,1,new_pattern


## Data Statistics

In [32]:
# Detailed statistics.
print("\n" + "="*60)
print("DETAILED DATASET STATISTICS")
print("="*60 + "\n")

print("Class Distribution:")
print(df['is_anomaly'].value_counts())
print(f"\nAnomaly Rate: {(df['is_anomaly'].sum() / len(df)) * 100:.2f}%\n")

print("\nLog Level Distribution:")
print(df['level'].value_counts())
print()

print("\nLog Level Distribution by Anomaly Status:")
level_anomaly = pd.crosstab(df['level'], df['is_anomaly'], margins=True)
level_anomaly.columns = ['Normal', 'Anomaly', 'Total']
print(level_anomaly)

print("\n" + "-"*60)
print("KEY INSIGHT: Expected Errors in Normal Logs")
print("-"*60)
normal_errors = df[(df['is_anomaly'] == 0) & (df['level'].isin(['ERROR', 'FATAL']))]
print(f"Expected ERROR/FATAL logs (NOT anomalies): {len(normal_errors):,}")
print(f"These are business errors like:")
print("  - Invalid passwords")
print("  - Insufficient funds")
print("  - Missing fields")
print("  - Session expired")
print()

print("\nAnomaly Type Distribution:")
if 'anomaly_type' in df.columns:
    anomaly_dist = df[df['is_anomaly'] == 1]['anomaly_type'].value_counts()
    print(anomaly_dist)
    print()

print("\nService Distribution:")
print(df['service'].value_counts())


DETAILED DATASET STATISTICS

Class Distribution:
is_anomaly
0    95000
1     1237
Name: count, dtype: int64

Anomaly Rate: 1.29%


Log Level Distribution:
level
INFO     61864
DEBUG    18814
WARN      9885
ERROR     4756
FATAL      918
Name: count, dtype: int64


Log Level Distribution by Anomaly Status:
       Normal  Anomaly  Total
level                        
DEBUG   18814        0  18814
ERROR    4294      462   4756
FATAL     488      430    918
INFO    61864        0  61864
WARN     9540      345   9885
All     95000     1237  96237

------------------------------------------------------------
KEY INSIGHT: Expected Errors in Normal Logs
------------------------------------------------------------
Expected ERROR/FATAL logs (NOT anomalies): 4,782
These are business errors like:
  - Invalid passwords
  - Insufficient funds
  - Missing fields
  - Session expired


Anomaly Type Distribution:
anomaly_type
spike_error_spike    447
cascade_failure      232
resource_spike       217
casc

## Save Training Data

In [None]:
# Create data directory if it doesn't exist.
import os

data_dir = '../data'
os.makedirs(data_dir, exist_ok=True)

# # Save to CSV.
# output_file = os.path.join(data_dir, 'training_logs.csv')
# df.to_csv(output_file, index=False)
# print(f"\n✓ Training data saved to: {output_file}")
# print(f"  File size: {os.path.getsize(output_file) / (1024*1024):.2f} MB")

# Try to save as Parquet for better performance (optional).
try:
    output_parquet = os.path.join(data_dir, 'training_logs.parquet')
    df.to_parquet(output_parquet, index=False, engine='fastparquet')
    print(f"\n✓ Training data saved to: {output_parquet}")
    print(f"  File size: {os.path.getsize(output_parquet) / (1024*1024):.2f} MB")
except Exception as e:
    print(f"\n⚠ Parquet save skipped (pyarrow/fastparquet not available)")
    print(f"  CSV format is sufficient for training.")


✓ Training data saved to: ../data/training_logs.csv
  File size: 8.97 MB

✓ Training data saved to: ../data/training_logs.parquet
  File size: 2.19 MB


## Generate Streaming Test Data (Smaller Sample)

Generate a smaller dataset for testing the streaming pipeline.

In [34]:
# Generate smaller test set for streaming validation.
print("\n" + "="*60)
print("GENERATING TEST DATASET")
print("="*60 + "\n")

test_df = generate_training_data(
    num_normal=9500,
    duration_minutes=10,
    num_spikes=3,
    num_cascades=1,
    num_new_patterns=20,
    num_security_incidents=1
)

# Save test data.
test_output = os.path.join(data_dir, 'test_logs.csv')
test_df.to_csv(test_output, index=False)
print(f"\n✓ Test data saved to: {test_output}")
print(f"  Total logs: {len(test_df):,}")
print(f"  Anomaly rate: {(test_df['is_anomaly'].sum() / len(test_df)) * 100:.2f}%")


GENERATING TEST DATASET

REALISTIC LOG DATA GENERATION

Generating logs over 10 minute window...
Normal logs (baseline): 9,500
Anomaly patterns to inject:
  - Error spikes: 3
  - Cascade failures: 1
  - New error patterns: 20
  - Security incidents: 1

Generating baseline traffic...
✓ Baseline complete: 9,500 logs

Injecting 3 error spikes...
  Spike 1: 72 errors at 16:17:25 from order-service
  Spike 2: 32 errors at 16:17:25 from payment-service
  Spike 3: 43 errors at 16:17:25 from user-service
✓ Spikes injected

Injecting 1 cascade failures...
  Cascade 1: 59 errors across 3 services at 16:12:28
✓ Cascades injected

Injecting 20 new error patterns...
✓ New patterns injected

Injecting 1 security incidents...
  Security incident 1 at 16:17:25
✓ Security incidents injected

Creating dataset...

DATASET GENERATED SUCCESSFULLY
Total logs: 9,748
Normal logs: 9,500 (97.46%)
Anomalous logs: 248 (2.54%)

Key insight: Not all ERROR/FATAL logs are anomalies!
  - Baseline contains 498 expecte

## Summary

This notebook generates **realistic** synthetic log data with:

### 1. **Realistic Normal Logs (Including Expected Errors)**
- **INFO/DEBUG/WARN**: Typical operations (requests, queries, cache operations).
- **Expected ERROR logs** (NOT anomalies):
  - Invalid passwords, missing fields, validation errors.
  - Business logic errors (insufficient funds, out of stock).
  - Session expired, rate limiting.
- **Expected FATAL logs** (NOT anomalies when isolated):
  - Rare unhandled exceptions.

### 2. **True Anomaly Patterns**
Anomalies are detected by **patterns**, not just log level:

- **Error Spikes**: Sudden burst of errors (30-80 errors in seconds).
- **Cascade Failures**: Multiple services failing together.
- **New Error Patterns**: Unusual errors that rarely occur (NullPointer, OutOfMemory).
- **Resource Exhaustion**: Critical resource issues (disk full, memory critical).
- **Security Incidents**: Brute force attacks, suspicious activity patterns.

### 3. **Key Differences from Traditional Approach**
- ❌ **Wrong**: All ERROR/FATAL logs = Anomalies.
- ✅ **Correct**: Anomalies = Unusual **patterns** and **behavior**.
  
### 4. **What Makes This Realistic**
- Expected errors occur naturally in baseline traffic.
- Anomalies are temporal patterns (spikes, sequences, new behaviors).
- Mixed log levels in both normal and anomalous data.
- Time-series structure with realistic timestamps.

### 5. **Model Training Implications**
Your anomaly detection model should learn:
- **Frequency patterns**: Error rate vs time.
- **Temporal sequences**: Spike detection, cascade patterns.
- **Novelty detection**: New error messages.
- **Service correlation**: Multiple services failing together.
- **NOT just**: log_level == ERROR → anomaly.

### 6. **Output Files**
- `training_logs.csv`: Full training dataset (~96k logs).
- `training_logs.parquet`: Same data, compressed format.
- `test_logs.csv`: Smaller test set for pipeline validation.

**Next Steps:**
1. Train models with temporal features (sliding windows, error rates).
2. Use anomaly_type field for multi-class classification.
3. Test streaming pipeline with realistic patterns.
4. Tune detection for < 30 second latency requirement.