# 🔍 Interactive Anomaly Detection Visualization

This notebook provides an interactive tool for visualizing anomaly detection results using patch-based comparison with a sub-bank of representative embeddings.

## Features:
- 🖼️ Interactive test image selection
- 🧩 Patch-wise comparison with sub-bank
- 🎨 Anomaly map generation and visualization
- 📊 Interactive Plotly visualizations

In [1]:
# Import Required Libraries
import sys
import os
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pickle
from PIL import Image
import ipywidgets as widgets
from IPython.display import display, HTML
import matplotlib.pyplot as plt
from pathlib import Path

# Add current directory to path for imports
sys.path.insert(0, os.getcwd())

print("✅ Libraries imported successfully!")

✅ Libraries imported successfully!


## 📂 Load Test Images and Sub Bank Data

Loading the test dataset and the optimized sub-bank created with K=2 clusters.

In [2]:
# Load test images and sub bank data
try:
    # Load dataset (assuming we can use the app's load function)
    from app import load_mvtec_category
    
    print("📂 Loading MVTec wood dataset...")
    dataset = load_mvtec_category('wood')
    
    if dataset is None:
        raise ValueError("Could not load dataset")
    
    test_images = dataset.get('test_images', [])
    test_labels = dataset.get('test_labels', [])
    
    print(f"✅ Loaded {len(test_images)} test images")
    print(f"📊 Available labels: {set(test_labels)}")
    
    # Load sub bank
    print("\n🏦 Loading sub-bank...")
    sub_bank_path = "sub_bank_k2.pkl"
    
    if os.path.exists(sub_bank_path):
        with open(sub_bank_path, 'rb') as f:
            sub_bank = pickle.load(f)
        print(f"✅ Loaded sub-bank with shape: {sub_bank.shape}")
    else:
        raise FileNotFoundError(f"Sub-bank file not found: {sub_bank_path}")
        
except Exception as e:
    print(f"❌ Error loading data: {e}")
    print("💡 Make sure you're running this from the enhanced_dashboard directory")
    print("💡 And that sub_bank_k2.pkl exists")

📂 Loading MVTec wood dataset...
✅ Loaded 20 test images
📊 Available labels: {'combined', 'color', 'good'}

🏦 Loading sub-bank...
✅ Loaded sub-bank with shape: (1300, 512)


## 🎛️ Create Interactive Image Selector Widget

Select which test image you want to analyze for anomalies.

In [3]:
# Create interactive image selector widget
if 'test_images' in locals() and len(test_images) > 0:
    
    # Create dropdown options with image index and label
    dropdown_options = []
    for i, label in enumerate(test_labels):
        dropdown_options.append((f"Image {i+1}: {label}", i))
    
    # Create the dropdown widget
    image_selector = widgets.Dropdown(
        options=dropdown_options,
        value=0,
        description='Test Image:',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='300px')
    )
    
    # Create output widget for displaying selected image info
    output_widget = widgets.Output()
    
    def on_image_change(change):
        with output_widget:
            output_widget.clear_output()
            idx = change['new']
            label = test_labels[idx]
            img_shape = test_images[idx].shape
            print(f"🖼️ Selected: Image {idx+1}")
            print(f"🏷️ Label: {label}")
            print(f"📐 Shape: {img_shape}")
    
    image_selector.observe(on_image_change, names='value')
    
    print("🎛️ Image selector created!")
    display(image_selector)
    display(output_widget)
    
    # Trigger initial display
    on_image_change({'new': 0})
else:
    print("❌ No test images available")

🎛️ Image selector created!


