- dataset: **Penn-Fudan**
- Metrics: **Precision**, **Recall**, **mean Average Precision**
- Models: **Tow-stage detector** Mask R cnn, **Single-stage detector** Panoptic Fpn
- Optimaizers: **Adam**, **AdaGrad**, **RmsProp**

# **Instance Segmentation with Optimizer Comparison**

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision.models.detection import maskrcnn_resnet50_fpn, MaskRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from torchvision.transforms import functional as F
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
from torchvision.models.detection import MaskRCNN
from torchvision.models.detection.rpn import AnchorGenerator
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import os
import cv2
from PIL import Image
import time
from tqdm import tqdm
import pandas as pd
from sklearn.model_selection import train_test_split
import urllib.request
import zipfile
import shutil
import seaborn as sns

## **Data preprocessing and exploration**

In [None]:
# Create output directory
OUTPUT_DIR = '/content/drive/MyDrive/DL-projet/results/instance_segmentation_results'
os.makedirs(OUTPUT_DIR, exist_ok=True)


In [None]:
def download_pennfudan_dataset():
    # Create a directory for the dataset
    os.makedirs("data", exist_ok=True)

    # URL for the Penn-Fudan Pedestrian dataset
    dataset_url = "https://www.cis.upenn.edu/~jshi/ped_html/PennFudanPed.zip"
    zip_path = "data/PennFudanPed.zip"

    # Download the dataset
    if not os.path.exists(zip_path):
        print(f"Downloading Penn-Fudan Pedestrian dataset from {dataset_url}...")
        urllib.request.urlretrieve(dataset_url, zip_path)
        print("Download complete!")
    else:
        print("Dataset zip already exists.")

    # Extract the dataset
    dataset_path = "data/PennFudanPed"
    if not os.path.exists(dataset_path):
        print("Extracting dataset...")
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall("data")
        print("Extraction complete!")
    else:
        print("Dataset already extracted.")

    # Verify the dataset structure
    if os.path.exists(os.path.join(dataset_path, "PNGImages")) and os.path.exists(os.path.join(dataset_path, "PedMasks")):
        print("Dataset is ready to use!")
        print(f"Number of images: {len(os.listdir(os.path.join(dataset_path, 'PNGImages')))}")
        print(f"Number of masks: {len(os.listdir(os.path.join(dataset_path, 'PedMasks')))}")
        return dataset_path
    else:
        print("Dataset structure is not as expected. Please check the download.")
        return None

# Download the dataset
dataset_path = download_pennfudan_dataset()
print(f"Dataset path: {dataset_path}")

Downloading Penn-Fudan Pedestrian dataset from https://www.cis.upenn.edu/~jshi/ped_html/PennFudanPed.zip...
Download complete!
Extracting dataset...
Extraction complete!
Dataset is ready to use!
Number of images: 170
Number of masks: 170
Dataset path: data/PennFudanPed


In [None]:
# Custom dataset for Penn-Fudan
class PennFudanDataset(Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        # Load all image files, sorting them to ensure they are aligned
        self.imgs = list(sorted(os.listdir(os.path.join(root, "PNGImages"))))
        self.masks = list(sorted(os.listdir(os.path.join(root, "PedMasks"))))

    def __getitem__(self, idx):
        # Load images and masks
        img_path = os.path.join(self.root, "PNGImages", self.imgs[idx])
        mask_path = os.path.join(self.root, "PedMasks", self.masks[idx])

        img = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path)

        # Convert mask to numpy array
        mask = np.array(mask)
        # Instances are encoded as different colors
        obj_ids = np.unique(mask)
        # First id is the background, so remove it
        obj_ids = obj_ids[1:]

        # Split the color-encoded mask into a set of binary masks
        masks = mask == obj_ids[:, None, None]

        # Get bounding box coordinates for each mask
        num_objs = len(obj_ids)
        boxes = []
        for i in range(num_objs):
            pos = np.where(masks[i])
            xmin = np.min(pos[1])
            xmax = np.max(pos[1])
            ymin = np.min(pos[0])
            ymax = np.max(pos[0])
            boxes.append([xmin, ymin, xmax, ymax])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # There is only one class (pedestrian)
        labels = torch.ones((num_objs,), dtype=torch.int64)
        masks = torch.as_tensor(masks, dtype=torch.uint8)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        # Suppose all instances are not crowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transforms is not None:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        return len(self.imgs)


