# Ultra-Advanced Spot the Difference Pipeline

**Focus: Fix Object Detection Bottleneck**

This notebook addresses the core accuracy issues:
1. **🔍 Enhanced Object Detection**: Grounding DINO + OWL-ViT ensemble
2. **📈 Image Upscaling**: Super-resolution for low-resolution images
3. **🎯 Threshold Calibration**: Cross-validation F1 optimization
4. **📚 Expanded Vocabulary**: Rich prompts with synonyms
5. **🧠 Pre-trained ChangeFormer**: Load existing model
6. **🔧 Strong Augmentations**: Data augmentation pipeline

**Key Insight**: Poor object detection (due to low resolution) is the main bottleneck → Fix detection first!**

In [None]:
# Import Required Libraries
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image, ImageFilter, ImageEnhance
import torch
import torch.nn as nn
import torchvision.transforms as T
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import f1_score, precision_recall_fscore_support
import cv2
from tqdm import tqdm
import warnings
import albumentations as A
from albumentations.pytorch import ToTensorV2
warnings.filterwarnings('ignore')

print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("Device name:", torch.cuda.get_device_name(0))
    torch.backends.cudnn.benchmark = True

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 1. Enhanced Data Loading & Image Preprocessing

**Problem**: Low resolution images cause poor object detection
**Solution**: Super-resolution upscaling + strong augmentations

In [None]:
# Load datasets
data_dir = './'
train_df = pd.read_csv(os.path.join(data_dir, 'train.csv'))
test_df = pd.read_csv(os.path.join(data_dir, 'test.csv'))

print('Train Data Sample:')
display(train_df.head())
print(f'\nTrain samples: {len(train_df)}')
print(f'Test samples: {len(test_df)}')

In [None]:
# Super-resolution image enhancement
class ImageEnhancer:
    """
    Advanced image enhancement for better object detection
    """
    def __init__(self, target_size=(1024, 1024)):
        self.target_size = target_size
        
    def enhance_image(self, image_path):
        """
        Apply super-resolution and enhancement techniques
        """
        # Load image
        img = cv2.imread(image_path)
        if img is None:
            raise ValueError(f"Could not load image: {image_path}")
        
        # Convert to RGB
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        # Get original dimensions
        h, w = img_rgb.shape[:2]
        
        # Super-resolution upscaling if image is small
        if h < 512 or w < 512:
            # Calculate upscale factor
            scale_factor = max(512 / h, 512 / w, 1.0)
            if scale_factor > 1.0:
                new_w = int(w * scale_factor)
                new_h = int(h * scale_factor)
                img_rgb = cv2.resize(img_rgb, (new_w, new_h), interpolation=cv2.INTER_CUBIC)
        
        # Convert to PIL for further processing
        pil_img = Image.fromarray(img_rgb)
        
        # Apply enhancement filters
        # 1. Sharpening
        pil_img = pil_img.filter(ImageFilter.UnsharpMask(radius=1, percent=150, threshold=3))
        
        # 2. Contrast enhancement
        enhancer = ImageEnhance.Contrast(pil_img)
        pil_img = enhancer.enhance(1.1)
        
        # 3. Brightness adjustment
        enhancer = ImageEnhance.Brightness(pil_img)
        pil_img = enhancer.enhance(1.05)
        
        # Resize to target size if needed
        if pil_img.size != self.target_size:
            pil_img = pil_img.resize(self.target_size, Image.LANCZOS)
        
        return pil_img
    
    def get_image_stats(self, image_path):
        """Get image statistics for analysis"""
        img = cv2.imread(image_path)
        if img is None:
            return None
        h, w = img.shape[:2]
        return {'width': w, 'height': h, 'aspect_ratio': w/h, 'area': w*h}

# Initialize enhancer
enhancer = ImageEnhancer(target_size=(1024, 1024))

print("✅ Image enhancer initialized")

In [None]:
# Analyze image statistics
print("📊 Analyzing image statistics...")

image_stats = []
for img_id in tqdm(train_df['img_id'].head(100), desc="Analyzing images"):  # Sample first 100
    for suffix in ['1', '2']:
        img_path = os.path.join(data_dir, 'data/data', f'{img_id}_{suffix}.png')
        stats = enhancer.get_image_stats(img_path)
        if stats:
            stats['img_id'] = img_id
            stats['suffix'] = suffix
            image_stats.append(stats)

stats_df = pd.DataFrame(image_stats)

print("\n📈 Image Statistics Summary:")
print(f"Total images analyzed: {len(stats_df)}")
print(f"Average resolution: {stats_df['width'].mean():.0f}x{stats_df['height'].mean():.0f}")
print(f"Min resolution: {stats_df['width'].min():.0f}x{stats_df['height'].min():.0f}")
print(f"Max resolution: {stats_df['width'].max():.0f}x{stats_df['height'].max():.0f}")
print(f"Images smaller than 512x512: {len(stats_df[stats_df['area'] < 512*512])} ({100*len(stats_df[stats_df['area'] < 512*512])/len(stats_df):.1f}%)")

plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
plt.hist(stats_df['width'], bins=30, alpha=0.7)
plt.title('Width Distribution')
plt.xlabel('Width (pixels)')

plt.subplot(1, 3, 2)
plt.hist(stats_df['height'], bins=30, alpha=0.7)
plt.title('Height Distribution')
plt.xlabel('Height (pixels)')

plt.subplot(1, 3, 3)
plt.hist(stats_df['area'], bins=30, alpha=0.7)
plt.title('Area Distribution')
plt.xlabel('Area (pixels²)')
plt.tight_layout()
plt.show()

print("\n🎯 Key Insight: Many images are low resolution → Super-resolution enhancement needed!")

In [None]:
# Strong data augmentation pipeline
def create_augmentation_pipeline():
    """
    Create strong augmentation pipeline for training
    """
    return A.Compose([
        # Geometric augmentations
        A.Rotate(limit=15, p=0.3),
        A.Affine(scale=(0.8, 1.2), translate_percent=0.1, rotate=(-10, 10), p=0.4),
        
        # Color augmentations
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.HueSaturationValue(hue_shift_limit=10, sat_shift_limit=20, val_shift_limit=10, p=0.3),
        A.RGBShift(r_shift_limit=10, g_shift_limit=10, b_shift_limit=10, p=0.3),
        
        # Noise and blur
        A.GaussNoise(var_limit=(10, 50), p=0.2),
        A.GaussianBlur(blur_limit=3, p=0.1),
        
        # Cutout/Mixup
        A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p=0.3),
        
        # Normalize
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])

# Test enhancement on sample image
sample_img_id = train_df['img_id'].iloc[0]
sample_img_path = os.path.join(data_dir, 'data/data', f'{sample_img_id}_1.png')

print(f"🧪 Testing image enhancement on: {sample_img_id}")

# Original image
original_img = Image.open(sample_img_path).convert('RGB')

# Enhanced image
enhanced_img = enhancer.enhance_image(sample_img_path)

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))
ax1.imshow(original_img)
ax1.set_title(f'Original\n{original_img.size}')
ax1.axis('off')

ax2.imshow(enhanced_img)
ax2.set_title(f'Enhanced\n{enhanced_img.size}')
ax2.axis('off')

plt.tight_layout()
plt.show()

print(f"✅ Image enhancement working! Resolution improved from {original_img.size} to {enhanced_img.size}")

## 2. Expanded Vocabulary with Rich Prompts

**Problem**: Limited vocabulary hurts detection accuracy
**Solution**: Rich prompts with synonyms, descriptions, and context

In [None]:
# Extract vocabulary STRICTLY from training data labels only
import re
from collections import defaultdict

print("📚 Extracting vocabulary from training dataset labels...")

# Extract base vocabulary from actual labels
term_frequencies = defaultdict(int)

