In [2]:
pip install -U albumentations



In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision.ops import MultiScaleRoIAlign, RoIPool
from torch.utils.data import DataLoader
import numpy as np
import os
import subprocess
import albumentations as A
from albumentations.pytorch import ToTensorV2

In [3]:
# Download training images
!wget http://images.cocodataset.org/zips/train2017.zip

# Download validation images
!wget http://images.cocodataset.org/zips/val2017.zip

# Download COCO annotations
!wget http://images.cocodataset.org/annotations/annotations_trainval2017.zip

--2025-03-16 04:18:30--  http://images.cocodataset.org/zips/train2017.zip
Resolving images.cocodataset.org (images.cocodataset.org)... 16.15.177.240, 16.182.42.97, 3.5.13.54, ...
Connecting to images.cocodataset.org (images.cocodataset.org)|16.15.177.240|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 19336861798 (18G) [application/zip]
Saving to: ‘train2017.zip’


2025-03-16 04:41:44 (13.2 MB/s) - ‘train2017.zip’ saved [19336861798/19336861798]

--2025-03-16 04:41:44--  http://images.cocodataset.org/zips/val2017.zip
Resolving images.cocodataset.org (images.cocodataset.org)... 3.5.29.58, 16.182.38.121, 54.231.131.241, ...
Connecting to images.cocodataset.org (images.cocodataset.org)|3.5.29.58|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 815585330 (778M) [application/zip]
Saving to: ‘val2017.zip’


2025-03-16 04:42:31 (16.6 MB/s) - ‘val2017.zip’ saved [815585330/815585330]

--2025-03-16 04:42:31--  http://images.cocodataset.org/annota

In [4]:
!unzip -o train2017.zip
!unzip -o val2017.zip
!unzip -o annotations_trainval2017.zip

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
 extracting: val2017/000000365745.jpg  
 extracting: val2017/000000320425.jpg  
 extracting: val2017/000000481404.jpg  
 extracting: val2017/000000314294.jpg  
 extracting: val2017/000000335328.jpg  
 extracting: val2017/000000513688.jpg  
 extracting: val2017/000000158548.jpg  
 extracting: val2017/000000132116.jpg  
 extracting: val2017/000000415238.jpg  
 extracting: val2017/000000321333.jpg  
 extracting: val2017/000000081738.jpg  
 extracting: val2017/000000577584.jpg  
 extracting: val2017/000000346905.jpg  
 extracting: val2017/000000433980.jpg  
 extracting: val2017/000000228144.jpg  
 extracting: val2017/000000041872.jpg  
 extracting: val2017/000000117492.jpg  
 extracting: val2017/000000368900.jpg  
 extracting: val2017/000000376900.jpg  
 extracting: val2017/000000352491.jpg  
 extracting: val2017/000000330790.jpg  
 extracting: val2017/000000384850.jpg  
 extracting: val2017/000000032735.jpg  
 extracting: va

In [5]:
# Custom Region Proposal Network (RPN)
class RPN(nn.Module):
    def __init__(self, in_channels, num_anchors):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, 512, kernel_size=3, stride=1, padding=1)
        self.cls_logits = nn.Conv2d(512, num_anchors, kernel_size=1)
        self.bbox_pred = nn.Conv2d(512, num_anchors * 4, kernel_size=1)

    def forward(self, x):
        x = F.relu(self.conv(x))
        logits = self.cls_logits(x)
        bbox_pred = self.bbox_pred(x)
        return logits, bbox_pred

