# Team Name

Sergio Sanz Rodriguez

# Introduction

This code implements pytorch-based testing pipeline for the Kaggle competition titled "Synthetic to Real Object Detection Challenge."

To run the code, please contact me via email to request access to the trained CNN models used in this project.

Email: sergio.sanz.rodriguez@gmail.com


# Importing Libraries

In [None]:
# Generic libraries
import os
import torch
import glob
import random
import torchvision.ops as ops
from pathlib import Path
import matplotlib.pyplot as plt

# Torchvision libraries
from torchvision.transforms import v2 as T
from torchvision.io import decode_image
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms.functional import to_pil_image

# Import custom libraries
from modules.obj_detection_utils import set_seeds, prune_predictions
from modules.common import Common
from modules.faster_rcnn import StandardFasterRCNN

# Warnings
import warnings
os.environ['TORCH_USE_CUDA_DSA'] = "1"
warnings.filterwarnings("ignore", category=UserWarning, module="torch.autograd.graph")
warnings.filterwarnings("ignore", category=FutureWarning, module="onnxscript.converter")

# Create target model directory
MODEL_DIR = Path("outputs")
MODEL_DIR.mkdir(parents=True, exist_ok=True)

# Set seeds
set_seeds(42)

# Specifying the Target Device

In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Device: {device}")

if device == "cuda":
    !nvidia-smi

# Data Preprocessing

In [None]:
def get_transform_test():
    transforms = []

    # Image normalization
    transforms.append(T.ToDtype(torch.float, scale=True))

    # Convert to tensor and permute dimensions to (C, H, W)
    transforms.append(T.ToPureTensor())
    
    # Composition
    return T.Compose(transforms)


# Inference on Real Data

In [None]:
def merge_and_select(pred1, pred2, pred3, iou_threshold=0.0):

    """
    Merges predictions from multiple models, groups overlapping bounding boxes by IoU,
    and selects the highest-scoring bounding box from the most frequently detected object.

    Args:
        predictions (list of dict): List of prediction dictionaries, each containing
            'boxes' (Tensor[N, 4]), 'scores' (Tensor[N]), and 'labels' (Tensor[N]).

    Returns:
        dict: A dictionary with a single selected prediction:
            - 'boxes': Tensor[1, 4]
            - 'scores': Tensor[1]
            - 'labels': Tensor[1]
    """

    # Merge predictions
    all_boxes = torch.cat([pred1["boxes"], pred2["boxes"], pred3["boxes"]], dim=0)
    all_scores = torch.cat([pred1["scores"], pred2["scores"], pred3["scores"]], dim=0)
    all_labels = torch.cat([pred1["labels"], pred2["labels"], pred3["labels"]], dim=0)

    # Return empty prediction if no boxes are found
    num_boxes = all_boxes.size(0)
    if num_boxes == 0:
        return {
            "boxes": torch.empty((1, 4)),
            "scores": torch.tensor([0.0]),
            "labels": torch.tensor([0])
        }

    # Compute IoU matrix between all pairs of boxes
    iou_matrix = ops.box_iou(all_boxes, all_boxes)

    # Cluster boxes with IoU greater than the threshold
    clusters = []
    visited = set()
    for i in range(num_boxes):
        if i in visited:
            continue
        cluster = [i]
        visited.add(i)
        for j in range(i + 1, num_boxes):
            if j not in visited and iou_matrix[i, j] > 0.01:
                cluster.append(j)
                visited.add(j)
        clusters.append(cluster)

    # Find the largest cluster(s) (most frequently detected object)
    max_len = max(len(c) for c in clusters)
    candidate_clusters = [c for c in clusters if len(c) == max_len]

    # Among the candidates, select the one with the highest score
    best_cluster = max(
        candidate_clusters,
        key=lambda c: all_scores[c].max().item()
    )
    
    # From the best cluster, select the bounding box with the highest score
    cluster_scores = all_scores[best_cluster]
    best_idx_in_cluster = best_cluster[torch.argmax(cluster_scores).item()]

    # Extract the final best prediction
    best_box = all_boxes[best_idx_in_cluster].unsqueeze(0)
    best_score = all_scores[best_idx_in_cluster].unsqueeze(0)
    best_label = all_labels[best_idx_in_cluster].unsqueeze(0)

    # Return the final prediction
    final_pred = {
        "boxes": best_box,
        "scores": best_score,
        "labels": best_label
    }

    return final_pred

In [None]:
def same_object(pred1, pred2, iou_threshold=0.5):
    """
    Compare two single-box predictions and return whether they refer to the same object.

    Args:
        pred1, pred2 (dict): Each with "boxes" (Tensor [1, 4]), "scores", "labels".
        iou_threshold (float): IoU threshold to consider them the same.

    Returns:
        bool: True if IoU > threshold, else False.
    """
    iou = ops.box_iou(pred1["boxes"], pred2["boxes"])[0, 0]
    return iou > iou_threshold and pred1["labels"][0] == pred2["labels"][0]

# Now select the one with the smallest area
def box_area(box):
    # box is [x1, y1, x2, y2]
    return (box[2] - box[0]) * (box[3] - box[1])

In [None]:
IMAGE_DIR = r"Synthetic_to_Real_Object_Detection_Full_2/data/test/images/*.jpg"
BOX_COLOR = "blue"
FONT_TYPE = r"C:\Windows\Fonts\arial.ttf"

NUM_CLASSES = 2
model1 = StandardFasterRCNN(
    backbone="resnet50_v2",
    num_classes=NUM_CLASSES,
    device=device,
    nms = [20, 5, 50, 2]
    )
model2 = StandardFasterRCNN(
    backbone="resnet50_v2",
    num_classes=NUM_CLASSES,
    device=device,
    nms = [20, 5, 50, 2]
    )
