# Anomaly Detection with Neural Heatmap API

This notebook demonstrates anomaly detection and analysis capabilities.

## What You'll Learn
- Retrieve detected anomalies
- Filter anomalies by severity and type
- Visualize anomaly patterns
- Export anomaly data for further analysis

In [None]:
# Import required libraries
import asyncio
from datetime import datetime
from pathlib import Path

from neural_heatmap import NeuralHeatmapClient, connect, Anomaly
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Set style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)

print("Libraries loaded successfully!")

## 1. Connect to Server

In [None]:
async def connect_to_server():
    client = await connect("http://localhost:8080")
    if client.connected:
        print("✅ Connected to Neural Heatmap server")
    return client

client = await connect_to_server()

## 2. Get All Anomalies

Retrieve all detected anomalies.

In [None]:
async def get_all_anomalies():
    # Get all anomalies
    anomalies = await client.get_anomalies()
    
    print(f"Found {len(anomalies)} anomalies")
    
    # Group by severity
    severity_counts = {}
    for anomaly in anomalies:
        severity = anomaly.severity
        if severity not in severity_counts:
            severity_counts[severity] = 0
        severity_counts[severity] += 1
    
    print("\nSeverity Distribution:")
    for severity, count in sorted(severity_counts.items()):
        print(f"  {severity.title()}: {count}")
    
    return anomalies, severity_counts

anomalies, severity_counts = await get_all_anomalies()

## 3. Visualize Severity Distribution

In [None]:
def plot_severity_distribution(severity_counts):
    """Plot severity distribution"""
    fig, ax = plt.subplots(figsize=(10, 6))
    
    severities = list(severity_counts.keys())
    counts = list(severity_counts.values())
    
    # Color mapping for severity
    color_map = {
        'low': '#2ca02c',
        'medium': '#ff7f0e',
        'high': '#d62728',
        'critical': '#9467bd'
    }
    colors = [color_map.get(s, '#1f77b4') for s in severities]
    
    bars = ax.bar(severities, counts, color=colors, alpha=0.7, edgecolor='black')
    ax.set_xlabel('Severity', fontsize=12)
    ax.set_ylabel('Count', fontsize=12)
    ax.set_title('Anomaly Severity Distribution', fontsize=14)
    ax.grid(True, alpha=0.3, axis='y')
    
    # Add count labels on bars
    for bar in bars:
        height = bar.get_height()
        ax.text(bar.get_x() + bar.get_width()/2., height,
               f'{int(height)}',
               ha='center', va='bottom')
    
    plt.tight_layout()
    plt.show()

plot_severity_distribution(severity_counts)

## 4. Filter by Severity

Get anomalies of specific severity levels.

In [None]:
async def get_high_severity_anomalies():
    # Get high and critical severity anomalies
    high_anomalies = await client.get_anomalies(severity="high")
    critical_anomalies = await client.get_anomalies(severity="critical")
    
    print(f"High severity anomalies: {len(high_anomalies)}")
    print(f"Critical severity anomalies: {len(critical_anomalies)}")
    
    # Display details
    if high_anomalies:
        print("\nHigh Severity Anomalies:")
        for anomaly in high_anomalies[:5]:
            print(f"  - {anomaly.description[:80]}...")
            print(f"    Score: {anomaly.score:.3f}, Layer: {anomaly.layer}")
    
    if critical_anomalies:
        print("\nCritical Severity Anomalies:")
        for anomaly in critical_anomalies[:5]:
            print(f"  - {anomaly.description[:80]}...")
            print(f"    Score: {anomaly.score:.3f}, Layer: {anomaly.layer}")
    
    return high_anomalies, critical_anomalies

high_anomalies, critical_anomalies = await get_high_severity_anomalies()

## 5. Analyze Anomaly Types

Group anomalies by type.

