In [None]:
import os
import random
import numpy as np
import torch
import torchvision
import torchvision.transforms as T
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection import maskrcnn_resnet50_fpn_v2
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import cv2
import matplotlib.pyplot as plt
import torchvision.transforms.functional as F
import albumentations as A
from albumentations.pytorch import ToTensorV2

from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


#### Copying dataset from drive to colab

In [None]:
!rsync -av --progress /content/drive/MyDrive/TACO/data /content/
data_dir = '/content/data'

In [None]:
data_dir = '/content/drive/MyDrive/TACO/data'

In [None]:
def resize_mask(mask, H, W):
    return cv2.resize(mask, (W, H), interpolation=cv2.INTER_NEAREST)

ann_file = os.path.join(data_dir, 'annotations_0_train.json')
coco = COCO(ann_file)

annotations_list = []
for img_id in coco.getImgIds():
    img_info = coco.loadImgs(img_id)[0]
    ann_ids = coco.getAnnIds(imgIds=img_id)
    annotations_list.append({
        'image_id': img_id,
        'file_name': img_info['file_name'],
        'ann_ids': ann_ids,
        'height': img_info['height'],
        'width': img_info['width']
    })

loading annotations into memory...
Done (t=3.05s)
creating index...
index created!


In [None]:
class TACOInstanceSegmentationDataset(Dataset):
    def __init__(self, image_dir, annotation_list, coco, transforms=None):
        self.image_dir = image_dir
        self.annotations = annotation_list
        self.transforms = transforms
        self.coco = coco

    def __getitem__(self, idx):
        ann = self.annotations[idx]
        img_path = os.path.join(self.image_dir, ann['file_name'])
        img = Image.open(img_path).convert("RGB")

        anns = self.coco.loadAnns(ann['ann_ids'])
        boxes, labels, masks = [], [], []
        H, W = ann['height'], ann['width']

        for a in anns:
            x, y, w, h = a['bbox']
            if w < 1 or h < 1:
                continue
            boxes.append([x, y, x + w, y + h])
            labels.append(a['category_id'])
            mask = resize_mask(self.coco.annToMask(a).astype(np.uint8), H, W)
            masks.append(mask)

        if not boxes:
            img = ToTensorV2()(image=img)["image"]
            target = {
                'boxes': torch.zeros((0, 4), dtype=torch.float32),
                'labels': torch.zeros((0,), dtype=torch.int64),
                'masks': torch.zeros((0, H, W), dtype=torch.uint8),
                'image_id': torch.tensor([ann['image_id']])
            }
            return img, target

        masks = np.stack(masks, axis=0) if masks else np.zeros((0, H, W), dtype=np.uint8)
        target = {
            'boxes': torch.as_tensor(boxes, dtype=torch.float32),
            'labels': torch.as_tensor(labels, dtype=torch.int64),
            'masks': torch.as_tensor(masks, dtype=torch.uint8),
            'image_id': torch.tensor([ann['image_id']])
        }
        if self.transforms:
            img = self.transforms(img)

        return img, target

    def __len__(self):
        return len(self.annotations)

In [None]:
!pip install -U albumentations

In [None]:
import albumentations as A

transform = A.Compose([
    A.RandomCrop(width=512, height=512),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.5),
    A.Normalize(),
], bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']),
   mask_params=A.MaskParams())

# transform = T.ToTensor()
dataset = TACOInstanceSegmentationDataset(data_dir, annotations_list, coco, transforms=transform)
data_loader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))


def get_model_instance_segmentation(num_classes):
    model = maskrcnn_resnet50_fpn_v2(weights="DEFAULT")

    in_features_box = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features_box, num_classes)

    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, 256, num_classes)

    return model

from torch.cuda.amp import GradScaler, autocast

def train(model, optimizer, scaler, data_loader, device):
    model.train()
    for images, targets in data_loader:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad()
        with autocast():
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

        scaler.scale(losses).backward()
        scaler.step(optimizer)
        scaler.update()

