### Data Splitting

In [None]:
import json, os, shutil
from pathlib import Path
import random

coco_ann_path = os.path.join('data','coco','coco.json')

images_dir = Path('data', 'images')

output_dir = Path('data', 'coco', 'split')
output_dir.mkdir(parents=True, exist_ok=True)

# Output image folders
train_img_dir = output_dir / "train"
val_img_dir = output_dir / "val"
test_img_dir = output_dir / "test"
train_img_dir.mkdir(exist_ok=True)
val_img_dir.mkdir(exist_ok=True)
test_img_dir.mkdir(exist_ok=True)

train_path = train_img_dir / "train.json"
val_path = val_img_dir / "val.json"
test_path = test_img_dir / "test.json"


# Split ratios
train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1

with open(coco_ann_path, 'r') as f:
    coco = json.load(f)

images = coco['images']
random.shuffle(images)
annotations = coco['annotations']
categories = coco['categories']

n = len(images)
n_train = int(n * train_ratio)
n_val = int(n * val_ratio)

train_images = images[:n_train]
val_images = images[n_train:n_train + n_val]
test_images = images[n_train + n_val:]

def filter_annotations(images_subset):
    image_ids = set(img['id'] for img in images_subset)
    return [ann for ann in annotations if ann['image_id'] in image_ids]

splits = [
    (train_path, train_images, train_img_dir),
    (val_path, val_images, val_img_dir),
    (test_path, test_images, test_img_dir)
]

for path, imgs, img_dir in splits:
    anns = filter_annotations(imgs)
    split_dict = {
        "images": imgs,
        "annotations": anns,
        "categories": categories
    }
    with open(path, 'w') as f:
        json.dump(split_dict, f)
    # Copy images
    for img in imgs:
        src = images_dir / img['file_name']
        dst = img_dir / img['file_name']
        if src.exists():
            shutil.copy2(src, dst)
        else:
            print(f"Warning: {src} does not exist.")

### Classes Used

In [None]:
import json
from collections import Counter

with open('data/coco/coco.json', 'r') as f:
    coco = json.load(f)

categories = coco['categories']
class_names = [cat['name'] for cat in categories]
print(class_names)
annotations = coco['annotations']
cat_counter = Counter(ann['category_id'] for ann in annotations)
for cat in categories:
    print(f"{cat['name']}: {cat_counter.get(cat['id'], 0)}")

### Training

In [None]:
import os
import torch
import torchvision
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pycocotools.coco import COCO
from tqdm import tqdm  # for Progress bar


# ---------- Dataset Class ----------
class CocoDataset(Dataset):
    def __init__(self, root, annFile, transforms=None):
        self.root = root
        self.coco = COCO(annFile)
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.transforms = transforms
        self.category_id_to_name = {cat['id']: cat['name'] for cat in self.coco.loadCats(self.coco.getCatIds())}

    def __getitem__(self, index):
        coco = self.coco
        img_id = self.ids[index]
        ann_ids = coco.getAnnIds(imgIds=img_id)
        anns = coco.loadAnns(ann_ids)

        path = coco.loadImgs(img_id)[0]['file_name']
        img_path = os.path.join(self.root, path)
        img = Image.open(img_path).convert("RGB")

        boxes, labels, masks = [], [], []
        for ann in anns:
            xmin, ymin, width, height = ann['bbox']
            xmax = xmin + width
            ymax = ymin + height
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(ann['category_id'])
            masks.append(coco.annToMask(ann))

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        masks = torch.as_tensor(masks, dtype=torch.uint8)
        image_id = torch.tensor([img_id])
        area = torch.as_tensor([ann['area'] for ann in anns], dtype=torch.float32)
        iscrowd = torch.as_tensor([ann.get('iscrowd', 0) for ann in anns], dtype=torch.int64)

        target = {
            "boxes": boxes,
            "labels": labels,
            "masks": masks,
            "image_id": image_id,
            "area": area,
            "iscrowd": iscrowd
        }

        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.ids)


