# Drone Detection - Results Analysis

Evaluate trained models on the test set and generate comparison plots.

## Configuration

In [None]:
import sys
import tempfile
from pathlib import Path

import cv2
import yaml
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from ultralytics import YOLO, RTDETR

# Paths
PROJECT_ROOT = Path('..').resolve()
ARTIFACTS_DIR = PROJECT_ROOT / 'artifacts'
DATA_DIR = PROJECT_ROOT / 'data' / 'dataset'
RESULTS_DIR = PROJECT_ROOT / 'results'

RESULTS_DIR.mkdir(parents=True, exist_ok=True)
(RESULTS_DIR / 'error_analysis').mkdir(exist_ok=True)

# Error analysis settings
N_ERROR_EXAMPLES = 5
IOU_THRESHOLD = 0.5

# Plot styling
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 11
plt.rcParams['axes.titlesize'] = 14
plt.rcParams['axes.labelsize'] = 12

MODEL_COLORS = {
    'yolo11l': '#2ecc71',
    'rtdetr-l': '#e74c3c',
}

MODEL_LABELS = {
    'yolo11l': 'YOLO11-L (CNN)',
    'rtdetr-l': 'RT-DETR-L (Attention)',
}

MODEL_MARKERS = {
    'yolo11l': 'o',
    'rtdetr-l': 's',
}

print(f"Artifacts: {ARTIFACTS_DIR}")
print(f"Results:   {RESULTS_DIR}")
print(f"PyTorch:   {torch.__version__}")

## Discover Artifacts

In [None]:
def discover_artifacts(artifacts_dir):
    runs = []
    
    for run_dir in artifacts_dir.iterdir():
        if not run_dir.is_dir() or run_dir.name.startswith('.'):
            continue
        
        parts = run_dir.name.rsplit('_', 1)
        if len(parts) != 2:
            continue
        
        model_name = parts[0]
        try:
            dataset_size = int(parts[1])
        except ValueError:
            continue
        
        train_dir = run_dir / 'train'
        if not train_dir.exists():
            continue
        
        results_csv = train_dir / 'results.csv'
        
        # Collect checkpoints
        weights_dir = train_dir / 'weights'
        checkpoints = []
        if weights_dir.exists():
            for pt_file in weights_dir.glob('*.pt'):
                if pt_file.stem.startswith('epoch'):
                    try:
                        epoch = int(pt_file.stem.replace('epoch', ''))
                        checkpoints.append({'epoch': epoch, 'path': pt_file})
                    except ValueError:
                        pass
                elif pt_file.stem == 'best':
                    checkpoints.append({'epoch': 'best', 'path': pt_file})
        
        runs.append({
            'model': model_name,
            'dataset_size': dataset_size,
            'run_dir': train_dir,
            'results_csv': results_csv if results_csv.exists() else None,
            'checkpoints': checkpoints,
        })
    
    df = pd.DataFrame(runs)
    if len(df) > 0:
        df = df.sort_values(['model', 'dataset_size']).reset_index(drop=True)
    return df


artifacts_df = discover_artifacts(ARTIFACTS_DIR)

if len(artifacts_df) == 0:
    raise FileNotFoundError(f"No artifacts found in {ARTIFACTS_DIR}. Run notebook 01 first.")

print(f"Found {len(artifacts_df)} training runs:\n")
for _, row in artifacts_df.iterrows():
    n_ckpts = len(row['checkpoints'])
    has_csv = "yes" if row['results_csv'] else "no"
    print(f"  {row['model']:12} | {row['dataset_size']:5} images | {n_ckpts} checkpoints | CSV: {has_csv}")

## Training Metrics

