In [None]:
!pip install albumentations
!pip install torch torchvision
!pip install albumentations torch torchvision





In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
import os
import json

In [None]:
json_path = '/content/drive/MyDrive/Object Recognition Dataset/instances_val2017.json/instances_val2017.json'
images_dir = '/content/drive/MyDrive/Object Recognition Dataset/val2017'

In [None]:
from pycocotools import mask as coco_mask  # Required for RLE decoding
# Dataset class
class CocoDataset(Dataset):
    def __init__(self, images, annotations, category_mapping, img_dir, transform=None, image_size=(416, 416)):
        self.images = images
        self.annotations = annotations
        self.category_mapping = category_mapping
        self.img_dir = img_dir
        self.transform = transform
        self.image_size = image_size
        self.image_id_to_annotations = self._group_annotations_by_image()

    def _group_annotations_by_image(self):
        image_id_to_annotations = {}
        for ann in self.annotations:
            image_id = ann['image_id']
            if image_id not in image_id_to_annotations:
                image_id_to_annotations[image_id] = []
            image_id_to_annotations[image_id].append(ann)
        return image_id_to_annotations

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image_info = self.images[idx]
        img_path = os.path.join(self.img_dir, image_info['file_name'])
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Get annotations
        image_id = image_info['id']
        annotations = self.image_id_to_annotations.get(image_id, [])
        boxes = []
        labels = []
        masks = []

        # Process masks and boxes
        for ann in annotations:
            if 'segmentation' in ann:
                segmentation = ann['segmentation']
                try:
                    if isinstance(segmentation, dict):  # RLE format
                        if isinstance(segmentation['counts'], list):
                            segmentation = coco_mask.frPyObjects(segmentation, *segmentation['size'])
                        mask = coco_mask.decode(segmentation)
                    elif isinstance(segmentation, list):  # Polygon format
                        mask = np.zeros((image.shape[0], image.shape[1]), dtype=np.uint8)
                        if all(isinstance(poly, list) for poly in segmentation):
                            for poly in segmentation:
                                poly_array = np.array(poly).reshape(-1, 2).astype(np.int32)
                                cv2.fillPoly(mask, [poly_array], 1)
                        else:
                            raise ValueError(f"Invalid polygon format: {segmentation}")
                    else:
                        raise ValueError(f"Unknown segmentation format: {type(segmentation)}")
                    masks.append(mask)
                except Exception as e:
                    print(f"Skipping invalid mask for annotation ID {ann['id']}: {e}")
                    continue

            # Assuming bounding boxes are available in 'bbox' field (COCO format)
            if 'bbox' in ann:
                bbox = ann['bbox']  # COCO bbox format: [x_min, y_min, width, height]
                boxes.append(bbox)
                labels.append(ann['category_id'])  # Category label

        # If there are no boxes, make sure we have empty tensors
        if len(boxes) == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)  # No boxes
            labels = torch.zeros((0,), dtype=torch.int64)  # No labels
        else:
            boxes = torch.tensor(boxes, dtype=torch.float32)
            labels = torch.tensor(labels, dtype=torch.int64)

        # Debugging: print labels to check for invalid ones
        print(f"Labels before transformation: {labels}")

        # Resize the image and masks to the same size
        target_size = (416, 416)  # Change to your desired size
        image = cv2.resize(image, target_size)
        masks = [cv2.resize(mask, target_size, interpolation=cv2.INTER_NEAREST) for mask in masks]

        # Debug: check image and mask sizes
        print(f"Image shape after resize: {image.shape}")
        print(f"Mask shapes after resize: {[mask.shape for mask in masks]}")

        # Convert labels to NumPy arrays if required by albumentations
        labels = labels.numpy()  # Convert to NumPy array
        masks = np.array(masks)  # Convert masks to NumPy array

        # Apply Albumentations transformations
        if self.transform:
            transformed = self.transform(image=image, masks=masks, bboxes=boxes, labels=labels)
            image = transformed['image']
            boxes = transformed['bboxes']
            labels = transformed['labels']
            masks = transformed['masks']

        # Convert masks back to tensor
        if len(masks) > 0:
            masks = torch.as_tensor(np.stack(masks, axis=0), dtype=torch.uint8)  # Shape: (num_masks, height, width)
        else:
            masks = torch.zeros((0, *target_size), dtype=torch.uint8)  # Empty mask tensor

        target = {"boxes": boxes, "labels": labels, "masks": masks}

        return image, target