In [None]:
# Define transforms
def get_transform(train):
    transforms = []
    # Convert PIL image to tensor
    transforms.append(lambda img, target: (F.to_tensor(img), target))
    if train:
        # Random horizontal flip with 50% probability during training
        transforms.append(lambda img, target: random_horizontal_flip(img, target))
    return Compose(transforms)

In [None]:
# Helper class for chaining transforms
class Compose:
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

In [None]:
# Random horizontal flip transform
def random_horizontal_flip(image, target, prob=0.5):
    if torch.rand(1) < prob:
        height, width = image.shape[-2:]
        image = image.flip(-1)
        bbox = target["boxes"]
        # Flip boxes: xmin, ymin, xmax, ymax
        bbox[:, [0, 2]] = width - bbox[:, [2, 0]]
        target["boxes"] = bbox
        if "masks" in target:
            target["masks"] = target["masks"].flip(-1)
    return image, target

In [None]:
def get_maskrcnn_resnet18(num_classes):
    # Load pre-trained Mask R-CNN with ResNet50 backbone
    model = maskrcnn_resnet50_fpn(weights=MaskRCNN_ResNet50_FPN_Weights.DEFAULT)

    # Replace the backbone with ResNet18
    backbone = torchvision.models.resnet18(pretrained=True)
    # Remove the last fully connected layer
    backbone = nn.Sequential(*list(backbone.children())[:-2])

    # Freeze all backbone parameters
    for param in backbone.parameters():
        param.requires_grad = False

    # Get the number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # Replace the box predictor
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    # Get the number of input features for the mask predictor
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    # Replace the mask predictor
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)

    # Freeze all parameters except for the prediction layers
    for name, param in model.named_parameters():
        # Only keep prediction layers trainable
        if not ("box_predictor" in name or "mask_predictor" in name or "conv5_mask" in name):
            param.requires_grad = False

    # Verify which parameters are trainable
    trainable_params =  sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Number of trainable parameters: {trainable_params}")

    return model

In [None]:
def get_panoptic_fpn(num_classes, num_stuff_classes=0, pretrained_backbone=True):
    # Load a pre-trained backbone with FPN
    backbone = resnet_fpn_backbone('resnet50', pretrained=pretrained_backbone)

    # Freeze backbone layers
    for param in backbone.parameters():
        param.requires_grad = False

    # RPN parameters
    anchor_generator = AnchorGenerator(
        sizes=((32,), (64,), (128,), (256,), (512,)), # Each size corresponds to a feature map
        aspect_ratios=((0.5, 1.0, 2.0),) * 5 # Repeat aspect ratios for each feature map
    )


    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0', '1', '2', '3'],
        output_size=7,
        sampling_ratio=2
    )

    mask_roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0', '1', '2', '3'],
        output_size=14,
        sampling_ratio=2
    )

    # Create Mask R-CNN model with FPN backbone (base for Panoptic FPN)
    model = MaskRCNN(
        backbone,
        num_classes=num_classes,
        rpn_anchor_generator=anchor_generator,
        box_roi_pool=roi_pooler,
        mask_roi_pool=mask_roi_pooler
    )

    # Modify the box predictor
    in_features_box = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features_box, num_classes)

    # Modify the mask predictor
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)

    # For Panoptic FPN, we need to add semantic segmentation head for "stuff" classes
    if num_stuff_classes > 0:
        # Create semantic segmentation head
        model.semantic_seg_head = create_semantic_seg_head(backbone.out_channels, num_stuff_classes)

    # Unfreeze specific layers for fine-tuning
    for name, param in model.named_parameters():
        if "box_predictor" in name or "mask_predictor" in name or "semantic_seg_head" in name:
            param.requires_grad = True
        else:
            param.requires_grad = False

    # Print trainable parameters
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Number of trainable parameters: {trainable_params}")

    return model

