# Burner Detection Preprocessing Engine

**Pipeline Overview:** Download model → Process images → Evaluate burner classification

**Key Features:**
1. **Simple Evaluation**: Presence/absence of burners (binary classification)
2. **Advanced Evaluation**: Spatial IoU-based matching (object detection metrics)  
3. **Preprocessing Comparison**: Test different normalization techniques for lighting variations
4. **Visualization Demo**: Visual walkthrough of pipeline on sample image

**Preprocessing Methods Available:**
- **Simple**: Standard 0-1 normalization (baseline)
- **GCN**: Global Contrast Normalization (handles overall brightness differences)
- **LCN**: Local Contrast Normalization (handles local lighting/shadow variations)

## Setup

In your terminal, use the following commands to create a python3.11 kernel. Select the kernel "Python 3.11" at the top of the notebook to use it:

```
conda create -n my_env python=3.11 ipykernel
conda activate my_env
python -m ipykernel install --user --name=my_env --display-name="Python 3.11"   
```

In [36]:
%pip install -r requirements.txt

Collecting opencv-python>=4.0.0 (from -r requirements.txt (line 9))
  Downloading opencv_python-4.12.0.88-cp37-abi3-macosx_13_0_arm64.whl.metadata (19 kB)
INFO: pip is looking at multiple versions of opencv-python to determine which version is compatible with other requirements. This could take a while.
  Using cached opencv_python-4.11.0.86-cp37-abi3-macosx_13_0_arm64.whl.metadata (20 kB)
Using cached opencv_python-4.11.0.86-cp37-abi3-macosx_13_0_arm64.whl (37.3 MB)
Installing collected packages: opencv-python
Successfully installed opencv-python-4.11.0.86
Note: you may need to restart the kernel to use updated packages.


In [43]:
import json
import os
import subprocess
import glob
from typing import Dict, List, Any, Tuple, Optional
import numpy as np
import pandas as pd
from PIL import Image, ImageDraw, ImageFont
import tensorflow as tf
from dotenv import load_dotenv
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from scipy import ndimage
import cv2
from scipy import ndimage

# Load environment variables
load_dotenv()

# Configuration
VIAM_CONFIG = {
    "model_name": os.getenv("VIAM_MODEL_NAME", "your-burner-detection-model"),
    "model_org_id": os.getenv("VIAM_MODEL_ORG_ID", "your-model-org-id"),
    "model_version": os.getenv("VIAM_MODEL_VERSION", "2024-XX-XXTXX-XX-XX"),
}

METADATA_DIR = os.getenv("METADATA_DIR", "metadata")
IMAGES_DIR = os.getenv("IMAGES_DIR", "data")
MODEL_DIR = os.getenv("MODEL_DIR", "models")

MODEL_INPUT_SIZE = None

print(f"Model: {VIAM_CONFIG['model_name']} v{VIAM_CONFIG['model_version']}")
print(f"Data: {len(glob.glob(os.path.join(METADATA_DIR, '*.json')))} metadata files")

# Quick preview of ground truth format
metadata_files_preview = glob.glob(os.path.join(METADATA_DIR, "*.json"))
if metadata_files_preview:
    print(f"\n📋 Sample ground truth format:")
    with open(metadata_files_preview[0], 'r') as f:
        sample_metadata = json.load(f)
    if 'annotations' in sample_metadata:
        for bbox in sample_metadata['annotations'].get('bboxes', [])[:3]:  # Show first 3
            print(f"  - Label: '{bbox.get('label', 'N/A')}'")
            print(f"    Normalized coordinates:")
            print(f"      x_min: {bbox.get('xMinNormalized', 'N/A'):.3f}")
            print(f"      y_min: {bbox.get('yMinNormalized', 'N/A'):.3f}") 
            print(f"      x_max: {bbox.get('xMaxNormalized', 'N/A'):.3f}")
            print(f"      y_max: {bbox.get('yMaxNormalized', 'N/A'):.3f}")
            width = bbox.get('xMaxNormalized', 0) - bbox.get('xMinNormalized', 0)
            height = bbox.get('yMaxNormalized', 0) - bbox.get('yMinNormalized', 0)
            print(f"    Box dimensions (normalized):")
            print(f"      width: {width:.3f}")
            print(f"      height: {height:.3f}")
    else:
        print("  No annotations found in sample metadata")

Model: pan-burner-v3 v2025-01-22T12-31-54
Data: 8432 metadata files

📋 Sample ground truth format:
  - Label: 'burner'
    Normalized coordinates:
      x_min: 0.006
      y_min: 0.320
      x_max: 0.269
      y_max: 0.842
    Box dimensions (normalized):
      width: 0.263
      height: 0.522
  - Label: 'burner'
    Normalized coordinates:
      x_min: 0.374
      y_min: 0.459
      x_max: 0.773
      y_max: 0.979
    Box dimensions (normalized):
      width: 0.398
      height: 0.520
  - Label: 'burner'
    Normalized coordinates:
      x_min: 0.467
      y_min: 0.021
      x_max: 0.794
      y_max: 0.387
    Box dimensions (normalized):
      width: 0.327
      height: 0.366


### Preprocessing and data checks: 
- Select between simple normalization (simple), local contrast normalization (LCN), and global contrast normalization (GCN).
- Check if all bounding box keys are present

In [None]:
def apply_global_contrast_normalization(image_array: np.ndarray, epsilon: float = 1e-8) -> np.ndarray:
    """
    Apply Global Contrast Normalization (GCN)
    
    This enhances global contrast by standardizing the entire image to have
    zero mean and unit variance, then scaling to 0-1 range.
    """
    # Convert to float for calculations
    image_float = image_array.astype(np.float32)
    
    # Global mean and std across all pixels and channels
    global_mean = np.mean(image_float)
    global_std = np.std(image_float)
    
    # Standardize: zero mean, unit variance
    normalized = (image_float - global_mean) / (global_std + epsilon)
    
    # Scale to 0-1 range for model input
    # Use a more aggressive scaling to show the effect
    normalized = np.tanh(normalized * 0.5) * 0.5 + 0.5  # Tanh activation for contrast
    
    return np.clip(normalized, 0, 1)

def apply_local_contrast_normalization(image_array: np.ndarray, window_size: int = 9, epsilon: float = 1e-8) -> np.ndarray:
    """Apply Local Contrast Normalization (LCN)"""
    normalized = np.zeros_like(image_array, dtype=np.float32)
    
    for channel in range(image_array.shape[2]):
        channel_data = image_array[:, :, channel]
        local_mean = ndimage.uniform_filter(channel_data, size=window_size, mode='reflect')
        local_variance = ndimage.uniform_filter(channel_data**2, size=window_size, mode='reflect') - local_mean**2
        local_std = np.sqrt(np.maximum(local_variance, 0)) + epsilon
        channel_normalized = (channel_data - local_mean) / local_std
        channel_normalized = (channel_normalized - channel_normalized.min()) / (channel_normalized.max() - channel_normalized.min() + epsilon)
        normalized[:, :, channel] = channel_normalized
    
    return normalized


In [None]:
def preprocess_image(image_array: np.ndarray, target_size: Optional[Tuple[int, int]] = None,
                    normalization_method: str = "simple") -> np.ndarray:
    """
    Preprocess image array for model input
    
    Args:
        image_array: Input image as numpy array
        target_size: Target size for model input (None to keep original size)
        normalization_method: 'simple', 'gcn', or 'lcn'
    
    Returns:
        Preprocessed image array ready for model input
    """
    # Resize image if target_size is specified
    if target_size is not None:
        image_pil = Image.fromarray(image_array)
        image_resized = image_pil.resize(target_size)
        image_array = np.array(image_resized, dtype=np.float32)
    else:
        # Keep original size
        image_array = np.array(image_array, dtype=np.float32)
    
    # Apply normalization
    if normalization_method == "simple":
        # Simple 0-1 normalization
        normalized = image_array / 255.0
    elif normalization_method == "gcn":
        # Global Contrast Normalization
        normalized = apply_global_contrast_normalization(image_array)
    elif normalization_method == "lcn":
        # Local Contrast Normalization
        normalized = apply_local_contrast_normalization(image_array)
    else:
        raise ValueError(f"Unknown normalization method: {normalization_method}")
    
    return normalized

