In [6]:
import os
import numpy as np
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision.transforms import functional as F

try:
    # much faster than PIL on many systems (libjpeg-turbo)
    from torchvision.io import read_image
    _HAS_TV_READ = True
except Exception:
    _HAS_TV_READ = False


class RCNN_Warp_Data(Dataset):
    """
    Drop-in replacement focused on speed:
      - pre-parses label files once in __init__
      - per-sample: decode image + scale cached boxes
      - works with (image, target) transforms
    """
    def __init__(self, root_path, split, transforms=None, class_path="classes.txt",
                 use_tv_read=True):
        self.root_path = root_path
        self.split_path = split
        self.class_path = class_path
        self.transforms = transforms
        self.use_tv_read = (use_tv_read and _HAS_TV_READ)

        # ---- resolve paths once ----
        self.class_names = self.__get_classes()
        self.image_paths, self.label_paths = self.__get_paths()

        # ---- pre-parse all label files (YOLO format) once ----
        # cache normalized (cx,cy,w,h) + labels per index to avoid file I/O in __getitem__
        self._norm_targets = []   # list of (labels_np[int64], boxes_np[float32 Nx4 in cxcywh])
        for lp in self.label_paths:
            try:
                if not os.path.exists(lp) or os.path.getsize(lp) == 0:
                    self._norm_targets.append((None, None))
                    continue
                # robust, vectorized load; enforce 2D
                arr = np.loadtxt(lp, ndmin=2, dtype=np.float32)
                # if file had a single value (corrupt), skip
                if arr.ndim != 2 or arr.shape[1] < 5:
                    self._norm_targets.append((None, None))
                    continue
                labels = arr[:, 0].astype(np.int64)
                cxcywh = arr[:, 1:5].astype(np.float32)
                self._norm_targets.append((labels, cxcywh))
            except Exception:
                # keep dataset robust; treat as empty
                self._norm_targets.append((None, None))

    # ------------- original helpers -------------
    def __get_classes(self):
        with open(os.path.join(self.root_path, self.class_path), "r", encoding="utf-8") as f:
            names = [ln.strip() for ln in f if ln.strip()]
        return names

    def __get_paths(self):
        # expects standard YOLO-like layout: <root>/<split>/images & <root>/<split>/labels
        img_dir = os.path.join(self.root_path, self.split_path, "images")
        lbl_dir = os.path.join(self.root_path, self.split_path, "labels")

        imgs, lbls = [], []
        for fn in sorted(os.listdir(img_dir)):
            if not fn.lower().endswith((".jpg", ".jpeg", ".png")):
                continue
            img_p = os.path.join(img_dir, fn)
            lbl_p = os.path.join(lbl_dir, os.path.splitext(fn)[0] + ".txt")
            imgs.append(img_p)
            lbls.append(lbl_p)
        return imgs, lbls

    # ------------- dataset API -------------
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        # ---- fast decode image ----
        img_path = self.image_paths[index]
        if self.use_tv_read:
            # CHW uint8 -> float 0..1
            img = read_image(img_path).float() / 255.0
            # If your transforms expect PIL, uncomment the next line:
            # img = F.to_pil_image(img)
        else:
            img = Image.open(img_path).convert("RGB")

        # get width/height (no extra open if using tensor)
        if isinstance(img, torch.Tensor):
            # tensor: [C,H,W]
            H, W = int(img.shape[1]), int(img.shape[2])
        else:
            W, H = img.size

        # ---- build target using cached normalized boxes ----
        labels_np, cxcywh_np = self._norm_targets[index]
        if labels_np is None or cxcywh_np is None or len(labels_np) == 0:
            boxes_t  = torch.zeros((0, 4), dtype=torch.float32)
            labels_t = torch.zeros((0,), dtype=torch.int64)
        else:
            # vectorized conversion from normalized YOLO (cx,cy,w,h) -> xyxy in pixels
            cx = cxcywh_np[:, 0] * W
            cy = cxcywh_np[:, 1] * H
            ww = cxcywh_np[:, 2] * W
            hh = cxcywh_np[:, 3] * H
            x1 = cx - ww / 2.0
            y1 = cy - hh / 2.0
            x2 = cx + ww / 2.0
            y2 = cy + hh / 2.0
            # clamp and ensure positive area
            eps = 1e-3
            x1 = np.clip(x1, 0, W - 1)
            y1 = np.clip(y1, 0, H - 1)
            x2 = np.clip(x2, x1 + eps, W - 1)
            y2 = np.clip(y2, y1 + eps, H - 1)

            boxes_t  = torch.from_numpy(np.stack([x1, y1, x2, y2], axis=1)).to(torch.float32)
            labels_t = torch.from_numpy(labels_np).to(torch.int64)

        area_t = (boxes_t[:, 2] - boxes_t[:, 0]) * (boxes_t[:, 3] - boxes_t[:, 1])
        target = {
            "boxes":   boxes_t,
            "labels":  labels_t,
            "image_id": torch.tensor([index], dtype=torch.int64),
            "area":    area_t,
            "iscrowd": torch.zeros((boxes_t.shape[0],), dtype=torch.int64),
        }

        # ---- transforms (support (img, target) or img-only) ----
        if self.transforms is not None:
            try:
                img, target = self.transforms(img, target)
            except TypeError:
                img = self.transforms(img)

        # ensure tensor image for FasterRCNN
        if isinstance(img, Image.Image):
            img = F.to_tensor(img)
        elif isinstance(img, torch.Tensor) and img.dtype != torch.float32:
            img = img.float()

        target["labels"] = target["labels"].to(torch.int64).reshape(-1)
        target["boxes"]  = target["boxes"].to(torch.float32).reshape(-1, 4)
        return img, target