In [6]:
class BoxHead(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.fc1 = nn.Linear(in_channels, 1024)  # ✅ Input should be 12544
        self.fc2 = nn.Linear(1024, 1024)
        self.cls_score = nn.Linear(1024, num_classes)  # ✅ Class logits
        self.bbox_pred = nn.Linear(1024, num_classes * 4)  # ✅ Bounding box regression

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        cls_logits = self.cls_score(x)  # ✅ Class Scores
        bbox_preds = self.bbox_pred(x)  # ✅ Box Predictions
        return cls_logits, bbox_preds


In [7]:
# Custom Mask Head (Fully Convolutional Layers)
class MaskHead(nn.Module):
    def __init__(self, in_channels, hidden_dim=256, num_classes=91):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, hidden_dim, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(hidden_dim, hidden_dim, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(hidden_dim, num_classes, kernel_size=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.conv5(x)
        return x


In [8]:
import torchvision.models.detection.backbone_utils as backbone_utils
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone

def get_resnet_backbone():
    backbone = resnet_fpn_backbone("resnet50", weights="DEFAULT")  # ✅ Use ResNet50 with FPN
    return backbone


In [9]:
class MaskRCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.num_classes = num_classes
        self.backbone = get_resnet_backbone()  # ✅ Use Pretrained ResNet
        self.rpn = RPN(2048, num_anchors=9)
        self.roi_pool = MultiScaleRoIAlign(featmap_names=["0", "1", "2", "3"], output_size=7, sampling_ratio=2)
        self.box_head = BoxHead(7 * 7 * 256, num_classes)  # ✅ Bounding Box Head
        self.mask_head = MaskHead(256, num_classes=num_classes)  # ✅ Mask Head

    def forward(self, images, proposals=None):
        features = self.backbone(images)  # ✅ Extract features

        # ✅ Fix `feature_dict` to correctly handle different backbone outputs
        if isinstance(features, dict):
            feature_dict = features  # ✅ Already a dictionary (FPN case)
        elif isinstance(features, list):
            feature_dict = {str(i): feat for i, feat in enumerate(features)}  # ✅ Convert list to dict
        else:
            feature_dict = {"0": features}  # ✅ Single tensor case (No FPN)

        # ✅ Generate proposals if not provided
        if proposals is None:
            rpn_logits, rpn_bbox = self.rpn(features)
            proposals = rpn_bbox

        # ✅ Fix `image_shapes` format
        image_shapes = [(images.shape[-2], images.shape[-1])] * images.shape[0]

        # ✅ Convert `proposals` into List[Tensor]
        proposals_per_image = []
        batch_size = images.shape[0]

        for i in range(batch_size):
            mask = proposals[:, 0] == i  # ✅ Select proposals for image `i`
            if mask.sum() == 0:  # ✅ Handle empty proposals
                proposals_per_image.append(torch.empty((0, 4), dtype=proposals.dtype, device=proposals.device))
            else:
                proposals_per_image.append(proposals[mask][:, 1:])  # ✅ Remove batch index column

        # ✅ Apply RoI Align
        pooled_regions = self.roi_pool(feature_dict, proposals_per_image, image_shapes)

        # ✅ Ensure `pooled_regions` is not empty
        if pooled_regions.numel() == 0:
            raise ValueError("❌ RoIAlign produced empty regions, check proposals input.")

        # ✅ Ensure correct shape before passing to BoxHead
        if pooled_regions.dim() != 4:
            raise ValueError(f"❌ Expected 4D tensor (N, C, 7, 7), got {pooled_regions.shape}")

        # ✅ Get feature map size
        C, H, W = pooled_regions.shape[1], pooled_regions.shape[2], pooled_regions.shape[3]
        expected_size = C * H * W  # Should be 256 * 7 * 7 = 12544

        # ✅ Ensure correct reshaping
        if expected_size != 12544:
            raise ValueError(f"❌ Feature map has incorrect channels: {C}. Expected 256.")

        flattened_pooled = pooled_regions.view(pooled_regions.size(0), expected_size)

        # ✅ Debugging Print (Optional)
        if batch_idx == 0:  # ✅ Print only for the first batch of each epoch
           print(f"✅ Reshaped pooled_regions: Expected {expected_size}, Got {flattened_pooled.shape[1]}")


        # ✅ Compute Class & Box Predictions
        class_logits, bbox_deltas = self.box_head(flattened_pooled)

        # ✅ Compute Mask Predictions
        masks = self.mask_head(pooled_regions)

        return class_logits, bbox_deltas, masks


In [10]:
import os

train_images_path = "train2017"
annotations_path = "annotations/instances_train2017.json"

# Check if the image directory exists
print("Train Images Exist:", os.path.exists(train_images_path))

# Check if the annotations file exists
print("Annotations Exist:", os.path.exists(annotations_path))


Train Images Exist: True
Annotations Exist: True


In [11]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.ops import RoIAlign
from torch.utils.data import DataLoader, Dataset
import numpy as np
import cv2
import os
import json
import time
from PIL import Image
from pycocotools.coco import COCO

# ✅ Set Device (GPU or CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using Device: {device}")

# ✅ COCO Dataset Class (Optimized)
class COCODataset(Dataset):
    def __init__(self, image_dir, annotation_file, transform=None):
        self.image_dir = image_dir
        self.coco = COCO(annotation_file)
        self.img_ids = list(self.coco.imgs.keys())[:50000]  # ✅ Use only 20,000 images
        self.transform = transform
        self.img_size = (128, 128)  # ✅ Resize to 128x128 for faster training

    def __len__(self):
        return len(self.img_ids)

    def __getitem__(self, idx):
        img_id = self.img_ids[idx]
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)

        img_info = self.coco.loadImgs(img_id)[0]
        img_path = os.path.join(self.image_dir, img_info['file_name'])
        image = Image.open(img_path).convert("RGB")

        # ✅ Resize image
        image = image.resize(self.img_size, Image.BILINEAR)

        # Get bounding boxes & masks
        boxes, masks, labels = [], [], []

        for ann in anns:
            x, y, w, h = ann['bbox']
            x1, y1, x2, y2 = x, y, x + w, y + h
            boxes.append([x1, y1, x2, y2])

            mask = self.coco.annToMask(ann)
            mask = cv2.resize(mask, self.img_size)  # Resize mask
            masks.append(mask)

            labels.append(ann['category_id'])

        # ✅ Handle Empty Annotations
        if len(boxes) == 0 or len(labels) == 0:
            return self.__getitem__((idx + 1) % len(self.img_ids))  # Skip empty images

        # ✅ Convert to tensors
        boxes = torch.tensor(boxes, dtype=torch.float32).reshape(-1, 4)  # Shape (N, 4)
        masks = torch.tensor(np.array(masks), dtype=torch.uint8).unsqueeze(1)  # Shape (N, 1, H, W)
        labels = torch.tensor(labels, dtype=torch.int64).flatten()  # Shape (N,)

        if self.transform:
            image = self.transform(image)

        return image, boxes, masks, labels

# ✅ Transforms
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images
    transforms.RandomHorizontalFlip(p=0.5),  # ✅ Flip images randomly
    transforms.RandomRotation(degrees=10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),  # ✅ Improve feature learning
    transforms.GaussianBlur(kernel_size=(3, 3), sigma=(0.1, 2.0)),  # ✅ Reduce noise
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ✅ Normalize using ImageNet stats
])

# ✅ Collate Function to Handle Variable-Sized Batches
def collate_fn(batch):
    images, boxes, masks, labels = zip(*batch)
    images = torch.stack(images, dim=0)  # Stack images into a single tensor
    return images, boxes, masks, labels  # Keep boxes/masks/lables as lists

# ✅ Create Dataset & DataLoader (Optimized)
train_dataset = COCODataset("train2017", "annotations/instances_train2017.json", transform)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn, num_workers=4)

# ✅ Check Dataset
print(f"Total images in dataset: {len(train_dataset)}")
for images, boxes, masks, labels in train_loader:
    print(f"Batch size: {len(images)}")
    print(f"Image shape: {images[0].shape}")
    print(f"Boxes shape: {boxes[0].shape}")
    print(f"Masks shape: {masks[0].shape}")
    print(f"Labels: {labels[0]}")
    break  # ✅ Stop after first batch check


Using Device: cuda
loading annotations into memory...
Done (t=15.30s)
creating index...
index created!
Total images in dataset: 50000
Batch size: 16
Image shape: torch.Size([3, 128, 128])
Boxes shape: torch.Size([7, 4])
Masks shape: torch.Size([7, 1, 128, 128])
Labels: tensor([77, 77, 77, 77, 62,  1,  1])


In [12]:
import torch
print(f"CUDA Available: {torch.cuda.is_available()}")
print(f"Device: {torch.device('cuda' if torch.cuda.is_available() else 'cpu')}")


CUDA Available: True
Device: cuda


In [13]:
import time

start = time.time()
data = next(iter(train_loader))  # Load one batch
end = time.time()

print(f"⏳ Time to load one batch: {end - start:.4f} seconds")


⏳ Time to load one batch: 0.8592 seconds


In [None]:


import time
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.cuda.amp import autocast, GradScaler
from torchvision import transforms

# ✅ Data Augmentation
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.05),
    transforms.RandomPerspective(distortion_scale=0.2, p=0.5),  # Improves robustness
    transforms.RandomAffine(degrees=15, scale=(0.8, 1.2), shear=10),
    transforms.RandomErasing(p=0.3, scale=(0.02, 0.2)),  # Simulates occlusion
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


# ✅ Load Model & Optimizer
device = "cuda" if torch.cuda.is_available() else "cpu"
model = MaskRCNN(num_classes=91).to(device)
optimizer = optim.AdamW(model.parameters(), lr=0.0005, weight_decay=0.0001)  # AdamW is a good optimizer
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

# ✅ Loss Functions
class FocalLoss(nn.Module):
    def __init__(self, alpha=0.25, gamma=2.0):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.ce_loss = nn.CrossEntropyLoss(reduction="none")

    def forward(self, logits, targets):
        ce_loss = self.ce_loss(logits, targets)
        pt = torch.exp(-ce_loss)  # Probability of the correct class
        focal_loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
        return focal_loss.mean()

loss_fn_cls = FocalLoss(alpha=0.25, gamma=2.0)

loss_fn_bbox = nn.SmoothL1Loss()
loss_fn_mask = nn.BCEWithLogitsLoss()