def visualize_preprocessing_effects(image_array: np.ndarray, methods: List[str] = ["simple", "gcn", "lcn"], 
                                   target_size: Optional[Tuple[int, int]] = None):
    """Visualize the effects of different preprocessing methods with detailed analysis"""
    
    # Create subplots: 2 rows (images + histograms), multiple columns
    fig, axes = plt.subplots(2, len(methods) + 1, figsize=(5 * (len(methods) + 1), 10))
    
    # Original image
    axes[0, 0].imshow(image_array.astype(np.uint8))
    axes[0, 0].set_title("Original Image")
    axes[0, 0].axis('off')
    
    # Original histogram
    axes[1, 0].hist(image_array.flatten(), bins=50, alpha=0.7, color='blue', range=(0, 255))
    axes[1, 0].set_title("Original Histogram")
    axes[1, 0].set_xlabel("Pixel Value")
    axes[1, 0].set_ylabel("Frequency")
    axes[1, 0].set_xlim(0, 255)
    
    # Store statistics for comparison
    stats = []
    stats.append({
        'method': 'Original',
        'mean': np.mean(image_array),
        'std': np.std(image_array),
        'min': np.min(image_array),
        'max': np.max(image_array)
    })
    
    # Preprocessed versions
    for i, method in enumerate(methods):
        preprocessed = preprocess_image(image_array, target_size, method)
        
        # For display, we need to handle the different ranges
        if method == "simple":
            # Simple is already 0-1, just scale to display
            display_image = (preprocessed * 255).astype(np.uint8)
            hist_data = preprocessed.flatten() * 255  # Scale for histogram
            hist_range = (0, 255)
        else:
            # For GCN and LCN, show the actual normalized values but scale for display
            # Clip to reasonable range for display
            display_preprocessed = np.clip(preprocessed, 0, 1)
            display_image = (display_preprocessed * 255).astype(np.uint8)
            hist_data = preprocessed.flatten()
            hist_range = (np.min(hist_data), np.max(hist_data))
        
        # Display preprocessed image
        axes[0, i + 1].imshow(display_image)
        axes[0, i + 1].set_title(f"{method.upper()} Preprocessing")
        axes[0, i + 1].axis('off')
        
        # Display histogram
        axes[1, i + 1].hist(hist_data, bins=50, alpha=0.7, 
                           color=['green', 'orange', 'red'][i], range=hist_range)
        axes[1, i + 1].set_title(f"{method.upper()} Histogram")
        axes[1, i + 1].set_xlabel("Normalized Value")
        axes[1, i + 1].set_ylabel("Frequency")
        
        # Store statistics
        stats.append({
            'method': method.upper(),
            'mean': np.mean(preprocessed),
            'std': np.std(preprocessed),
            'min': np.min(preprocessed),
            'max': np.max(preprocessed)
        })
    
    plt.tight_layout()
    plt.show()
    
    # Print detailed statistics
    print("\n📊 PREPROCESSING STATISTICS COMPARISON")
    print("=" * 80)
    print(f"{'Method':<10} {'Mean':<10} {'Std':<10} {'Min':<10} {'Max':<10}")
    print("-" * 50)
    for stat in stats:
        print(f"{stat['method']:<10} {stat['mean']:<10.3f} {stat['std']:<10.3f} "
              f"{stat['min']:<10.3f} {stat['max']:<10.3f}")
    
    print("\n💡 What to look for:")
    print("   - SIMPLE: Should have mean ≈ original/255, range [0,1]")
    print("   - GCN: Should have different mean/std, enhanced global contrast")  
    print("   - LCN: Should show enhanced local features, may have wider range")
    print("   - Histograms show the distribution of pixel values after normalization")

