In [3]:
# Run this cell first to install necessary Python packages
# The '!' runs this as a shell command in your environment
%pip install redis pandas numpy scikit-learn -q
print("Required Python libraries installed (or already present).")

Note: you may need to restart the kernel to use updated packages.
Required Python libraries installed (or already present).


In [4]:
import redis
import time
import random
import json
import datetime
import pandas as pd
import numpy as np
from collections import deque
from sklearn.ensemble import IsolationForest

# --- Configuration ---
REDIS_HOST = 'localhost' # Connect to your locally running Redis server
REDIS_PORT = 6379
STREAM_NAME = 'vscode_sensor_stream' # Use a unique name for this notebook/project
DATA_FIELD_NAME = 'metric_value' # Key for the metric within the stream message dict

# Producer Simulation Config
SIMULATION_DELAY_SECONDS = 0.05 # Controls speed of data generation
ANOMALY_CHANCE = 0.05        # 5% chance of generating an anomaly
NORMAL_RANGE = (90, 110)
ANOMALY_RANGE_LOW = (40, 60)
ANOMALY_RANGE_HIGH = (140, 180)

# Consumer Config
CONSUMER_GROUP_NAME = 'vscode_anomaly_detectors'
CONSUMER_NAME = 'vscode_detector_instance_1' # Identifier for this consumer instance

# Anomaly Detection Parameters (SELECT ONE METHOD by uncommenting/commenting)

# === Option 1: Rolling Z-Score ===
DETECTION_METHOD = 'zscore'
WINDOW_SIZE = 20      # Number of data points for rolling stats
Z_SCORE_THRESHOLD = 2.5 # How many std deviations away to be an anomaly

# === Option 2: Isolation Forest ===
# DETECTION_METHOD = 'isolation_forest'
# TRAIN_AFTER_N_POINTS = 50 # Train model after receiving this many points
# IFOREST_CONTAMINATION = 'auto' # Or a float like 0.05 (expected anomaly rate)
# IFOREST_RANDOM_STATE = 42
# --- End Configuration ---

print("Configuration loaded.")
print(f"Using Detection Method: {DETECTION_METHOD}")

# --- Consumer State (Global variables in Notebook Scope) ---
# These hold the state for the detectors between cell executions
data_window_zscore = deque(maxlen=WINDOW_SIZE) # For rolling Z-score
# Isolation Forest specific state
points_processed_iforest = 0
training_data_iforest = []
model_iforest = None # Will hold the trained Isolation Forest model
# --- End State ---

print("Consumer state variables initialized.")

Configuration loaded.
Using Detection Method: zscore
Consumer state variables initialized.