In [None]:
import torchvision


In [None]:
transform = A.Compose(
    [
        A.Resize(416, 416),  # Ensures all images and masks are resized consistently
        A.RandomBrightnessContrast(p=0.2),
        A.GaussianBlur(p=0.2),
        A.HorizontalFlip(p=0.5),
        A.Rotate(limit=20, p=0.5),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        ToTensorV2()
    ],
    bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']),
    additional_targets={'masks': 'masks'},
    #is_check_shapes=False  # Disable shape check for edge cases
)



# Load COCO annotations
annotations_file = '/content/drive/MyDrive/Object Recognition Dataset/instances_val2017.json/instances_val2017.json'
img_dir = '/content/drive/MyDrive/Object Recognition Dataset/val2017'

with open(annotations_file, 'r') as f:
    coco_data = json.load(f)

images = coco_data['images']
annotations = coco_data['annotations']
categories = coco_data['categories']
category_mapping = {category['id']: category['name'] for category in categories}

# Initialize dataset
dataset = CocoDataset(images=images, annotations=annotations, category_mapping=category_mapping, img_dir=img_dir, transform=transform)

# Define DataLoader
data_loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))


In [None]:
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
# Mask R-CNN Model
def create_custom_mask_rcnn(num_classes):
    weights = torchvision.models.detection.MaskRCNN_ResNet50_FPN_Weights.DEFAULT
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(weights=weights)

    # Update box predictor
    in_features_box = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = MaskRCNNPredictor(
        in_features_box, 256, num_classes
    )

    # Update mask predictor
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    model.roi_heads.mask_predictor = MaskRCNNPredictor(
        in_features_mask, hidden_layer, num_classes
    )

    return model


# Move model to device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
num_classes = 91  # Number of classes (including background)
model = create_custom_mask_rcnn(num_classes)
model = model.to(device)

In [None]:
# Optimizer and Scheduler
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(params, lr=1e-4)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training Loop
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, targets in data_loader:
        images = [image.to(device) for image in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad()
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()
        optimizer.step()

        running_loss += losses.item()

    lr_scheduler.step()
    epoch_loss = running_loss / len(data_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}")

    # Save model checkpoint
    torch.save(model.state_dict(), f"mask_rcnn_epoch_{epoch+1}.pth")

print("Training completed!")

In [None]:
# Evaluation Mode
model.eval()
test_image, test_target = dataset[0]  # Get one sample
test_image = test_image.to(device).unsqueeze(0)

# Prediction
with torch.no_grad():
    predictions = model(test_image)

# Visualization
pred_image = test_image[0].permute(1, 2, 0).cpu().numpy()
pred_image = (pred_image * 255).astype(np.uint8)

for box, label, mask in zip(predictions[0]['boxes'], predictions[0]['labels'], predictions[0]['masks']):
    x1, y1, x2, y2 = map(int, box)
    color = (0, 255, 0)
    cv2.rectangle(pred_image, (x1, y1), (x2, y2), color, 2)
    text = category_mapping.get(label.item(), "Unknown")
    cv2.putText(pred_image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
    mask = mask[0].cpu().numpy() > 0.5
    pred_image[mask] = pred_image[mask] * 0.5 + np.array([0, 255, 0], dtype=np.uint8) * 0.5

plt.figure(figsize=(12, 8))
plt.imshow(pred_image)
plt.axis("off")
plt.show()