# ---------- Transforms ----------
def get_transform(train):
    return transforms.Compose([transforms.ToTensor()])


# ---------- Model ----------
def get_model(num_classes):
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(
        in_features, num_classes
    )

    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    model.roi_heads.mask_predictor = torchvision.models.detection.mask_rcnn.MaskRCNNPredictor(
        in_features_mask, hidden_layer, num_classes
    )

    return model


# ---------- Training ----------
def train_model(
    train_img_dir,
    train_ann_file,
    val_img_dir,
    val_ann_file,
    num_epochs=10,
    batch_size=2,
    model_output_path="maskrcnn_trained.pth"
):
    train_dataset = CocoDataset(
        root=train_img_dir,
        annFile=train_ann_file,
        transforms=get_transform(train=True)
    )

    val_dataset = CocoDataset(
        root=val_img_dir,
        annFile=val_ann_file,
        transforms=get_transform(train=False)
    )

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

    num_classes = len(train_dataset.category_id_to_name) + 1  # +1 for background
    model = get_model(num_classes)

    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    model.to(device)

    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    for epoch in range(num_epochs):
        model.train()
        epoch_loss = 0.0
        print(f"\n🌀 Epoch [{epoch + 1}/{num_epochs}]")

        # Add progress bar using tqdm
        for images, targets in tqdm(train_loader, desc=f"Training", unit="batch"):
            new_images, new_targets = [], []
            for img, tgt in zip(images, targets):
                if tgt["boxes"].numel() == 0:
                    continue
                new_images.append(img)
                new_targets.append(tgt)

            if len(new_images) == 0:
                continue

            images = [img.to(device) for img in new_images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in new_targets]
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            epoch_loss += losses.item()

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

        lr_scheduler.step()
        print(f" Epoch Loss: {epoch_loss:.4f}")

    torch.save(model.state_dict(), model_output_path)
    print(f"\n Training complete. Model saved to: {model_output_path}")

import os
# ---------- Run Training ----------
model_output_path = "maskrcnn_resnet50_trained.pth"
path_to_dataset_main = r"data\coco\split"

if __name__ == "__main__":

    path_to_dataset = path_to_dataset_main
    train_img_dir = os.path.join(path_to_dataset, "train")
    val_img_dir = os.path.join(path_to_dataset, "val")
    train_ann_file = os.path.join(train_img_dir, "train.json")
    val_ann_file = os.path.join(val_img_dir, "val.json")
    train_model(
        train_img_dir=train_img_dir,
        train_ann_file=train_ann_file,
        val_img_dir=val_img_dir,
        val_ann_file=val_ann_file,
        num_epochs=5,
        batch_size=2,
        model_output_path = model_output_path
    )


### Evaluation of the trained Model