def visualize_preprocessing_detail_comparison(image_array: np.ndarray, 
                                            methods: List[str] = ["simple", "gcn", "lcn"],
                                            crop_size: int = 150):
    """
    Show detailed side-by-side comparison of preprocessing effects on a cropped region
    This makes the differences more visible by focusing on a smaller area
    """
    print(f"\n🔍 DETAILED PREPROCESSING COMPARISON")
    print("=" * 60)
    
    # Select a center crop to focus on details
    h, w = image_array.shape[:2]
    center_y, center_x = h // 2, w // 2
    y1 = max(0, center_y - crop_size // 2)
    y2 = min(h, center_y + crop_size // 2)
    x1 = max(0, center_x - crop_size // 2)
    x2 = min(w, center_x + crop_size // 2)
    
    cropped_original = image_array[y1:y2, x1:x2]
    
    # Create comparison figure
    fig, axes = plt.subplots(2, len(methods) + 1, figsize=(4 * (len(methods) + 1), 8))
    
    # Original cropped
    axes[0, 0].imshow(cropped_original.astype(np.uint8))
    axes[0, 0].set_title("Original (Cropped)")
    axes[0, 0].axis('off')
    
    # Show a line profile across the middle
    middle_line = cropped_original[cropped_original.shape[0]//2, :, 0]  # Red channel
    axes[1, 0].plot(middle_line, 'b-', linewidth=2, label='Original')
    axes[1, 0].set_title("Pixel Intensity Profile")
    axes[1, 0].set_xlabel("Pixel Position")
    axes[1, 0].set_ylabel("Intensity")
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)
    
    # Process each method
    colors = ['green', 'orange', 'red']
    for i, method in enumerate(methods):
        # Apply preprocessing to full image then crop
        preprocessed_full = preprocess_image(image_array, None, method)
        preprocessed_crop = preprocessed_full[y1:y2, x1:x2]
        
        # Display cropped preprocessed image
        display_image = (preprocessed_crop * 255).astype(np.uint8)
        axes[0, i + 1].imshow(display_image)
        axes[0, i + 1].set_title(f"{method.upper()} (Cropped)")
        axes[0, i + 1].axis('off')
        
        # Show line profile for comparison
        processed_line = preprocessed_crop[preprocessed_crop.shape[0]//2, :, 0] * 255  # Scale for comparison
        axes[1, i + 1].plot(middle_line, 'b-', linewidth=2, alpha=0.7, label='Original')
        axes[1, i + 1].plot(processed_line, color=colors[i], linewidth=2, label=f'{method.upper()}')
        axes[1, i + 1].set_title(f"{method.upper()} vs Original")
        axes[1, i + 1].set_xlabel("Pixel Position")
        axes[1, i + 1].set_ylabel("Intensity")
        axes[1, i + 1].legend()
        axes[1, i + 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Print observations
    print(f"\n📋 Observations from cropped region ({crop_size}x{crop_size} pixels):")
    print("   Line profiles show pixel intensity changes across the middle row")
    print("   - SIMPLE: Should closely follow original (just scaled)")
    print("   - GCN: May show enhanced contrast globally") 
    print("   - LCN: Should show enhanced local details and edge contrast")

print("✅ Preprocessing functions defined")


## Data validation and cleanup

In [59]:
def find_image_for_metadata(metadata_file: str) -> str:
    """Find corresponding image file for metadata"""
    with open(metadata_file, 'r') as f:
        metadata = json.load(f)
    
    filename = metadata.get('fileName', '')
    binary_id = metadata.get('id', '')
    
    # Try direct filename match
    image_path = os.path.join(IMAGES_DIR, filename)
    if os.path.exists(image_path):
        return image_path
    
    # Try binary ID match
    for file in os.listdir(IMAGES_DIR):
        if binary_id in file and file.lower().endswith(('.jpg', '.jpeg', '.png')):
            return os.path.join(IMAGES_DIR, file)
    
    return None

def extract_burner_bounding_boxes(metadata: Dict) -> List[Tuple[float, float, float, float]]:
    """
    Extract burner bounding boxes from metadata
    
    Returns:
        List of tuples (xmin, ymin, xmax, ymax) in normalized coordinates
    """
    burner_boxes = []
    
    if 'annotations' in metadata:
        for bbox in metadata['annotations'].get('bboxes', []):
            label = bbox.get('label', '')
            if 'burner' in label.lower():
                # Check if all coordinates are present
                required_keys = ["xMinNormalized", "yMinNormalized", "xMaxNormalized", "yMaxNormalized"]
                if all(key in bbox for key in required_keys):
                    burner_box = (
                        bbox["xMinNormalized"],
                        bbox["yMinNormalized"],
                        bbox["xMaxNormalized"],
                        bbox["yMaxNormalized"]
                    )
                    burner_boxes.append(burner_box)
    
    return burner_boxes

def is_valid_bbox(bbox: Dict) -> bool:
    """Check if a bounding box has all required coordinate keys"""
    required_keys = ["xMinNormalized", "yMinNormalized", "xMaxNormalized", "yMaxNormalized"]
    return all(key in bbox for key in required_keys)

# Dataset Creation

In [None]:
def create_dataset_dataframe(max_images: Optional[int] = None) -> pd.DataFrame:
    """
    Create dataset DataFrame with images and ground truth
    
    Returns:
        DataFrame with columns: image_name, image_path, image, burner_bounding_boxes
    """
    print("\n🗂️ CREATING DATASET DATAFRAME")
    print("=" * 60)
    
    # Get all metadata files
    metadata_files = glob.glob(os.path.join(METADATA_DIR, "*.json"))
    
    if max_images:
        metadata_files = metadata_files[:max_images]
        print(f"Processing first {max_images} images for testing")
    
    if not metadata_files:
        print("❌ No metadata files found!")
        return pd.DataFrame()
    
    # Initialize lists
    image_names = []
    image_paths = []
    images = []
    burner_bboxes = []
    
    print(f"📋 Processing {len(metadata_files)} metadata files...")
    
    for i, metadata_file in enumerate(metadata_files):
        try:
            # Load metadata
            with open(metadata_file, 'r') as f:
                metadata = json.load(f)
            
            # Find corresponding image
            image_path = find_image_for_metadata(metadata_file)
            if not image_path:
                print(f"⚠️  No image found for {os.path.basename(metadata_file)}")
                continue
            
            # Load image
            image = Image.open(image_path).convert('RGB')
            image_array = np.array(image)
            
            # Extract burner bounding boxes
            bboxes = extract_burner_bounding_boxes(metadata)
            
            # Add to lists
            image_names.append(os.path.basename(image_path))
            image_paths.append(image_path)
            images.append(image_array)
            burner_bboxes.append(bboxes)
            
            # Progress update
            if (i + 1) % 500 == 0:
                print(f"   Progress: {i + 1}/{len(metadata_files)} images processed")
                
        except Exception as e:
            print(f"❌ Error processing {os.path.basename(metadata_file)}: {e}")
            continue
    
    # Create DataFrame
    df = pd.DataFrame({
        'image_name': image_names,
        'image_path': image_paths,
        'image': images,
        'burner_bounding_boxes': burner_bboxes
    })
    
    # Add derived columns
    df['num_burners'] = df['burner_bounding_boxes'].apply(len)
    df['has_burners'] = df['num_burners'] > 0
    
    print(f"\n✅ Dataset DataFrame created!")
    print(f"   Total images: {len(df)}")
    print(f"   Images with burners: {len(df[df['has_burners']])}")
    print(f"   Images without burners: {len(df[~df['has_burners']])}")
    print(f"   Total burner objects: {df['num_burners'].sum()}")
    print(f"   Average burners per image: {df['num_burners'].mean():.2f}")
    
    return df

def visualize_dataset_samples(df: pd.DataFrame, num_samples: int = 6):
    """Visualize sample images from the dataset"""
    print(f"\n📸 VISUALIZING {num_samples} DATASET SAMPLES")
    print("=" * 60)
    
    # Select diverse samples
    with_burners = df[df['has_burners']].head(num_samples // 2)
    without_burners = df[~df['has_burners']].head(num_samples // 2)
    samples = pd.concat([with_burners, without_burners])
    
    fig, axes = plt.subplots(2, num_samples // 2, figsize=(15, 10))
    axes = axes.flatten()
    
    for i, (_, row) in enumerate(samples.iterrows()):
        if i >= len(axes):
            break
            
        # Display image
        axes[i].imshow(row['image'])
        axes[i].set_title(f"{row['image_name']}\n{row['num_burners']} burners")
        axes[i].axis('off')
        
        # Draw bounding boxes
        for bbox in row['burner_bounding_boxes']:
            xmin, ymin, xmax, ymax = bbox
            h, w = row['image'].shape[:2]
            
            # Convert to pixel coordinates
            x1, y1 = int(xmin * w), int(ymin * h)
            x2, y2 = int(xmax * w), int(ymax * h)
            
            # Draw rectangle
            rect = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, 
                                   linewidth=2, edgecolor='red', facecolor='none')
            axes[i].add_patch(rect)
    
    plt.tight_layout()
    plt.show()

In [None]:
# Create the dataset
print("\n🚀 STEP 2: DATASET CREATION")
print("=" * 60)

# For testing, limit to first 100 images - remove this for full dataset
# dataset_df = create_dataset_dataframe(max_images=100)
dataset_df = create_dataset_dataframe()

if not dataset_df.empty:
    # Visualize samples
    visualize_dataset_samples(dataset_df)
    
    # Show preprocessing effects on a sample
    if len(dataset_df) > 0:
        sample_image = dataset_df.iloc[0]['image']
        print(f"\n🔍 PREPROCESSING EFFECTS ON SAMPLE IMAGE")
        print(f"Original image size: {sample_image.shape[:2]} (height x width)")
        visualize_preprocessing_effects(sample_image)
        visualize_preprocessing_detail_comparison(sample_image)


## Setup Model

In [69]:
def download_model():
    """Download TFLite model from Viam"""
    import tarfile
    
    print("\n🤖 DOWNLOADING MODEL")
    print("=" * 60)
    
    os.makedirs(MODEL_DIR, exist_ok=True)
    
    cmd = [
        "viam", "packages", "export",
        "--org-id", VIAM_CONFIG["model_org_id"],
        "--name", VIAM_CONFIG["model_name"],
        "--version", VIAM_CONFIG["model_version"],
        "--type", "ml_model",
        "--destination", MODEL_DIR
    ]
    
    print("📥 Downloading model...")
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode == 0:
        print("✅ Model downloaded successfully")
        
        # Look for existing .tflite files
        tflite_files = glob.glob(os.path.join(MODEL_DIR, "**/*.tflite"), recursive=True)
        if tflite_files:
            model_path = tflite_files[0]
            print(f"✅ Model found: {model_path}")
            return model_path
        
        # Extract .tar.gz files if needed
        tar_files = glob.glob(os.path.join(MODEL_DIR, "**/*.tar.gz"), recursive=True)
        if tar_files:
            for tar_file in tar_files:
                print(f"📦 Extracting {tar_file}...")
                try:
                    with tarfile.open(tar_file, 'r:gz') as tar:
                        tar.extractall(os.path.dirname(tar_file))
                    print(f"✅ Extracted {tar_file}")
                except Exception as e:
                    print(f"❌ Error extracting {tar_file}: {e}")
                    continue
            
            # Look for .tflite files after extraction
            tflite_files = glob.glob(os.path.join(MODEL_DIR, "**/*.tflite"), recursive=True)
            if tflite_files:
                model_path = tflite_files[0]
                print(f"✅ Model ready: {model_path}")
                return model_path
        
        print("❌ No .tflite file found")
        return None
    else:
        print(f"❌ Download failed: {result.stderr}")
        return None


🤖 DOWNLOADING MODEL
📥 Downloading model...
✅ Model downloaded successfully
✅ Model found: models/2025-05-14T11-12-00/pan-burner-v4.tflite


'models/2025-05-14T11-12-00/pan-burner-v4.tflite'

### Load in model

In [72]:
def load_model(model_path: str):
    """Load TFLite model and return interpreter"""
    print(f"\n🔄 LOADING MODEL: {model_path}")
    
    interpreter = tf.lite.Interpreter(model_path=model_path)
    interpreter.allocate_tensors()
    
    # Get input/output details
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    
    print("✅ Model loaded successfully")
    print(f"   Input shape: {input_details[0]['shape']}")
    print(f"   Input dtype: {input_details[0]['dtype']}")
    print(f"   Output tensors: {len(output_details)}")
    
    # Show what input size will be used
    input_shape = input_details[0]['shape']
    if len(input_shape) == 4:  # [batch, height, width, channels]
        if input_shape[1] > 1 and input_shape[2] > 1:
            print(f"   Model expects images resized to: {input_shape[2]}x{input_shape[1]} (width x height)")
        else:
            print(f"   Model accepts variable input sizes (no resizing needed)")
            print(f"   Input shape: {input_shape} (where -1 means dynamic)")
    else:
        print(f"   Model input shape is not standard image format: {input_shape}")
    
    return interpreter

In [71]:
def get_model_input_size(interpreter) -> Optional[Tuple[int, int]]:
    """
    Get the expected input size from the model interpreter
    
    Returns:
        Tuple of (width, height) if model has fixed input size, None if variable
    """
    input_details = interpreter.get_input_details()
    input_shape = input_details[0]['shape']
    
    if len(input_shape) == 4:  # [batch, height, width, channels]
        if input_shape[1] > 1 and input_shape[2] > 1:
            return (input_shape[2], input_shape[1])  # (width, height)
        else:
            return None  # Variable input size
    else:
        return None  # Non-standard input shape

In [74]:
print("\n🚀 MODEL LOADING")
print("=" * 60)

model_path = download_model()
if model_path:
    model_interpreter = load_model(model_path)
    
    # Show what preprocessing will be used
    expected_size = get_model_input_size(model_interpreter)
    if expected_size:
        print(f"🔧 Images will be resized to {expected_size[0]}x{expected_size[1]} for inference")
    else:
        print(f"🔧 Images will keep their original size for inference")
        
else:
    print("❌ No model available - skipping inference steps")
    model_interpreter = None


🚀 MODEL LOADING

🤖 DOWNLOADING MODEL
📥 Downloading model...
✅ Model downloaded successfully
✅ Model found: models/2025-05-14T11-12-00/pan-burner-v4.tflite

🔄 LOADING MODEL: models/2025-05-14T11-12-00/pan-burner-v4.tflite
✅ Model loaded successfully
   Input shape: [  1 384 384   3]
   Input dtype: <class 'numpy.uint8'>
   Output tensors: 4
   Model expects images resized to: 384x384 (width x height)
🔧 Images will be resized to 384x384 for inference


### Infernece: Call the model!

In [None]:
def run_single_inference(image_array: np.ndarray, interpreter, 
                        normalization_method: str = "simple") -> List[Tuple[float, float, float, float, float]]:
    """
    Run inference on a single image
    
    The function automatically determines the required input size from the model:
    - If model has fixed input size (e.g., 640x640), image will be resized
    - If model accepts variable sizes (shape contains -1), original size is kept
    
    Args:
        image_array: Input image as numpy array
        interpreter: TFLite interpreter
        normalization_method: Preprocessing method ('simple', 'gcn', 'lcn')
    
    Returns:
        List of tuples (xmin, ymin, xmax, ymax, confidence) for detected burners
    """
    # Get model details
    input_details = interpreter.get_input_details()
    output_details = interpreter.get_output_details()
    
    # Get expected input size from model (or use None for no resizing)
    target_size = get_model_input_size(interpreter)
    
    # Preprocess image
    preprocessed = preprocess_image(image_array, target_size, normalization_method)
    
    # Convert to model's expected dtype
    input_dtype = input_details[0]['dtype']
    if input_dtype == np.uint8:
        preprocessed = np.clip(preprocessed * 255.0, 0, 255).astype(np.uint8)
    else:
        preprocessed = preprocessed.astype(np.float32)
    
    # Add batch dimension
    input_data = np.expand_dims(preprocessed, axis=0)
    
    # Run inference
    interpreter.set_tensor(input_details[0]['index'], input_data)
    interpreter.invoke()
    
    # Get outputs
    outputs = {}
    for detail in output_details:
        outputs[detail['name']] = interpreter.get_tensor(detail['index'])
    
    # Parse detections (adjust tensor names based on your model)
    detections = []
    boxes = outputs.get('detection_boxes', outputs.get('boxes', None))
    classes = outputs.get('detection_classes', outputs.get('classes', None))
    scores = outputs.get('detection_scores', outputs.get('scores', None))
    
    if boxes is not None and classes is not None and scores is not None:
        # Remove batch dimension
        if boxes.ndim == 3: boxes = boxes[0]
        if classes.ndim == 2: classes = classes[0]
        if scores.ndim == 2: scores = scores[0]
        
        for i, score in enumerate(scores):
            if score > 0.5 and int(classes[i]) == 0:  # confidence threshold and burner class
                # boxes format: [ymin, xmin, ymax, xmax]
                ymin, xmin, ymax, xmax = boxes[i]
                detection = (float(xmin), float(ymin), float(xmax), float(ymax), float(score))
                detections.append(detection)
    
    return detections

def run_inference_on_dataframe(df: pd.DataFrame, interpreter, 
                              normalization_method: str = "simple") -> pd.DataFrame:
    """Run inference on all images in the DataFrame"""
    
    print(f"\n🎯 RUNNING INFERENCE ON {len(df)} IMAGES")
    print("=" * 60)
    print(f"Using {normalization_method} normalization")
    
    inferred_bboxes = []
    
    for i, row in df.iterrows():
        try:
            detections = run_single_inference(row['image'], interpreter, normalization_method)
            inferred_bboxes.append(detections)
            
            # Progress update
            if (i + 1) % 25 == 0:
                print(f"   Progress: {i + 1}/{len(df)} images processed")
                
        except Exception as e:
            print(f"❌ Error processing {row['image_name']}: {e}")
            inferred_bboxes.append([])
    
    # Add results to DataFrame
    df_with_inference = df.copy()
    df_with_inference['inferred_burner_bboxes'] = inferred_bboxes
    df_with_inference['num_inferred_burners'] = df_with_inference['inferred_burner_bboxes'].apply(len)
    df_with_inference['has_inferred_burners'] = df_with_inference['num_inferred_burners'] > 0
    
    print(f"\n✅ Inference completed!")
    print(f"   Images with predicted burners: {len(df_with_inference[df_with_inference['has_inferred_burners']])}")
    print(f"   Total predicted burners: {df_with_inference['num_inferred_burners'].sum()}")
    print(f"   Average predicted burners per image: {df_with_inference['num_inferred_burners'].mean():.2f}")
    
    return df_with_inference

def visualize_inference_results(df: pd.DataFrame, num_samples: int = 6):
    """Visualize inference results"""
    print(f"\n📊 VISUALIZING INFERENCE RESULTS")
    print("=" * 60)
    
    # Select samples with ground truth burners
    samples = df[df['has_burners']].head(num_samples)
    
    fig, axes = plt.subplots(2, 3, figsize=(15, 10))
    axes = axes.flatten()
    
    for i, (_, row) in enumerate(samples.iterrows()):
        if i >= len(axes):
            break
            
        # Display image
        axes[i].imshow(row['image'])
        axes[i].set_title(f"{row['image_name']}\nGT: {row['num_burners']}, Pred: {row['num_inferred_burners']}")
        axes[i].axis('off')
        
        h, w = row['image'].shape[:2]
        
        # Draw ground truth bounding boxes (green)
        for bbox in row['burner_bounding_boxes']:
            xmin, ymin, xmax, ymax = bbox
            x1, y1 = int(xmin * w), int(ymin * h)
            x2, y2 = int(xmax * w), int(ymax * h)
            
            rect = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, 
                                   linewidth=2, edgecolor='green', facecolor='none')
            axes[i].add_patch(rect)
        
        # Draw inferred bounding boxes (red)
        for bbox in row['inferred_burner_bboxes']:
            xmin, ymin, xmax, ymax, confidence = bbox
            x1, y1 = int(xmin * w), int(ymin * h)
            x2, y2 = int(xmax * w), int(ymax * h)
            
            rect = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, 
                                   linewidth=2, edgecolor='red', facecolor='none', linestyle='--')
            axes[i].add_patch(rect)
            
            # Add confidence score
            axes[i].text(x1, y1 - 5, f'{confidence:.2f}', 
                        color='red', fontsize=8, weight='bold')
    
    # Add legend
    import matplotlib.lines as mlines
    green_line = mlines.Line2D([], [], color='green', linewidth=2, label='Ground Truth')
    red_line = mlines.Line2D([], [], color='red', linewidth=2, linestyle='--', label='Predictions')
    plt.legend(handles=[green_line, red_line], loc='upper right', bbox_to_anchor=(1.1, 1.1))
    
    plt.tight_layout()
    plt.show()

# Run inference if model is available
if model_interpreter is not None:
    print("\n🚀 STEP 4: INFERENCE")
    print("=" * 60)
    
    dataset_df = run_inference_on_dataframe(dataset_df, model_interpreter, "simple")
    
    # Visualize results
    if not dataset_df.empty:
        visualize_inference_results(dataset_df)
else:
    print("\n⚠️  SKIPPING STEP 4: No model available")


🚀 STEP 4: INFERENCE

🎯 RUNNING INFERENCE ON 8126 IMAGES
Using simple normalization
   Progress: 25/8126 images processed
   Progress: 50/8126 images processed
   Progress: 75/8126 images processed
   Progress: 100/8126 images processed
   Progress: 125/8126 images processed
   Progress: 150/8126 images processed
   Progress: 175/8126 images processed
   Progress: 200/8126 images processed
   Progress: 225/8126 images processed
   Progress: 250/8126 images processed
   Progress: 275/8126 images processed
   Progress: 300/8126 images processed
   Progress: 325/8126 images processed
   Progress: 350/8126 images processed
   Progress: 375/8126 images processed
   Progress: 400/8126 images processed
   Progress: 425/8126 images processed
   Progress: 450/8126 images processed
   Progress: 475/8126 images processed
   Progress: 500/8126 images processed
   Progress: 525/8126 images processed
   Progress: 550/8126 images processed
   Progress: 575/8126 images processed
   Progress: 600/8126 

## Advanced Evaluation with IoU Matching

In [None]:
def convert_bbox_format(bbox, format_type: str) -> List[float]:
    """Convert between different bounding box formats with robust error handling"""
    if format_type == "gt_to_pred":
        # Ground truth: {xMinNormalized, yMinNormalized, xMaxNormalized, yMaxNormalized}
        # to Model: [ymin, xmin, ymax, xmax]
        required_keys = ["xMinNormalized", "yMinNormalized", "xMaxNormalized", "yMaxNormalized"]
        
        # Check if all required keys are present
        missing_keys = [key for key in required_keys if key not in bbox]
        if missing_keys:
            raise KeyError(f"Missing required keys in bbox: {missing_keys}. Available keys: {list(bbox.keys())}")
        
        try:
            return [bbox["yMinNormalized"], bbox["xMinNormalized"], bbox["yMaxNormalized"], bbox["xMaxNormalized"]]
        except KeyError as e:
            raise KeyError(f"Error accessing bbox coordinates: {e}. Bbox content: {bbox}")
            
    elif format_type == "pred_to_gt":
        # Model: [ymin, xmin, ymax, xmax]
        # to Ground truth format: {xMinNormalized, yMinNormalized, xMaxNormalized, yMaxNormalized}
        return {
            "xMinNormalized": bbox[1],
            "yMinNormalized": bbox[0],
            "xMaxNormalized": bbox[3],
            "yMaxNormalized": bbox[2]
        }
    else:
        raise ValueError(f"Unknown format_type: {format_type}")

def is_valid_bbox(bbox: Dict) -> bool:
    """Check if a bounding box has all required coordinate keys"""
    required_keys = ["xMinNormalized", "yMinNormalized", "xMaxNormalized", "yMaxNormalized"]
    return all(key in bbox for key in required_keys)

def calculate_iou(box1: List[float], box2: List[float]) -> float:
    """Calculate IoU between two bounding boxes in [ymin, xmin, ymax, xmax] format"""
    # box1 and box2 should be in format [ymin, xmin, ymax, xmax]
    y1_min, x1_min, y1_max, x1_max = box1
    y2_min, x2_min, y2_max, x2_max = box2
    
    # Calculate intersection
    x_left = max(x1_min, x2_min)
    y_top = max(y1_min, y2_min)
    x_right = min(x1_max, x2_max)
    y_bottom = min(y1_max, y2_max)
    
    if x_right <= x_left or y_bottom <= y_top:
        return 0.0
    
    intersection = (x_right - x_left) * (y_bottom - y_top)
    
    # Calculate union
    area1 = (x1_max - x1_min) * (y1_max - y1_min)
    area2 = (x2_max - x2_min) * (y2_max - y2_min)
    union = area1 + area2 - intersection
    
    return intersection / union if union > 0 else 0.0

def evaluate_with_iou(results: List[Dict], iou_threshold: float = 0.5) -> Dict:
    """Evaluate predictions using IoU matching"""
    true_positives = 0
    false_positives = 0
    false_negatives = 0
    
    detailed_results = []
    bbox_errors = []
    
    for result in results:
        if not result["inference_success"]:
            continue
            
        # Get ground truth burner boxes
        gt_boxes = []
        if 'annotations' in result.get('metadata', {}):
            for bbox in result['metadata']['annotations'].get('bboxes', []):
                if 'burner' in bbox.get('label', '').lower():
                    if is_valid_bbox(bbox):
                        gt_boxes.append(convert_bbox_format(bbox, "gt_to_pred"))
                    else:
                        # Log problematic bbox for debugging
                        bbox_errors.append({
                            'file': result['file'],
                            'bbox': bbox,
                            'missing_keys': [key for key in ["xMinNormalized", "yMinNormalized", "xMaxNormalized", "yMaxNormalized"] if key not in bbox]
                        })
        
        # Get predicted burner boxes
        pred_boxes = []
        for det in result.get('detections', []):
            if det['class_id'] == 0:  # burner class
                pred_boxes.append(det['bbox'])
        
        # Match predictions to ground truth
        matched_gt = set()
        matched_pred = set()
        matches = []
        
        for pred_idx, pred_box in enumerate(pred_boxes):
            best_iou = 0
            best_gt_idx = -1
            
            for gt_idx, gt_box in enumerate(gt_boxes):
                if gt_idx in matched_gt:
                    continue
                    
                iou = calculate_iou(pred_box, gt_box)
                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = gt_idx
            
            if best_iou >= iou_threshold:
                matched_gt.add(best_gt_idx)
                matched_pred.add(pred_idx)
                matches.append({
                    'gt_idx': best_gt_idx,
                    'pred_idx': pred_idx,
                    'iou': best_iou,
                    'confidence': result['detections'][pred_idx]['confidence']
                })
                true_positives += 1
        
        # Count false positives and false negatives
        false_positives += len(pred_boxes) - len(matched_pred)
        false_negatives += len(gt_boxes) - len(matched_gt)
        
        detailed_results.append({
            'file': result['file'],
            'gt_boxes': len(gt_boxes),
            'pred_boxes': len(pred_boxes),
            'matches': matches,
            'tp': len(matches),
            'fp': len(pred_boxes) - len(matched_pred),
            'fn': len(gt_boxes) - len(matched_gt)
        })
    
    # Calculate metrics
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    # Log bbox errors if any
    if bbox_errors:
        print(f"\n⚠️  Found {len(bbox_errors)} problematic bounding boxes:")
        for error in bbox_errors[:5]:  # Show first 5
            print(f"  File: {error['file']}")
            print(f"  Missing keys: {error['missing_keys']}")
            print(f"  Available keys: {list(error['bbox'].keys())}")
            print(f"  Bbox content: {error['bbox']}")
        if len(bbox_errors) > 5:
            print(f"  ... and {len(bbox_errors) - 5} more")
    
    return {
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'true_positives': true_positives,
        'false_positives': false_positives,
        'false_negatives': false_negatives,
        'detailed_results': detailed_results,
        'bbox_errors': len(bbox_errors)
    }

## Visualization Pipeline

In [None]:
def visualize_single_image_pipeline(metadata_file: str, model_path: str):
    """Demonstrate the pipeline on a single sample image"""
    
    print("🔍 Loading sample image and metadata...")
    
    # Load metadata and find image
    with open(metadata_file, 'r') as f:
        metadata = json.load(f)
    
    image_path = find_image_for_metadata(metadata_file)
    if not image_path:
        print(f"❌ No image found for {metadata_file}")
        return
    
    # Load model
    interpreter = load_model(model_path)
    original_image = Image.open(image_path).convert('RGB')
    
    # Step 1: Show ground truth
    print(f"\n📋 Step 1: Ground Truth Analysis")
    print(f"   Image: {os.path.basename(image_path)}")
    print(f"   Original size: {original_image.size}")
    
    all_gt_labels = []
    gt_boxes = []
    invalid_gt_boxes = 0
    if 'annotations' in metadata:
        for bbox in metadata['annotations'].get('bboxes', []):
            label = bbox.get('label', '')
            all_gt_labels.append(label)
            if 'burner' in label.lower():
                if is_valid_bbox(bbox):
                    gt_boxes.append(bbox)
                else:
                    invalid_gt_boxes += 1
    
    print(f"   All ground truth labels: {all_gt_labels}")
    print(f"   Burner labels found: {len(gt_boxes)} burners")
    if invalid_gt_boxes > 0:
        print(f"   ⚠️  Skipped {invalid_gt_boxes} burner boxes with invalid coordinates")
    
    # Step 2: Show preprocessing
    print(f"\n🔄 Step 2: Preprocessing")
    input_details = interpreter.get_input_details()
    input_shape = input_details[0]['shape']
    input_dtype = input_details[0]['dtype']
    target_size = (input_shape[1], input_shape[2])
    
    print(f"   Model expects: {target_size} pixels, {input_dtype} data type")
    preprocessed = preprocess_image(image_path, target_size, input_dtype)
    print(f"   Preprocessed shape: {preprocessed.shape}")
    
    # Step 3: Show inference results
    print(f"\n🎯 Step 3: Inference Results")
    detections = run_inference(image_path, interpreter, "simple")
    pred_boxes = [det for det in detections if det['class_id'] == 0]
    
    print(f"   Total detections: {len(detections)}")
    print(f"   Burner detections: {len(pred_boxes)}")
    
    for i, det in enumerate(pred_boxes):
        print(f"     - Burner {i+1}: confidence={det['confidence']:.3f}, bbox={det['bbox']}")
    
    # Step 4: Visual comparison
    print(f"\n📸 Step 4: Creating Visual Comparison")
    fig, axes = plt.subplots(1, 3, figsize=(18, 6))
    
    # Original + Ground Truth
    axes[0].imshow(original_image)
    axes[0].set_title(f"Ground Truth\n({len(gt_boxes)} burners)")
    axes[0].axis('off')
    
    for bbox in gt_boxes:
        # Only draw boxes that have valid coordinates
        if is_valid_bbox(bbox):
            w, h = original_image.size
            x_min = int(bbox["xMinNormalized"] * w)
            y_min = int(bbox["yMinNormalized"] * h)
            x_max = int(bbox["xMaxNormalized"] * w)
            y_max = int(bbox["yMaxNormalized"] * h)
            
            rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, 
                               fill=False, edgecolor='green', linewidth=3)
            axes[0].add_patch(rect)
            axes[0].text(x_min, y_min - 10, "GT", color='green', fontsize=12, weight='bold')
    
    # Preprocessed image
    if input_dtype == np.uint8:
        display_image = preprocessed[0].astype(np.uint8)
    else:
        display_image = (preprocessed[0] * 255).astype(np.uint8)
    
    axes[1].imshow(display_image)
    axes[1].set_title(f"Preprocessed\n{target_size}, {input_dtype}")
    axes[1].axis('off')
    
    # Original + Predictions
    axes[2].imshow(original_image)
    axes[2].set_title(f"Predictions\n({len(pred_boxes)} burners)")
    axes[2].axis('off')
    
    for i, det in enumerate(pred_boxes):
        w, h = original_image.size
        y_min = int(det['bbox'][0] * h)
        x_min = int(det['bbox'][1] * w)
        y_max = int(det['bbox'][2] * h)
        x_max = int(det['bbox'][3] * w)
        
        rect = plt.Rectangle((x_min, y_min), x_max - x_min, y_max - y_min, 
                           fill=False, edgecolor='red', linewidth=3)
        axes[2].add_patch(rect)
        axes[2].text(x_min, y_min - 10, f"{det['confidence']:.2f}", 
                    color='red', fontsize=12, weight='bold')
    
    plt.tight_layout()
    plt.savefig("pipeline_demo.png", dpi=150, bbox_inches='tight')
    plt.show()
    
    # Final summary
    print(f"\n✅ Pipeline Demo Complete!")
    print(f"   📊 Ground Truth: {len(gt_boxes)} burners")
    print(f"   🎯 Predictions: {len(pred_boxes)} burners")
    print(f"   📸 Visualization saved: pipeline_demo.png")
    print(f"   🎪 Demo shows: GT (green) vs Predictions (red)")
    
    if len(gt_boxes) > 0 and len(pred_boxes) > 0:
        print(f"   ✅ Model found burners in image with burners!")
    elif len(gt_boxes) > 0 and len(pred_boxes) == 0:
        print(f"   ❌ Model missed burners that should be there")
    elif len(gt_boxes) == 0 and len(pred_boxes) > 0:
        print(f"   ❌ Model detected burners where there are none")
    else:
        print(f"   ✅ Model correctly found no burners in image with no burners")

def process_all_images_with_iou(model_path: str):
    """Process all images with full metadata for IoU evaluation"""
    if not model_path or not os.path.exists(model_path):
        print("❌ Model not found")
        return []
    
    interpreter = load_model(model_path)
    metadata_files = glob.glob(os.path.join(METADATA_DIR, "*.json"))
    results = []
    
    print(f"Processing {len(metadata_files)} images for IoU evaluation...")
    
    for i, metadata_file in enumerate(metadata_files):
        # Load metadata
        with open(metadata_file, 'r') as f:
            metadata = json.load(f)
        
        # Find corresponding image
        image_path = find_image_for_metadata(metadata_file)
        if not image_path:
            continue
        
        # Run inference
        detections = run_inference(image_path, interpreter, "simple")
        
        results.append({
            "file": os.path.basename(metadata_file),
            "image_path": image_path,
            "metadata": metadata,
            "detections": detections,
            "inference_success": True
        })
        
        # Progress update every 500 images
        if (i + 1) % 500 == 0:
            print(f"  Progress: {i + 1}/{len(metadata_files)} images processed")
    
    print(f"\n✅ IoU evaluation complete: {len(results)} images processed")
    return results

## Working with the Saved DataFrame

In [None]:
# Example: How to load and work with your saved dataset
print("\n" + "="*60)
print("📖 HOW TO USE YOUR SAVED DATASET")
print("="*60)

# This is how you would load your dataset in a new session
# df = load_dataframe("burner_dataset.pkl")

# Example queries you can perform:
print("\n💡 Example DataFrame Operations:")
print("   # Load your dataset")
print("   df = load_dataframe('burner_dataset.pkl')")
print()
print("   # Get all images with burners")
print("   images_with_burners = df[df['num_burners'] > 0]")
print()
print("   # Get all burner bounding boxes from a specific image")
print("   image_name = 'your_image.jpg'")
print("   image_row = df[df['image_name'] == image_name].iloc[0]")
print("   burner_boxes = image_row['burner_bboxes']")
print()
print("   # Filter images with multiple burners")
print("   multi_burner_images = df[df['num_burners'] > 1]")
print()
print("   # Get statistics")
print("   total_images = len(df)")
print("   total_burners = df['num_burners'].sum()")
print("   avg_burners_per_image = df['num_burners'].mean()")
print()
print("   # Export to other formats")
print("   df.to_csv('dataset_summary.csv', index=False)")
print("   df.to_json('dataset.json', orient='records')")

print("\n✅ Dataset is ready for use!")
print("   📁 Files created:")
print("     - burner_dataset.pkl (full dataset with bounding boxes)")
print("     - burner_dataset_summary.csv (summary statistics)")
if model_path:
    print("     - burner_dataset_with_predictions.pkl (dataset + model predictions)")
    print("     - burner_dataset_with_predictions_summary.csv (prediction summary)")

## Evaluation Methods

In [None]:
# Process all images
if model_path:
    # ===== EVALUATION METHOD 1: Simple Presence/Absence =====
    print("\n" + "="*60)
    print("📊 SIMPLE EVALUATION (Presence/Absence)")
    print("="*60)
    
    results = process_all_images(model_path)
    
    # ===== EVALUATION METHOD 2: Advanced IoU-Based =====
    print("\n" + "="*60)
    print("🎯 ADVANCED EVALUATION (IoU-Based)")
    print("="*60)
    
    iou_results = process_all_images_with_iou(model_path)
    if iou_results:
        evaluation = evaluate_with_iou(iou_results, iou_threshold=0.5)
        
        print(f"\n📊 IoU Evaluation Results (threshold=0.5):")
        print(f"   Precision: {evaluation['precision']:.3f}")
        print(f"   Recall: {evaluation['recall']:.3f}")
        print(f"   F1 Score: {evaluation['f1']:.3f}")
        print(f"   True Positives: {evaluation['true_positives']}")
        print(f"   False Positives: {evaluation['false_positives']}")
        print(f"   False Negatives: {evaluation['false_negatives']}")
        
        # Show detailed per-image results
        print(f"\n📋 Per-Image Results:")
        for detail in evaluation['detailed_results'][:5]:  # Show first 5
            print(f"   {detail['file']}: GT={detail['gt_boxes']}, Pred={detail['pred_boxes']}, "
                  f"TP={detail['tp']}, FP={detail['fp']}, FN={detail['fn']}")
    
else:
    print("⚠️  Skipping processing - no model available")
    results = []

## Step 3: Check Burner Classification Accuracy

In [None]:
def analyze_burner_performance(results: List[Dict]):
    """Analyze burner detection performance"""
    if not results:
        print("No results to analyze")
        return
    
    total = len(results)
    true_positives = sum(1 for r in results if r['has_burner_gt'] and r['has_burner_pred'])
    false_positives = sum(1 for r in results if not r['has_burner_gt'] and r['has_burner_pred'])
    false_negatives = sum(1 for r in results if r['has_burner_gt'] and not r['has_burner_pred'])
    true_negatives = sum(1 for r in results if not r['has_burner_gt'] and not r['has_burner_pred'])
    
    accuracy = (true_positives + true_negatives) / total if total > 0 else 0
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    
    print("\n=== Simple Burner Classification Results ===")
    print(f"Total images: {total}")
    print(f"Accuracy: {accuracy:.2%}")
    print(f"Precision: {precision:.2%}")
    print(f"Recall: {recall:.2%}")
    print(f"\nConfusion Matrix:")
    print(f"  True Positives: {true_positives}")
    print(f"  False Positives: {false_positives}")
    print(f"  False Negatives: {false_negatives}")
    print(f"  True Negatives: {true_negatives}")
    
    # Show misclassified examples
    print(f"\nMisclassifications:")
    for result in results:
        if result['has_burner_gt'] != result['has_burner_pred']:
            status = "Missing burner" if result['has_burner_gt'] else "False detection"
            print(f"  {result['file']}: {status}")

# Analyze results
analyze_burner_performance(results)

## Export Results

In [None]:
if results:
    output_file = "burner_classification_results.json"
    with open(output_file, 'w') as f:
        json.dump(results, f, indent=2)
    print(f"\n📄 Results saved to {output_file}")

## Preprocessing Method Comparison

Compare different normalization techniques to handle lighting variations

In [None]:
def process_images_with_preprocessing_method(model_path: str, normalization_method: str, 
                                           max_images: int = 100) -> List[Dict]:
    """Process subset of images with specific preprocessing method"""
    if not model_path or not os.path.exists(model_path):
        print(f"❌ Model not found")
        return []
    
    interpreter = load_model(model_path)
    metadata_files = glob.glob(os.path.join(METADATA_DIR, "*.json"))
    
    # Limit to subset for comparison (processing all 8000+ would take too long)
    test_files = metadata_files[:max_images]
    results = []
    
    print(f"Testing {normalization_method} normalization on {len(test_files)} images...")
    
    for i, metadata_file in enumerate(test_files):
        # Load metadata
        with open(metadata_file, 'r') as f:
            metadata = json.load(f)
        
        # Find corresponding image
        image_path = find_image_for_metadata(metadata_file)
        if not image_path:
            continue
        
        # Run inference with specific normalization method
        detections = run_inference(image_path, interpreter, normalization_method)
        
        # Extract ground truth burner labels
        gt_burners = []
        if 'annotations' in metadata:
            for bbox in metadata['annotations'].get('bboxes', []):
                if 'burner' in bbox.get('label', '').lower():
                    gt_burners.append(bbox['label'])
        
        # Check if model detected burners
        pred_burners = [det for det in detections if det['class_id'] == 0]
        
        results.append({
            "file": os.path.basename(metadata_file),
            "image_path": image_path,
            "ground_truth_burners": len(gt_burners),
            "predicted_burners": len(pred_burners),
            "detections": detections,
            "has_burner_gt": len(gt_burners) > 0,
            "has_burner_pred": len(pred_burners) > 0,
            "normalization_method": normalization_method
        })
        
        # Progress update every 25 images for smaller batches
        if (i + 1) % 25 == 0:
            print(f"  Progress: {i + 1}/{len(test_files)} images processed")
    
    print(f"✅ {normalization_method} processing complete: {len(results)} images")
    return results

def compare_preprocessing_methods(model_path: str, max_images: int = 100):
    """Compare all three preprocessing methods"""
    methods = ["simple", "gcn", "lcn"]
    all_results = {}
    
    print("🧪 PREPROCESSING METHOD COMPARISON")
    print("=" * 60)
    print(f"Testing {len(methods)} normalization methods on {max_images} images each")
    print("Methods: Simple (0-1), GCN (Global Contrast), LCN (Local Contrast)")
    print("=" * 60)
    
    # Test each method
    for method in methods:
        print(f"\n🔬 Testing {method.upper()} normalization...")
        results = process_images_with_preprocessing_method(model_path, method, max_images)
        all_results[method] = results
    
    # Compare results
    print(f"\n📊 PREPROCESSING COMPARISON RESULTS")
    print("=" * 60)
    
    for method, results in all_results.items():
        if not results:
            continue
            
        total = len(results)
        true_positives = sum(1 for r in results if r['has_burner_gt'] and r['has_burner_pred'])
        false_positives = sum(1 for r in results if not r['has_burner_gt'] and r['has_burner_pred'])
        false_negatives = sum(1 for r in results if r['has_burner_gt'] and not r['has_burner_pred'])
        true_negatives = sum(1 for r in results if not r['has_burner_gt'] and not r['has_burner_pred'])
        
        accuracy = (true_positives + true_negatives) / total if total > 0 else 0
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        # Count total detections (regardless of correctness)
        total_detections = sum(r['predicted_burners'] for r in results)
        avg_detections = total_detections / total if total > 0 else 0
        
        print(f"\n🔸 {method.upper()} Normalization:")
        print(f"   Accuracy:  {accuracy:.3f}")
        print(f"   Precision: {precision:.3f}")
        print(f"   Recall:    {recall:.3f}")
        print(f"   F1 Score:  {f1:.3f}")
        print(f"   Avg Detections/Image: {avg_detections:.2f}")
        print(f"   TP: {true_positives}, FP: {false_positives}, FN: {false_negatives}, TN: {true_negatives}")
    
    # Find best method
    best_method = None
    best_f1 = 0
    
    for method, results in all_results.items():
        if not results:
            continue
        total = len(results)
        true_positives = sum(1 for r in results if r['has_burner_gt'] and r['has_burner_pred'])
        false_positives = sum(1 for r in results if not r['has_burner_gt'] and r['has_burner_pred'])
        false_negatives = sum(1 for r in results if r['has_burner_gt'] and not r['has_burner_pred'])
        
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        if f1 > best_f1:
            best_f1 = f1
            best_method = method
    
    if best_method:
        print(f"\n🏆 BEST PREPROCESSING METHOD: {best_method.upper()}")
        print(f"   Best F1 Score: {best_f1:.3f}")
        print(f"   💡 Recommended for production use!")
    
    # Save comparison results
    comparison_file = "preprocessing_comparison.json"
    with open(comparison_file, 'w') as f:
        # Convert results to JSON-serializable format
        serializable_results = {}
        for method, results in all_results.items():
            serializable_results[method] = []
            for result in results:
                # Remove non-serializable items
                clean_result = {k: v for k, v in result.items() if k != 'detections'}
                clean_result['detection_count'] = len(result.get('detections', []))
                serializable_results[method].append(clean_result)
        
        json.dump(serializable_results, f, indent=2)
    
    print(f"\n📄 Comparison results saved to {comparison_file}")
    return all_results

def visualize_preprocessing_effects(metadata_file: str, model_path: str):
    """Visualize the effects of different preprocessing methods on a single image"""
    
    print("🔍 Visualizing preprocessing effects on sample image...")
    
    # Load metadata and find image
    with open(metadata_file, 'r') as f:
        metadata = json.load(f)
    
    image_path = find_image_for_metadata(metadata_file)
    if not image_path:
        print(f"❌ No image found for {metadata_file}")
        return
    
    # Load original image
    original_image = Image.open(image_path).convert('RGB')
    
    # Test all three preprocessing methods
    methods = ["simple", "gcn", "lcn"]
    method_names = ["Simple (0-1)", "Global Contrast Norm", "Local Contrast Norm"]
    
    # Load model to get expected input format
    interpreter = load_model(model_path)
    input_details = interpreter.get_input_details()
    input_shape = input_details[0]['shape']
    input_dtype = input_details[0]['dtype']
    target_size = (input_shape[1], input_shape[2])
    
    # Create visualization
    fig, axes = plt.subplots(1, 4, figsize=(20, 5))
    
    # Original image
    axes[0].imshow(original_image)
    axes[0].set_title("Original Image")
    axes[0].axis('off')
    
    # Apply each preprocessing method
    for i, (method, name) in enumerate(zip(methods, method_names)):
        preprocessed = preprocess_image(image_path, target_size, input_dtype, method)
        
        # Convert back to display format
        if input_dtype == np.uint8:
            display_image = preprocessed[0].astype(np.uint8)
        else:
            display_image = (preprocessed[0] * 255).astype(np.uint8)
        
        axes[i + 1].imshow(display_image)
        axes[i + 1].set_title(f"{name}")
        axes[i + 1].axis('off')
    
    plt.tight_layout()
    plt.savefig("preprocessing_comparison.png", dpi=150, bbox_inches='tight')
    plt.show()
    
    print(f"📸 Preprocessing comparison saved: preprocessing_comparison.png")

## Pipeline Visualization Demo

In [None]:
# ===== PIPELINE DEMO: Visual Walkthrough =====
if model_path:
    print("\n" + "="*60)
    print("📸 PIPELINE VISUALIZATION DEMO")
    print("="*60)
    
    # Visualize one sample image to demonstrate the pipeline
    metadata_files = glob.glob(os.path.join(METADATA_DIR, "*.json"))
    if metadata_files:
        sample_metadata = metadata_files[0]  # Just take the first one
        print(f"Demonstrating pipeline on sample image: {os.path.basename(sample_metadata)}")
        visualize_single_image_pipeline(sample_metadata, model_path)
    else:
        print("⚠️  No metadata files found for visualization demo")
else:
    print("⚠️  Skipping visualization demo - no model available")

## Debugging Simple Evaluation

In [None]:
def debug_simple_evaluation(results: List[Dict], sample_limit: int = 10):
    """
    Debug why simple evaluation is getting 0 true positives
    """
    print("🔍 DEBUGGING SIMPLE EVALUATION")
    print("=" * 60)
    
    if not results:
        print("❌ No results to debug!")
        return
    
    # Overall statistics
    total_images = len(results)
    has_gt_burners = sum(1 for r in results if r['has_burner_gt'])
    has_pred_burners = sum(1 for r in results if r['has_burner_pred'])
    true_positives = sum(1 for r in results if r['has_burner_gt'] and r['has_burner_pred'])
    
    print(f"📊 OVERALL STATISTICS:")
    print(f"   Total images: {total_images}")
    print(f"   Images with GT burners: {has_gt_burners}")
    print(f"   Images with predicted burners: {has_pred_burners}")
    print(f"   True positives: {true_positives}")
    
    # Sample detailed analysis
    print(f"\n🔍 DETAILED SAMPLE ANALYSIS (first {sample_limit} images):")
    
    gt_label_examples = set()
    pred_class_examples = set()
    inference_issues = 0
    
    for i, result in enumerate(results[:sample_limit]):
        print(f"\n📸 Sample {i+1}: {result['file']}")
        
        # Check ground truth
        print(f"   Ground Truth Analysis:")
        all_gt_labels = result.get('all_gt_labels', [])
        gt_burner_count = result.get('ground_truth_burners', 0)
        
        if all_gt_labels:
            print(f"     All labels: {all_gt_labels}")
            gt_label_examples.update(all_gt_labels)
        else:
            print(f"     No labels found in annotations")
        
        print(f"     Burner count: {gt_burner_count}")
        print(f"     has_burner_gt: {result['has_burner_gt']}")
        
        # Check predictions
        print(f"   Prediction Analysis:")
        detections = result.get('detections', [])
        pred_burner_count = result.get('predicted_burners', 0)
        
        if detections:
            print(f"     Total detections: {len(detections)}")
            for j, det in enumerate(detections):
                class_id = det['class_id']
                confidence = det['confidence']
                print(f"       Detection {j+1}: class_id={class_id}, confidence={confidence:.3f}")
                pred_class_examples.add(class_id)
        else:
            print(f"     No detections found")
            inference_issues += 1
        
        print(f"     Burner detections (class_id=0): {pred_burner_count}")
        print(f"     has_burner_pred: {result['has_burner_pred']}")
        
        # Check if this should be a true positive
        if result['has_burner_gt'] and result['has_burner_pred']:
            print(f"     ✅ This is a TRUE POSITIVE")
        elif result['has_burner_gt'] and not result['has_burner_pred']:
            print(f"     ❌ This is a FALSE NEGATIVE (GT has burners, model doesn't)")
        elif not result['has_burner_gt'] and result['has_burner_pred']:
            print(f"     ❌ This is a FALSE POSITIVE (model detects burners, GT doesn't)")
        else:
            print(f"     ✅ This is a TRUE NEGATIVE (no burners in GT or predictions)")
    
    print(f"\n📋 SUMMARY OF FINDINGS:")
    print(f"   Unique GT labels seen: {sorted(gt_label_examples)}")
    print(f"   Unique prediction class_ids seen: {sorted(pred_class_examples)}")
    print(f"   Images with no detections: {inference_issues}")
    
    # Diagnose the issue
    print(f"\n🔧 DIAGNOSIS:")
    if has_gt_burners == 0:
        print("   ❌ ISSUE: No ground truth burners found!")
        print("      - Check if GT labels contain 'burner' (case-insensitive)")
        print("      - Check if metadata files have 'annotations' field")
    elif has_pred_burners == 0:
        print("   ❌ ISSUE: No predicted burners found!")
        print("      - Check if model is producing any detections")
        print("      - Check if burner class_id should be 0")
        print("      - Check confidence thresholds")
    else:
        print("   ❌ ISSUE: GT and predictions don't align!")
        print("      - Check if burner labels in GT match detection logic")
        print("      - Check if there are images with both GT and pred burners")
    
    return {
        'total_images': total_images,
        'has_gt_burners': has_gt_burners,
        'has_pred_burners': has_pred_burners,
        'true_positives': true_positives,
        'gt_labels': gt_label_examples,
        'pred_classes': pred_class_examples,
        'inference_issues': inference_issues
    }

def inspect_raw_data(sample_limit: int = 3):
    """
    Inspect raw ground truth and model output formats
    """
    print("🔍 INSPECTING RAW DATA FORMATS")
    print("=" * 60)
    
    # Check ground truth format
    metadata_files = glob.glob(os.path.join(METADATA_DIR, "*.json"))
    if not metadata_files:
        print("❌ No metadata files found!")
        return
    
    print(f"📋 GROUND TRUTH INSPECTION:")
    for i, metadata_file in enumerate(metadata_files[:sample_limit]):
        print(f"\n📸 Sample {i+1}: {os.path.basename(metadata_file)}")
        
        with open(metadata_file, 'r') as f:
            metadata = json.load(f)
        
        print(f"   Available keys: {list(metadata.keys())}")
        
        if 'annotations' in metadata:
            annotations = metadata['annotations']
            print(f"   Annotations keys: {list(annotations.keys())}")
            
            if 'bboxes' in annotations:
                bboxes = annotations['bboxes']
                print(f"   Number of bboxes: {len(bboxes)}")
                
                for j, bbox in enumerate(bboxes[:3]):  # Show first 3 bboxes
                    print(f"     Bbox {j+1}: {bbox}")
            else:
                print("   No 'bboxes' in annotations")
        else:
            print("   No 'annotations' in metadata")
    
    # Check model output format if we have a model
    if model_path and os.path.exists(model_path):
        print(f"\n🎯 MODEL OUTPUT INSPECTION:")
        
        # Find a sample image
        sample_metadata = metadata_files[0]
        image_path = find_image_for_metadata(sample_metadata)
        
        if image_path:
            print(f"   Testing with image: {os.path.basename(image_path)}")
            
            interpreter = load_model(model_path)
            
            # Check model input/output details
            input_details = interpreter.get_input_details()
            output_details = interpreter.get_output_details()
            
            print(f"   Model input details:")
            for detail in input_details:
                print(f"     {detail['name']}: shape={detail['shape']}, dtype={detail['dtype']}")
            
            print(f"   Model output details:")
            for detail in output_details:
                print(f"     {detail['name']}: shape={detail['shape']}, dtype={detail['dtype']}")
            
            # Run inference and check raw outputs
            detections = run_inference(image_path, interpreter, "simple")
            print(f"   Raw detections returned: {len(detections)}")
            
            for j, det in enumerate(detections[:3]):  # Show first 3 detections
                print(f"     Detection {j+1}: {det}")
        else:
            print("   No corresponding image found for sample metadata")
    else:
        print("   No model available for inspection")