def create_semantic_seg_head(in_channels, num_classes):
    return nn.Sequential(
        nn.Conv2d(in_channels, 128, kernel_size=3, padding=1),
        nn.BatchNorm2d(128),
        nn.ReLU(inplace=True),
        nn.Conv2d(128, 128, kernel_size=3, padding=1),
        nn.BatchNorm2d(128),
        nn.ReLU(inplace=True),
        nn.Conv2d(128, num_classes, kernel_size=1)
    )

In [None]:
# Training function
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10):
    model.train()
    metric_logger = MetricLogger(delimiter="  ")
    metric_logger.add_meter('lr', SmoothedValue(window_size=1, fmt='{value:.6f}'))
    header = f'Epoch: [{epoch}]'

    lr_scheduler = None

    # Record gradients for visualization
    gradient_magnitudes = []

    for images, targets in metric_logger.log_every(data_loader, print_freq, header):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Zero the parameter gradients
        optimizer.zero_grad()

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        # Calculate gradient norms
        losses.backward()
        total_norm = 0
        for p in model.parameters():
            if p.grad is not None:
                param_norm = p.grad.data.norm(2)
                total_norm += param_norm.item() ** 2
        total_norm = total_norm ** 0.5
        gradient_magnitudes.append(total_norm)

        optimizer.step()

        metric_logger.update(loss=losses.item(), **{k: v.item() for k, v in loss_dict.items()})
        metric_logger.update(lr=optimizer.param_groups[0]['lr'])




    return metric_logger, gradient_magnitudes

In [None]:
# Utility class for averaging values
class SmoothedValue:
    def __init__(self, window_size=20, fmt=None):
        self.deque = []
        self.total = 0.0
        self.count = 0
        self.window_size = window_size
        self.fmt = fmt

    def update(self, value, n=1):
        self.deque.append(value)
        if len(self.deque) > self.window_size:
            self.deque.pop(0)
        self.count += n
        self.total += value * n

    @property
    def median(self):
        d = torch.tensor(self.deque)
        return d.median().item()

    @property
    def avg(self):
        d = torch.tensor(self.deque)
        return d.mean().item()

    @property
    def global_avg(self):
        return self.total / self.count

    def __str__(self):
        if self.fmt is None:
            return self.global_avg
        return self.fmt.format(
            median=self.median,
            avg=self.avg,
            global_avg=self.global_avg)

In [None]:
# Utility class for logging metrics
class MetricLogger:
    def __init__(self, delimiter="\t"):
        self.meters = {}
        self.delimiter = delimiter

    def add_meter(self, name, meter): # Added add_meter method
        self.meters[name] = meter

    def update(self, **kwargs):
        for k, v in kwargs.items():
            if k not in self.meters:
                self.meters[k] = SmoothedValue()
            self.meters[k].update(v)

    def __str__(self):
        loss_str = []
        for name, meter in self.meters.items():
            loss_str.append(f"{name}: {meter.global_avg:.4f}")
        return self.delimiter.join(loss_str)

    def log_every(self, iterable, print_freq, header=None):
        i = 0
        if header is not None:
            print(header)
        start_time = time.time()
        for obj in iterable:
            yield obj
            i += 1
            if i % print_freq == 0:
                print(f"{i}/{len(iterable)}: {str(self)}")
        print(f"{i}/{len(iterable)}: {str(self)}")
        total_time = time.time() - start_time
        print(f"Total time: {total_time:.3f}, time per item: {total_time/len(iterable):.3f}")