In [None]:
def analyze_anomaly_types(anomalies):
    """Group and analyze anomalies by type"""
    type_counts = {}
    type_details = {}
    
    for anomaly in anomalies:
        atype = anomaly.anomaly_type
        if atype not in type_counts:
            type_counts[atype] = 0
            type_details[atype] = []
        type_counts[atype] += 1
        type_details[atype].append(anomaly)
    
    print("\nAnomaly Types:")
    for atype, count in sorted(type_counts.items(), key=lambda x: x[1], reverse=True):
        print(f"  {atype}: {count}")
    
    return type_counts, type_details

type_counts, type_details = analyze_anomaly_types(anomalies)

## 6. Visualize Anomaly Timeline

Create a timeline visualization of anomalies.

In [None]:
def plot_anomaly_timeline(anomalies):
    """Create timeline visualization of anomalies"""
    if not anomalies:
        print("No anomalies to visualize")
        return
    
    fig, ax = plt.subplots(figsize=(14, 6))
    
    # Sort by timestamp
    sorted_anomalies = sorted(anomalies, key=lambda a: a.timestamp)
    
    # Color mapping
    color_map = {
        'low': '#2ca02c',
        'medium': '#ff7f0e',
        'high': '#d62728',
        'critical': '#9467bd'
    }
    
    # Plot anomalies
    timestamps = [a.timestamp for a in sorted_anomalies]
    scores = [a.score for a in sorted_anomalies]
    colors = [color_map.get(a.severity, '#1f77b4') for a in sorted_anomalies]
    sizes = [100 + a.score * 100 for a in sorted_anomalies]
    
    scatter = ax.scatter(timestamps, scores, c=colors, s=sizes, alpha=0.6, edgecolors='black')
    
    ax.set_xlabel('Time', fontsize=12)
    ax.set_ylabel('Anomaly Score', fontsize=12)
    ax.set_title('Anomaly Timeline', fontsize=14)
    ax.grid(True, alpha=0.3)
    
    # Create legend
    from matplotlib.patches import Patch
    legend_elements = [Patch(facecolor=color_map[s], label=s.title()) 
                      for s in color_map.keys()]
    ax.legend(handles=legend_elements, loc='upper right')
    
    plt.tight_layout()
    plt.show()

plot_anomaly_timeline(anomalies)

## 7. Spatial Distribution of Anomalies

Visualize anomalies in spatial context.

In [None]:
def plot_spatial_distribution(anomalies):
    """Plot spatial distribution of anomalies"""
    # Filter anomalies with location data
    located_anomalies = [a for a in anomalies if a.location]
    
    if not located_anomalies:
        print("No anomalies with location data")
        return
    
    fig, ax = plt.subplots(figsize=(12, 10))
    
    # Color by severity
    color_map = {
        'low': '#2ca02c',
        'medium': '#ff7f0e',
        'high': '#d62728',
        'critical': '#9467bd'
    }
    
    # Group by severity
    for severity in color_map.keys():
        severity_anomalies = [a for a in located_anomalies if a.severity == severity]
        
        if severity_anomalies:
            x_coords = [a.location.get('x', 0) for a in severity_anomalies]
            y_coords = [a.location.get('y', 0) for a in severity_anomalies]
            scores = [a.score for a in severity_anomalies]
            
            scatter = ax.scatter(x_coords, y_coords, 
                               c=[color_map[severity]], 
                               s=[100 + s * 200 for s in scores],
                               alpha=0.6,
                               edgecolors='black',
                               label=severity.title())
    
    ax.set_xlabel('X Coordinate', fontsize=12)
    ax.set_ylabel('Y Coordinate', fontsize=12)
    ax.set_title('Spatial Distribution of Anomalies', fontsize=14)
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

plot_spatial_distribution(anomalies)

## 8. Layer-wise Anomaly Analysis