for col in ['added_objs', 'removed_objs', 'changed_objs']:
    for label_str in train_df[col].dropna():
        if isinstance(label_str, str) and label_str.strip().lower() not in ['', 'none', 'null', 'nan']:
            # Split by common separators
            tokens = re.split(r'[,&\s]+', label_str.strip().lower())
            for token in tokens:
                token = token.strip()
                if token and token != 'none':
                    term_frequencies[token] += 1

# Print statistics
print(f"\nTotal unique terms found: {len(term_frequencies)}")
print(f"\nTerm frequency distribution:")
for freq_threshold in [1, 2, 5, 10]:
    count = len([t for t, f in term_frequencies.items() if f >= freq_threshold])
    print(f"  Terms appearing >= {freq_threshold} times: {count}")

# Filter terms - keep ALL terms that appear at least once in training data
# No filtering by generic terms - trust the training labels!
filtered_terms = {term: freq for term, freq in term_frequencies.items() if freq >= 1}

# Sort by frequency (most common first)
sorted_terms = sorted(filtered_terms.items(), key=lambda x: x[1], reverse=True)

# Create base vocabulary - ONLY terms from training data
base_vocabulary = [term for term, freq in sorted_terms]

print(f"\n✅ Base vocabulary created: {len(base_vocabulary)} terms")
print(f"\n📊 Top 30 most frequent terms in training data:")
for i, (term, freq) in enumerate(sorted_terms[:30], 1):
    print(f"  {i:2d}. {term:20s} (appears {freq:3d} times)")

# Analyze label patterns
print(f"\n🔍 Label pattern analysis:")
added_count = train_df['added_objs'].apply(lambda x: x not in ['', 'none', None] if isinstance(x, str) else False).sum()
removed_count = train_df['removed_objs'].apply(lambda x: x not in ['', 'none', None] if isinstance(x, str) else False).sum()
changed_count = train_df['changed_objs'].apply(lambda x: x not in ['', 'none', None] if isinstance(x, str) else False).sum()

print(f"  Samples with added objects: {added_count} ({100*added_count/len(train_df):.1f}%)")
print(f"  Samples with removed objects: {removed_count} ({100*removed_count/len(train_df):.1f}%)")
print(f"  Samples with changed objects: {changed_count} ({100*changed_count/len(train_df):.1f}%)")

In [None]:
# Simplified vocabulary - NO EXPANSION, use exact terms only
class StrictVocabulary:
    """
    Strict vocabulary using ONLY terms from training data
    No synonyms, no expansions - exact matching only
    """
    def __init__(self, base_terms):
        self.base_terms = list(base_terms)  # Keep exact order
        self.term_set = set(base_terms)
        
    def get_detection_prompts(self, add_simple_articles=False):
        """
        Get detection prompts
        
        Args:
            add_simple_articles: If True, add "a X" variants for better detection
                                Only adds simple variants, no synonyms
        
        Returns:
            List of prompts for object detection
        """
        prompts = []
        
        for term in self.base_terms:
            prompts.append(term)
            
            # Optionally add simple article variants for better detection
            if add_simple_articles:
                prompts.append(f"a {term}")
                prompts.append(f"the {term}")
        
        return prompts
    
    def normalize_term(self, detected_term):
        """
        Normalize detected term to match training vocabulary
        
        Args:
            detected_term: Term detected by model
        
        Returns:
            Normalized term if match found, original term otherwise
        """
        # Clean the term
        cleaned = detected_term.lower().strip()
        
        # Remove common articles
        for article in ['a ', 'an ', 'the ']:
            if cleaned.startswith(article):
                cleaned = cleaned[len(article):]
        
        # Check if it's in our vocabulary
        if cleaned in self.term_set:
            return cleaned
        
        # Check if any vocabulary term is contained in the detected term
        for vocab_term in self.base_terms:
            if vocab_term in cleaned or cleaned in vocab_term:
                return vocab_term
        
        # Return original if no match
        return cleaned
    
    def filter_predictions(self, predicted_terms):
        """
        Filter predictions to only include terms in training vocabulary
        
        Args:
            predicted_terms: List of predicted terms
        
        Returns:
            Filtered list containing only valid vocabulary terms
        """
        filtered = []
        for term in predicted_terms:
            normalized = self.normalize_term(term)
            if normalized in self.term_set:
                filtered.append(normalized)
        
        return list(set(filtered))  # Remove duplicates

# Create strict vocabulary - NO EXPANSION
strict_vocab = StrictVocabulary(base_vocabulary)

# Get detection prompts (with minimal article variants for better detection)
detection_prompts = strict_vocab.get_detection_prompts(add_simple_articles=True)

print(f"\n✅ Strict vocabulary created!")
print(f"   Base terms: {len(strict_vocab.base_terms)}")
print(f"   Detection prompts: {len(detection_prompts)} (includes article variants)")
print(f"\n   Sample base terms: {strict_vocab.base_terms[:15]}")
print(f"   Sample prompts: {detection_prompts[:20]}")

## 3. Advanced Object Detection: Grounding DINO + OWL-ViT Ensemble

**Problem**: Single detector misses objects
**Solution**: Ensemble of Grounding DINO (text-grounded) + OWL-ViT (open-vocab)

In [None]:
#!pip install -U transformers

In [None]:
# Load OWL-ViT (already familiar)
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection

print("Loading OWL-ViT model...")
owlvit_processor = AutoProcessor.from_pretrained("google/owlvit-base-patch32")
owlvit_model = AutoModelForZeroShotObjectDetection.from_pretrained("google/owlvit-base-patch32")
owlvit_model = owlvit_model.to(device)
owlvit_model.eval()
print("✅ OWL-ViT loaded")

In [None]:
# Load Grounding DINO
try:
    from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
    
    print("Loading Grounding DINO model...")
    
    # Model from Hugging Face transformers
    model_id = "IDEA-Research/grounding-dino-base"
    
    try:
        grounding_dino_processor = AutoProcessor.from_pretrained(model_id)
        grounding_dino_model = AutoModelForZeroShotObjectDetection.from_pretrained(model_id).to(device)
        grounding_dino_available = True
        print("✅ Grounding DINO loaded")
    except Exception as e:
        print(f"⚠️ Grounding DINO loading failed: {e}")
        print("⚠️ Will use OWL-ViT only")
        grounding_dino_available = False
        
except ImportError:
    print("⚠️ transformers library issue, will use OWL-ViT only")
    grounding_dino_available = False

In [None]:
# OWL-ViT detection function
def detect_with_owlvit(image_or_path, vocab_terms, threshold=0.08):
    """
    Detect objects using OWL-ViT with vocabulary prompts
    """
    image = image_or_path if isinstance(image_or_path, Image.Image) else Image.open(image_or_path).convert('RGB')
    inputs = owlvit_processor(text=vocab_terms, images=image, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}
    
    with torch.no_grad():
        outputs = owlvit_model(**inputs)
    
    target_sizes = torch.tensor([image.size[::-1]]).to(device)
    results = owlvit_processor.post_process_object_detection(
        outputs, 
        target_sizes=target_sizes, 
        threshold=threshold
    )[0]
    
    boxes = results['boxes'].cpu().numpy()
    scores = results['scores'].cpu().numpy()
    labels = results['labels'].cpu().numpy()
    detected_terms = [vocab_terms[int(label)] for label in labels]
    
    return boxes, scores, labels, detected_terms

print("✅ OWL-ViT detection function ready")