In [None]:
# ---------- Evaluation Function ----------
def evaluate_model(model, test_img_dir, test_ann_file, device):
    test_dataset = CocoDataset(
        root=test_img_dir,
        annFile=test_ann_file,
        transforms=get_transform(train=False)
    )
    test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))
    
    # Load COCO API for evaluation metrics
    coco_gt = COCO(test_ann_file)
    
    # Initialize list to store COCO-format detections
    coco_dt = []
    
    model.eval()
    with torch.no_grad():
        for images, targets in tqdm(test_loader, desc="Evaluating", unit="batch"):
            images = [img.to(device) for img in images]
            outputs = model(images)
            
            for i, (output, target) in enumerate(zip(outputs, targets)):
                image_id = target['image_id'].item()
                boxes = output['boxes'].cpu()
                scores = output['scores'].cpu()
                labels = output['labels'].cpu()
                
                # Apply score threshold (e.g. 0.5)
                score_threshold = 0.5
                keep_idxs = torch.where(scores > score_threshold)[0]
                
                # Convert to COCO format (xmin, ymin, width, height)
                for idx in keep_idxs:
                    box = boxes[idx].tolist()
                    x1, y1, x2, y2 = box
                    width = x2 - x1
                    height = y2 - y1
                    
                    detection = {
                        'image_id': image_id,
                        'category_id': labels[idx].item(),
                        'bbox': [x1, y1, width, height],
                        'score': scores[idx].item()
                    }
                    coco_dt.append(detection)
    
    # If we have detections, evaluate using COCO metrics
    if coco_dt:
        # Convert detections to COCO format
        coco_pred = coco_gt.loadRes(coco_dt)
        
        # Run COCO evaluation
        coco_eval = COCOeval(coco_gt, coco_pred, 'bbox')
        coco_eval.evaluate()
        coco_eval.accumulate()
        coco_eval.summarize()
        
        # Extract evaluation metrics
        ap_mean = coco_eval.stats[0]
        ap50 = coco_eval.stats[1]
        ap75 = coco_eval.stats[2]
        ap_small = coco_eval.stats[3]
        ap_medium = coco_eval.stats[4]
        ap_large = coco_eval.stats[5]
        ar_max1 = coco_eval.stats[6]
        ar_max10 = coco_eval.stats[7]
        ar_max100 = coco_eval.stats[8]
        ar_small = coco_eval.stats[9]
        ar_medium = coco_eval.stats[10]
        ar_large = coco_eval.stats[11]
        
        print(f"AP@0.5: {ap50:.4f}")
        
        # Create a metrics table
        metrics_table = [
            ["Metric", "Value"],
            ["AP@0.50:0.95", f"{ap_mean:.4f}"],
            ["AP@0.50", f"{ap50:.4f}"],
            ["AP@0.75", f"{ap75:.4f}"],
            ["AP (small)", f"{ap_small:.4f}"],
            ["AP (medium)", f"{ap_medium:.4f}"],
            ["AP (large)", f"{ap_large:.4f}"],
            ["AR@maxDets=1", f"{ar_max1:.4f}"],
            ["AR@maxDets=10", f"{ar_max10:.4f}"],
            ["AR@maxDets=100", f"{ar_max100:.4f}"],
            ["AR (small)", f"{ar_small:.4f}"],
            ["AR (medium)", f"{ar_medium:.4f}"],
            ["AR (large)", f"{ar_large:.4f}"]
        ]
        
        # Print metrics table
        print("\n--- Evaluation Metrics Table ---")
        col_width = max(len(word) for row in metrics_table for word in row) + 2  # padding
        for row in metrics_table:
            print("".join(word.ljust(col_width) for word in row))
        
        # Get category-wise AP
        category_names = test_dataset.category_id_to_name
        category_ap = []
        print("\n--- Category-wise AP@0.5 ---")
        cat_ap_table = [["Category", "AP@0.5"]]
        for idx, cat_id in enumerate(coco_eval.params.catIds):
            if cat_id in category_names:
                cat_name = category_names[cat_id]
                # Get AP for this category
                cat_ap = coco_eval.eval['precision'][0, :, idx, 0, 2].mean()
                print(f"{cat_name}: {cat_ap:.4f}")
                category_ap.append((cat_name, cat_ap))
                cat_ap_table.append([cat_name, f"{cat_ap:.4f}"])
        
        # Print category AP table
        print("\n--- Category-wise AP@0.5 Table ---")
        col_width = max(len(word) for row in cat_ap_table for word in row) + 2  # padding
        for row in cat_ap_table:
            print("".join(word.ljust(col_width) for word in row))
        
        # Create visualizations
        plt.figure(figsize=(15, 15))
        
        # 1. IoU Threshold Metrics
        plt.subplot(2, 2, 1)
        iou_thresholds = ['IoU=0.50:0.95', 'IoU=0.50', 'IoU=0.75']
        iou_values = [ap_mean, ap50, ap75]
        bars1 = plt.bar(iou_thresholds, iou_values, color=['#3498db', '#2ecc71', '#e74c3c'])
        plt.title('Performance by IoU Threshold', fontsize=14)
        plt.ylabel('AP Value', fontsize=12)
        plt.ylim(0, max(iou_values) * 1.2)
        for bar in bars1:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2., height + 0.01,
                    f'{height:.3f}', ha='center', fontsize=10)
        
        # 2. Object Size AP
        plt.subplot(2, 2, 2)
        sizes = ['All', 'Small', 'Medium', 'Large']
        size_values = [ap_mean, ap_small, ap_medium, ap_large]
        # Replace -1 with 0 for visualization
        size_values = [max(0, val) for val in size_values]
        bars2 = plt.bar(sizes, size_values, color=['#3498db', '#95a5a6', '#2ecc71', '#e74c3c'])
        plt.title('AP by Object Size', fontsize=14)
        plt.ylabel('AP Value', fontsize=12)
        plt.ylim(0, max(filter(lambda x: x != 0, size_values)) * 1.2)
        for i, bar in enumerate(bars2):
            height = bar.get_height()
            if size_values[i] <= 0.001:
                text = "No objects"
                plt.text(bar.get_x() + bar.get_width()/2., 0.01, text, ha='center', fontsize=8)
            else:
                plt.text(bar.get_x() + bar.get_width()/2., height + 0.01,
                        f'{size_values[i]:.3f}', ha='center', fontsize=10)
        
        # 3. AR by Max Detections
        plt.subplot(2, 2, 3)
        max_dets = ['maxDets=1', 'maxDets=10', 'maxDets=100']
        ar_values = [ar_max1, ar_max10, ar_max100]
        bars3 = plt.bar(max_dets, ar_values, color=['#3498db', '#2ecc71', '#e74c3c'])
        plt.title('Average Recall (AR) by Max Detections', fontsize=14)
        plt.ylabel('AR Value', fontsize=12)
        plt.ylim(0, max(ar_values) * 1.2)
        for bar in bars3:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2., height + 0.01,
                    f'{height:.3f}', ha='center', fontsize=10)
        
        # 4. Category-wise AP
        plt.subplot(2, 2, 4)
        cat_names = [x[0] for x in category_ap]
        cat_values = [x[1] for x in category_ap]
        colors = ['#27ae60' if val > 0.3 else '#e74c3c' for val in cat_values]
        bars4 = plt.bar(cat_names, cat_values, color=colors)
        plt.title('Category-wise AP@0.5', fontsize=14)
        plt.ylabel('AP Value', fontsize=12)
        plt.ylim(0, max(cat_values) * 1.2 if cat_values else 0.1)
        plt.xticks(rotation=45, ha='right')
        for bar in bars4:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2., height + 0.01,
                    f'{height:.3f}', ha='center', fontsize=9)
        
        plt.tight_layout()
        plt.savefig('evaluation_metrics.png')
        plt.show()
        
        # Finally, visualize predictions on sample images
        visualize_predictions(test_dataset, model, device, num_samples=5)
    else:
        print("No detections above threshold. Cannot evaluate.")

In [None]:
dataset_path = path_to_dataset_main
train_img_dir = os.path.join(path_to_dataset, "train")
test_img_dir = os.path.join(path_to_dataset, "test")
train_ann_file = os.path.join(train_img_dir, "train.json")
test_ann_file = os.path.join(test_img_dir, "test.json")

# Get number of classes from training dataset
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
num_classes = len(CocoDataset(
    root=train_img_dir,
    annFile=train_ann_file
).category_id_to_name) + 1  # +1 for background

# Create model with same architecture and load the saved weights
model = get_model(num_classes)
model.load_state_dict(torch.load("maskrcnn_trained_1.pth", map_location=device))
model.to(device)

# Now evaluate
evaluate_model(model_output_path, test_img_dir, test_ann_file, device)