In [None]:
def evaluate(model, data_loader, device):
    model.eval()
    metric_logger = MetricLogger(delimiter="  ")
    header = 'Test:'

    # Initialize metrics
    iou_list = []
    precision_list = []
    recall_list = []
    f1_list = []
    ap_scores = []

    # IoU threshold for positive detection
    iou_threshold = 0.5

    for images, targets in metric_logger.log_every(data_loader, 10, header):
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        with torch.no_grad():
            outputs = model(images)

        # Process each image in the batch
        for i, (output, target) in enumerate(zip(outputs, targets)):
            # Get predicted masks and scores
            pred_masks = output['masks']
            pred_scores = output['scores']
            pred_labels = output['labels']

            # Threshold predictions by score
            score_threshold = 0.5
            keep = pred_scores > score_threshold
            pred_masks = pred_masks[keep]
            pred_scores = pred_scores[keep]
            pred_labels = pred_labels[keep]

            # Get target masks and labels
            gt_masks = target['masks']
            gt_labels = target['labels']

            # Skip images with no ground truth instances
            if len(gt_masks) == 0:
                continue

            # Skip images with no predictions
            if len(pred_masks) == 0:
                # Add zeros to metrics for this image
                precision_list.append(0)
                recall_list.append(0)
                f1_list.append(0)
                continue

            # Calculate IoU between each prediction and ground truth pair
            ious = torch.zeros((len(pred_masks), len(gt_masks)), device=device)
            pred_areas = pred_masks.sum((1, 2, 3))
            gt_areas = gt_masks.sum((1, 2))

            # Calculate IoU for each pred-gt pair
            for p_idx, p_mask in enumerate(pred_masks):
                p_mask = p_mask.squeeze(1) > 0.5  # Convert to binary mask
                for gt_idx, gt_mask in enumerate(gt_masks):
                    # Skip if labels don't match (for multi-class)
                    if pred_labels[p_idx] != gt_labels[gt_idx]:
                        continue

                    intersection = (p_mask & gt_mask).float().sum()
                    union = pred_areas[p_idx] + gt_areas[gt_idx] - intersection
                    if union > 0:
                        ious[p_idx, gt_idx] = intersection / union

            # Calculate metrics
            # For each ground truth, find the prediction with highest IoU
            matched_ious = []
            matched_indices = set()

            if ious.numel() > 0:
                # For each ground truth, find best matching prediction
                for gt_idx in range(len(gt_masks)):
                    if ious.shape[0] > 0:  # Check if there are any predictions
                        best_pred_idx = ious[:, gt_idx].argmax()
                        best_iou = ious[best_pred_idx, gt_idx]

                        # Only count as match if IoU exceeds threshold
                        if best_iou >= iou_threshold and best_pred_idx not in matched_indices:
                            matched_ious.append(best_iou.item())
                            matched_indices.add(best_pred_idx)

            # Calculate precision, recall, F1
            if len(pred_masks) > 0:
                precision = len(matched_indices) / len(pred_masks)
                precision_list.append(precision)
            else:
                precision_list.append(0)

            if len(gt_masks) > 0:
                recall = len(matched_indices) / len(gt_masks)
                recall_list.append(recall)
            else:
                recall_list.append(0)

            # F1 Score
            if precision + recall > 0:
                f1 = 2 * precision * recall / (precision + recall)
                f1_list.append(f1)
            else:
                f1_list.append(0)

            # Record IoUs
            if matched_ious:
                iou_list.extend(matched_ious)

            # Calculate AP for this image (for mAP)
            # Sort predictions by score
            sorted_indices = torch.argsort(pred_scores, descending=True)
            sorted_masks = pred_masks[sorted_indices]
            sorted_scores = pred_scores[sorted_indices]
            sorted_labels = pred_labels[sorted_indices]

            # Calculate precision at different recall points
            tp = torch.zeros(len(sorted_masks))
            fp = torch.zeros(len(sorted_masks))
            gt_matched = set()

            for p_idx, p_mask in enumerate(sorted_masks):
                p_mask = p_mask.squeeze(1) > 0.5
                p_label = sorted_labels[p_idx]

                max_iou = 0
                max_gt_idx = -1

                # Find the GT with highest IoU
                for gt_idx, gt_mask in enumerate(gt_masks):
                    if gt_idx in gt_matched or gt_labels[gt_idx] != p_label:
                        continue

                    intersection = (p_mask & gt_mask).float().sum()
                    union = p_mask.float().sum() + gt_mask.float().sum() - intersection
                    iou = intersection / union if union > 0 else 0

                    if iou > max_iou:
                        max_iou = iou
                        max_gt_idx = gt_idx

                # Check if match is valid
                if max_iou >= iou_threshold and max_gt_idx not in gt_matched:
                    tp[p_idx] = 1
                    gt_matched.add(max_gt_idx)
                else:
                    fp[p_idx] = 1

            # Calculate cumulative precision and recall
            cum_tp = torch.cumsum(tp, dim=0)
            cum_fp = torch.cumsum(fp, dim=0)

            cum_precision = cum_tp / (cum_tp + cum_fp)
            cum_recall = cum_tp / len(gt_masks) if len(gt_masks) > 0 else cum_tp * 0

            # Calculate AP using 11-point interpolation
            ap = 0
            for t in torch.arange(0, 1.1, 0.1):
                if torch.sum(cum_recall >= t) == 0:
                    p = 0
                else:
                    p = torch.max(cum_precision[cum_recall >= t])
                ap = ap + p / 11

            ap_scores.append(ap.item())

    # Aggregate results
    results = {}

    # Mean IoU
    if iou_list:
        results['mIoU'] = sum(iou_list) / len(iou_list)
    else:
        results['mIoU'] = 0

    # Mean Precision
    if precision_list:
        results['Precision'] = sum(precision_list) / len(precision_list)
    else:
        results['Precision'] = 0

    # Mean Recall
    if recall_list:
        results['Recall'] = sum(recall_list) / len(recall_list)
    else:
        results['Recall'] = 0

    # Mean F1
    if f1_list:
        results['F1'] = sum(f1_list) / len(f1_list)
    else:
        results['F1'] = 0

    # Mean Average Precision (mAP)
    if ap_scores:
        results['mAP'] = sum(ap_scores) / len(ap_scores)
    else:
        results['mAP'] = 0

    print(f"\nEvaluation Results:")
    for metric, value in results.items():
        print(f"{metric}: {value:.4f}")

    return results