Dropdown(description='Test Image:', layout=Layout(width='300px'), options=(('Image 1: color', 0), ('Image 2: c…

Output()

## 🧩 Implement Patch Extraction Function

Function to divide images into patches for comparison with the sub-bank.

In [4]:
# Implement patch extraction function
def extract_image_patches(image, patch_size=5, stride=5):
    """
    Extract patches from an image for comparison.
    
    Args:
        image: numpy array of shape (H, W, C)
        patch_size: size of square patches
        stride: step size between patches
    
    Returns:
        patches: array of shape (n_patches, patch_size, patch_size, C)
        patch_positions: list of (row, col) positions for each patch
    """
    H, W, C = image.shape
    patches = []
    patch_positions = []
    
    # Extract patches
    for i in range(0, H - patch_size + 1, stride):
        for j in range(0, W - patch_size + 1, stride):
            patch = image[i:i+patch_size, j:j+patch_size, :]
            patches.append(patch)
            patch_positions.append((i, j))
    
    return np.array(patches), patch_positions

def patches_to_embeddings(patches):
    """
    Convert patches to embeddings using the same method as the sub-bank.
    For simplicity, we'll use mean RGB values and flatten.
    This is a simplified version - in practice you'd use the same ResNet18 features.
    """
    embeddings = []
    for patch in patches:
        # Simple feature extraction: flatten and normalize
        # In reality, this should use the same ResNet18 model as create_bank.py
        flattened = patch.flatten().astype(np.float32)
        # Normalize to 0-1
        if flattened.max() > 1:
            flattened = flattened / 255.0
        embeddings.append(flattened)
    
    return np.array(embeddings)

# Test the function
print("🧩 Patch extraction functions defined!")
print("📝 Note: Using simplified embeddings for demonstration.")
print("🔧 In production, use the same ResNet18 features as create_bank.py")

🧩 Patch extraction functions defined!
📝 Note: Using simplified embeddings for demonstration.
🔧 In production, use the same ResNet18 features as create_bank.py


## 🔍 Implement Patch Comparison Algorithm

Compare each image patch with all sub-bank patches to find anomalies.

In [5]:
# Implement patch comparison algorithm
from sklearn.metrics.pairwise import cosine_distances, euclidean_distances

def compare_patches_with_subbank(image_patches, sub_bank, method='cosine'):
    """
    Compare image patches with sub-bank patches.
    
    Args:
        image_patches: array of patch embeddings
        sub_bank: sub-bank embeddings
        method: 'cosine' or 'euclidean'
    
    Returns:
        min_distances: minimum distance for each patch to sub-bank
    """
    if method == 'cosine':
        distances = cosine_distances(image_patches, sub_bank)
    else:
        distances = euclidean_distances(image_patches, sub_bank)
    
    # Take minimum distance for each patch (most similar sub-bank patch)
    min_distances = np.min(distances, axis=1)
    
    return min_distances

def create_anomaly_map(distances, patch_positions, image_shape, patch_size=5, stride=5):
    """
    Create anomaly map from patch distances.
    
    Args:
        distances: array of distances for each patch
        patch_positions: list of (row, col) positions
        image_shape: (H, W, C) of original image
        patch_size: size of patches
        stride: stride used for extraction
    
    Returns:
        anomaly_map: 2D array representing anomaly scores
    """
    H, W = image_shape[:2]
    
    # Calculate anomaly map dimensions
    map_h = (H - patch_size) // stride + 1
    map_w = (W - patch_size) // stride + 1
    
    anomaly_map = np.zeros((map_h, map_w))
    
    # Fill anomaly map
    for idx, (i, j) in enumerate(patch_positions):
        map_i = i // stride
        map_j = j // stride
        if map_i < map_h and map_j < map_w:
            anomaly_map[map_i, map_j] = distances[idx]
    
    return anomaly_map

print("🔍 Patch comparison functions defined!")

🔍 Patch comparison functions defined!


## 🎨 Generate Anomaly Map

Process the selected image and generate anomaly map.

In [6]:
# Generate anomaly map for selected image
def analyze_selected_image(image_idx, patch_size=5, stride=5, comparison_method='cosine'):
    """
    Analyze selected image and generate anomaly map.
    """
    if image_idx >= len(test_images):
        print(f"❌ Invalid image index: {image_idx}")
        return None, None, None
    
    # Get selected image
    image = test_images[image_idx]
    label = test_labels[image_idx]
    
    print(f"🔍 Analyzing Image {image_idx+1}: {label}")
    print(f"📐 Image shape: {image.shape}")
    
    # Extract patches
    print("🧩 Extracting patches...")
    patches, patch_positions = extract_image_patches(image, patch_size, stride)
    print(f"📊 Extracted {len(patches)} patches")
    
    # Convert to embeddings (simplified)
    print("🧠 Converting to embeddings...")
    patch_embeddings = patches_to_embeddings(patches)
    print(f"📊 Embeddings shape: {patch_embeddings.shape}")
    
    # Compare with sub-bank
    print(f"🔍 Comparing with sub-bank using {comparison_method} distance...")
    distances = compare_patches_with_subbank(patch_embeddings, sub_bank, comparison_method)
    
    # Create anomaly map
    print("🎨 Creating anomaly map...")
    anomaly_map = create_anomaly_map(distances, patch_positions, image.shape, patch_size, stride)
    
    # Normalize anomaly map
    if anomaly_map.max() > anomaly_map.min():
        anomaly_map_norm = (anomaly_map - anomaly_map.min()) / (anomaly_map.max() - anomaly_map.min())
    else:
        anomaly_map_norm = anomaly_map
    
    print(f"✅ Anomaly map created! Shape: {anomaly_map.shape}")
    print(f"📊 Anomaly scores range: [{anomaly_map.min():.3f}, {anomaly_map.max():.3f}]")
    
    return image, anomaly_map_norm, distances

# Test with first image
print("🎯 Ready to analyze images!")

🎯 Ready to analyze images!


## 📊 Create Interactive Plotly Visualization

Visualize the original image and anomaly map side by side with interactive features.

In [7]:
# Create interactive Plotly visualization
def create_anomaly_visualization(image, anomaly_map, image_idx, distances=None):
    """
    Create interactive visualization of image and anomaly map.
    """
    label = test_labels[image_idx] if image_idx < len(test_labels) else 'unknown'
    
    # Create subplots
    fig = make_subplots(
        rows=1, cols=2,
        subplot_titles=(f"Original Image - {label}", "Anomaly Map"),
        specs=[[{"type": "image"}, {"type": "heatmap"}]]
    )
    
    # Add original image
    fig.add_trace(
        go.Image(z=image, name="Original"),
        row=1, col=1
    )
    
    # Add anomaly map heatmap
    fig.add_trace(
        go.Heatmap(
            z=np.flipud(anomaly_map),  # Flip to match image orientation
            colorscale='Reds',
            showscale=True,
            colorbar=dict(
                title="Anomaly Score",
                titleside="right"
            ),
            hovertemplate='<b>Anomaly Score</b>: %{z:.3f}<extra></extra>',
            name="Anomaly Map"
        ),
        row=1, col=2
    )
    
    # Update layout
    fig.update_layout(
        title=f"Anomaly Detection Results - Image {image_idx+1}",
        height=500,
        showlegend=False
    )
    
    # Update axes
    fig.update_xaxes(showticklabels=False)
    fig.update_yaxes(showticklabels=False)
    
    return fig

# Create analysis and visualization function
def analyze_and_visualize(image_idx, patch_size=5, stride=5, method='cosine'):
    """
    Complete analysis and visualization pipeline.
    """
    # Analyze image
    image, anomaly_map, distances = analyze_selected_image(
        image_idx, patch_size, stride, method
    )
    
    if image is None:
        return
    
    # Create visualization
    fig = create_anomaly_visualization(image, anomaly_map, image_idx, distances)
    
    # Display
    fig.show()
    
    # Print statistics
    print(f"\n📊 Analysis Statistics:")
    print(f"   • Image: {image_idx+1} ({test_labels[image_idx]})")
    print(f"   • Patches analyzed: {len(distances) if distances is not None else 'N/A'}")
    print(f"   • Anomaly map shape: {anomaly_map.shape}")
    print(f"   • Max anomaly score: {anomaly_map.max():.3f}")
    print(f"   • Mean anomaly score: {anomaly_map.mean():.3f}")

print("📊 Visualization functions created!")

📊 Visualization functions created!


## 🎮 Interactive Analysis Interface

Use the controls below to analyze different images with various parameters.

In [9]:
# Create interactive analysis interface
if 'test_images' in locals() and len(test_images) > 0:
    
    # Create parameter widgets
    image_widget = widgets.Dropdown(
        options=[(f"Image {i+1}: {test_labels[i]}", i) for i in range(len(test_images))],
        value=0,
        description='Image:',
        style={'description_width': 'initial'}
    )
    
    patch_size_widget = widgets.IntSlider(
        value=5,
        min=3,
        max=15,
        step=1,
        description='Patch Size:',
        style={'description_width': 'initial'}
    )
    
    stride_widget = widgets.IntSlider(
        value=5,
        min=1,
        max=10,
        step=1,
        description='Stride:',
        style={'description_width': 'initial'}
    )
    
    method_widget = widgets.Dropdown(
        options=[('Cosine Distance', 'cosine'), ('Euclidean Distance', 'euclidean')],
        value='cosine',
        description='Method:',
        style={'description_width': 'initial'}
    )
    
    analyze_button = widgets.Button(
        description='🔍 Analyze Image',
        button_style='primary',
        layout=widgets.Layout(width='200px')
    )
    
    # Create output area
    analysis_output = widgets.Output()
    
    def on_analyze_click(b):
        with analysis_output:
            analysis_output.clear_output()
            try:
                analyze_and_visualize(
                    image_widget.value,
                    patch_size_widget.value,
                    stride_widget.value,
                    method_widget.value
                )
            except Exception as e:
                print(f"❌ Error during analysis: {e}")
    
    analyze_button.on_click(on_analyze_click)
    
    # Display interface
    print("🎮 Interactive Analysis Interface")
    print("=" * 40)
    
    controls = widgets.VBox([
        widgets.HBox([image_widget, method_widget]),
        widgets.HBox([patch_size_widget, stride_widget]),
        analyze_button
    ])
    
    display(controls)
    display(analysis_output)
    
    # Run initial analysis
    print("\n🚀 Running initial analysis...")
    with analysis_output:
        analyze_and_visualize(0, 5, 5, 'cosine')

else:
    print("❌ No test images available for analysis")

🎮 Interactive Analysis Interface


VBox(children=(HBox(children=(Dropdown(description='Image:', options=(('Image 1: color', 0), ('Image 2: color'…

Output()


🚀 Running initial analysis...


## 📝 Summary

This notebook provides an interactive tool for:

1. **Image Selection**: Choose from available test images
2. **Patch Analysis**: Extract and compare patches with sub-bank
3. **Anomaly Detection**: Generate anomaly maps based on distance metrics
4. **Visualization**: Interactive Plotly visualizations

### 🔧 Technical Notes:

- **Simplified Embeddings**: This demo uses flattened RGB values instead of ResNet18 features for simplicity
- **Sub-Bank**: Uses the optimized K=2 cluster sub-bank created by `create_sub_bank.py`
- **Distance Metrics**: Supports both cosine and Euclidean distance
- **Interactive**: Real-time parameter adjustment and visualization

### 🚀 Next Steps:

- Replace simplified embeddings with actual ResNet18 features
- Add more distance metrics and comparison methods
- Implement patch-level visualization overlay
- Add statistical analysis and anomaly scoring