scaler = GradScaler()

# ✅ Training Loop
epochs = 85
accumulate_steps = 8

for epoch in range(epochs):
    model.train()
    start_time = time.time()
    total_loss, correct_preds, total_preds = 0, 0, 0

    for batch_idx, (images, boxes, masks, labels) in enumerate(train_loader):
        images = torch.stack([img.to(device) for img in images])
        boxes = [b.to(device) for b in boxes]
        masks = [m.to(device) for m in masks]
        labels = [l.to(device) for l in labels]

        # ✅ Skip Empty Labels to Prevent Errors
        if len(labels) == 0 or len(boxes) == 0:
            print(f"⚠️ Skipping batch {batch_idx} due to empty labels/boxes")
            continue

        # ✅ Normalize & Clamp Bounding Boxes
        for i in range(len(boxes)):
            image_w, image_h = images.shape[3], images.shape[2]
            boxes[i] /= torch.tensor([image_w, image_h, image_w, image_h], device=device)
            boxes[i] = torch.clamp(boxes[i], min=0, max=1)

        num_proposals = min(32, max(len(b) for b in boxes))
        proposals, proposal_labels, proposal_boxes = [], [], []

        for i, (b, l) in enumerate(zip(boxes, labels)):
            if len(b) == 0:
                continue
            b_idx = torch.full((b.shape[0], 1), i, dtype=torch.float32, device=device)
            formatted_boxes = torch.cat((b_idx, b[:, :4]), dim=1)
            proposals.append(formatted_boxes[:num_proposals])
            proposal_labels.append(l[:num_proposals])
            proposal_boxes.append(b[:num_proposals])

        if len(proposals) > 0:
            proposals = torch.cat(proposals, dim=0).to(device)
            proposal_labels = torch.cat(proposal_labels, dim=0).to(device)
            proposal_boxes = torch.cat(proposal_boxes, dim=0).to(device)
        else:
            proposals = torch.zeros((1, 5), dtype=torch.float32, device=device)
            proposal_labels = torch.zeros((1,), dtype=torch.int64, device=device)
            proposal_boxes = torch.zeros((1, 4), dtype=torch.float32, device=device)

        # ✅ Forward Pass with Mixed Precision
        with autocast():
            class_logits, bbox_deltas, mask_logits = model(images, proposals)

        # ✅ Compute Losses
        loss_cls = loss_fn_cls(class_logits, proposal_labels)
        selected_bbox_deltas = bbox_deltas.view(-1, 91, 4)
        predicted_classes = torch.argmax(class_logits, dim=1)
        selected_bbox_deltas = selected_bbox_deltas[torch.arange(selected_bbox_deltas.size(0)), predicted_classes]
        loss_bbox = loss_fn_bbox(selected_bbox_deltas, proposal_boxes)

        resized_masks = F.interpolate(masks[0].float(), size=mask_logits.shape[-2:], mode="bilinear", align_corners=False)

        # ✅ Ensure masks match mask_logits channels
        if resized_masks.shape[1] == 1 and mask_logits.shape[1] > 1:
            resized_masks = resized_masks.repeat(1, mask_logits.shape[1], 1, 1)
        elif resized_masks.shape[1] > 1 and mask_logits.shape[1] == 1:
            resized_masks = resized_masks[:, :1, :, :]

        # ✅ Ensure batch sizes match
        if resized_masks.shape[0] < mask_logits.shape[0]:
            pad_size = mask_logits.shape[0] - resized_masks.shape[0]
            padding = torch.zeros((pad_size, mask_logits.shape[1], *mask_logits.shape[-2:]), dtype=resized_masks.dtype, device=resized_masks.device)
            resized_masks = torch.cat([resized_masks, padding], dim=0)
        elif resized_masks.shape[0] > mask_logits.shape[0]:
            resized_masks = resized_masks[:mask_logits.shape[0]]

        loss_mask = loss_fn_mask(mask_logits, resized_masks)

        # ✅ Total Loss (Normalized)
        loss = (loss_cls + loss_bbox + loss_mask) / len(images)

        # ✅ Convert NaN Losses to Zero & Skip Unstable Batches
        loss = torch.nan_to_num(loss, nan=0.0, posinf=0.0, neginf=0.0)
        if loss == 0 or torch.isnan(loss) or torch.isinf(loss):
            print(f"⚠️ Skipping batch {batch_idx} due to unstable loss")
            optimizer.zero_grad()
            continue

        # ✅ Gradient Accumulation with Clipping
        scaler.scale(loss).backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Clip gradients
        if (batch_idx + 1) % accumulate_steps == 0:
            scaler.step(optimizer)
            scaler.update()
            optimizer.zero_grad()

        total_loss += loss.item()

        preds = torch.argmax(class_logits, dim=1)
        labels_flat = torch.cat(proposal_labels, dim=0) if isinstance(proposal_labels, list) else proposal_labels

        # ✅ Track Accuracy
        min_size = min(labels_flat.shape[0], preds.shape[0])
        correct_preds += (preds[:min_size] == labels_flat[:min_size]).sum().item()
        total_preds += min_size

        if batch_idx % 100 == 0:
            print(f"🟢 Epoch {epoch+1}, Batch {batch_idx}/{len(train_loader)}: Loss {loss.item():.4f}")

    # ✅ Compute Training Accuracy
    scheduler.step()
    train_accuracy = correct_preds / total_preds if total_preds > 0 else 0.0
    epoch_time = time.time() - start_time

    print(f"✅ Epoch {epoch+1} Done: Loss {total_loss:.4f}, Train Acc: {train_accuracy:.4f}, Time: {epoch_time:.2f}s")

    # ✅ Print Learning Rate
    current_lr = optimizer.param_groups[0]['lr']
    print(f"📉 Learning Rate after Epoch {epoch+1}: {current_lr:.6f}")


  scaler = GradScaler()
  with autocast():


✅ Reshaped pooled_regions: Expected 12544, Got 12544
🟢 Epoch 1, Batch 0/3125: Loss 0.1416
🟢 Epoch 1, Batch 100/3125: Loss 0.1059
🟢 Epoch 1, Batch 200/3125: Loss 0.0691
🟢 Epoch 1, Batch 300/3125: Loss 0.0522
🟢 Epoch 1, Batch 400/3125: Loss 0.0574
🟢 Epoch 1, Batch 500/3125: Loss 0.0589
🟢 Epoch 1, Batch 600/3125: Loss 0.0641
🟢 Epoch 1, Batch 700/3125: Loss 0.0542
🟢 Epoch 1, Batch 800/3125: Loss 0.0555
🟢 Epoch 1, Batch 900/3125: Loss 0.0500
🟢 Epoch 1, Batch 1000/3125: Loss 0.0606
🟢 Epoch 1, Batch 1100/3125: Loss 0.0464
🟢 Epoch 1, Batch 1200/3125: Loss 0.0497
🟢 Epoch 1, Batch 1300/3125: Loss 0.0481
🟢 Epoch 1, Batch 1400/3125: Loss 0.0527
🟢 Epoch 1, Batch 1500/3125: Loss 0.0531
🟢 Epoch 1, Batch 1600/3125: Loss 0.0577
🟢 Epoch 1, Batch 1700/3125: Loss 0.0455
🟢 Epoch 1, Batch 1800/3125: Loss 0.0562
🟢 Epoch 1, Batch 1900/3125: Loss 0.0537
🟢 Epoch 1, Batch 2000/3125: Loss 0.0545
🟢 Epoch 1, Batch 2100/3125: Loss 0.0538
🟢 Epoch 1, Batch 2200/3125: Loss 0.0470
🟢 Epoch 1, Batch 2300/3125: Loss 0.0465

