In [None]:
# --- Import Required Libraries ---

# File and data handling
import os
import json
import random
import shutil
from collections import defaultdict
from pathlib import Path

# Data processing and visualization
from sklearn.model_selection import train_test_split
import yaml

os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"

# Deep learning
import torch
from ultralytics import YOLO
from tqdm.notebook import tqdm  

if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
print(f"Using device: {device}")


In [None]:
NOTEBOOK_DIR = os.getcwd()

if not NOTEBOOK_DIR.endswith("YOLO_Ultralytics"):
    raise ValueError("Please set the working directory to 'YOLO_Ultralytics' folder. Currently it is set to: " + NOTEBOOK_DIR)

BASE_DIR = os.path.join(NOTEBOOK_DIR, "..", "..", "..")

In [None]:
# --- 1. Configuration Section ---

# Paths to your data and model files
JSON_DIR = os.path.join(BASE_DIR, 'data', 'MangaSegmentation/jsons_processed') 
IMAGE_ROOT_DIR = os.path.join(BASE_DIR, 'data', 'Manga109_released_2023_12_07','images')  
DATASET_DIR = os.path.join(BASE_DIR, 'data', 'YOLO_data')

EPOCHS = 1  # Number of training epochs: Used 20, set to 1 for easy testing
IMAGE_SIZE = 160  # Image size for training: Used 640, set to 160 for easy testing
BATCH_SIZE = 2  # Batch size for training: Used 8 
yaml_path = Path(DATASET_DIR) / 'dataset.yaml'

# Validate paths
print("\nValidating directories...")
for path in [JSON_DIR, IMAGE_ROOT_DIR]:
    if not os.path.exists(path):
        raise FileNotFoundError(f"Directory not found: {path}")
    else:
        print(f"Found directory: {path}")
        # List some contents
        contents = os.listdir(path)[:5]
        print(f"Sample contents: {contents}")

# Create dataset directories
print("\nCreating dataset directories...")
for split in ['train', 'val']:
    for subdir in ['images', 'labels']:  
        dir_path = os.path.join(DATASET_DIR, f'{subdir}/{split}')
        os.makedirs(dir_path, exist_ok=True)
        print(f"Created: {dir_path}")

# Set category information directly
TARGET_CATEGORY_ID = 5  # Fixed category ID for balloon
TARGET_CATEGORY_NAME = "balloon"  # Fixed category name

print("Target Category Configuration:")
print(f"Category ID: {TARGET_CATEGORY_ID}")
print(f"Category Name: {TARGET_CATEGORY_NAME}")

In [None]:
# ===================================================================
#  Model Training 
# ===================================================================
print("\n--- 5. Initializing and Training YOLOv11 Model ---")

PROJECT_DIR = os.path.join(BASE_DIR, 'models', 'bubble-detection', 'YOLOv11s_Training_Results')

# 1. Load a pretrained YOLOv11 segmentation model
model = YOLO('yolo11s-seg.pt')

# 2. Train the model with essential parameters
print("\nStarting model training...")
results = model.train(
    data=str(yaml_path),
    epochs=EPOCHS,
    imgsz=IMAGE_SIZE,
    batch=BATCH_SIZE,
    project=PROJECT_DIR,
    name='balloon_segmentation_run1',
    exist_ok=True,
)

print("\n--- Training Finished ---")
print(f"All results, logs, and plots have been saved to: {model.trainer.save_dir}")
print(f"The best performing model is saved as: {model.trainer.best}")

In [None]:
# ===================================================================
# Automatic and Comprehensive Evaluation
# ===================================================================
print("\n--- 6. Evaluating Final Model Performance ---")

# 1. Load the best model that was saved during training
path_to_best_model = model.trainer.best
if not os.path.exists(path_to_best_model):
    raise FileNotFoundError(f"Could not find the best model at: {path_to_best_model}")

print(f"Loading best model from: {path_to_best_model}")
best_model = YOLO(path_to_best_model)

# 2. Run validation on the 'val' split to get the metrics object
print("\nRunning final validation on the test set...")
metrics = best_model.val(
    split='val',
    project=PROJECT_DIR,
    name='balloon_segmentation_run1',
    exist_ok=True,
)

# 3. Automatically discover, group, and print all available metrics
print("\n" + "#"*60)
print("--- Final Comprehensive Evaluation Report (All Metrics) ---")
print("#"*60)
print(f"\nValidation results saved to: {metrics.save_dir}\n")