In [None]:
def load_training_metrics(artifacts_df):
    all_metrics = []
    
    for _, row in artifacts_df.iterrows():
        if row['results_csv'] is None:
            continue
        df = pd.read_csv(row['results_csv'])
        df.columns = df.columns.str.strip()  # ultralytics adds spaces
        df['model'] = row['model']
        df['dataset_size'] = row['dataset_size']
        all_metrics.append(df)
    
    if not all_metrics:
        return pd.DataFrame()
    return pd.concat(all_metrics, ignore_index=True)


training_metrics = load_training_metrics(artifacts_df)

if len(training_metrics) == 0:
    raise ValueError("No training metrics found")

# column names vary across ultralytics versions
map50_cols = [c for c in training_metrics.columns if 'mAP50' in c and '95' not in c]
map50_95_cols = [c for c in training_metrics.columns if 'mAP50-95' in c]

MAP50_COL = map50_cols[0] if map50_cols else None
MAP50_95_COL = map50_95_cols[0] if map50_95_cols else None

print(f"Loaded {len(training_metrics)} rows")
print(f"mAP50 column:    {MAP50_COL}")
print(f"mAP50-95 column: {MAP50_95_COL}")

training_metrics.to_csv(RESULTS_DIR / 'training_metrics.csv', index=False)

## Convergence Curves

In [None]:
def plot_convergence_curves(metrics_df, map_col, output_path):
    dataset_sizes = sorted(metrics_df['dataset_size'].unique())
    n_sizes = len(dataset_sizes)
    
    if n_sizes <= 3:
        n_rows, n_cols = 1, n_sizes
    else:
        n_rows = 2
        n_cols = (n_sizes + 1) // 2
    
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(5 * n_cols, 5 * n_rows), sharey=True)
    axes = np.array(axes).flatten()
    
    for idx, dataset_size in enumerate(dataset_sizes):
        ax = axes[idx]
        subset = metrics_df[metrics_df['dataset_size'] == dataset_size]
        
        for model in sorted(subset['model'].unique()):
            model_data = subset[subset['model'] == model].sort_values('epoch')
            ax.plot(
                model_data['epoch'], model_data[map_col],
                color=MODEL_COLORS.get(model, 'gray'),
                marker=MODEL_MARKERS.get(model, 'o'),
                markersize=4, linewidth=2,
                label=MODEL_LABELS.get(model, model),
            )
        
        ax.set_xlabel('Epoch')
        ax.set_title(f'{dataset_size} Training Images')
        ax.legend(loc='upper left', fontsize=9)
        ax.set_ylim(0, 1)
    
    for idx in range(n_sizes, len(axes)):
        axes[idx].set_visible(False)
    
    for idx in range(0, len(axes), n_cols):
        if idx < n_sizes:
            axes[idx].set_ylabel('Validation mAP50-95')
    
    plt.suptitle('Convergence Curves: CNN vs Attention', fontsize=16, y=1.02)
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    plt.show()
    print(f"Saved to {output_path}")


if MAP50_95_COL:
    plot_convergence_curves(training_metrics, MAP50_95_COL, RESULTS_DIR / 'convergence_curves.png')

## Test Set Evaluation

In [None]:
def evaluate_checkpoint(checkpoint_path, data_dir, model_type):
    if 'rtdetr' in model_type.lower():
        model = RTDETR(checkpoint_path)
    else:
        model = YOLO(checkpoint_path)
    
    # need a temp yaml because ultralytics wants a file path
    data_yaml = {
        'train': str(data_dir / 'train' / 'images'),
        'val': str(data_dir / 'valid' / 'images'),
        'test': str(data_dir / 'test' / 'images'),
        'nc': 1,
        'names': ['drone']
    }
    
    with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
        yaml.dump(data_yaml, f)
        temp_yaml = f.name
    
    results = model.val(data=temp_yaml, split='test', verbose=False)
    Path(temp_yaml).unlink()
    
    return {
        'mAP50': results.results_dict.get('metrics/mAP50(B)', 0),
        'mAP50-95': results.results_dict.get('metrics/mAP50-95(B)', 0),
        'precision': results.results_dict.get('metrics/precision(B)', 0),
        'recall': results.results_dict.get('metrics/recall(B)', 0),
    }

