# Setup Environment and Dependencies
Import required libraries including PyTorch, torchvision, and other dependencies. Set up CUDA if available.

In [None]:
# Import required libraries
import torch
import torchvision
from torchvision.models.detection import maskrcnn_resnet50_fpn
from torchvision.transforms import functional as F

# Check if CUDA is available and set device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Print the device being used
print(f'Using device: {device}')

# Load and Preprocess Dataset
Load the dataset, create data loaders, and implement data augmentation techniques. Convert annotations to the required format.

In [None]:
# Import additional required libraries
import os
import numpy as np
from PIL import Image
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T

# Define a custom dataset class
class CustomDataset(Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.imgs = list(sorted(os.listdir(os.path.join(root, "images"))))
        self.masks = list(sorted(os.listdir(os.path.join(root, "masks"))))

    def __getitem__(self, idx):
        # Load images and masks
        img_path = os.path.join(self.root, "images", self.imgs[idx])
        mask_path = os.path.join(self.root, "masks", self.masks[idx])
        img = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path)

        # Convert the mask to a numpy array
        mask = np.array(mask)
        obj_ids = np.unique(mask)
        obj_ids = obj_ids[1:]  # Remove background

        # Split the color-encoded mask into a set of binary masks
        masks = mask == obj_ids[:, None, None]

        # Get bounding box coordinates for each mask
        num_objs = len(obj_ids)
        boxes = []
        for i in range(num_objs):
            pos = np.where(masks[i])
            xmin = np.min(pos[1])
            xmax = np.max(pos[1])
            ymin = np.min(pos[0])
            ymax = np.max(pos[0])
            boxes.append([xmin, ymin, xmax, ymax])

        # Convert everything to a torch tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.ones((num_objs,), dtype=torch.int64)  # All objects are labeled as 1
        masks = torch.as_tensor(masks, dtype=torch.uint8)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transforms is not None:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        return len(self.imgs)

# Define transformations for data augmentation
def get_transform(train):
    transforms = []
    transforms.append(T.ToTensor())
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

# Load the dataset
dataset = CustomDataset('path/to/dataset', get_transform(train=True))
dataset_test = CustomDataset('path/to/dataset', get_transform(train=False))

# Split the dataset into training and validation sets
indices = torch.randperm(len(dataset)).tolist()
dataset = torch.utils.data.Subset(dataset, indices[:-50])
dataset_test = torch.utils.data.Subset(dataset_test, indices[-50:])

# Define data loaders
data_loader = DataLoader(
    dataset, batch_size=2, shuffle=True, num_workers=4,
    collate_fn=lambda x: tuple(zip(*x)))

data_loader_test = DataLoader(
    dataset_test, batch_size=1, shuffle=False, num_workers=4,
    collate_fn=lambda x: tuple(zip(*x)))

# Configure Model Parameters
Set up configuration for Cascade Mask R-CNN including backbone architecture, learning rates, anchor sizes, and other hyperparameters.

In [None]:
# Configure Model Parameters

# Define the model configuration
model = maskrcnn_resnet50_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features
num_classes = 2  # 1 class (person) + background

# Replace the box predictor with a new one for our dataset
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)

# Replace the mask predictor with a new one for our dataset
in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
hidden_layer = 256
model.roi_heads.mask_predictor = torchvision.models.detection.mask_rcnn.MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)

# Move the model to the appropriate device
model.to(device)

# Define training parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Print model configuration
print(model)

# Build Cascade Mask R-CNN Model
Initialize the Cascade Mask R-CNN model with the configured parameters. Set up multiple detection heads for cascade architecture.

In [None]:
# Build Cascade Mask R-CNN Model

# Import necessary libraries for Cascade Mask R-CNN
from torchvision.models.detection import CascadeRCNN

# Initialize the Cascade Mask R-CNN model
model = CascadeRCNN(
    backbone=torchvision.models.resnet50(pretrained=True),
    num_classes=num_classes,
    rpn_anchor_generator=model.rpn.anchor_generator,
    box_roi_pool=model.roi_heads.box_roi_pool,
    mask_roi_pool=model.roi_heads.mask_roi_pool,
    box_head=model.roi_heads.box_head,
    mask_head=model.roi_heads.mask_head,
    box_predictor=model.roi_heads.box_predictor,
    mask_predictor=model.roi_heads.mask_predictor,
)

# Set up multiple detection heads for cascade architecture
model.roi_heads.box_predictor = torchvision.models.detection.cascade_rcnn.CascadeRCNNPredictor(
    in_channels=in_features,
    num_classes=num_classes,
    num_stages=3,  # Number of stages in the cascade
)

# Move the model to the appropriate device
model.to(device)

# Print the model to verify the architecture
print(model)

# Training Pipeline
Implement training loop with learning rate scheduling, loss computation, and model checkpointing. Monitor training metrics.

In [None]:
# Training Pipeline

