In [None]:
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torchvision.models as models
import ijson

In [None]:
# Class mapping (same for training & inference)
name_to_id = {"traffic light": 0,
    "traffic sign": 1,
    "car": 2,
    "person": 3,
    "bus": 4,
    "truck": 5,
    "rider": 6,
    "bike": 7,
    "motor": 8,
    "train": 9
}

def custom_collate_fn(batch):
    """Custom collate function for DataLoader."""
    all_patches = []
    all_labels = []

    for patches, labels in batch:
        all_patches.append(patches)  # Each image has a different number of patches
        all_labels.append(labels)

    return all_patches, all_labels  # Keep them as lists instead of stacking

In [None]:
# RPN+ROI Model
from sklearn.cluster import DBSCAN
import torch
import torch.nn as nn
import torchvision.ops as ops
import torchvision
from torchvision import transforms

class CBAM(nn.Module):
    def __init__(self, channels, reduction=4, kernel_size=3):
        super().__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels//reduction, bias=False),
            nn.ReLU(),
            nn.Linear(channels//reduction, channels, bias=False)
        )
        self.sigmoid = nn.Sigmoid()
        self.conv = nn.Conv2d(2,1,kernel_size,padding=kernel_size//2,bias=False)

    def forward(self, x):
        b,c,h,w = x.shape
        # Channel
        y_avg = self.avg_pool(x).view(b,c)
        y_max = self.max_pool(x).view(b,c)
        y = self.fc(y_avg) + self.fc(y_max)
        scale = self.sigmoid(y).view(b,c,1,1)
        x = x*scale

        # Spatial
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out,_ = torch.max(x, dim=1, keepdim=True)
        y = torch.cat([avg_out,max_out], dim=1)
        scale = self.sigmoid(self.conv(y))
        return x*scale


class EnhancedRPNWithROI(nn.Module):
    def __init__(self, 
                 in_channels=512, 
                 mid_channels=256, 
                 n_anchor=15,  # must match len(ratios)*len(scales)
                 pool_size=(7,7), 
                 nms_thresh=0.5,
                 conf_thresh=0.5,
                 top_n=400):
        """
        Largely the same as your EnhancedRPN, but adds ROI pooling and a _process_proposals method.
        """
        super().__init__()
        
        # --- The same RPN body as your EnhancedRPN ---
        self.conv1 = nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(mid_channels, mid_channels, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(mid_channels, mid_channels, kernel_size=3, padding=1)

        # If you want your CBAM modules:
        self.cbam1 = CBAM(mid_channels, reduction=4, kernel_size=3)
        self.cbam2 = CBAM(mid_channels, reduction=4, kernel_size=3)

        self.reg_layer = nn.Conv2d(mid_channels, n_anchor*4, kernel_size=1)
        self.cls_layer = nn.Conv2d(mid_channels, n_anchor*2, kernel_size=1)

        self.skip_conv = nn.Conv2d(in_channels, mid_channels, kernel_size=1)
        if in_channels == mid_channels:
            nn.init.eye_(self.skip_conv.weight)
            nn.init.zeros_(self.skip_conv.bias)
            self.skip_conv.weight.requires_grad = False
            self.skip_conv.bias.requires_grad = False

        self._init_weights()

        # --- ROI pooling layer ---
        #   You can use RoIPool or RoIAlign. 
        #   Below is ops.RoIPool, but if you want bilinear interpolation,
        #   you might prefer RoIAlign(pooled_height=pool_size[0], pooled_width=pool_size[1], ...)
        self.roi_pool = ops.RoIPool(output_size=pool_size, spatial_scale=1.0)

        # Store thresholds for NMS, etc.
        self.nms_thresh = nms_thresh
        self.conf_thresh = conf_thresh
        self.top_n = top_n

    def _init_weights(self):
        for layer in [self.conv1, self.conv2, self.conv3, self.reg_layer, self.cls_layer]:
            nn.init.normal_(layer.weight, std=0.01)
            if layer.bias is not None:
                nn.init.constant_(layer.bias, 0)

    def forward(self, x, anchors=None, do_roi=False):
        """
        Args:
            x: feature map from the backbone, shape (B, C=512, H, W)
            anchors: (total_anchors, 4) if you want to decode proposals
            do_roi: bool. If True, we also do the ROI pooling step, returning pooled features.

        Returns:
            pred_locs: (B, #anchors, 4)
            pred_scores: (B, #anchors, 2)
            objectness_score: (B, #anchors)
            (optionally) pooled_feats: if do_roi==True and anchors is not None
        """
        residual = self.skip_conv(x)

        # block1 + CBAM
        x = Fn.relu(self.conv1(x))
        x = self.cbam1(x)

        # block2 + CBAM
        x = Fn.relu(self.conv2(x))
        x = self.cbam2(x)

        # block3 + residual
        x = Fn.relu(self.conv3(x) + residual)

        # RPN heads
        B = x.size(0)
        pred_anchor_locs = self.reg_layer(x)      # shape (B, n_anchor*4, H, W)
        pred_cls_scores  = self.cls_layer(x)      # shape (B, n_anchor*2, H, W)

        pred_anchor_locs = pred_anchor_locs.permute(0,2,3,1).contiguous().view(B, -1, 4)
        pred_cls_scores  = pred_cls_scores.permute(0,2,3,1).contiguous().view(B, -1, 2)

        # optional tanh for loc
        pred_anchor_locs = torch.tanh(pred_anchor_locs) * 2
        objectness_score = Fn.softmax(pred_cls_scores, dim=-1)[..., 1]  # shape (B, #anchors)

        if anchors is None or (not do_roi):
            # If we don't want ROI pooling, just return the normal RPN outputs
            return pred_anchor_locs, pred_cls_scores, objectness_score

        # else we want proposals + ROI pooling
        proposals = self._generate_proposals(pred_anchor_locs, anchors)  # shape (B, #anchors, 4)

        # _process_proposals will do NMS, thresholding, and ROI pooling
        pooled_feats = self._process_proposals(x, proposals, objectness_score)

        return pred_anchor_locs, pred_cls_scores, objectness_score, pooled_feats

    def _generate_proposals(self, pred_locs, anchors):
        """
        Convert anchor offsets to box coords [x1, y1, x2, y2].
        pred_locs: (B, N, 4) => offsets [dy, dx, dh, dw]
        anchors:   (N, 4) in [x1, y1, x2, y2]
        """
        B, N, _ = pred_locs.size()
        proposals = torch.zeros_like(pred_locs)  # (B, N, 4)

        # anchors => float on same device
        anchors = anchors.to(pred_locs.device)

        # anchor geometry
        anc_w = anchors[:, 2] - anchors[:, 0]  # x2 - x1
        anc_h = anchors[:, 3] - anchors[:, 1]  # y2 - y1
        anc_ctr_x = anchors[:, 0] + 0.5*anc_w
        anc_ctr_y = anchors[:, 1] + 0.5*anc_h

        dy = pred_locs[..., 0]
        dx = pred_locs[..., 1]
        dh = pred_locs[..., 2]
        dw = pred_locs[..., 3]

        # decode
        ctr_y = dy * anc_h[None, :] + anc_ctr_y[None, :]
        ctr_x = dx * anc_w[None, :] + anc_ctr_x[None, :]
        h = torch.exp(dh) * anc_h[None, :]
        w = torch.exp(dw) * anc_w[None, :]

        # final
        proposals[..., 0] = ctr_x - 0.5*w
        proposals[..., 1] = ctr_y - 0.5*h
        proposals[..., 2] = ctr_x + 0.5*w
        proposals[..., 3] = ctr_y + 0.5*h

        return proposals

    def _process_proposals(self, conv_features, proposals, scores):
        """
        For each image in the batch:
          - Filter proposals by self.conf_thresh
          - NMS
          - Keep top_n
          - ROI Pool
        Return pooled features.
        """
        B = conv_features.size(0)
        pooled_list = []

        for b_idx in range(B):
            cur_scores = scores[b_idx]      # shape (#anchors,)
            cur_props = proposals[b_idx]    # shape (#anchors, 4)

            # 1) Confidence threshold
            conf_mask = cur_scores > self.conf_thresh
            filtered_boxes  = cur_props[conf_mask]
            filtered_scores = cur_scores[conf_mask]

            if filtered_boxes.size(0) == 0:
                # no proposals left
                pooled_list.append(torch.empty(0, device=conv_features.device))
                continue

            # 2) NMS
            keep_idx = ops.nms(filtered_boxes, filtered_scores, self.nms_thresh)
            keep_idx = keep_idx[:self.top_n]  # top top_n after NMS
            final_boxes = filtered_boxes[keep_idx]

            # 3) ROI Pool
            # Format => [batch_ind, x1, y1, x2, y2]
            roi_input = torch.cat([
                torch.full((final_boxes.size(0),1), b_idx, device=conv_features.device, dtype=torch.float32),
                final_boxes
            ], dim=1)

            # shape => (N_proposals, C, pool_size[0], pool_size[1])
            pooled = self.roi_pool(conv_features, roi_input)
            pooled_list.append(pooled)

        # Combine into a single tensor if you want
        pooled_feats = torch.cat(pooled_list, dim=0)
        return pooled_feats

In [None]:
# Classification model
# Transfer learning with resnet18 - will be fine tune in training
class ObjectClassifier(nn.Module):
    def __init__(self, num_classes=10, dropout_p=0.5, freeze_backbone=True):
        super(ObjectClassifier, self).__init__()
        self.model = models.resnet18(pretrained=True)

        # Optionally freeze early layers
        if freeze_backbone:
            for param in self.model.parameters():
                param.requires_grad = False
            for param in self.model.layer4.parameters():
                param.requires_grad = True
            for param in self.model.fc.parameters():
                param.requires_grad = True

        # Replace the classification head with dropout + linear
        in_features = self.model.fc.in_features
        self.model.fc = nn.Sequential(
            nn.Dropout(p=dropout_p),
            nn.Linear(in_features, num_classes)
        )

    def forward(self, x):
        return self.model(x)

In [None]:
# load image and image processing
import torch
import torch.nn.functional as Fn
import torchvision.transforms.functional as F
import cv2
import numpy as np

# loading image
image_dir = "trainA_original_700"
image_filename = "00fc910e-bce87172.jpg"  # Replace with an actual image name
image_path = os.path.join(image_dir, image_filename)
image = Image.open(image_path).convert("RGB")
image_tensor = transforms.ToTensor()(image).unsqueeze(0)  # .pt file, (3, 720, 1280) dim


def contrast_stretch(image, low_percentile=10, high_percentile=90):
    """
    Perform contrast stretching while printing debug info to avoid full black images.

    :param image: PyTorch tensor of shape (C, H, W)
    :param low_percentile: Lower percentile for clipping
    :param high_percentile: Upper percentile for clipping
    :return: Contrast-stretched tensor
    """
    image_np = image.cpu().numpy()

    # Compute percentiles
    min_val = np.percentile(image_np, low_percentile)
    max_val = np.percentile(image_np, high_percentile)

    print(f"Debug: Min percentile value = {min_val}, Max percentile value = {max_val}")

    if max_val - min_val < 1e-6:
        print("Warning: Min and max values are too close! Returning original image.")
        return image  # Return original image to avoid black output

    # Apply contrast stretching
    stretched = (image_np - min_val) / (max_val - min_val + 1e-8)

    # Clip values to avoid over-brightening
    stretched = np.clip(stretched, 0, 1)

    return torch.tensor(stretched, dtype=image.dtype, device=image.device)

# image_tensor = contrast_stretch(image_tensor).unsqueeze(0)

In [None]:
# Helper functions
import torch
import matplotlib.pyplot as plt
from PIL import Image, ImageDraw, ImageFont
import torchvision.transforms as transforms
import matplotlib.patches as patches
import numpy as np


ISIZE = (720, 1280)
# Anchor generation
ratios = [0.5, 1, 2]
anchor_scales = [4, 8, 16, 32, 64]

_, _, H_IMG, W_IMG = image_tensor.shape
_, C, Y_FM, X_FM = feature_maps.shape

# 5) Show boxes in [x1,y1,x2,y2]
def create_corner_rect(bb, color='red'):
    x1,y1,x2,y2 = bb
    return plt.Rectangle((x1,y1), x2 - x1, y2 - y1, color=color,
                         fill=False, lw=2)

def show_corner_bbs(img, bbs):
    # Expect [x1,y1,x2,y2]
    img_np = (img*255.0).clamp(0,255).byte().cpu().numpy()  # (C,H,W)
    img_np = np.transpose(img_np, (1,2,0))  # (H,W,C)
    plt.imshow(img_np)
    for bb in bbs:
        plt.gca().add_patch(create_corner_rect(bb))
    plt.show()

def generate_anchor_grid_np(X_FM, Y_FM, ratios, scales):
    """
    Generate a (N,4) NumPy array of anchor boxes over an X_FM x Y_FM feature map.

    The final shape is (X_FM * Y_FM * len(ratios)*len(scales), 4),
    where each row is [y1, x1, y2, x2].
    """
    import numpy as np

    # X_FM => width of feature map (number of columns)
    # Y_FM => height of feature map (number of rows)

    total_positions = X_FM * Y_FM
    num_anchor_per_pos = len(ratios) * len(scales)
    total_anchors = total_positions * num_anchor_per_pos

    # We assume your input image is (H=ISIZE[0], W=ISIZE[1])
    # You may also do something like:
    # sub_sampling_x = float(W_IMG) / X_FM
    # sub_sampling_y = float(H_IMG) / Y_FM
    # But in your original code, you used the global ISIZE.
    # Adjust as needed if your shape is dynamic.

    # If you’re using a fixed ISIZE = (height=720, width=1280), do:
    H_IMG, W_IMG = ISIZE[0], ISIZE[1]

    sub_sampling_x = W_IMG / float(X_FM)
    sub_sampling_y = H_IMG / float(Y_FM)

    # Create a grid of center positions
    shift_x = np.arange(sub_sampling_x, (X_FM + 1) * sub_sampling_x, sub_sampling_x)
    shift_y = np.arange(sub_sampling_y, (Y_FM + 1) * sub_sampling_y, sub_sampling_y)

    shift_x, shift_y = np.meshgrid(shift_x, shift_y)  # shape (Y_FM, X_FM)
    # Now each cell center is (cy, cx) = (shift_y[r,c] - sub_sampling_y/2, shift_x[r,c] - sub_sampling_x/2)
    centers = np.stack([
        shift_y.ravel() - sub_sampling_y / 2.0,
        shift_x.ravel() - sub_sampling_x / 2.0
    ], axis=1)  # shape (total_positions, 2)

    anchors = []
    for cy, cx in centers:
        for ratio in ratios:
            for scale in scales:
                h = sub_sampling_y * scale * np.sqrt(ratio)
                w = sub_sampling_x * scale * np.sqrt(1. / ratio)

                y1 = cy - h * 0.5
                x1 = cx - w * 0.5
                y2 = cy + h * 0.5
                x2 = cx + w * 0.5

                anchors.append([x1, y1, x2, y2])

    anchors = np.array(anchors, dtype=np.float32)  # shape (total_anchors, 4)
    return anchors


np_anchors = generate_anchor_grid_np(X_FM, Y_FM, ratios, anchor_scales)
anchors = torch.from_numpy(np_anchors).float().to(device)

In [None]:
# Load model
# extracting feature map with vgg backbone
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
vgg_model = torchvision.models.vgg16(pretrained=True).features[:30].to(device) # same as faster rcnn
vgg_model.eval()
with torch.no_grad():
    feature_maps = vgg_model(image_tensor)  # Shape: [1, C, Hf, Wf]

# Classification model
model_classifier = ObjectClassifier(num_classes=len(name_to_id)).to(device)
model_classifier.load_state_dict(torch.load("classification_3_2000_60.pth",map_location=device))
model_classifier.eval()

# RPN+ROI model
model_RPNROI = EnhancedRPNWithROI().to(device)
model_RPNROI.load_state_dict(torch.load("final_model.pth",map_location=device))
model_RPNROI.eval()

In [None]:
# mode helper functions
from typing_extensions import final
from PIL import ImageFont
from torchvision.ops import box_iou

# Define transformations for the image patches
IMAGE_SIZE = (128, 128)  # Resize patches for CNN input

transform = transforms.Compose([
    transforms.Resize(IMAGE_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


# 3) Utility for decoding predicted offsets -> [x1,y1,x2,y2]
def pred_bbox_to_xywh(bbox_offsets, anchors):
    """
    bbox_offsets: (N,4) predicted offsets [dy, dx, dh, dw]
    anchors: (N,4) in [x1,y1,x2,y2]
    return (N,4) boxes in [x1,y1,x2,y2]
    """
    # Convert to numpy
    anchors_np = anchors.detach().cpu().numpy()
    bbox_np    = bbox_offsets.detach().cpu().numpy()

    anc_w = anchors_np[:,2] - anchors_np[:,0]  # x2 - x1
    anc_h = anchors_np[:,3] - anchors_np[:,1]  # y2 - y1
    anc_ctr_x = anchors_np[:,0] + 0.5*anc_w
    anc_ctr_y = anchors_np[:,1] + 0.5*anc_h

    dy = bbox_np[:,0]
    dx = bbox_np[:,1]
    dh = bbox_np[:,2]
    dw = bbox_np[:,3]

    # decode
    ctr_y = dy*anc_h + anc_ctr_y
    ctr_x = dx*anc_w + anc_ctr_x
    h = np.exp(dh)*anc_h
    w = np.exp(dw)*anc_w

    out = np.zeros_like(bbox_np, dtype=np.float32)
    out[:,0] = ctr_x - 0.5*w  # x1
    out[:,1] = ctr_y - 0.5*h  # y1
    out[:,2] = ctr_x + 0.5*w  # x2
    out[:,3] = ctr_y + 0.5*h  # y2
    return out

# 5) Show boxes in [x1,y1,x2,y2]
def create_corner_rect(bb, color='red'):
    x1,y1,x2,y2 = bb
    return plt.Rectangle((x1,y1), x2 - x1, y2 - y1, color=color,
                         fill=False, lw=2)

def show_corner_bbs(img, bbs):
    # Expect [x1,y1,x2,y2]
    img_np = (img*255.0).clamp(0,255).byte().cpu().numpy()  # (C,H,W)
    img_np = np.transpose(img_np, (1,2,0))  # (H,W,C)
    plt.imshow(img_np)
    for bb in bbs:
        plt.gca().add_patch(create_corner_rect(bb))
    plt.show()

def combine_box_group(boxes, scores):
    """
    Combine a group of boxes into one representative box using score-weighted average
    """
    weights = scores / scores.sum()
    combined_box = torch.sum(boxes * weights.view(-1, 1), dim=0)
    return combined_box

def recursive_nms(boxes, scores, iou_threshold=0.5, recursion_limit=10):
    """
    Custom NMS that recursively combines overlapping boxes by comparing all pairs.
    
    Args:
        boxes: Tensor of shape [N, 4] (x1, y1, x2, y2 format)
        scores: Tensor of shape [N] containing confidence scores
        iou_threshold: IoU threshold for combining boxes
        recursion_limit: Maximum number of recursive passes
        
    Returns:
        combined_boxes: Tensor of combined boxes
        keep_indices: Indices of kept boxes from original input
    """
    if len(boxes) == 0:
        return boxes, torch.empty(0, dtype=torch.long, device=boxes.device)
    
    # Convert to float32 if needed
    boxes = boxes.float()
    
    # Initialize list to track which boxes to keep
    keep = torch.ones(len(boxes), dtype=torch.bool, device=boxes.device)
    
    # Recursive combining
    changed = True
    recursion_count = 0
    
    while changed and recursion_count < recursion_limit:
        changed = False
        iou_matrix = box_iou(boxes, boxes)  # [N, N] matrix
        
        # Zero out diagonal (self-comparisons)
        iou_matrix.fill_diagonal_(0)
        
        # Find all pairs that exceed IoU threshold
        overlaps = iou_matrix > iou_threshold
        
        for i in range(len(boxes)):
            if not keep[i]:
                continue
                
            # Find all boxes that overlap with current box
            overlapping_indices = torch.where(overlaps[i])[0]
            
            if len(overlapping_indices) > 0:
                # Get the overlapping boxes and their scores
                overlapping_boxes = boxes[overlapping_indices]
                overlapping_scores = scores[overlapping_indices]
                
                # Combine with current box (weighted average by scores)
                combined_box = combine_box_group(
                    torch.cat([boxes[i].unsqueeze(0), overlapping_boxes]),
                    torch.cat([scores[i].unsqueeze(0), overlapping_scores])
                )
                
                # Replace current box with combined version
                boxes[i] = combined_box
                
                # Mark overlapping boxes for removal
                keep[overlapping_indices] = False
                changed = True
        
        # Filter boxes after each pass
        boxes = boxes[keep]
        scores = scores[keep]
        keep = torch.ones(len(boxes), dtype=torch.bool, device=boxes.device)
        recursion_count += 1
    
    return boxes, torch.where(keep)[0]

In [None]:
# final integration function
def detect_objects(feature_maps, anchors, image, image_tensor, RPNROI, classifier):
    with torch.no_grad():
      pred_locs, pred_scores, objectness_score, pooled_feats = RPNROI(feature_maps, anchors, do_roi=True)

    top_k = 40

    # Decode from anchors + predicted offsets => actual boxes
    rois = pred_bbox_to_xywh(pred_locs[0], anchors)

    k = min(top_k, objectness_score[0].shape[0])
    topk_inds = torch.topk(objectness_score[0], k=k).indices
    proposals = rois[topk_inds.cpu().numpy()]
    scores = objectness_score[0][topk_inds].cpu().numpy()

    # Apply recursive NMS to reduce overlapping proposals
    proposals_tensor = torch.from_numpy(proposals).float().to(device)
    scores_tensor = torch.from_numpy(scores).float().to(device)
    proposals, _ = recursive_nms(proposals_tensor, scores_tensor, iou_threshold=0.5, recursion_limit=top_k)
    proposals = proposals.cpu().numpy()  # convert back to numpy for visualization
    
    image_width, image_height = image.size
    crops = []
    box_list = []

    for box in proposals:
        x1, y1, x2, y2 = [int(coord) for coord in box]
        y1, x1 = max(0, y1), max(0, x1)
        y2, x2 = min(image_height, y2), min(image_width, x2)

        if x1 >= x2 or y1 >= y2:
            continue
        else:
            patch = image.crop((x1, y1, x2, y2))
            patch = transform(patch)
            crops.append(patch)
            box_list.append((x1, y1, x2, y2))  # Store box for later drawing

    # If no objects are detected
    if not crops:
        print("No objects detected after ROI filtering.")
        return

    crops = torch.stack(crops).to(device)

    # Classifier
    classifier.eval()
    with torch.no_grad():
        outputs = classifier(crops)
        _, predicted = torch.max(outputs, 1)

    # Convert predictions to class names
    predicted_labels = [list(name_to_id.keys())[p] for p in predicted.cpu().numpy()]

    # Draw Bounding Boxes and Labels on Image
    draw = ImageDraw.Draw(image)

    try:
        font = ImageFont.truetype("arial.ttf", 16)  # Load font (if available)
    except:
        font = ImageFont.load_default()  # Use default font if Arial not available

    for (x1, y1, x2, y2), label in zip(box_list, predicted_labels):
        draw.rectangle([x1, y1, x2, y2], outline="red", width=2)  # Draw bounding box
        font_path = "/usr/share/fonts/truetype/liberation/LiberationSans-Bold.ttf"
        font1 = ImageFont.truetype(font_path, 32)
        draw.text((x1, y1 - 10), label, fill="red", font=font1)  # Draw label

    image_draw = image.copy()
    draw = ImageDraw.Draw(image_draw)

    for box in box_list:
        x1, y1, x2, y2 = map(int, box)
        draw.rectangle([x1, y1, x2, y2], outline="red", width=3)

    plt.figure(figsize=(10, 6))
    plt.imshow(image_draw)
    plt.axis("off")
    plt.title("Prediction Result")
    plt.show()

detect_objects(feature_maps, anchors, image, image_tensor, model_RPNROI, model_classifier)