In [None]:
def compare_optimizers(model_name, train_loader, val_loader, device, num_epochs=5):
    optimizers = {
        'Adam': None,
        'AdaGrad': None,
        'RMSProp': None
    }
    results = {}
    gradient_data = {}
    learning_rates = {
        'Adam': 0.001,
        'AdaGrad': 0.01,
        'RMSProp': 0.001
    }

    for opt_name in optimizers.keys():
        print(f"\nTraining with {opt_name} optimizer")
        if model_name == 'maskrcnn':
            model = get_maskrcnn_resnet18(num_classes=2)  # Background + pedestrian
            model.to(device)

        elif model_name == 'panoptic':
            model = get_panoptic_fpn(num_classes=2)  # Background + pedestrian
            model.to(device)

        if opt_name == 'Adam':
            optimizer = optim.Adam(model.parameters(), lr=learning_rates[opt_name])
            optimizers[opt_name] = optimizer
        elif opt_name == 'AdaGrad':
            optimizer = optim.Adagrad(model.parameters(), lr=learning_rates[opt_name])
            optimizers[opt_name] = optimizer
        elif opt_name == 'RMSProp':
            optimizer = optim.RMSprop(model.parameters(), lr=learning_rates[opt_name])
            optimizers[opt_name] = optimizer

        # Training loop
        epoch_losses = []
        epoch_gradients = []
        epoch_lr = []

        for epoch in range(num_epochs):
            metric_logger, gradients = train_one_epoch(
                model, optimizer, train_loader, device, epoch)

            epoch_losses.append(metric_logger.meters['loss'].global_avg)
            epoch_gradients.append(sum(gradients) / len(gradients))
            epoch_lr.append(optimizer.param_groups[0]['lr'])

            # Evaluate model
            eval_results = evaluate(model, val_loader, device)
            print(f"Epoch {epoch}: Loss = {epoch_losses[-1]:.4f}")


        results[opt_name] = {
            'losses': epoch_losses,
            'final_metrics': evaluate(model, val_loader, device),
            'learning_rates': epoch_lr
        }

        gradient_data[opt_name] = {
            'gradients': epoch_gradients,
            'learning_rates': epoch_lr
        }

    return results, gradient_data, model