In [None]:
best_ckpts = []
for _, row in artifacts_df.iterrows():
    for ckpt in row['checkpoints']:
        if ckpt['epoch'] == 'best':
            best_ckpts.append({'model': row['model'], 'dataset_size': row['dataset_size'], 'path': ckpt['path']})

print(f"Evaluating {len(best_ckpts)} checkpoints...\n")

test_results = []
for ckpt in tqdm(best_ckpts):
    metrics = evaluate_checkpoint(ckpt['path'], DATA_DIR, ckpt['model'])
    test_results.append({'model': ckpt['model'], 'dataset_size': ckpt['dataset_size'], 'epoch': 'best', 'checkpoint': str(ckpt['path']), **metrics})

test_results_df = pd.DataFrame(test_results)
test_results_df.to_csv(RESULTS_DIR / 'test_evaluation.csv', index=False)
print(f"\nEvaluated {len(test_results_df)} checkpoints")

## Data Efficiency

In [None]:
def plot_data_efficiency(results_df, output_path):
    final_df = results_df[results_df['epoch'] == 'best'].copy()
    if len(final_df) == 0:
        max_epoch = results_df['epoch'].max()
        final_df = results_df[results_df['epoch'] == max_epoch].copy()
    
    dataset_sizes = sorted(final_df['dataset_size'].unique())
    models = sorted(final_df['model'].unique())
    
    x = np.arange(len(dataset_sizes))
    width = 0.35
    
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    
    for ax, metric, title in zip(axes, ['mAP50', 'mAP50-95'], ['mAP50', 'mAP50-95']):
        for i, model in enumerate(models):
            model_data = final_df[final_df['model'] == model].set_index('dataset_size')
            values = [model_data.loc[ds, metric] if ds in model_data.index else 0 for ds in dataset_sizes]
            
            offset = width * (i - len(models)/2 + 0.5)
            bars = ax.bar(
                x + offset, values, width,
                label=MODEL_LABELS.get(model, model),
                color=MODEL_COLORS.get(model, 'gray'),
            )
            
            for bar, val in zip(bars, values):
                ax.annotate(f'{val:.2f}',
                    xy=(bar.get_x() + bar.get_width()/2, bar.get_height()),
                    ha='center', va='bottom', fontsize=8, rotation=45)
        
        ax.set_xlabel('Training Dataset Size')
        ax.set_ylabel(title)
        ax.set_title(f'Test Set {title}')
        ax.set_xticks(x)
        ax.set_xticklabels(dataset_sizes)
        ax.legend()
        ax.set_ylim(0, 1.15)
    
    plt.suptitle('Data Efficiency: CNN vs Attention', fontsize=16, y=1.02)
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    plt.show()
    print(f"Saved to {output_path}")


if len(test_results_df) > 0:
    plot_data_efficiency(test_results_df, RESULTS_DIR / 'data_efficiency.png')

## Error Analysis

In [None]:
def compute_iou(box1, box2):
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    inter_area = max(0, x2 - x1) * max(0, y2 - y1)
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union_area = box1_area + box2_area - inter_area
    
    return inter_area / union_area if union_area > 0 else 0


def load_ground_truth(label_path, img_width, img_height):
    """YOLO format: class x_center y_center width height (normalized)"""
    boxes = []
    if not label_path.exists():
        return boxes
    
    with open(label_path) as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 5:
                _, xc, yc, w, h = map(float, parts[:5])
                # convert to pixel coords
                x1 = (xc - w/2) * img_width
                y1 = (yc - h/2) * img_height
                x2 = (xc + w/2) * img_width
                y2 = (yc + h/2) * img_height
                boxes.append([x1, y1, x2, y2])
    
    return boxes