In [8]:
import torch
import torch.nn as nn
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
import numpy as np
import matplotlib.pyplot as plt
import torch.optim as optim
from torchvision.ops import nms
import itertools
from torch.utils.data import random_split
import json
from pathlib import Path
import cv2
from torchmetrics.detection import MeanAveragePrecision
import os
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch.utils.data import Dataset, DataLoader
from pathlib import Path
from PIL import Image
import torchvision.transforms.functional as F
import traceback
from pycocotools.coco import COCO
import os

import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" ##my computer has an installation issue, will fix and remove this from final version

if torch.backends.mps.is_available():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
print("Using device:", device)

def collate_fn(batch):
    return tuple(zip(*batch))

root = Path.cwd() / "Warp-D"

##temporary replacement for above cell
train_ds = RCNN_Warp_Data(root, split="train")
test_ds  = RCNN_Warp_Data(root, split="test")

sub_size = 0.25
train_size = int(sub_size * len(train_ds))
test_size = int(sub_size * len(test_ds))

train_ds, _ = random_split(train_ds, [train_size, len(train_ds) - train_size]) 
test_ds, _ = random_split(test_ds, [test_size, len(test_ds) - test_size]) 


train_loader = DataLoader(
    train_ds,
    batch_size=2,           
    shuffle=True,
    num_workers=0,
    collate_fn=collate_fn
)

test_loader = DataLoader(
    test_ds,
    batch_size=2,
    shuffle=False,
    num_workers=0,
    collate_fn=collate_fn
)

##basic starter to prove it works, will improve later
class Custom_Backbone(nn.Module):
    def __init__ (self, in_channels = 3, out_channels = 256, hidden_size = 32, depth = 3):
        super().__init__()

        layers = []
        in_dim = in_channels
        out_dim = hidden_size

        for i in range(depth):
            layers += [
                nn.Conv2d(in_dim, out_dim, 3, stride=2, padding=1),  
                nn.BatchNorm2d(out_dim), nn.ReLU(inplace=True),
            ]
            in_dim = out_dim
            out_dim = min(out_dim * 2, 1024)

        layers += [
            nn.Conv2d(in_dim, out_channels, 3, stride=2, padding=1), 
            nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True),    
        ]
            
        self.body = nn.Sequential(*layers)
        # required by FasterRCNN
        self.out_channels = out_channels

    def forward(self, x):
        # return a single Tensor feature map
        return self.body(x)