In [None]:
def analyze_layer_anomalies(anomalies):
    """Analyze anomalies by neural layer"""
    layer_anomalies = {}
    
    for anomaly in anomalies:
        layer = anomaly.layer or "unknown"
        if layer not in layer_anomalies:
            layer_anomalies[layer] = []
        layer_anomalies[layer].append(anomaly)
    
    print("\nAnomalies by Layer:")
    print(f"{'Layer':<30} {'Count':<10} {'Avg Score':<12} {'Max Severity':<15}")
    print("-" * 70)
    
    for layer, layer_list in sorted(layer_anomalies.items(), 
                                    key=lambda x: len(x[1]), 
                                    reverse=True):
        count = len(layer_list)
        avg_score = sum(a.score for a in layer_list) / count
        max_severity = max(a.severity for a in layer_list)
        print(f"{layer:<30} {count:<10} {avg_score:<12.3f} {max_severity:<15}")
    
    return layer_anomalies

layer_anomalies = analyze_layer_anomalies(anomalies)

## 9. Export Anomaly Data

In [None]:
async def export_anomaly_data():
    """Export anomaly data"""
    export_dir = Path("./anomaly_analysis")
    export_dir.mkdir(exist_ok=True)
    
    # Export to CSV
    await client.download_anomalies(
        export_dir / "anomalies.csv"
    )
    print("✅ Exported anomalies.csv")
    
    # Export high severity to JSON
    import json
    
    high_severity_data = [
        {
            "type": a.anomaly_type,
            "severity": a.severity,
            "score": a.score,
            "description": a.description,
            "layer": a.layer,
            "location": a.location,
            "timestamp": a.timestamp,
            "metadata": a.metadata
        }
        for a in high_anomalies + critical_anomalies
    ]
    
    with open(export_dir / "high_severity_anomalies.json", "w") as f:
        json.dump(high_severity_data, f, indent=2)
    print("✅ Exported high_severity_anomalies.json")
    
    # Export summary statistics
    summary = {
        "total_anomalies": len(anomalies),
        "severity_distribution": severity_counts,
        "type_distribution": type_counts,
        "high_severity_count": len(high_anomalies),
        "critical_count": len(critical_anomalies),
        "layer_counts": {layer: len(anoms) for layer, anoms in layer_anomalies.items()}
    }
    
    with open(export_dir / "anomaly_summary.json", "w") as f:
        json.dump(summary, f, indent=2)
    print("✅ Exported anomaly_summary.json")

await export_anomaly_data()

## 10. Anomaly Alert System

Set up real-time monitoring for critical anomalies.

In [None]:
async def monitor_critical_anomalies():
    """Subscribe to real-time anomaly updates"""
    # Subscribe to anomaly updates
    await client.subscribe_anomalies()
    
    print("Monitoring for critical anomalies...")
    print("(Press Ctrl+C to stop)")
    
    try:
        async for update in client.stream_updates(message_types=["anomaly_update"]):
            if update.severity in ["high", "critical"]:
                timestamp = datetime.fromtimestamp(update.timestamp / 1000)
                print(f"\n⚠️ [{timestamp}] {update.severity.upper()} ANOMALY DETECTED")
                print(f"   Description: {update.description}")
                print(f"   Score: {update.score:.3f}")
                if update.location:
                    print(f"   Location: ({update.location.get('x', 0)}, {update.location.get('y', 0)})")
    except KeyboardInterrupt:
        print("\nStopping anomaly monitor...")

# Uncomment to run the monitor:
# await monitor_critical_anomalies()

## 11. Cleanup

In [None]:
async def cleanup():
    await client.unsubscribe_anomalies()
    await client.disconnect()
    print("✅ Disconnected from server")

await cleanup()

## Summary

In this notebook, you learned:
- How to retrieve and filter anomalies
- Analyzing anomaly severity and types
- Visualizing anomaly timelines and spatial distributions
- Layer-wise anomaly analysis
- Exporting anomaly data
- Setting up real-time anomaly monitoring

## Next Steps
- `05_custom_workflows.ipynb` - Build custom analysis workflows
- Combine anomaly detection with temporal analysis