def evaluate(model, dataset, coco_gt, device, num_images=100):
    model.eval()
    results = []
    image_ids = []

    for idx in range(min(num_images, len(dataset))):
        img, _ = dataset[idx]
        img = img.to(device)
        with torch.no_grad():
            prediction = model([img])[0]

        image_id = int(dataset.annotations[idx]['image_id'])
        image_ids.append(image_id)

        for box, score, label in zip(prediction['boxes'], prediction['scores'], prediction['labels']):
            x1, y1, x2, y2 = box.tolist()
            w, h = x2 - x1, y2 - y1
            results.append({
                'image_id': image_id,
                'category_id': int(label),
                'bbox': [x1, y1, w, h],
                'score': float(score)
            })

    if not results:
        print("No predictions to evaluate.")
        return

    coco_dt = coco_gt.loadRes(results)
    coco_eval = COCOeval(coco_gt, coco_dt, iouType='segm')
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()


def show_prediction(image_tensor, pred, category_id_to_name, score_threshold=0.4):
    image_np = F.to_pil_image(image_tensor.cpu()).convert("RGB")
    image_with_masks = np.array(image_np).copy()

    boxes = pred['boxes'].cpu().numpy()
    scores = pred['scores'].cpu().numpy()
    labels = pred['labels'].cpu().numpy()
    masks = pred['masks'].cpu().numpy()

    fig, axs = plt.subplots(1, 2, figsize=(20, 10))

    axs[0].imshow(image_np)
    axs[0].set_title("Original Image")
    axs[0].axis("off")
    axs[1].imshow(image_np)
    axs[1].set_title("Predictions")
    ax = axs[1]

    for i, score in enumerate(scores):
        if score < score_threshold:
            continue

        x1, y1, x2, y2 = boxes[i]
        label_id = labels[i]
        class_name = category_id_to_name.get(label_id, f"class {label_id}")
        ax.add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1,
                                   fill=False, edgecolor='red', linewidth=2))
        ax.text(x1, y1 - 5, f"{class_name}: {score:.2f}",
                fontsize=12, color='white',
                bbox=dict(facecolor='red', alpha=0.5, edgecolor='none'))

        mask = masks[i, 0]
        masked = np.ma.masked_where(mask < 0.5, mask)
        ax.imshow(masked, alpha=0.4, cmap='jet')

    plt.tight_layout()
    plt.show()

In [None]:
# Training setup
num_classes = 60
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = get_model_instance_segmentation(num_classes).to(device)

# Phase 1: Freeze backbone
for name, param in model.backbone.named_parameters():
    param.requires_grad = False
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.AdamW(params, lr=1e-3, weight_decay=1e-4)
scaler = GradScaler()

for epoch in range(1, 11):
    print(f"Epoch {epoch} (frozen backbone)")
    train(model, optimizer, scaler, data_loader, device)
    if epoch % 5 == 0:
        torch.save(model.state_dict(), f"model_epoch_{epoch}_frozen.pth")

# Phase 2: Fine-tune full model
for param in model.backbone.parameters():
    param.requires_grad = True
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)

for epoch in range(11, 31):
    print(f"Epoch {epoch} (fine-tuning)")
    train(model, optimizer, scaler, data_loader, device)
    if epoch % 5 == 0:
        torch.save(model.state_dict(), f"model_epoch_{epoch}_finetuned.pth")



  scaler = GradScaler()


Epoch 1 (frozen backbone)


  with autocast():


Epoch 2 (frozen backbone)
Epoch 3 (frozen backbone)
Epoch 4 (frozen backbone)
Epoch 5 (frozen backbone)
Epoch 6 (frozen backbone)
Epoch 7 (frozen backbone)
Epoch 8 (frozen backbone)
Epoch 9 (frozen backbone)
Epoch 10 (frozen backbone)
Epoch 11 (fine-tuning)
Epoch 12 (fine-tuning)
Epoch 13 (fine-tuning)
Epoch 14 (fine-tuning)
Epoch 15 (fine-tuning)
Epoch 16 (fine-tuning)
Epoch 17 (fine-tuning)
Epoch 18 (fine-tuning)
Epoch 19 (fine-tuning)
Epoch 20 (fine-tuning)
Epoch 21 (fine-tuning)
Epoch 22 (fine-tuning)
Epoch 23 (fine-tuning)
Epoch 24 (fine-tuning)
Epoch 25 (fine-tuning)
Epoch 26 (fine-tuning)
Epoch 27 (fine-tuning)
Epoch 28 (fine-tuning)
Epoch 29 (fine-tuning)
Epoch 30 (fine-tuning)