def check_detection_correct(pred_boxes, gt_boxes, iou_thresh=0.5):
    """True if every GT box has a matching prediction"""
    if len(gt_boxes) == 0:
        return len(pred_boxes) == 0
    if len(pred_boxes) == 0:
        return False
    
    for gt in gt_boxes:
        matched = any(compute_iou(pred, gt) >= iou_thresh for pred in pred_boxes)
        if not matched:
            return False
    return True

In [None]:
def visualize_error_examples(examples, title, output_path):
    if not examples:
        print(f"No examples for: {title}")
        return
    
    n = len(examples)
    fig, axes = plt.subplots(1, n, figsize=(5 * n, 5))
    if n == 1:
        axes = [axes]
    
    for ax, ex in zip(axes, examples):
        img = cv2.imread(str(ex['image_path']))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        ax.imshow(img)
        
        for box in ex['gt_boxes']:
            rect = plt.Rectangle((box[0], box[1]), box[2]-box[0], box[3]-box[1],
                fill=False, edgecolor='blue', linewidth=2)
            ax.add_patch(rect)
        
        for i, box in enumerate(ex['yolo_boxes']):
            conf = ex['yolo_conf'][i] if i < len(ex['yolo_conf']) else 0
            rect = plt.Rectangle((box[0], box[1]), box[2]-box[0], box[3]-box[1],
                fill=False, edgecolor='green', linewidth=2, linestyle='--')
            ax.add_patch(rect)
            ax.text(box[0], box[1]-5, f'YOLO:{conf:.2f}', color='green', fontsize=8)
        
        for i, box in enumerate(ex['rtdetr_boxes']):
            conf = ex['rtdetr_conf'][i] if i < len(ex['rtdetr_conf']) else 0
            rect = plt.Rectangle((box[0], box[1]), box[2]-box[0], box[3]-box[1],
                fill=False, edgecolor='red', linewidth=2, linestyle=':')
            ax.add_patch(rect)
            ax.text(box[2], box[1]-5, f'DETR:{conf:.2f}', color='red', fontsize=8)
        
        ax.set_title(ex['image_path'].name, fontsize=9)
        ax.axis('off')
    
    from matplotlib.patches import Patch
    legend_elements = [
        Patch(facecolor='none', edgecolor='blue', label='Ground Truth'),
        Patch(facecolor='none', edgecolor='green', linestyle='--', label='YOLO'),
        Patch(facecolor='none', edgecolor='red', linestyle=':', label='RT-DETR'),
    ]
    fig.legend(handles=legend_elements, loc='upper center', ncol=3, bbox_to_anchor=(0.5, 1.05))
    
    plt.suptitle(title, fontsize=14, y=1.1)
    plt.tight_layout()
    plt.savefig(output_path, dpi=150, bbox_inches='tight')
    plt.show()
    print(f"Saved to {output_path}")