In [15]:
# ✅ Save trained model
model_save_path = "mask_rcnn_trained.pth"
torch.save(model.state_dict(), model_save_path)
print(f"✅ Model saved as {model_save_path}")

NameError: name 'model' is not defined

In [14]:
from google.colab import drive
drive.mount('/content/drive')

save_path = "/content/drive/MyDrive/mask_rcnn_trained.pth"  # Change path if needed
torch.save(model.state_dict(), save_path)

print(f"✅ Model saved to Google Drive at {save_path}")


Mounted at /content/drive


NameError: name 'model' is not defined

In [None]:
import torch

# ✅ Define the new save path
save_path = "/content/drive/MyDrive/mask_rcnn_trained_v2.pth"  # Different name

# ✅ Save the entire model (architecture + weights)
torch.save(model, save_path)

print(f"✅ Full model saved successfully at: {save_path}")


PicklingError: Can't pickle <class '__main__.RPN'>: it's not the same object as __main__.RPN

  scaler = GradScaler()
  with autocast():


✅ Reshaped pooled_regions: Expected 12544, Got 12544
🟢 Epoch 1, Batch 0/3125: Loss 0.3478
🟢 Epoch 1, Batch 100/3125: Loss 0.2745
⚠️ Skipping batch 144 due to unstable loss
⚠️ Skipping batch 171 due to unstable loss
🟢 Epoch 1, Batch 200/3125: Loss 0.2453
⚠️ Skipping batch 217 due to unstable loss
⚠️ Skipping batch 247 due to unstable loss
⚠️ Skipping batch 252 due to unstable loss
⚠️ Skipping batch 288 due to unstable loss
🟢 Epoch 1, Batch 300/3125: Loss 0.2288
⚠️ Skipping batch 319 due to unstable loss
⚠️ Skipping batch 364 due to unstable loss
⚠️ Skipping batch 375 due to unstable loss
🟢 Epoch 1, Batch 400/3125: Loss 0.2303
🟢 Epoch 1, Batch 500/3125: Loss 0.2507
⚠️ Skipping batch 597 due to unstable loss
🟢 Epoch 1, Batch 600/3125: Loss 0.2138
⚠️ Skipping batch 672 due to unstable loss
🟢 Epoch 1, Batch 700/3125: Loss 0.2145
⚠️ Skipping batch 723 due to unstable loss
⚠️ Skipping batch 760 due to unstable loss
🟢 Epoch 1, Batch 800/3125: Loss 0.2520
⚠️ Skipping batch 821 due to unstable l

KeyboardInterrupt: 

In [22]:
import torch
import torchvision
from torch.utils.data import DataLoader
from torch.nn.functional import interpolate
from collections import Counter
import os

# ✅ Define Google Drive model path
model_path = "/content/drive/MyDrive/mask_rcnn_trained_approach2_full_model.pth"

# ✅ Verify that the model file exists
if os.path.exists(model_path):
    print("✅ Model file found!")
else:
    raise FileNotFoundError("❌ Model file NOT found. Check your Google Drive path.")

# ✅ Load the trained model
# ✅ Load Full Model Instead of Just State Dictionary
def load_trained_model(model_path, device):
    """Load a trained Mask R-CNN model from a .pth file"""
    checkpoint = torch.load(model_path, map_location=device)

    if isinstance(checkpoint, dict):
        # ✅ Case 1: Model was saved using state_dict() only
        model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=False, num_classes=91)
        model.load_state_dict(checkpoint)  # Load weights
    else:
        # ✅ Case 2: Model was saved as a full model object
        model = checkpoint  # Directly assign the model

    model.to(device)
    model.eval()  # Set to evaluation mode
    print("✅ Model Loaded Successfully!")
    return model

# ✅ Dataset & DataLoader
class COCODataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation_file, transform=None):
        from pycocotools.coco import COCO
        self.coco = COCO(annotation_file)
        self.root = root
        self.transform = transform
        self.image_ids = list(self.coco.imgs.keys())

    def __getitem__(self, index):
        from PIL import Image
        image_id = self.image_ids[index]
        image_info = self.coco.loadImgs(image_id)[0]
        path = os.path.join(self.root, image_info["file_name"])
        image = Image.open(path).convert("RGB")

        ann_ids = self.coco.getAnnIds(imgIds=image_id)
        anns = self.coco.loadAnns(ann_ids)

        boxes = []
        labels = []
        masks = []

        for ann in anns:
            x, y, w, h = ann["bbox"]
            boxes.append([x, y, x + w, y + h])
            labels.append(ann["category_id"])
            mask = self.coco.annToMask(ann)
            masks.append(mask)

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(boxes), torch.tensor(masks), torch.tensor(labels)

    def __len__(self):
        return len(self.image_ids)


# ✅ Data Transformation (Modify as needed)
from torchvision import transforms
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


# ✅ DataLoader
def collate_fn(batch):
    images, boxes, masks, labels = zip(*batch)
    return images, boxes, masks, labels

test_dataset = COCODataset("val2017", "annotations/instances_val2017.json", transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, collate_fn=collate_fn, num_workers=4)

# ✅ COCO Labels Dictionary
COCO_CLASSES = {
    1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane",
    6: "bus", 7: "train", 8: "truck", 9: "boat", 10: "traffic light",
    11: "fire hydrant", 13: "stop sign", 14: "parking meter", 15: "bench",
    16: "bird", 17: "cat", 18: "dog", 19: "horse", 20: "sheep",
    21: "cow", 22: "elephant", 23: "bear", 24: "zebra", 25: "giraffe",
    27: "backpack", 28: "umbrella", 31: "handbag", 32: "tie", 33: "suitcase",
    34: "frisbee", 35: "skis", 36: "snowboard", 37: "sports ball", 38: "kite",
    39: "baseball bat", 40: "baseball glove", 41: "skateboard", 42: "surfboard",
    43: "tennis racket", 44: "bottle", 46: "wine glass", 47: "cup", 48: "fork",
    49: "knife", 50: "spoon", 51: "bowl", 52: "banana", 53: "apple",
    54: "sandwich", 55: "orange", 56: "broccoli", 57: "carrot", 58: "hot dog",
    59: "pizza", 60: "donut", 61: "cake", 62: "chair", 63: "couch",
    64: "potted plant", 65: "bed", 67: "dining table", 70: "toilet",
    72: "tv", 73: "laptop", 74: "mouse", 75: "remote", 76: "keyboard",
    77: "cell phone", 78: "microwave", 79: "oven", 80: "toaster", 81: "sink",
    82: "refrigerator", 84: "book", 85: "clock", 86: "vase", 87: "scissors",
    88: "teddy bear", 89: "hair drier", 90: "toothbrush"
}