def training_loop(model, optimizer, NUM_EPOCHS, train_loader, test_loader, printing=True): 
    test_metric = MeanAveragePrecision(box_format="xyxy", class_metrics=True)
    train_metric = MeanAveragePrecision(box_format="xyxy", class_metrics=True)
    test_maps = []
    train_losses = []
    train_maps = []
    
    for epoch in range(NUM_EPOCHS):
        
        # Run the training loop
        model.train()
        total_train_loss = 0.0

        for train_inputs, train_labels in train_loader:
            train_inputs = list(image.to(device) for image in train_inputs)
            train_labels = [{k: v.to(device) for k, v in t.items()} for t in train_labels]

            
            optimizer.zero_grad()
            train_loss_dict = model(train_inputs, train_labels)
            loss = sum(train_loss_dict.values())
            
            loss.backward()
            optimizer.step()

            model.eval()
            with torch.no_grad():
                preds = model(train_inputs)  # Now it returns predictions
                train_metric.update(preds, train_labels)
            model.train()

            total_train_loss += loss.item()
        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        train_map_results = train_metric.compute()
        train_maps.append(train_map_results['map'])

        

        # Run the testing loop
        # this is essentially the same as the training loop but
        # without the optimizer and backward propagation
        #model.eval()
        total_test_loss = 0.0
        model.eval()
        with torch.no_grad():
            for test_inputs, test_labels in test_loader:
                test_inputs = list(image.to(device) for image in test_inputs)
                test_labels = [{k: v.to(device) for k, v in t.items()} for t in test_labels]
                
                predictions = model(test_inputs)
                test_metric.update(predictions, test_labels)
            #     loss = sum(test_loss_dict.values())
            
            #     total_test_loss += loss.item()
            test_map_results = test_metric.compute()
            # avg_test_loss = total_test_loss / len(test_loader)
            test_maps.append(test_map_results['map'])
            test_metric.reset()

        if printing:
            print(f"Epoch {epoch+1}/{NUM_EPOCHS}"+\
                  f" - Train Loss: {avg_train_loss:.4f} ", end = " ")
            print(f"- Train Map {train_map_results['map'].item():.4f}", end = " ")
            print(f"- Test  Map {test_map_results['map'].item():.4f}")

    return train_losses,test_maps, train_maps


def testParams(out_channels = [64, 128, 256, 512], hidden_sizes = [32, 64, 128, 256, 512], depth = [1, 2, 3, 4, 5], NUM_EPOCHS = 5, num_classes = 29):
    combinations = list(itertools.product(out_channels, hidden_sizes, depth))
    models = []
    res = []

    anchor_sizes = ((32, 64, 128, 256, 512),) 
    aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes)
    rpn_anchor_generator = AnchorGenerator(anchor_sizes, aspect_ratios)

    for t in combinations:
        custom_backbone = Custom_Backbone(3, t[0], t[1], t[2])
        model = FasterRCNN(
            backbone=custom_backbone,
            num_classes=num_classes,
            rpn_anchor_generator=rpn_anchor_generator,
            )
        model = model.to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        print(f"out: {t[0]}  hidden: {t[1]} depth: {t[2]}")
        train_losses, test_map, train_map = training_loop(model, optimizer, NUM_EPOCHS, train_loader, test_loader, printing=True)
        res += (train_losses, test_map, train_map)
        print("\n-----------\n")

    return res


testParams()

Using device: cpu
out: 64  hidden: 32 depth: 1


KeyboardInterrupt: 