In [7]:
try:
    # decode_responses=True automatically decodes bytes to strings from Redis
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    r.ping() # Check the connection
    print(f"Successfully connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
except redis.exceptions.ConnectionError as e:
    print(f"ERROR: Could not connect to Redis at {REDIS_HOST}:{REDIS_PORT}")
    print("!!! Please ensure your Redis server is running locally. !!!")
    print(f"Common ways to start Redis:")
    print("  - Docker: docker run -d -p 6379:6379 redis")
    print("  - Homebrew (Mac): brew services start redis")
    print("  - Linux Service: sudo systemctl start redis-server (or redis)")
    print(f"Error details: {e}")
    # Stop execution if connection fails
    raise ConnectionError("Cannot proceed without Redis connection.")

# Optional: Clean up stream/group from previous runs (useful for repeated testing)
# Use with caution if other processes might be using the same stream name!
# try:
#     r.delete(STREAM_NAME)
#     print(f"Stream '{STREAM_NAME}' deleted (if it existed).")
# except Exception as del_err:
#     print(f"Note: Could not delete stream (may not exist or other error): {del_err}")
# try:
#     # Deleting group requires stream to exist first or handling the error
#     # r.xgroup_destroy(STREAM_NAME, CONSUMER_GROUP_NAME)
#     # print(f"Consumer group '{CONSUMER_GROUP_NAME}' deleted (if it existed).")
#      pass # Simpler to just let group persist for this example
# except Exception as del_err:
#     print(f"Note: Could not delete group (may not exist or other error): {del_err}")

Successfully connected to Redis at localhost:6379


In [8]:
def generate_and_send_data(redis_conn, count=1, inject_anomaly_now=False):
    """Generates data points and sends them to the Redis stream."""
    messages_sent = 0
    for i in range(count):
        timestamp = datetime.datetime.utcnow().isoformat()
        # Determine if this point should be an anomaly
        is_anomaly = False
        if inject_anomaly_now and i == 0: # Only inject on the first if requested
             is_anomaly = True
        elif not inject_anomaly_now:
            is_anomaly = random.random() < ANOMALY_CHANCE

        # Generate value based on whether it's an anomaly
        if is_anomaly:
            # Randomly choose between high or low anomaly range
            value = random.uniform(*ANOMALY_RANGE_HIGH) if random.choice([True, False]) else random.uniform(*ANOMALY_RANGE_LOW)
        else:
            value = random.uniform(*NORMAL_RANGE)

        # Create the data payload, including injected status for verification
        data_point = {"timestamp": timestamp, DATA_FIELD_NAME: value, "anomaly_injected": is_anomaly}
        # The stream message stores fields/values; here we store the whole data_point JSON as the value for our field
        message_data = { DATA_FIELD_NAME: json.dumps(data_point) }

        try:
            # XADD adds the message to the stream. '*' lets Redis auto-generate the unique message ID.
            message_id = redis_conn.xadd(STREAM_NAME, message_data)
            messages_sent += 1
            # Optional: print(f"  Producer -> Sent ID: {message_id}, Value: {value:.2f}, Anomaly: {is_anomaly}")
            time.sleep(SIMULATION_DELAY_SECONDS) # Simulate time gap between events
        except redis.exceptions.ConnectionError as e:
            print(f"ERROR during produce: Redis connection lost: {e}")
            raise e # Stop simulation if Redis disconnects
        except Exception as e:
            print(f"ERROR during produce: Unexpected error: {e}")
            break # Stop producing if another error occurs
    return messages_sent

print("Producer function 'generate_and_send_data' defined.")

Producer function 'generate_and_send_data' defined.


In [9]:
# Ensure the consumer group exists on the stream. Creates if not present.
try:
    # id='0' means the group starts tracking from the beginning of the stream if created now.
    # mkstream=True creates the stream itself if it doesn't exist yet.
    r.xgroup_create(STREAM_NAME, CONSUMER_GROUP_NAME, id='0', mkstream=True)
    print(f"Consumer group '{CONSUMER_GROUP_NAME}' ensured on stream '{STREAM_NAME}'.")
except redis.exceptions.ResponseError as e:
    # This specific error is expected if the group already exists, which is fine.
    if "BUSYGROUP Consumer Group name already exists" in str(e):
        print(f"Consumer group '{CONSUMER_GROUP_NAME}' already exists.")
    else:
        # Other errors during group creation should be raised.
        print(f"ERROR: Could not create/access consumer group: {e}")
        raise e

print("Consumer state variables should be initialized from Step 2.")

Consumer group 'vscode_anomaly_detectors' ensured on stream 'vscode_sensor_stream'.
Consumer state variables should be initialized from Step 2.


In [10]:
def detect_anomaly(message_data_str):
    """
    Parses message, updates relevant state, performs anomaly detection
    based on the configured DETECTION_METHOD, and returns True if anomaly detected.
    Accesses global variables for state (data_window_zscore, model_iforest, etc.).
    """
    # These lines declare that we intend to modify the global variables defined in Step 2
    global data_window_zscore
    global points_processed_iforest, training_data_iforest, model_iforest

    try:
        # Decode the JSON data stored in the stream message field
        data_point = json.loads(message_data_str)
        value = float(data_point[DATA_FIELD_NAME])
        timestamp = data_point.get("timestamp", datetime.datetime.utcnow().isoformat())
        anomaly_injected = data_point.get("anomaly_injected", False) # Check if producer marked this as anomaly
    except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
        print(f"  Consumer WARN: Error parsing message data: {e}, Data: '{message_data_str[:100]}...'")
        return False # Cannot process this message

    anomaly_detected = False # Flag to indicate if detection logic triggers
    details = {} # Dictionary to store context about the detection

    # === Z-Score Detection Logic ===
    if DETECTION_METHOD == 'zscore':
        current_mean = np.nan
        current_std = np.nan
        z_score = np.nan

        # Calculate rolling stats only if window has enough data for meaningful std dev
        if len(data_window_zscore) >= WINDOW_SIZE // 2:
            # Calculate stats based on the window *before* adding the current point
            past_window_data = list(data_window_zscore)
            current_mean = np.mean(past_window_data)
            current_std = np.std(past_window_data)

            # Check for non-zero std dev to avoid division errors
            if current_std > 1e-6: # Use a small tolerance
                z_score = abs((value - current_mean) / current_std)
                # Check if Z-score exceeds the threshold
                if z_score > Z_SCORE_THRESHOLD:
                    anomaly_detected = True
                    details = {'z_score': z_score, 'mean': current_mean, 'std': current_std, 'threshold': Z_SCORE_THRESHOLD}
            # Optional: Handle case where std=0 (e.g., if value != mean, maybe flag it)

        # Add the current value to the window for the *next* calculation
        data_window_zscore.append(value)

    # === Isolation Forest Detection Logic ===
    elif DETECTION_METHOD == 'isolation_forest':
        points_processed_iforest += 1
        value_array = np.array([[value]]) # Scikit-learn models expect 2D array-like input

        # --- Training Phase ---
        # If the model hasn't been trained yet
        if model_iforest is None:
            # Collect data points until the threshold is reached
            if points_processed_iforest <= TRAIN_AFTER_N_POINTS:
                training_data_iforest.append([value]) # Add value as a list [value]
                # Print progress message occasionally
                if points_processed_iforest % (TRAIN_AFTER_N_POINTS // 5) == 0 or points_processed_iforest == TRAIN_AFTER_N_POINTS:
                    print(f"  Consumer: Collecting I-Forest training data {points_processed_iforest}/{TRAIN_AFTER_N_POINTS}...")

                # Once enough data is collected, train the model
                if points_processed_iforest == TRAIN_AFTER_N_POINTS:
                    print(f"\n  Consumer: Training Isolation Forest with {len(training_data_iforest)} points...")
                    try:
                        # Create and fit the Isolation Forest model
                        model_iforest = IsolationForest(n_estimators=100, # Number of trees
                                                        contamination=IFOREST_CONTAMINATION, # Expected proportion of outliers
                                                        random_state=IFOREST_RANDOM_STATE
                                                        ).fit(np.array(training_data_iforest)) # Fit on collected data
                        print("  Consumer: Isolation Forest training complete.")
                        training_data_iforest = [] # Clear training data list to save memory
                    except Exception as train_err:
                        print(f"  Consumer ERROR: Isolation Forest training failed: {train_err}")
                        # Model remains None, detection won't happen for this method yet

        # --- Prediction Phase ---
        # If the model *has* been trained successfully
        elif model_iforest is not None:
            try:
                # Predict returns 1 for inliers, -1 for outliers (anomalies)
                prediction = model_iforest.predict(value_array)
                # Score samples gives anomaly score (lower means more anomalous)
                score = model_iforest.score_samples(value_array)[0]

                # Check if the prediction indicates an anomaly
                if prediction[0] == -1:
                    anomaly_detected = True
                    details = {'iforest_score': score, 'prediction': prediction[0]}
            except Exception as predict_err:
                print(f"  Consumer ERROR: Isolation Forest prediction failed: {predict_err}")

    # --- Alerting Section ---
    # If the detection logic flagged an anomaly
    if anomaly_detected:
        status = "CORRECTLY DETECTED" if anomaly_injected else "FALSE POSITIVE"
        print(f"\n{'='*15} ANOMALY DETECTED ({status}) {'='*15}")
        print(f" Timestamp: {timestamp}")
        print(f" Value:     {value:.4f} (Producer Marked Anomaly: {anomaly_injected})") # Show if producer intended it
        print(f" Method:    {DETECTION_METHOD}")
        # Print specific details from the detection method
        for key, val in details.items():
             print(f" {key.replace('_', ' ').title()}: {val:.4f}" if isinstance(val, float) else f" {key.replace('_', ' ').title()}: {val}")
        print("="*(32 + len(status) + 2))
        return True # Indicate anomaly was found

    # If anomaly was NOT detected, but producer *intended* it to be one (a miss)
    elif anomaly_injected:
        print(f"\n{'!'*15} MISSED ANOMALY (FALSE NEGATIVE) {'!'*15}")
        print(f" Timestamp: {timestamp}")
        print(f" Value:     {value:.4f} (Producer Marked Anomaly: True)")
        print(f" Method:    {DETECTION_METHOD}")
        print(f" Details:   Did not meet detection criteria.")
        # Optional: Print Z-score/IForest score even if not exceeding threshold for debugging misses
        # if DETECTION_METHOD == 'zscore' and 'z_score' in locals(): print(f"           (Z-Score was: {z_score:.4f})")
        # elif DETECTION_METHOD == 'isolation_forest' and 'score' in locals(): print(f"           (IForest Score was: {score:.4f})")
        print("!"*(32 + len("FALSE NEGATIVE") + 2))
        return False # No anomaly detected by the logic

    # If no anomaly detected and none was injected
    return False

print("Anomaly detection function 'detect_anomaly' defined.")

Anomaly detection function 'detect_anomaly' defined.


In [11]:
def process_stream_messages(redis_conn, max_messages_per_batch=50):
    """Reads messages from the stream using consumer group and processes them."""
    anomalies_found_in_batch = 0
    messages_processed_in_batch = 0

    try:
        # XREADGROUP reads messages for a specific group/consumer.
        # '>' ID means get new messages that haven't been delivered to this consumer yet.
        # BLOCK 100: Waits up to 100ms for new messages if none are immediately available. 0 means don't block.
        # COUNT: Max messages to read in this single command.
        response = redis_conn.xreadgroup(
            groupname=CONSUMER_GROUP_NAME,
            consumername=CONSUMER_NAME,
            streams={STREAM_NAME: '>'},
            count=max_messages_per_batch,
            block=100 # Milliseconds to wait
        )

        # If the response is empty, no new messages arrived within the block time.
        if not response:
            return 0, 0 # Return zero counts for this batch

        # Response format: [['stream_name', [['message_id', {'field': 'value'}], ...]]]
        for stream_name, messages in response:
            # print(f"Consumer received {len(messages)} messages...") # Can be verbose
            for message_id, message_data in messages:
                # Check if our expected data field exists
                if DATA_FIELD_NAME in message_data:
                    # Call the detection function with the JSON payload
                    if detect_anomaly(message_data[DATA_FIELD_NAME]):
                        anomalies_found_in_batch += 1
                    messages_processed_in_batch += 1
                else:
                    # Log if message format is unexpected
                    print(f"  Consumer WARN: Field '{DATA_FIELD_NAME}' not in msg '{message_id}'. Skipping.")

                # IMPORTANT: Acknowledge the message was processed by this consumer.
                # This prevents it from being re-delivered to this or other consumers in the group later.
                redis_conn.xack(STREAM_NAME, CONSUMER_GROUP_NAME, message_id)

    except redis.exceptions.ConnectionError as e:
        print(f"ERROR during consume: Redis connection lost: {e}")
        raise e # Stop simulation if Redis disconnects
    except Exception as e:
        print(f"ERROR during consume: Unexpected error: {e}")
        # Decide how to handle other errors, maybe log and continue or raise
        # raise e

    # Return the counts for this specific processing batch
    return messages_processed_in_batch, anomalies_found_in_batch

print("Consumer processing function 'process_stream_messages' defined.")

Consumer processing function 'process_stream_messages' defined.


In [12]:
# --- Simulation Parameters ---
TOTAL_SIMULATION_STEPS = 100 # How many produce/consume cycles to run
MESSAGES_PER_STEP = 5       # How many messages the producer sends each step
MAX_CONSUME_PER_STEP = 50 # Max messages consumer tries to process each step
PRINT_INTERVAL = 10       # How often (in steps) to print summary status

# --- Initialize Counters ---
total_messages_produced = 0
total_messages_processed = 0
total_anomalies_detected = 0

print(f"Starting simulation loop for {TOTAL_SIMULATION_STEPS} steps...")
print(f"Producer generates ~{MESSAGES_PER_STEP * (1-ANOMALY_CHANCE):.1f} normal and ~{MESSAGES_PER_STEP * ANOMALY_CHANCE:.1f} anomalies per step.")
print("="*50)

# --- Main Loop ---
try:
    for step in range(1, TOTAL_SIMULATION_STEPS + 1):
        # 1. Produce Data
        # Occasionally inject a known anomaly for testing (e.g., every 20 steps)
        inject_specific_anomaly = (step % 20 == 0)
        produced_now = generate_and_send_data(r, count=MESSAGES_PER_STEP, inject_anomaly_now=inject_specific_anomaly)
        if inject_specific_anomaly and produced_now > 0:
             print(f"--- Step {step}: Injected specific anomaly ---")
        total_messages_produced += produced_now

        # 2. Consume Data
        processed_now, anomalies_now = process_stream_messages(r, max_messages_per_batch=MAX_CONSUME_PER_STEP)
        total_messages_processed += processed_now
        total_anomalies_detected += anomalies_now

        # 3. Print Status Periodically
        if step % PRINT_INTERVAL == 0 or step == TOTAL_SIMULATION_STEPS:
            print(f"--- Step {step}/{TOTAL_SIMULATION_STEPS} Status ---")
            print(f"  Total Produced:  {total_messages_produced}")
            print(f"  Total Processed: {total_messages_processed}")
            print(f"  Total Detected:  {total_anomalies_detected}")
            # Display method-specific state
            if DETECTION_METHOD == 'zscore':
                print(f"  Z-Score Window Size: {len(data_window_zscore)}")
            elif DETECTION_METHOD == 'isolation_forest':
                 model_status = "Trained & Predicting" if model_iforest else f"Training Mode ({points_processed_iforest}/{TRAIN_AFTER_N_POINTS})"
                 print(f"  Isolation Forest Status: {model_status}")
            print("-"*(len(f"--- Step {step}/{TOTAL_SIMULATION_STEPS} Status ---")))

        # 4. Optional small delay to prevent overwhelming CPU if simulation is very fast
        # time.sleep(0.01)

except KeyboardInterrupt:
    print("\n--- Simulation interrupted by user ---")
except Exception as loop_err:
    print(f"\n--- Simulation stopped due to error: {loop_err} ---")


print("="*50)
print("Simulation loop finished.")
# Final status might differ slightly if interrupted before last status print
print(f"Final Tallies: Produced={total_messages_produced}, Processed={total_messages_processed}, Detected={total_anomalies_detected}")

Starting simulation loop for 100 steps...
Producer generates ~4.8 normal and ~0.2 anomalies per step.


  timestamp = datetime.datetime.utcnow().isoformat()
  timestamp = data_point.get("timestamp", datetime.datetime.utcnow().isoformat())


--- Step 10/100 Status ---
  Total Produced:  50
  Total Processed: 50
  Total Detected:  0
  Z-Score Window Size: 20
--------------------------

 Timestamp: 2025-04-14T11:31:47.454264
 Value:     148.8446 (Producer Marked Anomaly: True)
 Method:    zscore
 Z Score: 8.0747
 Mean: 101.0787
 Std: 5.9155
 Threshold: 2.5000
--- Step 20: Injected specific anomaly ---

 Timestamp: 2025-04-14T11:31:48.082971
 Value:     52.0957 (Producer Marked Anomaly: True)
 Method:    zscore
 Z Score: 4.7419
 Mean: 105.0185
 Std: 11.1607
 Threshold: 2.5000
--- Step 20/100 Status ---
  Total Produced:  100
  Total Processed: 100
  Total Detected:  2
  Z-Score Window Size: 20
--------------------------

 Timestamp: 2025-04-14T11:31:48.866492
 Value:     45.5521 (Producer Marked Anomaly: True)
 Method:    zscore
 Z Score: 4.3322
 Mean: 98.6352
 Std: 12.2531
 Threshold: 2.5000

 Timestamp: 2025-04-14T11:31:49.126406
 Value:     51.5336 (Producer Marked Anomaly: True)
 Method:    zscore
 Z Score: 2.6652
 Mean: 