# ✅ Evaluation Function
def evaluate_model(model, test_loader, device):
    model.eval()
    total_correct, total_samples = 0, 0

    with torch.no_grad():
        for batch_idx, (images, boxes, masks, labels) in enumerate(test_loader):
            images = torch.stack([img.to(device) for img in images])
            labels = [l.to(device) for l in labels]

            if len(labels) == 0:
                print(f"⚠️ Skipping batch {batch_idx} due to empty labels")
                continue

            outputs = model(images)
            pred_labels = [output["labels"] for output in outputs]

            # Flatten & Convert Labels
            gt_labels = torch.cat(labels, dim=0) if len(labels) > 0 else torch.tensor([], device=device)
            pred_labels = torch.cat(pred_labels, dim=0) if len(pred_labels) > 0 else torch.tensor([], device=device)

            # ✅ Ensure Labels Are in COCO Range
            if len(gt_labels) > 0 and len(pred_labels) > 0:
                assert gt_labels.min() >= 1 and gt_labels.max() <= 90, "GT labels out of range!"
                assert pred_labels.min() >= 1 and pred_labels.max() <= 90, "Pred labels out of range!"

            # ✅ Match Predictions to Ground Truth
            matches = torch.isin(pred_labels, gt_labels)
            total_correct += matches.sum().item()
            total_samples += len(gt_labels)

            if batch_idx % 10 == 0:
                print("Predicted Label Distribution:", Counter(pred_labels.tolist()).most_common(5))

    accuracy = total_correct / total_samples if total_samples > 0 else 0.0
    print(f"✅ Evaluation Done: Accuracy {accuracy:.4f}")
    return accuracy


# ✅ Load Model and Run Evaluation
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = load_trained_model(model_path, device)
accuracy = evaluate_model(model, test_loader, device)


✅ Model file found!
loading annotations into memory...
Done (t=0.45s)
creating index...
index created!


AttributeError: 'str' object has no attribute '__module__'

In [None]:
from torchvision import transforms

# ✅ Define the transformation (ToTensor)
to_tensor_transform = transforms.ToTensor()

def evaluate_model(model, test_loader, device):
    model.eval()
    total_correct, total_samples = 0, 0

    with torch.no_grad():
        for batch_idx, (images, boxes, masks, labels) in enumerate(test_loader):
            # ✅ Convert images to tensors if they are not already
            images = [to_tensor_transform(img) if not isinstance(img, torch.Tensor) else img for img in images]

            # ✅ Move Data to the Same Device as Model
            images = torch.stack([img.to(device) for img in images])
            boxes = [b.to(device) for b in boxes]
            labels = [l.to(device) for l in labels]

            if len(labels) == 0 or len(boxes) == 0:
                print(f"⚠️ Skipping batch {batch_idx} due to empty labels/boxes")
                continue

            # ✅ Normalize Bounding Boxes
            for i in range(len(boxes)):
                image_w, image_h = images.shape[3], images.shape[2]
                boxes[i] /= torch.tensor([image_w, image_h, image_w, image_h], device=device)
                boxes[i] = torch.clamp(boxes[i], min=0, max=1)

            # ✅ Create Proper Targets List
            targets = [{"boxes": b, "labels": l} for b, l in zip(boxes, labels)]

            # ✅ Run Model Inference (Ensure Inputs Are on Same Device)
            outputs = model(images.to(device))

            # ✅ Compute Accuracy
            for output, target in zip(outputs, targets):
                pred_labels = output["labels"]
                gt_labels = target["labels"]

                # ✅ Match Predicted Labels with Ground Truth
                min_size = min(len(pred_labels), len(gt_labels))
                total_correct += (pred_labels[:min_size] == gt_labels[:min_size]).sum().item()
                total_samples += min_size

            if batch_idx % 50 == 0:
                print(f"🔵 Batch {batch_idx}/{len(test_loader)} processed")

    accuracy = total_correct / total_samples if total_samples > 0 else 0.0
    print(f"✅ Evaluation Done: Accuracy {accuracy:.4f}")
    return accuracy

# ✅ Run Evaluation
accuracy = evaluate_model(model, test_loader, device)


TypeError: Caught TypeError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/torch/utils/data/_utils/worker.py", line 349, in _worker_loop
    data = fetcher.fetch(index)  # type: ignore[possibly-undefined]
           ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/torch/utils/data/_utils/fetch.py", line 55, in fetch
    return self.collate_fn(data)
           ^^^^^^^^^^^^^^^^^^^^^
  File "<ipython-input-21-464b1fa6790b>", line 86, in collate_fn
    images = torch.stack(images, dim=0)  # Stack images into a single tensor
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: expected Tensor as element 0 in argument 0, but got Image


In [None]:
import torch
from torch.utils.data import DataLoader
from torch.nn.functional import interpolate
from collections import Counter

# ✅ Create Dataset & DataLoader (Optimized)
test_dataset = COCODataset("val2017", "annotations/instances_val2017.json", transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn, num_workers=4)

# ✅ COCO Labels
COCO_CLASSES = {
    1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane",
    6: "bus", 7: "train", 8: "truck", 9: "boat", 10: "traffic light",
    11: "fire hydrant", 13: "stop sign", 14: "parking meter", 15: "bench",
    16: "bird", 17: "cat", 18: "dog", 19: "horse", 20: "sheep",
    21: "cow", 22: "elephant", 23: "bear", 24: "zebra", 25: "giraffe",
    27: "backpack", 28: "umbrella", 31: "handbag", 32: "tie", 33: "suitcase",
    34: "frisbee", 35: "skis", 36: "snowboard", 37: "sports ball", 38: "kite",
    39: "baseball bat", 40: "baseball glove", 41: "skateboard", 42: "surfboard",
    43: "tennis racket", 44: "bottle", 46: "wine glass", 47: "cup", 48: "fork",
    49: "knife", 50: "spoon", 51: "bowl", 52: "banana", 53: "apple",
    54: "sandwich", 55: "orange", 56: "broccoli", 57: "carrot", 58: "hot dog",
    59: "pizza", 60: "donut", 61: "cake", 62: "chair", 63: "couch",
    64: "potted plant", 65: "bed", 67: "dining table", 70: "toilet",
    72: "tv", 73: "laptop", 74: "mouse", 75: "remote", 76: "keyboard",
    77: "cell phone", 78: "microwave", 79: "oven", 80: "toaster", 81: "sink",
    82: "refrigerator", 84: "book", 85: "clock", 86: "vase", 87: "scissors",
    88: "teddy bear", 89: "hair drier", 90: "toothbrush"
}