# Dictionaries to hold the grouped metrics
box_metrics = {}
mask_metrics = {}
other_metrics = {}

# Iterate through all key-value pairs in the results dictionary
for key, value in metrics.results_dict.items():
    # Clean the key by removing the 'metrics/' prefix
    clean_key = key.replace('metrics/', '').strip()
    
    # Sort keys into their respective groups
    if '(B)' in clean_key:
        final_key = clean_key.replace('(B)', '').strip()
        box_metrics[final_key] = value
    elif '(M)' in clean_key:
        final_key = clean_key.replace('(M)', '').strip()
        mask_metrics[final_key] = value
    else:
        other_metrics[clean_key] = value

# --- Function to print a dictionary of metrics neatly ---
def print_metric_group(title, metric_dict):
    print(f"\n--- {title} ---")
    if not metric_dict:
        print("     (No metrics found for this group)")
        return
    # Sort keys for consistent ordering
    for key in sorted(metric_dict.keys()):
        value = metric_dict[key]
        # Use a fixed width for the key for nice alignment
        print(f"     - {key:<15}: {value:.4f}")

# --- Print each group of metrics ---
print_metric_group("Bounding Box Detection Performance", box_metrics)
print_metric_group("Instance Segmentation Performance", mask_metrics)
print_metric_group("Other Metrics (e.g., Losses)", other_metrics)

print("\n" + "#"*60)

In [None]:
# ===================================================================
# Export All Validation Predictions
# ===================================================================
print("\n--- 7. Exporting All Predictions from the Validation Set ---")

# 1. Load the best model again
# The path is available from the previous cell: model.trainer.best
path_to_best_model = model.trainer.best
if not os.path.exists(path_to_best_model):
    raise FileNotFoundError(f"Could not find the best model at: {path_to_best_model}")

print(f"Loading best model from: {path_to_best_model}")
best_model = YOLO(path_to_best_model)