def visualize_comprehensive_plots(results, gradient_data, output_dir, model_name, optim_name):
    """Create comprehensive visualizations for optimizer performance."""

    # Set up plotting style
    # plt.style.use('seaborn')

    # 1. Loss Comparison
    plt.figure(figsize=(15, 10))
    plt.subplot(2, 2, 1)
    for opt_name, res in results.items():
        plt.plot(res['losses'], label=f"{opt_name}")
    plt.title('Training Loss by Optimizer', fontsize=12)
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    # 2. Gradient Magnitudes
    plt.subplot(2, 2, 2)
    for opt_name, data in gradient_data.items():
        plt.plot(data['gradients'], label=f"{opt_name}")
    plt.title('Gradient Magnitudes by Optimizer', fontsize=12)
    plt.xlabel('Epoch')
    plt.ylabel('Average Gradient Magnitude')
    plt.legend()
    plt.grid(True)

    # 3. Learning Rate vs Loss
    plt.subplot(2, 2, 3)
    for opt_name, res in results.items():
        plt.plot(res['learning_rates'], res['losses'], label=f"{opt_name}")
    plt.title('Learning Rate vs Loss', fontsize=12)
    plt.xlabel('Learning Rate')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)

    # 4. Metric Comparison
    plt.subplot(2, 2, 4)
    metrics = ['mIoU', 'Precision', 'Recall', 'F1', 'mAP']
    optimizer_names = list(results.keys())
    metric_values = np.array([
        [results[opt]['final_metrics'][m] for m in metrics]
        for opt in optimizer_names
    ])

    sns.heatmap(metric_values, annot=True, cmap='YlGnBu',
                xticklabels=metrics, yticklabels=optimizer_names)
    plt.title('Optimizer Performance Metrics', fontsize=12)

    plt.tight_layout()
    plt.savefig(os.path.join(output_dir, f'{model_name}_{optim_name}_optimizer_comprehensive_analysis.png'), dpi=300)
    plt.close()

    # Detailed Bar Plots for Each Metric
    for metric in metrics:
        plt.figure(figsize=(10, 6))
        metric_data = [results[opt]['final_metrics'][metric] for opt in optimizer_names]

        plt.bar(optimizer_names, metric_data, color=['blue', 'green', 'red'])
        plt.title(f'{metric} Comparison Across Optimizers', fontsize=12)
        plt.ylabel(metric)
        plt.ylim(0, max(metric_data) * 1.2)

        for i, v in enumerate(metric_data):
            plt.text(i, v, f'{v:.4f}', ha='center', va='bottom')

        plt.tight_layout()

        plt.savefig(os.path.join(output_dir, f'{model_name}_{optim_name}_optimizer_{metric.lower()}_comparison.png'), dpi=300)
        plt.close()

In [None]:
# Visualize results
def visualize_optimizer_comparison(results, gradient_data, model_name, optim_name):
    # Plot loss curves
    plt.figure(figsize=(12, 6))
    for opt_name, res in results.items():
        plt.plot(res['losses'], label=f"{opt_name}")
    plt.title('Training Loss by Optimizer')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.savefig(f'{model_name}_{optim_name}_optimizer_loss_comparison.png')
    plt.close()

    # Plot gradient magnitudes
    plt.figure(figsize=(12, 6))
    for opt_name, grads in gradient_data.items():
        plt.plot(grads, label=f"{opt_name}")
    plt.title('Gradient Magnitudes by Optimizer')
    plt.xlabel('Epoch')
    plt.ylabel('Average Gradient Magnitude')
    plt.legend()
    plt.grid(True)
    plt.savefig(f'{model_name}_{optim_name}_optimizer_gradient_comparison.png')
    plt.close()