In [None]:
def run_error_analysis(yolo_ckpt, rtdetr_ckpt, data_dir, output_dir, n_examples=5, iou_thresh=0.5):
    yolo_model = YOLO(yolo_ckpt)
    rtdetr_model = RTDETR(rtdetr_ckpt)
    
    test_images_dir = data_dir / 'test' / 'images'
    test_labels_dir = data_dir / 'test' / 'labels'
    
    test_images = list(test_images_dir.glob('*.jpg'))
    
    print(f"Running inference on {len(test_images)} test images...")
    
    yolo_wins = []    # yolo correct, rtdetr wrong
    rtdetr_wins = []  # rtdetr correct, yolo wrong
    both_fail = []
    both_succeed = []
    
    for img_path in tqdm(test_images):
        img = cv2.imread(str(img_path))
        if img is None:
            continue
        h, w = img.shape[:2]
        
        label_path = test_labels_dir / (img_path.stem + '.txt')
        gt_boxes = load_ground_truth(label_path, w, h)
        
        yolo_res = yolo_model.predict(img_path, verbose=False)[0]
        rtdetr_res = rtdetr_model.predict(img_path, verbose=False)[0]
        
        yolo_boxes = yolo_res.boxes.xyxy.cpu().numpy().tolist() if len(yolo_res.boxes) > 0 else []
        rtdetr_boxes = rtdetr_res.boxes.xyxy.cpu().numpy().tolist() if len(rtdetr_res.boxes) > 0 else []
        
        yolo_ok = check_detection_correct(yolo_boxes, gt_boxes, iou_thresh)
        rtdetr_ok = check_detection_correct(rtdetr_boxes, gt_boxes, iou_thresh)
        
        result = {
            'image_path': img_path,
            'gt_boxes': gt_boxes,
            'yolo_boxes': yolo_boxes,
            'rtdetr_boxes': rtdetr_boxes,
            'yolo_conf': yolo_res.boxes.conf.cpu().numpy().tolist() if len(yolo_res.boxes) > 0 else [],
            'rtdetr_conf': rtdetr_res.boxes.conf.cpu().numpy().tolist() if len(rtdetr_res.boxes) > 0 else [],
        }
        
        if yolo_ok and not rtdetr_ok:
            yolo_wins.append(result)
        elif rtdetr_ok and not yolo_ok:
            rtdetr_wins.append(result)
        elif not yolo_ok and not rtdetr_ok:
            both_fail.append(result)
        else:
            both_succeed.append(result)
    
    print(f"\nYOLO wins: {len(yolo_wins)} | RT-DETR wins: {len(rtdetr_wins)} | Both fail: {len(both_fail)} | Both succeed: {len(both_succeed)}")
    
    return {
        'yolo_wins': yolo_wins[:n_examples],
        'rtdetr_wins': rtdetr_wins[:n_examples],
        'both_fail': both_fail[:n_examples],
    }

In [None]:
# compare models on largest dataset only
print("Error Analysis\n")

max_size = artifacts_df['dataset_size'].max()
yolo_best = None
rtdetr_best = None

for _, row in artifacts_df.iterrows():
    if row['dataset_size'] != max_size:
        continue
    for ckpt in row['checkpoints']:
        if ckpt['epoch'] == 'best':
            if row['model'] == 'yolo11l':
                yolo_best = ckpt['path']
            elif row['model'] == 'rtdetr-l':
                rtdetr_best = ckpt['path']

if yolo_best and rtdetr_best:
    print(f"YOLO:    {yolo_best}")
    print(f"RT-DETR: {rtdetr_best}\n")

    error_results = run_error_analysis(
        yolo_ckpt=yolo_best,
        rtdetr_ckpt=rtdetr_best,
        data_dir=DATA_DIR,
        output_dir=RESULTS_DIR / 'error_analysis',
        n_examples=N_ERROR_EXAMPLES,
        iou_thresh=IOU_THRESHOLD,
    )
else:
    print("Missing best.pt checkpoints for error analysis")
    error_results = None

In [None]:
# Visualize error examples
if error_results:
    if error_results['yolo_wins']:
        visualize_error_examples(
            error_results['yolo_wins'],
            'YOLO Correct, RT-DETR Wrong',
            RESULTS_DIR / 'error_analysis' / 'yolo_wins.png'
        )
    
    if error_results['rtdetr_wins']:
        visualize_error_examples(
            error_results['rtdetr_wins'],
            'RT-DETR Correct, YOLO Wrong',
            RESULTS_DIR / 'error_analysis' / 'rtdetr_wins.png'
        )
    
    if error_results['both_fail']:
        visualize_error_examples(
            error_results['both_fail'],
            'Both Models Failed',
            RESULTS_DIR / 'error_analysis' / 'both_fail.png'
        )

## Output

In [None]:
# List generated files
for f in sorted(RESULTS_DIR.rglob('*')):
    if f.is_file():
        size_kb = f.stat().st_size / 1024
        print(f"  {f.relative_to(RESULTS_DIR)} ({size_kb:.1f} KB)")