# ✅ Fixed Evaluation Function
# def evaluate_model(model, test_loader, device):
#     model.eval()
#     total_correct, total_samples = 0, 0
#     total_loss = 0.0

#     with torch.no_grad():
#         for batch_idx, (images, boxes, masks, labels) in enumerate(test_loader):
#             images = torch.stack([img.to(device) for img in images])
#             boxes = [b.to(device) for b in boxes]
#             masks = [m.to(device) for m in masks]
#             labels = [l.to(device) for l in labels]

#             if len(labels) == 0 or len(boxes) == 0:
#                 print(f"⚠️ Skipping batch {batch_idx} due to empty labels/boxes")
#                 continue

#             # ✅ Normalize Bounding Boxes
#             for i in range(len(boxes)):
#                 image_w, image_h = images.shape[3], images.shape[2]
#                 boxes[i] /= torch.tensor([image_w, image_h, image_w, image_h], device=device)
#                 boxes[i] = torch.clamp(boxes[i], min=0, max=1)


#             # ✅ Create Proper Targets List
#             targets = []
#             for b, l in zip(boxes, labels):
#                 targets.append({"boxes": b, "labels": l})

#             # ✅ Run Model Inference


#             # class_logits = model(images)
#             # print(f"Model Output Type: {type(class_logits)}")
#             # print(f"Model Output Length: {len(class_logits) if isinstance(class_logits, (tuple, list)) else 'N/A'}")
#             # print("output keys: ", class_logits)
#             # print(f"First Output Type: {type(class_logits[0])}")
#             # print(f"Keys in First Output: {class_logits[0].keys() if isinstance(class_logits[0], dict) else 'Not a dict'}")

#             outputs = model(images)  # ✅ Get list of dictionaries
#             pred_labels = [output["labels"] for output in outputs]  # ✅ Extract 'labels'


#             # pred_labels = torch.argmax(class_logits, dim=1)
#             # gt_labels = torch.cat(labels, dim=0)

#             # # ✅ Ensure Matching Sizes for Accuracy Calculation
#             # min_size = min(pred_labels.shape[0], gt_labels.shape[0])
#             # total_correct += (pred_labels[:min_size] == gt_labels[:min_size]).sum().item()
#             # total_samples += min_size

#             gt_labels = torch.cat(labels, dim=0) if len(labels) > 0 else torch.tensor([], device=device)
#             pred_labels = torch.cat(pred_labels, dim=0) if len(pred_labels) > 0 else torch.tensor([], device=device)

#             # Ensure both have the same shape before comparison
#             min_size = min(pred_labels.shape[0], gt_labels.shape[0])
#             total_correct += (pred_labels[:min_size] == gt_labels[:min_size]).sum().item()
#             total_samples += min_size


#             # # ✅ Compute Losses
#             # loss_cls, loss_bbox, loss_mask = 0.0, 0.0, 0.0
#             # for output, target in zip(outputs, targets):
#             #     pred_boxes = output["boxes"]
#             #     pred_labels = output["labels"]
#             #     pred_scores = output["scores"]

#             #     # ✅ Match Predicted Labels with Ground Truth
#             #     gt_labels = target["labels"]
#             #     min_size = min(len(pred_labels), len(gt_labels))
#             #     total_correct += (pred_labels[:min_size] == gt_labels[:min_size]).sum().item()
#             #     total_samples += min_size

#             # # ✅ Calculate Loss (For Debugging, Adjust as Needed)
#             # loss = loss_cls + loss_bbox + loss_mask
#             # total_loss += loss

#             if batch_idx % 50 == 0:
#                 print(f"🔵 Batch {batch_idx}/{len(test_loader)}: Loss {loss:.4f}")

#     accuracy = total_correct / total_samples if total_samples > 0 else 0.0
#     # avg_loss = total_loss / len(test_loader)
#     print(f"✅ Evaluation Done: Avg Loss {avg_loss:.4f}, Accuracy {accuracy:.4f}")
#     return avg_loss, accuracy




def evaluate_model(model, test_loader, device):
    model.eval()
    total_correct, total_samples = 0, 0

    with torch.no_grad():
        for batch_idx, (images, boxes, masks, labels) in enumerate(test_loader):
            images = torch.stack([img.to(device) for img in images])
            labels = [l.to(device) for l in labels]

            if len(labels) == 0:
                print(f"⚠️ Skipping batch {batch_idx} due to empty labels")
                continue

            outputs = model(images)
            pred_labels = [output["labels"] for output in outputs]

            # Flatten & Convert Labels
            gt_labels = torch.cat(labels, dim=0) if len(labels) > 0 else torch.tensor([], device=device)
            pred_labels = torch.cat(pred_labels, dim=0) if len(pred_labels) > 0 else torch.tensor([], device=device)

            # Debugging: Print Sample Labels
            if batch_idx % 10 == 0:
                print(f"GT Labels: {gt_labels.tolist()}")
                print(f"Pred Labels: {pred_labels.tolist()}")

            # ✅ Ensure Labels Are in COCO Range
            if len(gt_labels) > 0 and len(pred_labels) > 0:
                assert gt_labels.min() >= 1 and gt_labels.max() <= 90, "GT labels out of range!"
                assert pred_labels.min() >= 1 and pred_labels.max() <= 90, "Pred labels out of range!"

            # ✅ Match Predictions to Ground Truth
            matches = torch.isin(pred_labels, gt_labels)
            total_correct += matches.sum().item()
            total_samples += len(gt_labels)

            if batch_idx % 10 == 0:
                print("Predicted Label Distribution:", Counter(pred_labels.tolist()).most_common(5))

    accuracy = total_correct / total_samples if total_samples > 0 else 0.0
    print(f"✅ Evaluation Done: Accuracy {accuracy:.4f}")
    return accuracy

# ✅ Run Evaluation
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
avg_loss, accuracy = evaluate_model(model, test_loader, device)



In [None]:
#testing code before changes

import torch
from torch.utils.data import DataLoader
from torch.nn.functional import interpolate

# ✅ Create Dataset & DataLoader (Optimized)
test_dataset = COCODataset("val2017", "annotations/instances_val2017.json", transform)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn, num_workers=4)