In [None]:
# Grounding DINO detection function
def detect_with_grounding_dino(image_or_path, text_prompts, box_threshold=0.3, text_threshold=0.25):
    """
    Detect objects using Grounding DINO via transformers
    """
    if not grounding_dino_available:
        return np.array([]), np.array([]), np.array([]), []
    
    try:
        # Load image
        image = image_or_path if isinstance(image_or_path, Image.Image) else Image.open(image_or_path).convert('RGB')
        
        # Create text prompt (IMPORTANT: lowercase + end with dots)
        # Limit to avoid token limits
        text_list = text_prompts[:50]
        text = '. '.join([t.lower() for t in text_list]) + '.'
        
        # Process inputs
        inputs = grounding_dino_processor(images=image, text=text, return_tensors="pt").to(device)
        
        # Run prediction
        with torch.no_grad():
            outputs = grounding_dino_model(**inputs)
        
        # Post-process results
        results = grounding_dino_processor.post_process_grounded_object_detection(
            outputs,
            inputs.input_ids,
            box_threshold=box_threshold,
            text_threshold=text_threshold,
            target_sizes=[image.size[::-1]]
        )[0]
        
        # Extract results
        boxes = results['boxes'].cpu().numpy()
        scores = results['scores'].cpu().numpy()
        labels_indices = results['labels']  # These are indices into the text prompts
        
        # Map labels back to text terms
        detected_terms = []
        for label_idx in labels_indices:
            if label_idx < len(text_list):
                detected_terms.append(text_list[label_idx])
            else:
                detected_terms.append(text_list[0])  # Fallback
        
        # Create numeric labels for consistency
        labels = np.arange(len(detected_terms))
        
        return boxes, scores, labels, detected_terms
        
    except Exception as e:
        print(f"Grounding DINO error: {e}")
        return np.array([]), np.array([]), np.array([]), []

print("✅ Grounding DINO detection function ready")

In [None]:
# Enhanced ensemble detection
from torchvision.ops import nms

def detect_ensemble_advanced(image_path, vocab_terms, use_enhancement=True):
    """
    Advanced ensemble detection: Grounding DINO + OWL-ViT
    
    Args:
        image_path: Path to image
        vocab_terms: Base vocabulary terms
        use_enhancement: Whether to enhance image first
    
    Returns:
        boxes, scores, labels, detected_terms (mapped to base vocabulary)
    """
    # Enhance image if requested
    # Prepare in-memory image
    if use_enhancement:
        work_image = enhancer.enhance_image(image_path)  # PIL image
    else:
        work_image = Image.open(image_path).convert('RGB')

    # Get detection prompts
    prompts = strict_vocab.get_detection_prompts(add_simple_articles=True)
    
    # Detect with OWL-ViT
    boxes_owl, scores_owl, labels_owl, terms_owl = detect_with_owlvit(
        work_image, prompts, threshold=0.05
    )
    
    # Detect with Grounding DINO if available
    if grounding_dino_available:
        boxes_gdino, scores_gdino, labels_gdino, terms_gdino = detect_with_grounding_dino(
            work_image, prompts[:50]  # Limit prompts to avoid token limits
        )
    else:
        boxes_gdino, scores_gdino, labels_gdino, terms_gdino = np.array([]), np.array([]), np.array([]), []
    
    # Combine detections
    all_boxes = []
    all_scores = []
    all_terms = []
    
    # Add OWL-ViT detections
    if len(boxes_owl) > 0:
        all_boxes.append(boxes_owl)
        all_scores.append(scores_owl)
        all_terms.extend(terms_owl)
    
    # Add Grounding DINO detections
    if len(boxes_gdino) > 0:
        all_boxes.append(boxes_gdino)
        all_scores.append(scores_gdino)
        all_terms.extend(terms_gdino)
    
    # Merge boxes
    merged_boxes = np.vstack(all_boxes) if len(all_boxes) > 1 else all_boxes[0]
    merged_scores = np.concatenate(all_scores) if len(all_scores) > 1 else all_scores[0]
    
    # Apply NMS
    from torchvision.ops import nms
    
    if len(merged_boxes) > 0:
        boxes_tensor = torch.tensor(merged_boxes, dtype=torch.float32)
        scores_tensor = torch.tensor(merged_scores, dtype=torch.float32)
        
        keep_indices = nms(boxes_tensor, scores_tensor, iou_threshold=0.5)
        
        final_boxes = merged_boxes[keep_indices.numpy()]
        final_scores = merged_scores[keep_indices.numpy()]
        final_terms_raw = [all_terms[i] for i in keep_indices.numpy()]
        
        # STRICT FILTERING: Normalize and filter to training vocabulary only
        final_terms_normalized = []
        valid_indices = []
        
        for i, term in enumerate(final_terms_raw):
            normalized = strict_vocab.normalize_term(term)
            if normalized in strict_vocab.term_set:
                final_terms_normalized.append(normalized)
                valid_indices.append(i)
        
        # Only keep boxes with valid vocabulary matches
        if len(valid_indices) > 0:
            final_boxes = final_boxes[valid_indices]
            final_scores = final_scores[valid_indices]
            final_labels = np.array([base_vocabulary.index(term) for term in final_terms_normalized])
        else:
            final_boxes, final_scores, final_labels, final_terms_normalized = np.array([]), np.array([]), np.array([]), []
    else:
        final_boxes, final_scores, final_labels, final_terms_normalized = np.array([]), np.array([]), np.array([]), []
    
    return final_boxes, final_scores, final_labels, final_terms_normalized

print("✅ Strict ensemble detection ready")

## 4. Load Pre-trained ChangeFormer Model

**Problem**: Training from scratch takes time and may not be optimal
**Solution**: Load pre-trained ChangeFormer model

In [None]:
# Load pre-trained ChangeFormer
import timm
from torch.nn import MultiheadAttention