# 2. Define the path to the validation images
validation_images_path = os.path.join(DATASET_DIR, 'images/val')
image_files = [os.path.join(validation_images_path, f) for f in os.listdir(validation_images_path) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
print(f"Found {len(image_files)} images to predict in: {validation_images_path}")

# 3. Loop through each image and predict one by one to avoid OOM
output_project = PROJECT_DIR
output_name = 'all_validation_predictions'

print(f"Starting prediction... Results will be saved in '{output_project}/{output_name}'")
for image_path in tqdm(image_files, desc="Processing images"):
    # Predict on a SINGLE image.
    best_model.predict(
        source=image_path,
        project=output_project,
        name=output_name,
        exist_ok=True,  # This is important to ensure all results save to the SAME folder
        save=True       # Save the image with predictions
    )

print("\n--- Prediction Export Finished ---")
print(f"All prediction images have been saved to the '{output_project}/{output_name}' directory.")

In [None]:
# ===================================================================
# Advanced Evaluation: Calculating Boundary-AP
# ===================================================================
import cv2
import numpy as np
from sklearn.metrics import average_precision_score

print("\n--- 8. Advanced: Calculating Boundary-AP ---")

# --- Helper Functions ---

def load_gt_polygons(label_path, img_height, img_width):
    """Loads ground-truth polygons from a YOLO .txt file."""
    polygons = []
    if not os.path.exists(label_path):
        return polygons
    with open(label_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            # Denormalize polygon
            poly = np.array([float(p) for p in parts[1:]]).reshape(-1, 2)
            poly[:, 0] *= img_width
            poly[:, 1] *= img_height
            polygons.append(poly.astype(np.int32))
    return polygons

def polygons_to_mask(polygons, height, width):
    """Converts a list of polygons to a binary mask."""
    mask = np.zeros((height, width), dtype=np.uint8)
    cv2.fillPoly(mask, pts=polygons, color=255)
    return mask

def calculate_area_iou(gt_polygons, pred_polygon, height, width):
    """Calculates Area IoU between a predicted polygon and all GT polygons."""
    pred_mask = polygons_to_mask([pred_polygon], height, width)
    gt_mask = polygons_to_mask(gt_polygons, height, width)
    
    intersection = np.logical_and(pred_mask, gt_mask).sum()
    union = np.logical_or(pred_mask, gt_mask).sum()
    
    return intersection / union if union > 0 else 0

def calculate_boundary_iou(gt_poly, pred_poly, height, width, thickness=2):
    """Calculates Boundary IoU for a single pair of polygons."""
    gt_boundary_mask = np.zeros((height, width), dtype=np.uint8)
    pred_boundary_mask = np.zeros((height, width), dtype=np.uint8)
    
    cv2.polylines(gt_boundary_mask, [gt_poly], isClosed=True, color=255, thickness=thickness)
    cv2.polylines(pred_boundary_mask, [pred_poly], isClosed=True, color=255, thickness=thickness)
    
    intersection = np.logical_and(gt_boundary_mask, pred_boundary_mask).sum()
    union = np.logical_or(gt_boundary_mask, pred_boundary_mask).sum()
    
    return intersection / union if union > 0 else 0

# --- Main Evaluation Logic ---

# 1. Load the best model
path_to_best_model = model.trainer.best
best_model = YOLO(path_to_best_model)

# 2. Get paths for validation images and labels
val_img_dir = os.path.join(DATASET_DIR, 'images/val')
val_label_dir = os.path.join(DATASET_DIR, 'labels/val')
val_image_files = [os.path.join(val_img_dir, f) for f in os.listdir(val_img_dir)]

# 3. Run prediction to get results object
print(f"Running predictions on {len(val_image_files)} validation images to get polygon data...")
results = best_model.predict(source=val_img_dir, stream=True) # Use stream for memory efficiency

all_predictions = [] # Store tuples of (confidence, is_tp)

for result in tqdm(results, total=len(val_image_files), desc="Evaluating Boundary Quality"):
    img_path = Path(result.path)
    h, w = result.orig_shape
    
    # Load corresponding ground truth
    label_path = os.path.join(val_label_dir, img_path.stem + '.txt')
    gt_polygons = load_gt_polygons(label_path, h, w)
    
    # Get predictions for this image
    pred_polygons_normalized = result.masks.xyn if result.masks else []
    pred_confs = result.boxes.conf if result.boxes else []

    # Keep track of which GT polygons have been "matched"
    gt_matched = [False] * len(gt_polygons)
    
    # Sort predictions by confidence
    if len(pred_polygons_normalized) > 0:
        sorted_preds = sorted(zip(pred_confs, pred_polygons_normalized), key=lambda x: x[0], reverse=True)

        for conf, poly_norm in sorted_preds:
            # Denormalize predicted polygon
            pred_poly = (poly_norm * np.array([w, h])).astype(np.int32)
            
            best_match_idx = -1
            best_area_iou = 0.5 # Matching threshold

            # Find the best GT match for this prediction using Area IoU
            for i, gt_poly in enumerate(gt_polygons):
                if not gt_matched[i]:
                    area_iou = calculate_area_iou([gt_poly], pred_poly, h, w)
                    if area_iou > best_area_iou:
                        best_area_iou = area_iou
                        best_match_idx = i

            is_tp = False
            if best_match_idx != -1:
                # We found a match, now score it with Boundary IoU
                gt_matched[best_match_idx] = True
                matched_gt_poly = gt_polygons[best_match_idx]
                
                b_iou = calculate_boundary_iou(matched_gt_poly, pred_poly, h, w)
                
                # If Boundary IoU is high enough, it's a True Positive
                if b_iou > 0.75: # Boundary IoU threshold
                    is_tp = True
            
            all_predictions.append({'confidence': float(conf), 'is_tp': is_tp})

# 4. Calculate Boundary-AP
if not all_predictions:
    print("\nNo predictions were made. Boundary-AP is 0.")
else:
    all_predictions.sort(key=lambda x: x['confidence'], reverse=True)
    
    y_true = [int(p['is_tp']) for p in all_predictions]
    y_scores = [p['confidence'] for p in all_predictions]
    
    # Also need to account for False Negatives
    num_gt_total = sum(len(load_gt_polygons(os.path.join(val_label_dir, Path(f).stem + '.txt'), 1, 1)) for f in val_image_files)
    num_tp = sum(y_true)
    num_fn = num_gt_total - num_tp
    
    # Append FNs to the results
    y_true.extend([1] * num_fn)
    y_scores.extend([0] * num_fn)
    
    boundary_ap = average_precision_score(y_true, y_scores)
    
    print("\n" + "#"*60)
    print("--- Custom Boundary-AP Evaluation Report ---")
    print("#"*60)
    print(f"\n     - Total Ground-Truth Balloons : {num_gt_total}")
    print(f"     - Total Predictions Evaluated : {len(all_predictions)}")
    print(f"     - True Positives (Boundary IoU > 0.75): {num_tp}")
    print(f"     - Boundary-AP                  : {boundary_ap:.4f}")
    print("\n" + "#"*60)