model3 = StandardFasterRCNN(
    backbone="resnet50_v2",
    num_classes=NUM_CLASSES,
    device=device,
    nms = [20, 5, 50, 2]
    )
model4 = StandardFasterRCNN(
    backbone="resnet50_v2",
    num_classes=NUM_CLASSES,
    device=device,
    nms = [20, 5, 50, 2]
    )

# Load the parameters of the best model
name1 = "modelA1"
name2 = "modelA2"
name3 = "modelA3"
name4 = 'modelB'
model1 = Common().load_model(model1, "outputs", f"{name1}.pth")
model2 = Common().load_model(model2, "outputs", f"{name2}.pth")
model3 = Common().load_model(model3, "outputs", f"{name3}.pth")
model4 = Common().load_model(model4, "outputs", f"{name4}.pth")

# Get all image paths and randomly select 10
image_paths = glob.glob(IMAGE_DIR)
num_images = len(image_paths)

# Create subplots based on number of images
cols = 5
rows = (num_images + 1) // cols  # Ensure enough rows for all images

# Dynamically adjust figsize based on the number of images
fig_width = 15  # Width of the figure (you can experiment with this)
fig_height = rows * 4  # Adjust height for better fitting
fig, axes = plt.subplots(rows, cols, figsize=(fig_width, fig_height))

# Define transformation
transform=get_transform_test()

# Move models to device
model1.eval().to(device)
model2.eval().to(device)
model3.eval().to(device)
model4.eval().to(device)

# Utility function
def has_scores(prediction):
    return "scores" in prediction and isinstance(prediction["scores"], torch.Tensor) and prediction["scores"].nelement() > 0

random.seed(42)
random.shuffle(image_paths)

# For loop over the images
for i, image_path in enumerate(image_paths):

    # Load image
    image = decode_image(image_path)

    # Image dimensions
    img_height, img_width = image.shape[1], image.shape[2]
    image_area = img_height * img_width
    WIDTH = round(max(img_height, img_width) / 175)
    FONT_SIZE = img_width // 15
    
    with torch.no_grad():
        
        x = transform(image)
        x = x[:3, ...].to(device)
        pred1 = model1([x, ])[0]     
        pred2 = model2([x, ])[0]
        pred3 = model3([x, ])[0]
        pred4 = model4([x, ])[0]

        # Take the best prediction(s)
        if pred1["boxes"].nelement() > 0:    
            pred1 = prune_predictions(pred1, score_threshold = 0.8, iou_threshold = 0.01, best_candidate="score", remove_large_boxes=image_area*0.5)
        else:       
            Common().info(f"No prediction from model1 for {image_path}")

        # Take the best prediction(s)
        if pred2["boxes"].nelement() > 0:                
            pred2 = prune_predictions(pred2, score_threshold = 0.8, iou_threshold = 0.01, best_candidate="score", remove_large_boxes=image_area*0.5)
        else:       
            Common().info(f"No prediction from model2 for {image_path}")   

        # Take the best prediction(s)
        if pred3["boxes"].nelement() > 0:                
            pred3 = prune_predictions(pred3, score_threshold = 0.8, iou_threshold = 0.01, best_candidate="score", remove_large_boxes=image_area*0.5)
        else:       
            Common().info(f"No prediction from model2 for {image_path}")

        # Take the best prediction(s)
        if pred4["boxes"].nelement() > 0:                
            pred4 = prune_predictions(pred4, score_threshold = 0.8, iou_threshold = 0.01, best_candidate="score", remove_large_boxes=image_area*0.5)
        else:       
            Common().info(f"No prediction from model2 for {image_path}")                        

    # Select best prediction
    if has_scores(pred1) and has_scores(pred2) and has_scores(pred3):
        pred = merge_and_select(pred1, pred2, pred3, iou_threshold=0.0)
    elif has_scores(pred1):
        pred = pred1
    elif has_scores(pred2):
        pred = pred2
    elif has_scores(pred3):
        pred = pred3
    else:
        pred = None
        Common().info(f"No prediction from any model for {image_path}")

    # Select best prediction in the second ensemble path
    if has_scores(pred4):
        if same_object(pred, pred4, iou_threshold=0.01) and box_area(pred4['boxes'][0]) <= box_area(pred['boxes'][0]):
            pred = pred4
            #print(f"predB: {pred4['scores'].item():.4f}")
        #else:
        #    preds = [pred1, pred2, pred3]
        #    names = ['predA1', 'predA2', 'predA3']
        #    for p, name in zip(preds, names):                
        #        if torch.equal(p['boxes'], pred['boxes']):
        #            best_name = name
        #    print(f"{best_name}: {pred['scores'].item():.4f}")    

    # Ensure predictions exist to plot the images with bounding boxes
    if "boxes" in pred and pred["boxes"].nelement() > 0:
        
        pred_labels = [f"roi: {score:.3f}" for label, score in zip(pred["labels"], pred["scores"])]
        pred_boxes = pred["boxes"].long()
        output_image = draw_bounding_boxes(
            image,
            pred_boxes,
            #pred_labels,
            colors=BOX_COLOR,
            fill=BOX_COLOR,
            width=WIDTH,
            font=FONT_TYPE,
            font_size=FONT_SIZE)   
        image_boxes = to_pil_image(output_image)
        
    else:
        image_boxes = to_pil_image(image)

    # Plot image
    ax = axes[i // cols, i % cols] if num_images > 1 else axes
    ax.imshow(image_boxes)
    ax.axis("off")

# Remove empty subplots if any
total_plots = rows * cols
for j in range(num_images, total_plots):
    fig.delaxes(axes[j // cols, j % cols])

plt.savefig("inference.png", bbox_inches='tight')

plt.tight_layout()
plt.show()