# ✅ COCO Labels
COCO_CLASSES = {
    1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 5: "airplane",
    6: "bus", 7: "train", 8: "truck", 9: "boat", 10: "traffic light",
    11: "fire hydrant", 13: "stop sign", 14: "parking meter", 15: "bench",
    16: "bird", 17: "cat", 18: "dog", 19: "horse", 20: "sheep",
    21: "cow", 22: "elephant", 23: "bear", 24: "zebra", 25: "giraffe",
    27: "backpack", 28: "umbrella", 31: "handbag", 32: "tie", 33: "suitcase",
    34: "frisbee", 35: "skis", 36: "snowboard", 37: "sports ball", 38: "kite",
    39: "baseball bat", 40: "baseball glove", 41: "skateboard", 42: "surfboard",
    43: "tennis racket", 44: "bottle", 46: "wine glass", 47: "cup", 48: "fork",
    49: "knife", 50: "spoon", 51: "bowl", 52: "banana", 53: "apple",
    54: "sandwich", 55: "orange", 56: "broccoli", 57: "carrot", 58: "hot dog",
    59: "pizza", 60: "donut", 61: "cake", 62: "chair", 63: "couch",
    64: "potted plant", 65: "bed", 67: "dining table", 70: "toilet",
    72: "tv", 73: "laptop", 74: "mouse", 75: "remote", 76: "keyboard",
    77: "cell phone", 78: "microwave", 79: "oven", 80: "toaster", 81: "sink",
    82: "refrigerator", 84: "book", 85: "clock", 86: "vase", 87: "scissors",
    88: "teddy bear", 89: "hair drier", 90: "toothbrush"
}

# ✅ Fixed Evaluation Function
def evaluate_model(model, test_loader, device):
    model.eval()
    total_correct, total_samples = 0, 0
    total_loss = 0.0

    with torch.no_grad():
        for batch_idx, (images, boxes, masks, labels) in enumerate(test_loader):
            images = torch.stack([img.to(device) for img in images])
            boxes = [b.to(device) for b in boxes]
            masks = [m.to(device) for m in masks]
            labels = [l.to(device) for l in labels]

            if len(labels) == 0 or len(boxes) == 0:
                print(f"⚠️ Skipping batch {batch_idx} due to empty labels/boxes")
                continue

            # ✅ Normalize Bounding Boxes
            for i in range(len(boxes)):
                image_w, image_h = images.shape[3], images.shape[2]
                boxes[i] /= torch.tensor([image_w, image_h, image_w, image_h], device=device)
                boxes[i] = torch.clamp(boxes[i], min=0, max=1)

            # ✅ Create Proper Targets List
            targets = []
            for b, l in zip(boxes, labels):
                targets.append({"boxes": b, "labels": l})

            # ✅ Run Model Inference
            outputs = model(images)

            # ✅ Compute Losses
            loss_cls, loss_bbox, loss_mask = 0.0, 0.0, 0.0
            for output, target in zip(outputs, targets):
                pred_boxes = output["boxes"]
                pred_labels = output["labels"]
                pred_scores = output["scores"]

                # ✅ Match Predicted Labels with Ground Truth
                gt_labels = target["labels"]
                min_size = min(len(pred_labels), len(gt_labels))
                total_correct += (pred_labels[:min_size] == gt_labels[:min_size]).sum().item()
                total_samples += min_size

            # ✅ Calculate Loss (For Debugging, Adjust as Needed)
            loss = loss_cls + loss_bbox + loss_mask
            total_loss += loss

            if batch_idx % 50 == 0:
                print(f"🔵 Batch {batch_idx}/{len(test_loader)}: Loss {loss:.4f}")

    accuracy = total_correct / total_samples if total_samples > 0 else 0.0
    avg_loss = total_loss / len(test_loader)
    print(f"✅ Evaluation Done: Avg Loss {avg_loss:.4f}, Accuracy {accuracy:.4f}")
    return avg_loss, accuracy

# ✅ Run Evaluation
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
avg_loss, accuracy = evaluate_model(model, test_loader, device)



In [None]:
# import torch
# from torch.utils.data import DataLoader
# from torch.nn.functional import interpolate

# # ✅ Ensure Model is on the Correct Device
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)

# # ✅ Create Dataset & DataLoader (Optimized)
# test_dataset = COCODataset("val2017", "annotations/instances_val2017.json", transform)
# test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn, num_workers=4)

# # ✅ Evaluation Function
# def evaluate_model(model, test_loader, device):
#     model.eval()
#     total_correct, total_samples = 0, 0
#     total_loss = 0.0

#     with torch.no_grad():
#         for batch_idx, (images, boxes, masks, labels) in enumerate(test_loader):
#             # ✅ Move Data to the Same Device as Model
#             images = torch.stack([img.to(device) for img in images])
#             boxes = [b.to(device) for b in boxes]
#             masks = [m.to(device) for m in masks]
#             labels = [l.to(device) for l in labels]

#             if len(labels) == 0 or len(boxes) == 0:
#                 print(f"⚠️ Skipping batch {batch_idx} due to empty labels/boxes")
#                 continue

#             # ✅ Normalize Bounding Boxes
#             for i in range(len(boxes)):
#                 image_w, image_h = images.shape[3], images.shape[2]
#                 boxes[i] /= torch.tensor([image_w, image_h, image_w, image_h], device=device)
#                 boxes[i] = torch.clamp(boxes[i], min=0, max=1)

#             # ✅ Create Proper Targets List
#             targets = []
#             for b, l in zip(boxes, labels):
#                 targets.append({"boxes": b, "labels": l})

#             # ✅ Run Model Inference (Ensure Inputs Are on Same Device)
#             outputs = model(images.to(device))

#             # ✅ Compute Accuracy
#             for output, target in zip(outputs, targets):
#                 pred_labels = output["labels"]
#                 gt_labels = target["labels"]

#                 # ✅ Match Predicted Labels with Ground Truth
#                 min_size = min(len(pred_labels), len(gt_labels))
#                 total_correct += (pred_labels[:min_size] == gt_labels[:min_size]).sum().item()
#                 total_samples += min_size

#             if batch_idx % 50 == 0:
#                 print(f"🔵 Batch {batch_idx}/{len(test_loader)} processed")

#     accuracy = total_correct / total_samples if total_samples > 0 else 0.0
#     print(f"✅ Evaluation Done: Accuracy {accuracy:.4f}")
#     return accuracy

# # ✅ Run Evaluation
# accuracy = evaluate_model(model, test_loader, device)


In [None]:
# import torch
# import cv2
# import numpy as np
# import matplotlib.pyplot as plt
# from torchvision.transforms import functional as F
# from torch.utils.data import DataLoader
# from torch.nn.functional import interpolate

# # ✅ Load Model & Move to Correct Device
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)

# # ✅ COCO Dataset Class Names (91 Classes)
# class_names = ["background"] + [f"Class_{i}" for i in range(1, 91)]  # Replace with actual COCO class names if available

# # ✅ Function to Detect Objects & Masks
# def detect_masked_regions(model, image, device, threshold=0.5):
#     model.eval()
#     image_tensor = F.to_tensor(image).unsqueeze(0).to(device)

#     with torch.no_grad():
#         outputs = model(image_tensor)

#     pred_scores = outputs[0]['scores'].cpu().numpy()
#     pred_labels = outputs[0]['labels'].cpu().numpy()
#     pred_masks = outputs[0]['masks'].cpu().numpy()
#     pred_boxes = outputs[0]['boxes'].cpu().numpy()