class ChangeFormer(nn.Module):
    def __init__(self, backbone='vit_base_patch16_224', num_heads=8, hidden_dim=256):
        super().__init__()
        self.encoder = timm.create_model(backbone, pretrained=True, num_classes=0)
        embed_dim = self.encoder.num_features
        
        self.cross_attn_1to2 = MultiheadAttention(
            embed_dim=embed_dim, num_heads=num_heads, batch_first=True
        )
        self.cross_attn_2to1 = MultiheadAttention(
            embed_dim=embed_dim, num_heads=num_heads, batch_first=True
        )
        
        self.fusion = nn.Sequential(
            nn.Linear(embed_dim * 4, hidden_dim),
            nn.LayerNorm(hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.1)
        )
        
        self.change_head = nn.Linear(hidden_dim // 2, 1)
    
    def forward(self, img1, img2):
        feat1 = self.encoder.forward_features(img1)
        feat2 = self.encoder.forward_features(img2)
        
        attn_1to2, _ = self.cross_attn_1to2(feat1, feat2, feat2)
        attn_2to1, _ = self.cross_attn_2to1(feat2, feat1, feat1)
        
        feat1_pool = feat1.mean(dim=1)
        feat2_pool = feat2.mean(dim=1)
        attn_1to2_pool = attn_1to2.mean(dim=1)
        attn_2to1_pool = attn_2to1.mean(dim=1)
        
        combined = torch.cat([feat1_pool, feat2_pool, attn_1to2_pool, attn_2to1_pool], dim=1)
        fused = self.fusion(combined)
        change_logits = self.change_head(fused)
        
        return change_logits.squeeze(-1)

# Try to load pre-trained ChangeFormer
changeformer_path = 'changeformer_model.pth'
if os.path.exists(changeformer_path):
    print(f"Loading pre-trained ChangeFormer from {changeformer_path}...")
    changeformer_model = ChangeFormer()
    changeformer_model.load_state_dict(torch.load(changeformer_path, map_location='cpu'))
    changeformer_model = changeformer_model.to(device)
    changeformer_model.eval()
    print("✅ Pre-trained ChangeFormer loaded")
else:
    print(f"⚠️ Pre-trained ChangeFormer not found at {changeformer_path}")
    print("Creating new ChangeFormer model...")
    changeformer_model = ChangeFormer().to(device)
    print("✅ New ChangeFormer created (will need training)")

print(f"Model parameters: {sum(p.numel() for p in changeformer_model.parameters()):,}")

## 5. Threshold Calibration with Cross-Validation

**Problem**: Fixed thresholds don't work well across different scenarios
**Solution**: Cross-validation to find optimal thresholds for F1 scores

In [None]:
# Threshold calibration with cross-validation
class ThresholdCalibrator:
    """
    Calibrate thresholds using cross-validation for optimal F1 scores
    """
    def __init__(self, n_splits=5):
        self.n_splits = n_splits
        self.best_thresholds = {
            'iou_match': 0.5,
            'change_score': 0.3,
            'detection_conf': 0.1
        }
    
    def calibrate_on_validation_set(self, val_df, vocab_terms, max_samples=50):
        """
        Calibrate thresholds using validation data
        """
        print(f"🎯 Calibrating thresholds using {min(len(val_df), max_samples)} validation samples...")
        
        # Use subset for speed
        val_subset = val_df.sample(min(len(val_df), max_samples), random_state=42)
        
        # Threshold ranges to test
        iou_thresholds = [0.3, 0.4, 0.5, 0.6, 0.7]
        change_thresholds = [0.1, 0.2, 0.3, 0.4, 0.5]
        conf_thresholds = [0.05, 0.1, 0.15, 0.2, 0.25]
        
        best_f1 = 0
        best_thresholds = self.best_thresholds.copy()
        
        # Grid search
        for iou_thresh in iou_thresholds:
            for change_thresh in change_thresholds:
                for conf_thresh in conf_thresholds:
                    f1_scores = []
                    
                    for idx, row in val_subset.iterrows():
                        img_id = row['img_id']
                        
                        # Get predictions with current thresholds
                        pred_result = self._predict_with_thresholds(
                            img_id, vocab_terms, iou_thresh, change_thresh, conf_thresh
                        )
                        
                        # Calculate F1 for this sample
                        true_added = set(self._normalize_labels(row['added_objs']))
                        true_removed = set(self._normalize_labels(row['removed_objs']))
                        true_changed = set(self._normalize_labels(row['changed_objs']))
                        
                        pred_added = set(pred_result['added'])
                        pred_removed = set(pred_result['removed'])
                        pred_changed = set(pred_result['changed'])
                        
                        # Calculate F1 for each category
                        f1_added = self._calculate_f1(true_added, pred_added)
                        f1_removed = self._calculate_f1(true_removed, pred_removed)
                        f1_changed = self._calculate_f1(true_changed, pred_changed)
                        
                        # Average F1
                        avg_f1 = (f1_added + f1_removed + f1_changed) / 3
                        f1_scores.append(avg_f1)
                    
                    # Average F1 across samples
                    mean_f1 = np.mean(f1_scores)
                    
                    if mean_f1 > best_f1:
                        best_f1 = mean_f1
                        best_thresholds = {
                            'iou_match': iou_thresh,
                            'change_score': change_thresh,
                            'detection_conf': conf_thresh
                        }
                        print(f"🆕 New best F1: {best_f1:.4f} with thresholds: {best_thresholds}")
        
        self.best_thresholds = best_thresholds
        print(f"\n✅ Calibration complete! Best F1: {best_f1:.4f}")
        print(f"Optimal thresholds: {self.best_thresholds}")
        
        return self.best_thresholds
    
    def _predict_with_thresholds(self, img_id, vocab_terms, iou_thresh, change_thresh, conf_thresh):
        """Make prediction with specific thresholds"""
        img1_path = os.path.join(data_dir, 'data/data', f'{img_id}_1.png')
        img2_path = os.path.join(data_dir, 'data/data', f'{img_id}_2.png')
        
        # Detect objects
        boxes1, scores1, labels1, terms1 = detect_ensemble_advanced(img1_path, vocab_terms)
        boxes2, scores2, labels2, terms2 = detect_ensemble_advanced(img2_path, vocab_terms)
        
        # Filter by confidence
        if len(scores1) > 0:
            keep1 = scores1 >= conf_thresh
            boxes1, scores1, labels1, terms1 = boxes1[keep1], scores1[keep1], labels1[keep1], [terms1[i] for i in range(len(terms1)) if keep1[i]]
        
        if len(scores2) > 0:
            keep2 = scores2 >= conf_thresh
            boxes2, scores2, labels2, terms2 = boxes2[keep2], scores2[keep2], labels2[keep2], [terms2[i] for i in range(len(terms2)) if keep2[i]]
        
        # Change detection
        transform = T.Compose([
            T.Resize((224, 224)),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        
        img1_tensor = transform(Image.open(img1_path).convert('RGB')).unsqueeze(0).to(device)
        img2_tensor = transform(Image.open(img2_path).convert('RGB')).unsqueeze(0).to(device)
        
        changeformer_model.eval()
        with torch.no_grad():
            change_score = torch.sigmoid(changeformer_model(img1_tensor, img2_tensor)).item()
        
        # If no significant change, return empty
        if change_score < change_thresh:
            return {'added': [], 'removed': [], 'changed': [], 'change_score': change_score}
        
        # Object matching with calibrated IoU threshold
        matched_pairs = self._match_objects(boxes1, labels1, boxes2, labels2, iou_thresh)
        matched_set = set(matched_pairs)
        
        added = [vocab_terms[int(labels2[j])] for j in range(len(labels2)) 
                if all((i, j) not in matched_set for i in range(len(labels1)))]
        
        removed = [vocab_terms[int(labels1[i])] for i in range(len(labels1)) 
                  if all((i, j) not in matched_set for j in range(len(labels2)))]
        
        changed = [vocab_terms[int(labels1[i])] for i, j in matched_pairs 
                  if self._compute_iou(boxes1[i], boxes2[j]) < iou_thresh]
        
        return {
            'added': list(set(added)), 
            'removed': list(set(removed)), 
            'changed': list(set(changed)), 
            'change_score': change_score
        }
    
    def _match_objects(self, boxes1, labels1, boxes2, labels2, iou_thresh):
        """Match objects using Hungarian algorithm"""
        if len(boxes1) == 0 or len(boxes2) == 0:
            return []
        
        from scipy.optimize import linear_sum_assignment
        cost_matrix = np.ones((len(boxes1), len(boxes2)))
        
        for i in range(len(boxes1)):
            for j in range(len(boxes2)):
                if labels1[i] == labels2[j]:
                    cost_matrix[i, j] = 1 - self._compute_iou(boxes1[i], boxes2[j])
        
        row_ind, col_ind = linear_sum_assignment(cost_matrix)
        matched_pairs = [(i, j) for i, j in zip(row_ind, col_ind) if cost_matrix[i, j] < (1 - iou_thresh)]
        
        return matched_pairs
    
    def _compute_iou(self, boxA, boxB):
        """Compute IoU"""
        xA = max(boxA[0], boxB[0])
        yA = max(boxA[1], boxB[1])
        xB = min(boxA[2], boxB[2])
        yB = min(boxA[3], boxB[3])
        interArea = max(0, xB - xA) * max(0, yB - yA)
        boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
        boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
        if boxAArea + boxBArea - interArea == 0:
            return 0
        return interArea / float(boxAArea + boxBArea - interArea)
    
    def _normalize_labels(self, label_str):
        """Normalize labels for comparison"""
        if pd.isna(label_str) or label_str.strip() == '' or label_str.strip() == 'none':
            return []
        tokens = re.split(r'[,&\s]+', label_str.strip().lower())
        return [token.strip() for token in tokens if token.strip() and token != 'none']
    
    def _calculate_f1(self, true_set, pred_set):
        """Calculate F1 score for a set comparison"""
        if len(true_set) == 0 and len(pred_set) == 0:
            return 1.0
        if len(true_set) == 0 or len(pred_set) == 0:
            return 0.0
        
        true_pos = len(true_set & pred_set)
        false_pos = len(pred_set - true_set)
        false_neg = len(true_set - pred_set)
        
        if true_pos + false_pos == 0:
            precision = 0
        else:
            precision = true_pos / (true_pos + false_pos)
        
        if true_pos + false_neg == 0:
            recall = 0
        else:
            recall = true_pos / (true_pos + false_neg)
        
        if precision + recall == 0:
            return 0
        return 2 * (precision * recall) / (precision + recall)

# Initialize calibrator
calibrator = ThresholdCalibrator(n_splits=5)

print("✅ Threshold calibrator ready")

## 6. Complete Ultra-Advanced Pipeline

**Integration**: Enhanced detection + calibrated thresholds + ChangeFormer

In [None]:
# Complete ultra-advanced pipeline
def ultra_advanced_pipeline(img_id, vocab_terms):
    """
    Complete ultra-advanced pipeline with all improvements:
    1. Enhanced image preprocessing
    2. Ensemble detection (Grounding DINO + OWL-ViT)
    3. Rich vocabulary with synonyms
    4. Calibrated thresholds
    5. ChangeFormer for change localization
    
    Returns:
        Dictionary with predictions
    """
    
    img1_path = os.path.join(data_dir, 'data/data', f'{img_id}_1.png')
    img2_path = os.path.join(data_dir, 'data/data', f'{img_id}_2.png')
    
    # Step 1: Enhanced object detection
    print(f"  1️⃣ Enhanced ensemble detection...")
    boxes1, scores1, labels1, terms1 = detect_ensemble_advanced(img1_path, vocab_terms, use_enhancement=True)
    boxes2, scores2, labels2, terms2 = detect_ensemble_advanced(img2_path, vocab_terms, use_enhancement=True)

    print(f"     Image 1: {len(terms1)} objects detected")
    print(f"     Image 2: {len(terms2)} objects detected")
    
    # Step 2: Filter by calibrated confidence threshold
    conf_thresh = calibrator.best_thresholds['detection_conf']
    
    # Ensure all outputs are arrays (handle single detection case)
    boxes1 = np.atleast_2d(boxes1)
    scores1 = np.atleast_1d(scores1)
    labels1 = np.atleast_1d(labels1)
    
    boxes2 = np.atleast_2d(boxes2)
    scores2 = np.atleast_1d(scores2)
    labels2 = np.atleast_1d(labels2)
    
    # For Image 1:
    if len(scores1) > 0:
        keep1 = scores1 >= conf_thresh
        
        # Filter numpy arrays
        boxes1 = boxes1[keep1]
        scores1 = scores1[keep1]
        labels1 = labels1[keep1]
        
        # Filter Python list
        keep1_indices = np.where(keep1)[0]
        terms1 = [terms1[i] for i in keep1_indices]

    # For Image 2:
    if len(scores2) > 0:
        keep2 = scores2 >= conf_thresh
        
        # Filter numpy arrays
        boxes2 = boxes2[keep2]
        scores2 = scores2[keep2]
        labels2 = labels2[keep2]
        
        # Filter Python list
        keep2_indices = np.where(keep2)[0]
        terms2 = [terms2[i] for i in keep2_indices]

    print(f"     After confidence filtering: {len(terms1)} / {len(terms2)} objects")
    
    # Step 3: ChangeFormer change detection
    print(f"  2️⃣ ChangeFormer analysis...")
    transform = T.Compose([
        T.Resize((224, 224)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    img1_tensor = transform(Image.open(img1_path).convert('RGB')).unsqueeze(0).to(device)
    img2_tensor = transform(Image.open(img2_path).convert('RGB')).unsqueeze(0).to(device)
    
    changeformer_model.eval()
    with torch.no_grad():
        change_score = torch.sigmoid(changeformer_model(img1_tensor, img2_tensor)).item()
    
    print(f"     Change score: {change_score:.4f}")
    
    # Step 4: Check if significant change detected
    change_thresh = calibrator.best_thresholds['change_score']
    if change_score < change_thresh:
        print(f"     No significant change detected (threshold: {change_thresh:.2f})")
        return {
            'added': [], 'removed': [], 'changed': [], 
            'change_score': change_score,
            'objects_img1': terms1, 'objects_img2': terms2
        }
    
    # Step 5: Object matching with calibrated IoU threshold
    print(f"  3️⃣ Object matching...")
    iou_thresh = calibrator.best_thresholds['iou_match']
    
    matched_pairs = calibrator._match_objects(boxes1, labels1, boxes2, labels2, iou_thresh)
    matched_set = set(matched_pairs)
    
    print(f"     Matched pairs: {len(matched_pairs)}")
    
    # Step 6: Classify changes
    added = [vocab_terms[int(labels2[j])] for j in range(len(labels2)) 
            if all((i, j) not in matched_set for i in range(len(labels1)))]
    
    removed = [vocab_terms[int(labels1[i])] for i in range(len(labels1)) 
              if all((i, j) not in matched_set for j in range(len(labels2)))]
    
    changed = [vocab_terms[int(labels1[i])] for i, j in matched_pairs 
              if calibrator._compute_iou(boxes1[i], boxes2[j]) < iou_thresh]
    
    result = {
        'added': list(set(added)),
        'removed': list(set(removed)),
        'changed': list(set(changed)),
        'change_score': change_score,
        'objects_img1': terms1,
        'objects_img2': terms2,
        'matched_pairs': len(matched_pairs)
    }
    
    print(f"     Added: {result['added']}")
    print(f"     Removed: {result['removed']}")
    print(f"     Changed: {result['changed']}")
    
    return result

print("✅ Ultra-advanced pipeline ready")

## 7. Calibration & Testing

**First**: Calibrate thresholds using validation data
**Then**: Test on validation samples

In [None]:
# Calibrate thresholds using validation data
print("🎯 Starting threshold calibration...")
print("This may take several minutes...")

# Use a subset of training data as validation for calibration
val_df = train_df.sample(30, random_state=42)  # Small subset for speed

optimal_thresholds = calibrator.calibrate_on_validation_set(
    val_df, base_vocabulary, max_samples=20
)

print(f"\n✅ Calibration complete!")
print(f"Optimal thresholds: {optimal_thresholds}")

In [None]:
# Test ultra-advanced pipeline on validation samples
print("\n🧪 Testing ultra-advanced pipeline on validation samples...")
print("="*80)

test_samples = train_df.sample(5, random_state=123)

print(f"\n🚀 Processing image {img_id} with ultra-advanced pipeline...")
for idx, row in test_samples.iterrows():
    img_id = row['img_id']
    
    print(f"\n📷 Image: {img_id}")
    print("-"*50)
    
    # Ground truth
    print("Ground Truth:")
    print(f"  Added: {row['added_objs']}")
    print(f"  Removed: {row['removed_objs']}")
    print(f"  Changed: {row['changed_objs']}")
    
    # Predictions
    result = ultra_advanced_pipeline(img_id, base_vocabulary)
    print(f"\nPredictions (Ultra-Advanced Pipeline):")
    print(f"  Added: {result['added']}")
    print(f"  Removed: {result['removed']}")
    print(f"  Changed: {result['changed']}")
    print(f"  Change score: {result['change_score']:.4f}")
    print(f"  Objects detected: {len(result['objects_img1'])} / {len(result['objects_img2'])}")

print("\n" + "="*80)

## 8. Generate Final Submission

**Using calibrated thresholds and ultra-advanced pipeline**

In [None]:
# Generate predictions for test set
print("🚀 Generating final predictions for test set...")
print("Using ultra-advanced pipeline with calibrated thresholds")

submission = []
for img_id in tqdm(test_df['img_id'], desc='Processing test images'):
    result = ultra_advanced_pipeline(img_id, base_vocabulary)
    
    added = 'none' if not result['added'] else ' '.join(result['added'])
    removed = 'none' if not result['removed'] else ' '.join(result['removed'])
    changed = 'none' if not result['changed'] else ' '.join(result['changed'])
    
    submission.append({
        'img_id': img_id,
        'added_objs': added,
        'removed_objs': removed,
        'changed_objs': changed
    })

submission_df = pd.DataFrame(submission)
submission_path = 'submission_ultra_advanced.csv'
submission_df.to_csv(submission_path, index=False)

print(f"\n✅ Final submission saved to {submission_path}")
print(f"Total predictions: {len(submission_df)}")
display(submission_df.head(10))

## 9. Comprehensive Error Analysis

**Goal**: Understand pipeline performance, identify failure modes, and find improvement opportunities

In [None]:
# Error Analysis on Validation Set
print("🔍 COMPREHENSIVE ERROR ANALYSIS")
print("="*80)

# Select a larger validation set for thorough analysis
error_analysis_df = train_df.sample(min(50, len(train_df)), random_state=42)

# Storage for analysis
error_results = []
detection_stats = {
    'total_samples': 0,
    'perfect_matches': 0,
    'partial_matches': 0,
    'complete_misses': 0,
    'false_positives': 0,
    'false_negatives': 0,
    'category_errors': {'added': [], 'removed': [], 'changed': []}
}

print(f"\n📊 Analyzing {len(error_analysis_df)} validation samples...")
print("This may take several minutes...\n")

for idx, row in tqdm(error_analysis_df.iterrows(), total=len(error_analysis_df), desc="Error analysis"):
    img_id = row['img_id']
    
    try:
        # Get predictions
        result = ultra_advanced_pipeline(img_id, base_vocabulary)
        
        # Ground truth (normalized)
        true_added = set(calibrator._normalize_labels(row['added_objs']))
        true_removed = set(calibrator._normalize_labels(row['removed_objs']))
        true_changed = set(calibrator._normalize_labels(row['changed_objs']))
        
        # Predictions
        pred_added = set(result['added'])
        pred_removed = set(result['removed'])
        pred_changed = set(result['changed'])
        
        # Calculate metrics per category
        def calculate_metrics(true_set, pred_set):
            tp = len(true_set & pred_set)
            fp = len(pred_set - true_set)
            fn = len(true_set - pred_set)
            
            precision = tp / (tp + fp) if (tp + fp) > 0 else 0
            recall = tp / (tp + fn) if (tp + fn) > 0 else 0
            f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
            
            return {
                'tp': tp, 'fp': fp, 'fn': fn,
                'precision': precision, 'recall': recall, 'f1': f1
            }
        
        metrics_added = calculate_metrics(true_added, pred_added)
        metrics_removed = calculate_metrics(true_removed, pred_removed)
        metrics_changed = calculate_metrics(true_changed, pred_changed)
        
        # Overall F1
        avg_f1 = (metrics_added['f1'] + metrics_removed['f1'] + metrics_changed['f1']) / 3
        
        # Classify result
        if avg_f1 == 1.0:
            result_type = 'perfect'
            detection_stats['perfect_matches'] += 1
        elif avg_f1 >= 0.5:
            result_type = 'partial'
            detection_stats['partial_matches'] += 1
        else:
            result_type = 'miss'
            detection_stats['complete_misses'] += 1
        
        # Track errors by category
        detection_stats['category_errors']['added'].append(metrics_added['f1'])
        detection_stats['category_errors']['removed'].append(metrics_removed['f1'])
        detection_stats['category_errors']['changed'].append(metrics_changed['f1'])
        
        # Store detailed error info
        error_results.append({
            'img_id': img_id,
            'result_type': result_type,
            'avg_f1': avg_f1,
            'change_score': result['change_score'],
            # Ground truth
            'true_added': true_added,
            'true_removed': true_removed,
            'true_changed': true_changed,
            # Predictions
            'pred_added': pred_added,
            'pred_removed': pred_removed,
            'pred_changed': pred_changed,
            # Metrics
            'f1_added': metrics_added['f1'],
            'f1_removed': metrics_removed['f1'],
            'f1_changed': metrics_changed['f1'],
            'tp_added': metrics_added['tp'],
            'fp_added': metrics_added['fp'],
            'fn_added': metrics_added['fn'],
            'tp_removed': metrics_removed['tp'],
            'fp_removed': metrics_removed['fp'],
            'fn_removed': metrics_removed['fn'],
            'tp_changed': metrics_changed['tp'],
            'fp_changed': metrics_changed['fp'],
            'fn_changed': metrics_changed['fn'],
            # Detection info
            'num_objects_img1': len(result['objects_img1']),
            'num_objects_img2': len(result['objects_img2']),
            'matched_pairs': result['matched_pairs']
        })
        
        detection_stats['total_samples'] += 1
        
    except Exception as e:
        print(f"Error processing {img_id}: {e}")
        continue

# Convert to DataFrame for analysis
error_df = pd.DataFrame(error_results)

print("\n" + "="*80)
print("📈 ERROR ANALYSIS RESULTS")
print("="*80)

In [None]:
# 1. Overall Performance Summary
print("\n1️⃣ OVERALL PERFORMANCE SUMMARY")
print("-"*50)

total = detection_stats['total_samples']
if total > 0:
    print(f"Total samples analyzed: {total}")
    print(f"\nPerformance breakdown:")
    print(f"  ✅ Perfect matches (F1=1.0): {detection_stats['perfect_matches']} ({100*detection_stats['perfect_matches']/total:.1f}%)")
    print(f"  ⚠️ Partial matches (F1≥0.5): {detection_stats['partial_matches']} ({100*detection_stats['partial_matches']/total:.1f}%)")
    print(f"  ❌ Complete misses (F1<0.5): {detection_stats['complete_misses']} ({100*detection_stats['complete_misses']/total:.1f}%)")
    
    print(f"\nAverage F1 scores by category:")
    for category in ['added', 'removed', 'changed']:
        scores = detection_stats['category_errors'][category]
        if scores:
            avg_f1 = np.mean(scores)
            print(f"  {category.capitalize()}: {avg_f1:.3f}")
    
    print(f"\nOverall average F1: {error_df['avg_f1'].mean():.3f}")
    print(f"Median F1: {error_df['avg_f1'].median():.3f}")
    print(f"Std deviation: {error_df['avg_f1'].std():.3f}")

In [None]:
# 2. Category-wise Error Analysis
print("\n2️⃣ CATEGORY-WISE ERROR BREAKDOWN")
print("-"*50)

categories = ['added', 'removed', 'changed']
for cat in categories:
    print(f"\n{cat.upper()} Objects:")
    tp = error_df[f'tp_{cat}'].sum()
    fp = error_df[f'fp_{cat}'].sum()
    fn = error_df[f'fn_{cat}'].sum()
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    print(f"  True Positives: {tp}")
    print(f"  False Positives: {fp}")
    print(f"  False Negatives: {fn}")
    print(f"  Precision: {precision:.3f}")
    print(f"  Recall: {recall:.3f}")
    print(f"  F1 Score: {f1:.3f}")
    
    if fp > 0:
        print(f"  ⚠️ False positive rate: {fp/(tp+fp):.1%}")
    if fn > 0:
        print(f"  ⚠️ False negative rate: {fn/(tp+fn):.1%}")

In [None]:
# 3. Detection Quality Analysis
print("\n3️⃣ OBJECT DETECTION QUALITY")
print("-"*50)

print(f"Average objects detected per image:")
print(f"  Image 1: {error_df['num_objects_img1'].mean():.1f} objects")
print(f"  Image 2: {error_df['num_objects_img2'].mean():.1f} objects")
print(f"  Average matched pairs: {error_df['matched_pairs'].mean():.1f}")

print(f"\nDetection distribution:")
print(f"  Images with 0 objects: {len(error_df[(error_df['num_objects_img1']==0) | (error_df['num_objects_img2']==0)])}")
print(f"  Images with 1-5 objects: {len(error_df[((error_df['num_objects_img1']>=1) & (error_df['num_objects_img1']<=5)) | ((error_df['num_objects_img2']>=1) & (error_df['num_objects_img2']<=5))])}")
print(f"  Images with 6+ objects: {len(error_df[(error_df['num_objects_img1']>5) | (error_df['num_objects_img2']>5)])}")

# Change score analysis
print(f"\nChange score statistics:")
print(f"  Mean: {error_df['change_score'].mean():.3f}")
print(f"  Median: {error_df['change_score'].median():.3f}")
print(f"  Min: {error_df['change_score'].min():.3f}")
print(f"  Max: {error_df['change_score'].max():.3f}")

# Correlation between change score and F1
correlation = error_df[['change_score', 'avg_f1']].corr().iloc[0, 1]
print(f"  Correlation with F1 score: {correlation:.3f}")

In [None]:
# 4. Failure Mode Analysis
print("\n4️⃣ FAILURE MODE ANALYSIS")
print("-"*50)

# Identify worst performing samples
worst_samples = error_df.nsmallest(10, 'avg_f1')
print(f"\nTop 10 worst performing samples:")
print(f"{'Img ID':<12} {'F1':<8} {'Change':<8} {'Added':<8} {'Removed':<8} {'Changed':<8}")
print("-"*60)
for _, row in worst_samples.iterrows():
    print(f"{row['img_id']:<12} {row['avg_f1']:<8.3f} {row['change_score']:<8.3f} "
          f"{row['f1_added']:<8.3f} {row['f1_removed']:<8.3f} {row['f1_changed']:<8.3f}")

# Identify common failure patterns
print("\n⚠️ Common failure patterns:")

# Pattern 1: High false positives
high_fp_added = error_df[error_df['fp_added'] >= 2]
high_fp_removed = error_df[error_df['fp_removed'] >= 2]
high_fp_changed = error_df[error_df['fp_changed'] >= 2]
print(f"  • High false positives in 'added': {len(high_fp_added)} samples")
print(f"  • High false positives in 'removed': {len(high_fp_removed)} samples")
print(f"  • High false positives in 'changed': {len(high_fp_changed)} samples")

# Pattern 2: High false negatives
high_fn_added = error_df[error_df['fn_added'] >= 2]
high_fn_removed = error_df[error_df['fn_removed'] >= 2]
high_fn_changed = error_df[error_df['fn_changed'] >= 2]
print(f"  • High false negatives in 'added': {len(high_fn_added)} samples")
print(f"  • High false negatives in 'removed': {len(high_fn_removed)} samples")
print(f"  • High false negatives in 'changed': {len(high_fn_changed)} samples")

# Pattern 3: Low change score but has changes
low_change_but_has_changes = error_df[
    (error_df['change_score'] < calibrator.best_thresholds['change_score']) &
    ((error_df['true_added'].apply(len) > 0) | 
     (error_df['true_removed'].apply(len) > 0) | 
     (error_df['true_changed'].apply(len) > 0))
]
print(f"  • Low change score but has actual changes: {len(low_change_but_has_changes)} samples")

# Pattern 4: Few objects detected
few_objects = error_df[(error_df['num_objects_img1'] <= 2) & (error_df['num_objects_img2'] <= 2)]
print(f"  • Very few objects detected (≤2): {len(few_objects)} samples")

In [None]:
# 5. Detailed Case Studies
print("\n5️⃣ DETAILED CASE STUDIES")
print("-"*50)

# Case Study 1: Perfect Match
perfect = error_df[error_df['avg_f1'] == 1.0].head(1)
if len(perfect) > 0:
    row = perfect.iloc[0]
    print(f"\n✅ PERFECT MATCH EXAMPLE: {row['img_id']}")
    print(f"   F1 Score: {row['avg_f1']:.3f}")
    print(f"   Change score: {row['change_score']:.3f}")
    print(f"   True added: {row['true_added']}")
    print(f"   Pred added: {row['pred_added']}")
    print(f"   True removed: {row['true_removed']}")
    print(f"   Pred removed: {row['pred_removed']}")
    print(f"   True changed: {row['true_changed']}")
    print(f"   Pred changed: {row['pred_changed']}")

# Case Study 2: Complete Miss
miss = error_df[error_df['result_type'] == 'miss'].head(1)
if len(miss) > 0:
    row = miss.iloc[0]
    print(f"\n❌ COMPLETE MISS EXAMPLE: {row['img_id']}")
    print(f"   F1 Score: {row['avg_f1']:.3f}")
    print(f"   Change score: {row['change_score']:.3f}")
    print(f"   True added: {row['true_added']}")
    print(f"   Pred added: {row['pred_added']}")
    print(f"   True removed: {row['true_removed']}")
    print(f"   Pred removed: {row['pred_removed']}")
    print(f"   True changed: {row['true_changed']}")
    print(f"   Pred changed: {row['pred_changed']}")
    print(f"   Objects detected: img1={row['num_objects_img1']}, img2={row['num_objects_img2']}")
    print(f"   Matched pairs: {row['matched_pairs']}")

# Case Study 3: High False Positives
high_fp = error_df[error_df['fp_added'] + error_df['fp_removed'] + error_df['fp_changed'] >= 3].head(1)
if len(high_fp) > 0:
    row = high_fp.iloc[0]
    print(f"\n⚠️ HIGH FALSE POSITIVES EXAMPLE: {row['img_id']}")
    print(f"   F1 Score: {row['avg_f1']:.3f}")
    print(f"   Total FP: {row['fp_added'] + row['fp_removed'] + row['fp_changed']}")
    print(f"   FP in added: {row['fp_added']} - Predicted: {row['pred_added'] - row['true_added']}")
    print(f"   FP in removed: {row['fp_removed']} - Predicted: {row['pred_removed'] - row['true_removed']}")
    print(f"   FP in changed: {row['fp_changed']} - Predicted: {row['pred_changed'] - row['true_changed']}")

In [None]:
# 6. Visualization of Error Distribution
print("\n6️⃣ ERROR DISTRIBUTION VISUALIZATIONS")
print("-"*50)

fig, axes = plt.subplots(2, 3, figsize=(18, 10))

# Plot 1: F1 Score Distribution
axes[0, 0].hist(error_df['avg_f1'], bins=20, edgecolor='black', alpha=0.7)
axes[0, 0].set_title('F1 Score Distribution')
axes[0, 0].set_xlabel('F1 Score')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].axvline(error_df['avg_f1'].mean(), color='red', linestyle='--', label=f'Mean: {error_df["avg_f1"].mean():.3f}')
axes[0, 0].legend()

# Plot 2: Category F1 Comparison
category_f1s = [error_df['f1_added'].mean(), error_df['f1_removed'].mean(), error_df['f1_changed'].mean()]
axes[0, 1].bar(['Added', 'Removed', 'Changed'], category_f1s, alpha=0.7, color=['green', 'red', 'blue'])
axes[0, 1].set_title('Average F1 by Category')
axes[0, 1].set_ylabel('F1 Score')
axes[0, 1].set_ylim(0, 1)
for i, v in enumerate(category_f1s):
    axes[0, 1].text(i, v + 0.02, f'{v:.3f}', ha='center')

# Plot 3: Change Score vs F1
axes[0, 2].scatter(error_df['change_score'], error_df['avg_f1'], alpha=0.5)
axes[0, 2].set_title('Change Score vs F1')
axes[0, 2].set_xlabel('Change Score')
axes[0, 2].set_ylabel('F1 Score')
axes[0, 2].axhline(y=0.5, color='r', linestyle='--', alpha=0.3, label='F1=0.5')
axes[0, 2].axvline(x=calibrator.best_thresholds['change_score'], color='g', linestyle='--', alpha=0.3, label='Threshold')
axes[0, 2].legend()

# Plot 4: Objects Detected Distribution
axes[1, 0].hist(error_df['num_objects_img1'], bins=15, alpha=0.5, label='Image 1', edgecolor='black')
axes[1, 0].hist(error_df['num_objects_img2'], bins=15, alpha=0.5, label='Image 2', edgecolor='black')
axes[1, 0].set_title('Objects Detected Distribution')
axes[1, 0].set_xlabel('Number of Objects')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].legend()

# Plot 5: Error Type Breakdown
error_types = ['TP', 'FP', 'FN']
for cat_idx, cat in enumerate(['added', 'removed', 'changed']):
    tp = error_df[f'tp_{cat}'].sum()
    fp = error_df[f'fp_{cat}'].sum()
    fn = error_df[f'fn_{cat}'].sum()
    
    axes[1, 1].bar([f'{cat}\n{et}' for et in error_types], [tp, fp, fn], alpha=0.7)

axes[1, 1].set_title('Error Type Breakdown by Category')
axes[1, 1].set_ylabel('Count')
axes[1, 1].tick_params(axis='x', rotation=45)

# Plot 6: Precision-Recall by Category
precisions = []
recalls = []
for cat in ['added', 'removed', 'changed']:
    tp = error_df[f'tp_{cat}'].sum()
    fp = error_df[f'fp_{cat}'].sum()
    fn = error_df[f'fn_{cat}'].sum()
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    precisions.append(precision)
    recalls.append(recall)

x = np.arange(len(['Added', 'Removed', 'Changed']))
width = 0.35
axes[1, 2].bar(x - width/2, precisions, width, label='Precision', alpha=0.7)
axes[1, 2].bar(x + width/2, recalls, width, label='Recall', alpha=0.7)
axes[1, 2].set_title('Precision vs Recall by Category')
axes[1, 2].set_ylabel('Score')
axes[1, 2].set_xticks(x)
axes[1, 2].set_xticklabels(['Added', 'Removed', 'Changed'])
axes[1, 2].set_ylim(0, 1)
axes[1, 2].legend()

plt.tight_layout()
plt.show()

print("✅ Visualizations complete!")

In [None]:
# 7. Key Insights and Recommendations
print("\n7️⃣ KEY INSIGHTS & RECOMMENDATIONS")
print("="*80)

# Calculate insights
avg_f1 = error_df['avg_f1'].mean()
best_category = max(['added', 'removed', 'changed'], 
                   key=lambda x: error_df[f'f1_{x}'].mean())
worst_category = min(['added', 'removed', 'changed'], 
                    key=lambda x: error_df[f'f1_{x}'].mean())

total_fp = error_df['fp_added'].sum() + error_df['fp_removed'].sum() + error_df['fp_changed'].sum()
total_fn = error_df['fn_added'].sum() + error_df['fn_removed'].sum() + error_df['fn_changed'].sum()

print("\n📊 PERFORMANCE SUMMARY:")
print(f"  • Overall F1 Score: {avg_f1:.3f}")
print(f"  • Best performing category: {best_category} (F1={error_df[f'f1_{best_category}'].mean():.3f})")
print(f"  • Worst performing category: {worst_category} (F1={error_df[f'f1_{worst_category}'].mean():.3f})")
print(f"  • Total False Positives: {total_fp}")
print(f"  • Total False Negatives: {total_fn}")

if total_fp > total_fn:
    print(f"  ⚠️ Model is over-predicting (more FP than FN)")
elif total_fn > total_fp:
    print(f"  ⚠️ Model is under-predicting (more FN than FP)")
else:
    print(f"  ✅ Model is well-balanced (FP ≈ FN)")

print("\n🔍 KEY INSIGHTS:")

# Insight 1: Detection quality
avg_objects = (error_df['num_objects_img1'].mean() + error_df['num_objects_img2'].mean()) / 2
if avg_objects < 3:
    print(f"  1. Low object detection rate (avg {avg_objects:.1f} objects/image)")
    print(f"     → May need to lower detection threshold or use more aggressive prompts")
elif avg_objects > 10:
    print(f"  1. High object detection rate (avg {avg_objects:.1f} objects/image)")
    print(f"     → May need to raise detection threshold to reduce false positives")
else:
    print(f"  1. Reasonable object detection rate (avg {avg_objects:.1f} objects/image)")

# Insight 2: Change detection
if correlation < 0.3:
    print(f"  2. Weak correlation between change score and F1 ({correlation:.3f})")
    print(f"     → ChangeFormer may need fine-tuning or threshold adjustment")
elif correlation > 0.6:
    print(f"  2. Strong correlation between change score and F1 ({correlation:.3f})")
    print(f"     → ChangeFormer is working well")
else:
    print(f"  2. Moderate correlation between change score and F1 ({correlation:.3f})")

# Insight 3: Category imbalance
category_std = np.std([error_df[f'f1_{cat}'].mean() for cat in ['added', 'removed', 'changed']])
if category_std > 0.2:
    print(f"  3. High variance across categories (std={category_std:.3f})")
    print(f"     → Consider category-specific thresholds or separate models")
else:
    print(f"  3. Consistent performance across categories (std={category_std:.3f})")

print("\n💡 RECOMMENDATIONS FOR IMPROVEMENT:")

recommendations = []

# Recommendation based on FP/FN ratio
if total_fp > total_fn * 1.5:
    recommendations.append("  1. REDUCE FALSE POSITIVES:")
    recommendations.append("     • Increase detection confidence threshold")
    recommendations.append("     • Increase IoU matching threshold")
    recommendations.append("     • Use stricter NMS (lower IoU threshold)")
elif total_fn > total_fp * 1.5:
    recommendations.append("  1. REDUCE FALSE NEGATIVES:")
    recommendations.append("     • Lower detection confidence threshold")
    recommendations.append("     • Add more vocabulary terms/synonyms")
    recommendations.append("     • Try different detection models")

# Recommendation based on change score
if len(low_change_but_has_changes) > len(error_analysis_df) * 0.2:
    recommendations.append("  2. IMPROVE CHANGE DETECTION:")
    recommendations.append("     • Lower change score threshold")
    recommendations.append("     • Fine-tune ChangeFormer on your specific data")
    recommendations.append("     • Use ensemble of change detection models")

# Recommendation based on detection quality
if avg_objects < 3:
    recommendations.append("  3. IMPROVE OBJECT DETECTION:")
    recommendations.append("     • Use larger detection models (e.g., owlvit-large)")
    recommendations.append("     • Increase image enhancement strength")
    recommendations.append("     • Add more detection models to ensemble")

# Recommendation based on worst category
if error_df[f'f1_{worst_category}'].mean() < 0.5:
    recommendations.append(f"  4. FOCUS ON '{worst_category.upper()}' CATEGORY:")
    recommendations.append(f"     • Category-specific threshold tuning")
    recommendations.append(f"     • Augment training data for {worst_category} scenarios")
    recommendations.append(f"     • Review vocabulary coverage for {worst_category}")

for rec in recommendations:
    print(rec)

print("\n✅ Error analysis complete!")
print("="*80)

In [None]:
# Save error analysis results
error_analysis_path = 'error_analysis_results.csv'
error_df.to_csv(error_analysis_path, index=False)
print(f"\n💾 Error analysis saved to: {error_analysis_path}")

# Save detailed report
report_path = 'error_analysis_report.txt'
with open(report_path, 'w') as f:
    f.write("="*80 + "\n")
    f.write("ERROR ANALYSIS REPORT\n")
    f.write("="*80 + "\n\n")
    
    f.write(f"Total samples: {total}\n")
    f.write(f"Overall F1: {avg_f1:.3f}\n")
    f.write(f"Perfect matches: {detection_stats['perfect_matches']} ({100*detection_stats['perfect_matches']/total:.1f}%)\n")
    f.write(f"Partial matches: {detection_stats['partial_matches']} ({100*detection_stats['partial_matches']/total:.1f}%)\n")
    f.write(f"Complete misses: {detection_stats['complete_misses']} ({100*detection_stats['complete_misses']/total:.1f}%)\n\n")
    
    f.write("Category Performance:\n")
    for cat in ['added', 'removed', 'changed']:
        f.write(f"  {cat}: F1={error_df[f'f1_{cat}'].mean():.3f}\n")
    
    f.write(f"\nTotal False Positives: {total_fp}\n")
    f.write(f"Total False Negatives: {total_fn}\n")

print(f"📄 Detailed report saved to: {report_path}")