# Visualize segmentation results
def visualize_predictions(model, dataset, model_name, optim_name,idx=0, device='cpu'):
    img, target = dataset[idx]
    model.eval()
    with torch.no_grad():
        prediction = model([img.to(device)])[0]

    # Convert tensors to numpy arrays
    img = img.mul(255).permute(1, 2, 0).byte().numpy()

    # Draw ground truth
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))

    # Ground truth
    ax1.imshow(img)
    for mask in target['masks']:
        mask = mask.numpy()
        contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
        for contour in contours:
            contour = contour.reshape(-1, 2)
            ax1.plot(contour[:, 0], contour[:, 1], linewidth=2)
    ax1.set_title('Ground Truth')
    ax1.axis('off')

    # Predictions
    ax2.imshow(img)
    for mask in prediction['masks']:
        if mask.max() > 0.5:  # Threshold
            mask = mask.squeeze().cpu().numpy() > 0.5
            contours, _ = cv2.findContours(mask.astype(np.uint8), cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
            for contour in contours:
                contour = contour.reshape(-1, 2)
                ax2.plot(contour[:, 0], contour[:, 1], linewidth=2)
    ax2.set_title('Predictions')
    ax2.axis('off')

    plt.tight_layout()
    plt.savefig(f'{model_name}_{optim_name}_segmentation_comparison.png')
    plt.close()

In [None]:
# Calculate metrics
def calculate_metrics(model, data_loader, device):
    model.eval()
    metrics = {
        'iou': [],
        'precision': [],
        'recall': [],
        'f1': []
    }

    for images, targets in data_loader:
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        with torch.no_grad():
            outputs = model(images)

        # Calculate metrics for each image
        for i, (output, target) in enumerate(zip(outputs, targets)):
            # Check if 'masks' are present in the output
            if 'masks' not in output:
                print("Warning: 'masks' not found in output. Skipping metric calculation for this image.")
                continue  # Skip to the next image

            pred_masks = output['masks'] > 0.5
            gt_masks = target['masks']

            for j, pred_mask in enumerate(pred_masks):
                best_iou = 0
                best_precision = 0
                best_recall = 0

                for gt_mask in gt_masks:
                    # Calculate IoU
                    intersection = torch.logical_and(pred_mask, gt_mask).sum().float()
                    union = torch.logical_or(pred_mask, gt_mask).sum().float()
                    # Convert to float if necessary
                    iou = intersection / union if union > 0 else 0.0 # Changed to float

                    # Calculate precision and recall
                    tp = intersection
                    fp = pred_mask.sum() - tp
                    fn = gt_mask.sum() - tp

                    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
                    recall = tp / (tp + fn) if (tp + fn) > 0 else 0

                    if iou > best_iou:
                        best_iou = iou
                        best_precision = precision
                        best_recall = recall

                # Calculate F1 score
                f1 = 2 * best_precision * best_recall / (best_precision + best_recall) if (best_precision + best_recall) > 0 else 0

                # Call item() only if it's a Tensor
                metrics['iou'].append(best_iou.item() if isinstance(best_iou, torch.Tensor) else best_iou) # add condition to check type before calling item()
                metrics['precision'].append(best_precision.item() if isinstance(best_precision, torch.Tensor) else best_precision) # add condition to check type before calling item()
                metrics['recall'].append(best_recall.item() if isinstance(best_recall, torch.Tensor) else best_recall) # add condition to check type before calling item()
                metrics['f1'].append(f1.item() if isinstance(f1, torch.Tensor) else f1) # add condition to check type before calling item()

    # Average the metrics
    for k in metrics:
        if metrics[k]:
            metrics[k] = sum(metrics[k]) / len(metrics[k])
        else:
            metrics[k] = 0

    return metrics

In [None]:
def main():
    # Set device
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
    print(f"Using device: {device}")

    # Set paths
    data_path = "data/PennFudanPed"

    # Create dataset
    dataset = PennFudanDataset(data_path, get_transform(train=True))
    dataset_test = PennFudanDataset(data_path, get_transform(train=False))

    # Split the dataset into train and validation
    indices = torch.randperm(len(dataset)).tolist()
    train_size = int(len(dataset) * 0.8)
    dataset_train = torch.utils.data.Subset(dataset, indices[:train_size])
    dataset_val = torch.utils.data.Subset(dataset_test, indices[train_size:])

    # Create data loaders
    train_loader = DataLoader(
        dataset_train, batch_size=2, shuffle=True, collate_fn=lambda x: tuple(zip(*x))
    )
    val_loader = DataLoader(
        dataset_val, batch_size=1, shuffle=False, collate_fn=lambda x: tuple(zip(*x))
    )

    # Compare optimizers for Mask R-CNN
    print("Training Mask R-CNN with ResNet18 backbone")
    maskrcnn_results, maskrcnn_gradients, model_maskrcnn = compare_optimizers(
        'maskrcnn', train_loader, val_loader, device, num_epochs=5
    )

    # Compare optimizers for Panoptic FPN
    print("\nTraining Panoptic FPN")
    panoptic_results, panoptic_gradients, model_panoptic = compare_optimizers(
        'panoptic', train_loader, val_loader, device, num_epochs=5
    )

    # Comprehensive Visualization
    visualize_comprehensive_plots(maskrcnn_results, maskrcnn_gradients, OUTPUT_DIR, 'maskrcnn', '')
    visualize_comprehensive_plots(panoptic_results, panoptic_gradients, OUTPUT_DIR, 'panoptic', '')

    # Save detailed results to CSV
    results_df_maskrcnn = pd.DataFrame([
        {**{'Model': 'Mask R-CNN', 'Optimizer': opt}, **metrics}
        for opt, metrics in maskrcnn_results.items()
    ])
    results_df_panoptic = pd.DataFrame([
        {**{'Model': 'Panoptic FPN', 'Optimizer': opt}, **metrics}
        for opt, metrics in panoptic_results.items()
    ])

    results_df = pd.concat([results_df_maskrcnn, results_df_panoptic])
    results_df.to_csv(os.path.join(OUTPUT_DIR, 'model_optimizer_comparison.csv'), index=False)

    print(f"\nResults and visualizations saved to {OUTPUT_DIR}")


In [None]:
main()

Using device: cuda
Training Mask R-CNN with ResNet18 backbone

Training with Adam optimizer




Number of trainable parameters: 273164
Epoch: [0]
10/68: lr: 0.0010  loss: 1.4644  loss_classifier: 0.3404  loss_box_reg: 0.2015  loss_mask: 0.8959  loss_objectness: 0.0227  loss_rpn_box_reg: 0.0038
20/68: lr: 0.0010  loss: 1.1729  loss_classifier: 0.2907  loss_box_reg: 0.2024  loss_mask: 0.6574  loss_objectness: 0.0183  loss_rpn_box_reg: 0.0041
30/68: lr: 0.0010  loss: 0.9884  loss_classifier: 0.2300  loss_box_reg: 0.1703  loss_mask: 0.5661  loss_objectness: 0.0180  loss_rpn_box_reg: 0.0040
40/68: lr: 0.0010  loss: 0.8890  loss_classifier: 0.1931  loss_box_reg: 0.1541  loss_mask: 0.5164  loss_objectness: 0.0203  loss_rpn_box_reg: 0.0052
50/68: lr: 0.0010  loss: 0.8045  loss_classifier: 0.1684  loss_box_reg: 0.1431  loss_mask: 0.4669  loss_objectness: 0.0211  loss_rpn_box_reg: 0.0050
60/68: lr: 0.0010  loss: 0.7298  loss_classifier: 0.1497  loss_box_reg: 0.1325  loss_mask: 0.4227  loss_objectness: 0.0198  loss_rpn_box_reg: 0.0050
68/68: lr: 0.0010  loss: 0.6760  loss_classifier: 0.1370



Number of trainable parameters: 273164
Epoch: [0]
10/68: lr: 0.0010  loss: 2.7607  loss_classifier: 0.4383  loss_box_reg: 0.0154  loss_mask: 1.6039  loss_objectness: 0.6857  loss_rpn_box_reg: 0.0174
20/68: lr: 0.0010  loss: 2.3193  loss_classifier: 0.3076  loss_box_reg: 0.0136  loss_mask: 1.2926  loss_objectness: 0.6858  loss_rpn_box_reg: 0.0197
30/68: lr: 0.0010  loss: 2.0927  loss_classifier: 0.2360  loss_box_reg: 0.0140  loss_mask: 1.1358  loss_objectness: 0.6858  loss_rpn_box_reg: 0.0212
40/68: lr: 0.0010  loss: 1.9471  loss_classifier: 0.1941  loss_box_reg: 0.0133  loss_mask: 1.0322  loss_objectness: 0.6858  loss_rpn_box_reg: 0.0217
50/68: lr: 0.0010  loss: 1.8442  loss_classifier: 0.1671  loss_box_reg: 0.0137  loss_mask: 0.9560  loss_objectness: 0.6858  loss_rpn_box_reg: 0.0216
60/68: lr: 0.0010  loss: 1.7726  loss_classifier: 0.1473  loss_box_reg: 0.0126  loss_mask: 0.9042  loss_objectness: 0.6858  loss_rpn_box_reg: 0.0227
68/68: lr: 0.0010  loss: 1.7366  loss_classifier: 0.1371

  plt.ylim(0, max(metric_data) * 1.2)



Results and visualizations saved to /content/drive/MyDrive/DL-projet/results/instance_segmentation_results


https://arxiv.org/pdf/2104.11892