In [1]:
%env CUDA_LAUNCH_BLOCKING=1

env: CUDA_LAUNCH_BLOCKING=1


In [2]:
import cv2
import os
import torch
import torchvision
import numpy as np
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

from tqdm import tqdm
from torchvision import transforms
from torchvision.models import ResNeXt50_32X4D_Weights
from torch.utils.data import DataLoader
from typing import Iterable

from helpers.datasets import CrackDataset, custom_collate_fn
from helpers.early_stopping import EarlyStopping

In [3]:
class Resnext50RCNN(nn.Module):
    def __init__(self, input_shape=(3, 224, 224), linear_layers_features=512):
        super(Resnext50RCNN, self).__init__()

        self.feature_extractor = models.resnext50_32x4d(weights=ResNeXt50_32X4D_Weights.IMAGENET1K_V1)
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self._get_feature_size(input_shape), linear_layers_features),
            nn.ReLU(inplace=True),
            nn.Linear(linear_layers_features, 1)
        )
        self.bbox_regressor = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self._get_feature_size(input_shape), linear_layers_features),
            nn.ReLU(inplace=True),
            nn.Linear(linear_layers_features, 4)
        )
        self.roi_size = (input_shape[1], input_shape[2])

    def _get_feature_size(self, shape: tuple):
        with torch.no_grad():
            dummy_input = torch.zeros(1, *shape)
            features = self.feature_extractor(dummy_input)
            return features.numel()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.feature_extractor(x)
        
        return x

    def extract_region_features(self, image: torch.Tensor, boxes: Iterable[torch.Tensor]) -> torch.Tensor:
        """
        For each bounding box, crop the region, resize to the input shape, and pass through the feature extractor.
        """
        regions = []
        image = image.float() / 255.0

        for box in boxes:
            x1, y1, x2, y2 = box.int()
            region = image[:, y1:y2, x1:x2]
            region_resized = nn.functional.interpolate(region.unsqueeze(0), size=self.roi_size, mode="bilinear", align_corners=False)
            
            regions.append(region_resized)

        regions = torch.cat(regions, dim=0)
        features = self.feature_extractor(regions)
        
        return features

    def refine_bboxes(self, proposals: torch.Tensor, deltas: torch.Tensor) -> torch.Tensor:
        """
        Refines the original bounding boxes (proposals) using the predicted bbox deltas.
        Args:
        - proposals: The original bounding boxes from selective search (x1, y1, x2, y2)
        - deltas: Predicted bounding box adjustments (dx, dy, dw, dh)

        Returns:
        - Refined bounding boxes (x1', y1', x2', y2')
        """
        widths = proposals[:, 2] - proposals[:, 0]
        heights = proposals[:, 3] - proposals[:, 1]
        ctr_x = proposals[:, 0] + 0.5 * widths
        ctr_y = proposals[:, 1] + 0.5 * heights
        dx = deltas[:, 0]
        dy = deltas[:, 1]
        dw = deltas[:, 2]
        dh = deltas[:, 3]
        refined_ctr_x = ctr_x + dx * widths
        refined_ctr_y = ctr_y + dy * heights
        refined_widths = widths * torch.exp(dw)
        refined_heights = heights * torch.exp(dh)
        refined_x1 = refined_ctr_x - 0.5 * refined_widths
        refined_y1 = refined_ctr_y - 0.5 * refined_heights
        refined_x2 = refined_ctr_x + 0.5 * refined_widths
        refined_y2 = refined_ctr_y + 0.5 * refined_heights
        refined_boxes = torch.stack([refined_x1, refined_y1, refined_x2, refined_y2], dim=1)
        
        return refined_boxes

    def predict(self, features: torch.Tensor):
        class_scores = self.classifier(features)
        bbox_deltas = self.bbox_regressor(features)
        
        return class_scores, bbox_deltas

In [4]:
def list_image_paths(directory: str) -> list[str]:
    return [os.path.join(directory, file) for file in os.listdir(directory)]


transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Resnext50RCNN().to(device)
criterion_class = nn.BCEWithLogitsLoss()
criterion_bbox = nn.SmoothL1Loss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
train_coco_path = os.path.join("data", "train", "coco_annotations.json")
valid_coco_path = os.path.join("data", "valid", "coco_annotations.json")
train_images_dir = os.path.join("data", "train", "images")
valid_images_dir = os.path.join("data", "valid", "images")
train_dataset = CrackDataset(
    train_coco_path,
    train_images_dir
)
valid_dataset = CrackDataset(
    valid_coco_path,
    valid_images_dir
)
train_dataloader = DataLoader(
    train_dataset, 
    batch_size=32, 
    shuffle=True, 
    num_workers=4, 
    collate_fn=custom_collate_fn
)
validation_dataloader = DataLoader(
    valid_dataset, 
    batch_size=32, 
    shuffle=True, 
    num_workers=4, 
    collate_fn=custom_collate_fn
)
early_stopping = EarlyStopping(patience=7, verbose=True, delta=0)
num_epochs = 30

In [None]:
def assign_labels_to_proposals(proposals: torch.Tensor, bboxes: np.ndarray, iou_threshold: float = 0.5) -> torch.Tensor:
    """
    Assigns binary labels (1 for foreground, 0 for background) to proposals based on IoU with ground truth boxes.
    
    Arguments:
    - proposals: tensor of proposals, shape [N, 4]
    - bboxes: tensor of ground truth boxes, shape [M, 4]
    - iou_threshold: IoU threshold for considering a proposal as positive
    
    Returns:
    - assigned_labels: tensor of binary labels for each proposal, shape [N]
    """
    assigned_labels = torch.zeros(proposals.shape[0], dtype=torch.long, device=proposals.device)

    for _, proposal in enumerate(proposals):
        max_iou = 0

        for bbox in bboxes:
            iou = torchvision.ops.box_iou(proposal, bbox)

            if iou > max_iou:
                max_iou = iou

        if max_iou >= iou_threshold:
            assigned_labels[i] = 1

    return assigned_labels


def perform_selective_search(image: np.ndarray, max_proposals=5) -> torch.Tensor:
    ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation()

    ss.setBaseImage(image)
    ss.switchToSelectiveSearchFast()

    rects = ss.process()
    boxes = []

    for (x, y, w, h) in rects[:max_proposals]:
        boxes.append([x, y, x + w, y + h])

    return torch.tensor(boxes, dtype=torch.float32).to(device)

In [5]:
for epoch in range(num_epochs):
    total_cls_loss = 0.0
    total_bbox_loss = 0.0
    
    model.train()
    
    for image_paths, images, bboxes in train_dataloader:
        images = images.to(device)
        bboxes = bboxes.to(device)
    
        optimizer.zero_grad()
    
        for i, image in enumerate(images):
            proposals = perform_selective_search(image.cpu().numpy())
            features = model.extract_region_features(image.permute(2, 0, 1), proposals)
            class_scores, bbox_deltas = model.predict(features)
            refined_bboxes = model.refine_bboxes(proposals, bbox_deltas)

            torch.cuda.empty_cache()
            print(class_scores)
            print(labels[i].long())
            cls_loss = criterion_class(class_scores, labels[i].long())
            bbox_loss = criterion_bbox(refined_bboxes, bboxes[i])
            loss = cls_loss + bbox_loss
    
            loss.backward()
            optimizer.step()
    
            total_cls_loss += cls_loss.item()
            total_bbox_loss += bbox_loss.item()
    
    avg_cls_loss = total_cls_loss / len(train_dataloader)
    avg_bbox_loss = total_bbox_loss / len(train_dataloader)
    print(f"Epoch {epoch + 1}, Classification Loss: {avg_cls_loss:.4f}, BBox Regression Loss: {avg_bbox_loss:.4f}")


tensor([[-0.4875],
        [ 0.1486],
        [ 0.2978],
        [ 0.0817],
        [ 0.2505]], device='cuda:0', grad_fn=<AddmmBackward0>)
tensor([1, 0], device='cuda:0')


ValueError: Target size (torch.Size([2])) must be the same as input size (torch.Size([5, 1]))