In [None]:
torch.save(model.state_dict(), f"model_epoch_{epoch}_finetuned.pth")

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [None]:
num_classes = 60
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = get_model_instance_segmentation(num_classes)
checkpoint_path = "/content/drive/MyDrive/taco_models/30 epoch/model_epoch_30_finetuned.pth"
model.load_state_dict(torch.load(checkpoint_path, map_location=device))
model.to(device)
model.eval()

Downloading: "https://download.pytorch.org/models/maskrcnn_resnet50_fpn_v2_coco-73cbd019.pth" to /root/.cache/torch/hub/checkpoints/maskrcnn_resnet50_fpn_v2_coco-73cbd019.pth
100%|██████████| 177M/177M [00:00<00:00, 204MB/s]


MaskRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
         

In [None]:
from pycocotools.coco import COCO

val_ann_file = os.path.join(data_dir, 'annotations_0_val.json')
coco_val = COCO(val_ann_file)

val_annotations_list = []
for img_id in coco_val.getImgIds():
    img_info = coco_val.loadImgs(img_id)[0]
    ann_ids = coco_val.getAnnIds(imgIds=img_id)
    val_annotations_list.append({
        'image_id': img_id,
        'file_name': img_info['file_name'],
        'ann_ids': ann_ids,
        'height': img_info['height'],
        'width': img_info['width']
    })

val_dataset = TACOInstanceSegmentationDataset(
    image_dir=data_dir,
    annotation_list=val_annotations_list,
    coco=coco_val,
    transforms=transform
)
category_id_to_name = {cat['id']: cat['name'] for cat in coco_val.loadCats(coco_val.getCatIds())}


loading annotations into memory...
Done (t=0.01s)
creating index...
index created!


In [None]:
import random
model.to('cpu')

for i in random.sample(range(len(val_dataset)), 20):
    img, _ = val_dataset[i]
    with torch.no_grad():
        pred = model([img])[0]
    show_prediction(img, pred, category_id_to_name)


In [None]:
import os
import torch
import numpy as np
import cv2
from PIL import Image
import torchvision.transforms.functional as F
import matplotlib.pyplot as plt

def save_image_and_masks_with_overlay(model, image_path, output_dir, name, device='cuda', score_threshold=0.5):
    image_name = os.path.splitext(os.path.basename(image_path))[0]
    image_output_dir = os.path.join(output_dir, name)
    os.makedirs(image_output_dir, exist_ok=True)

    img = Image.open(image_path).convert("RGB")
    img_tensor = F.to_tensor(img).to(device)

    model.eval()
    with torch.no_grad():
        pred = model([img_tensor])[0]

    image_png_path = os.path.join(image_output_dir, f"{name}.png")
    img.save(image_png_path)

    image_np = np.array(img)
    overlay = image_np.copy()

    masks = pred['masks'].cpu().numpy()
    scores = pred['scores'].cpu().numpy()
    labels = pred['labels'].cpu().numpy()

    mask_count = 0
    for i, score in enumerate(scores):
        if score < score_threshold:
            continue

        mask = (masks[i, 0] > 0.5).astype(np.uint8)
        mask_filename = os.path.join(image_output_dir, f"{name}_mask_{mask_count}.png")
        cv2.imwrite(mask_filename, mask * 255)

        color = np.random.randint(0, 255, size=3, dtype=np.uint8)
        colored_mask = np.stack([mask * c for c in color], axis=-1)

        overlay = np.where(mask[:, :, None], 0.5 * overlay + 0.5 * colored_mask, overlay)

        mask_count += 1

    overlay = overlay.astype(np.uint8)
    overlay_path = os.path.join(image_output_dir, f"{name}_overlay.png")
    Image.fromarray(overlay).save(overlay_path)

    print(f"Saved {mask_count} masks, original image, and overlay to: {image_output_dir}")