# Define the training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    i = 0
    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward pass
        loss_dict = model(images, targets)

        # Compute total loss
        losses = sum(loss for loss in loss_dict.values())

        # Backward pass and optimization
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        # Print training metrics
        if i % 10 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i}/{len(data_loader)}], Loss: {losses.item():.4f}")
        i += 1

    # Update the learning rate
    lr_scheduler.step()

    # Save the model checkpoint
    torch.save(model.state_dict(), f"model_epoch_{epoch+1}.pth")

    # Evaluate the model on the validation set
    model.eval()
    with torch.no_grad():
        for images, targets in data_loader_test:
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Forward pass
            loss_dict = model(images, targets)

            # Compute total loss
            losses = sum(loss for loss in loss_dict.values())

            # Print validation metrics
            print(f"Validation Loss: {losses.item():.4f}")

# Model Evaluation
Evaluate model performance using metrics like mAP, precision, recall on validation set. Generate performance reports.

In [None]:
# Model Evaluation

# Import necessary libraries for evaluation
from torchvision.ops import box_iou
from sklearn.metrics import precision_recall_fscore_support

# Function to calculate mean Average Precision (mAP)
def calculate_map(pred_boxes, pred_labels, pred_scores, true_boxes, true_labels, iou_threshold=0.5):
    all_ap = []
    for label in set(true_labels):
        true_label_mask = true_labels == label
        pred_label_mask = pred_labels == label

        true_boxes_label = true_boxes[true_label_mask]
        pred_boxes_label = pred_boxes[pred_label_mask]
        pred_scores_label = pred_scores[pred_label_mask]

        if len(pred_boxes_label) == 0:
            continue

        ious = box_iou(pred_boxes_label, true_boxes_label)
        max_iou, max_iou_idx = ious.max(dim=1)

        tp = (max_iou >= iou_threshold).sum().item()
        fp = (max_iou < iou_threshold).sum().item()
        fn = len(true_boxes_label) - tp

        precision = tp / (tp + fp) if tp + fp > 0 else 0
        recall = tp / (tp + fn) if tp + fn > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0

        all_ap.append(f1)

    return sum(all_ap) / len(all_ap) if len(all_ap) > 0 else 0

# Evaluate the model on the validation set
model.eval()
all_pred_boxes = []
all_pred_labels = []
all_pred_scores = []
all_true_boxes = []
all_true_labels = []

with torch.no_grad():
    for images, targets in data_loader_test:
        images = list(image.to(device) for image in images)
        outputs = model(images)

        for output, target in zip(outputs, targets):
            all_pred_boxes.append(output['boxes'].cpu())
            all_pred_labels.append(output['labels'].cpu())
            all_pred_scores.append(output['scores'].cpu())
            all_true_boxes.append(target['boxes'].cpu())
            all_true_labels.append(target['labels'].cpu())

# Concatenate all predictions and ground truths
all_pred_boxes = torch.cat(all_pred_boxes)
all_pred_labels = torch.cat(all_pred_labels)
all_pred_scores = torch.cat(all_pred_scores)
all_true_boxes = torch.cat(all_true_boxes)
all_true_labels = torch.cat(all_true_labels)

# Calculate mAP
map_score = calculate_map(all_pred_boxes, all_pred_labels, all_pred_scores, all_true_boxes, all_true_labels)
print(f"mAP: {map_score:.4f}")

# Calculate precision, recall, and F1-score
precision, recall, f1, _ = precision_recall_fscore_support(all_true_labels, all_pred_labels, average='weighted')
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")

# Generate performance report
performance_report = {
    "mAP": map_score,
    "Precision": precision,
    "Recall": recall,
    "F1-score": f1
}

print("Performance Report:", performance_report)

# Inference and Visualization
Run inference on test images, visualize detection results, and create visualization utilities for masks and bounding boxes.

In [None]:
# Inference and Visualization

# Import necessary libraries for visualization
import matplotlib.pyplot as plt
import random

# Function to visualize bounding boxes and masks
def visualize_predictions(image, boxes, masks, labels, scores, threshold=0.5):
    plt.figure(figsize=(10, 10))
    plt.imshow(image.permute(1, 2, 0).cpu().numpy())
    ax = plt.gca()

    for box, mask, label, score in zip(boxes, masks, labels, scores):
        if score < threshold:
            continue

        # Draw bounding box
        xmin, ymin, xmax, ymax = box
        rect = plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin, fill=False, color='red', linewidth=2)
        ax.add_patch(rect)

        # Draw mask
        mask = mask.cpu().numpy()
        mask = mask > 0.5
        image[mask] = image[mask] * 0.5 + torch.tensor([1, 0, 0], dtype=torch.float32).to(device) * 0.5

    plt.axis('off')
    plt.show()

# Run inference on test images and visualize results
model.eval()
with torch.no_grad():
    for images, targets in random.sample(list(data_loader_test), 5):  # Visualize 5 random test images
        images = list(image.to(device) for image in images)
        outputs = model(images)

        for image, output in zip(images, outputs):
            visualize_predictions(image, output['boxes'], output['masks'], output['labels'], output['scores'])