#     valid_indices = pred_scores > threshold  # Apply confidence threshold
#     return pred_boxes[valid_indices], pred_masks[valid_indices], pred_labels[valid_indices]

# # ✅ Function to Visualize Predictions
# def visualize_predictions(image, boxes, masks, labels, class_names):
#     image = np.array(image, dtype=np.uint8)
#     overlay = image.copy()

#     for box, mask, label in zip(boxes, masks, labels):
#         mask = mask[0] > 0.5  # Convert mask to binary
#         color = np.random.randint(0, 255, (3,), dtype=np.uint8)
#         overlay[mask] = 0.5 * overlay[mask] + 0.5 * color  # Apply mask overlay

#         x1, y1, x2, y2 = map(int, box)
#         cv2.rectangle(image, (x1, y1), (x2, y2), color.tolist(), 2)
#         text = class_names[label] if label < len(class_names) else "Unknown"
#         cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color.tolist(), 2)

#     combined = cv2.addWeighted(image, 0.6, overlay, 0.4, 0)
#     plt.figure(figsize=(10, 10))
#     plt.imshow(cv2.cvtColor(combined, cv2.COLOR_BGR2RGB))
#     plt.axis("off")
#     plt.show()

# # ✅ Function to Evaluate Model Accuracy
# def evaluate_model(model, test_loader, device):
#     model.eval()
#     total_correct, total_samples = 0, 0

#     with torch.no_grad():
#         for batch_idx, (images, boxes, masks, labels) in enumerate(test_loader):
#             # ✅ Move Data to GPU/CPU
#             images = torch.stack([img.to(device) for img in images])
#             boxes = [b.to(device) for b in boxes]
#             masks = [m.to(device) for m in masks]
#             labels = [l.to(device) for l in labels]

#             if len(labels) == 0 or len(boxes) == 0:
#                 print(f"⚠️ Skipping batch {batch_idx} due to empty labels/boxes")
#                 continue

#             # ✅ Normalize Bounding Boxes
#             for i in range(len(boxes)):
#                 image_w, image_h = images.shape[3], images.shape[2]
#                 boxes[i] /= torch.tensor([image_w, image_h, image_w, image_h], device=device)
#                 boxes[i] = torch.clamp(boxes[i], min=0, max=1)

#             # ✅ Create Proper Targets List
#             targets = [{"boxes": b, "labels": l} for b, l in zip(boxes, labels)]

#             # ✅ Run Model Inference
#             outputs = model(images.to(device))

#             # ✅ Compute Accuracy
#             for output, target in zip(outputs, targets):
#                 pred_labels = output["labels"]
#                 gt_labels = target["labels"]

#                 min_size = min(len(pred_labels), len(gt_labels))
#                 total_correct += (pred_labels[:min_size] == gt_labels[:min_size]).sum().item()
#                 total_samples += min_size

#             if batch_idx % 50 == 0:
#                 print(f"🔵 Batch {batch_idx}/{len(test_loader)} processed")

#     accuracy = total_correct / total_samples if total_samples > 0 else 0.0
#     print(f"✅ Evaluation Done: Accuracy {accuracy:.4f}")
#     return accuracy

# # ✅ Create Dataset & DataLoader (Optimized)
# test_dataset = COCODataset("val2017", "annotations/instances_val2017.json", transform)
# test_loader = DataLoader(test_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn, num_workers=4)

# # ✅ Run Evaluation
# accuracy = evaluate_model(model, test_loader, device)

# # ✅ Example Usage
# image_path = "sample.jpg"
# image = cv2.imread(image_path)
# image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# # ✅ Detect Masked Regions
# boxes, masks, labels = detect_masked_regions(model, image, device)

# # ✅ Visualize Results
# visualize_predictions(image, boxes, masks, labels, class_names)


In [None]:
import torch
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import cv2
from torchvision.models.detection import maskrcnn_resnet50_fpn
from PIL import Image

# ✅ Set Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ✅ Load the Model
#model = maskrcnn_resnet50_fpn(pretrained=False, num_classes=91).to(device)

# ✅ Load the trained weights
model.load_state_dict(torch.load("mask_rcnn_trained.pth", map_location=device), strict=False)

# ✅ Set model to evaluation mode
model.eval()
print("✅ Model loaded successfully for testing!")

# ✅ Enhanced Preprocessing Function
def preprocess_image(image_path):
    image = cv2.imread(image_path)
    original_size = image.shape[:2][::-1]  # (width, height)

    # ✅ Convert to Grayscale
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

    # ✅ Apply CLAHE (Adaptive Histogram Equalization)
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
    gray = clahe.apply(gray)

    # ✅ Apply Bilateral Filter (Preserves Edges)
    filtered = cv2.bilateralFilter(gray, 9, 75, 75)

    # ✅ Convert back to 3-channel image
    processed_image = cv2.cvtColor(filtered, cv2.COLOR_GRAY2RGB)

    # ✅ Convert to PIL for Transformations
    pil_image = Image.fromarray(processed_image)

    # ✅ Apply Torchvision Transformations
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize grayscale values
    ])
    image_tensor = transform(pil_image).unsqueeze(0)  # Add batch dimension

    return image_tensor.to(device), image, original_size

# ✅ Function to Detect Masked Regions & Draw Bounding Boxes
def detect_masked_regions(image_np):
    # Convert to HSV for color-based segmentation
    hsv = cv2.cvtColor(image_np, cv2.COLOR_BGR2HSV)

    # ✅ Define a range for the masked color (e.g., blue tint in this case)
    lower_bound = np.array([90, 50, 50])   # Lower HSV range
    upper_bound = np.array([140, 255, 255])  # Upper HSV range

    # ✅ Create a Mask
    mask = cv2.inRange(hsv, lower_bound, upper_bound)

    # ✅ Find Contours of Masked Areas
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # ✅ Draw Bounding Boxes
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        cv2.rectangle(image_np, (x, y), (x + w, y + h), (255, 0, 0), 3)  # Blue Box

    return image_np

# ✅ Updated Visualization Function (Now with Bounding Boxes on Masked Areas)
def visualize_predictions(original_image, original_size, outputs, threshold=0.8):
    image_np = np.array(original_image)  # Convert PIL image to NumPy
    image_np = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)  # Convert to OpenCV format

    # ✅ Detect & Draw Bounding Boxes on Masked Regions
    image_np = detect_masked_regions(image_np)

    # ✅ Display the Image
    plt.figure(figsize=(10, 10))
    plt.imshow(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))
    plt.axis("off")
    plt.title("Detected Masked Areas with Bounding Boxes")
    plt.show()

# ✅ Specify Image Path
image_path = "val2017/000000000785.jpg"  # Change this to your test image

# ✅ Load & Predict on the Selected Image
image_tensor, original_image, original_size = preprocess_image(image_path)

# ✅ Run Inference
with torch.no_grad():
    outputs = model(image_tensor)[0]  # Get first image output

# ✅ Visualize Results (Bounding Boxes Around Masked Regions)
visualize_predictions(original_image, original_size, outputs)
