# Introduction

This code implements the training, validation, and testing pipelines for the Kaggle competition titled "Synthetic to Real Object Detection Challenge."

The proposed model is based on PyTorch's Region-based CNN (R-CNN), specifically the [Faster R-CNN](https://pytorch.org/vision/master/models/faster_rcnn.html) implementation. In this competition, the ``ResNet50_FPN_v2`` backbone has been used.

A key aspect of the proposed method is an augmentation-based regularization technique to improve generalization. Strong data augmentation techniques, such as rotation, zooming out, occlusions, color jitter, and resolution scaling, are applied.

The model relies on two R-CNNs. The first model makes predictions on all images, while the second is used only when the first fails to detect any regions of interest. In other words, the second model acts as a fallback, increasing the likelihood of correctly generating bounding boxes.

# Importing Libraries

In [None]:
# Generic libraries
import os
import torch
import cv2
import matplotlib.pyplot as plt
import glob
import numpy as np
from pathlib import Path
from torchinfo import summary
from torch.optim.lr_scheduler import CosineAnnealingLR
from PIL import Image

# Torchvision libraries
from torchvision import tv_tensors
from torchvision.io import decode_image
from torchvision.transforms import v2 as T
from torchvision.utils import draw_bounding_boxes
from torchvision.transforms import InterpolationMode
from torchvision.transforms.functional import to_pil_image

# Import custom libraries
from modules.obj_detection_utils import collate_fn, prune_predictions, display_and_save_predictions, visualize_transformed_data, set_seeds, RandomTextureOcclusion, RandomCircleOcclusion
from modules.obj_detection import ObjectDetectionEngine
from modules.schedulers import FixedLRSchedulerWrapper
from modules.common import Common
from modules.obj_dect_dataloaders import ProcessDatasetCheerios
from modules.faster_rcnn import StandardFasterRCNN

# Warnings
import warnings
os.environ['TORCH_USE_CUDA_DSA'] = "1"
warnings.filterwarnings("ignore", category=UserWarning, module="torch.autograd.graph")
warnings.filterwarnings("ignore", category=FutureWarning, module="onnxscript.converter")

# Create target model directory
MODEL_DIR = Path("outputs")
MODEL_DIR.mkdir(parents=True, exist_ok=True)

# Set seeds
set_seeds(42)

# Specifying the Target Device

In [None]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Device: {device}")

if device == "cuda":
    !nvidia-smi

# Data Preprocessing (Augmentation)

Two data augmentation pipelines are used in this project:

1. **First Augmentation Pipeline:** This pipeline applies several transformations, including occlusions, flips, and zooming out, among others. Notably, a bush texture, ``T_Bush_Falcon.png``, was created using the ``Falcon editing software`` to simulate occlusions with plants. This augmentation technique is implemented in the ``RandomTextureOcclusion`` class.

2. **Second Augmentation Pipeline:** This pipeline is similar to the first one, but with a key difference in how occlusions are applied. Instead of using plant textures, occlusions are simulated by randomly adding circles of different colors. The ``RandomCircleOcclusion`` class is used for this purpose.

In [None]:
# First augmentation pipeline
def get_transform_1(train, mean_std_norm):
    transforms = []
    if train:
        transforms.append(T.RandomChoice([T.Resize(size, interpolation=InterpolationMode.BILINEAR) for size in range(580, 1080, 250)]))
        transforms.append(T.RandomHorizontalFlip(p=0.5))
        transforms.append(T.RandomVerticalFlip(p=0.5))
        transforms.append(T.ColorJitter(contrast=0.2, saturation=0.2))
        transforms.append(T.GaussianBlur(kernel_size=(3, 3), sigma=(0.1, 0.5)))
        transforms.append(T.RandomPerspective(distortion_scale=0.1, p=0.5))
        transforms.append(T.RandomZoomOut(fill={tv_tensors.Image: (0.5, 0.5, 0.5), "others": 0}, side_range=(1.0, 1.5), p=0.5))
        transforms.append(T.RandomErasing(p=0.5, scale=(0.02, 0.1), ratio=(0.3, 3.3), value=(0.5, 0.5, 0.5)))
        transforms.append(RandomTextureOcclusion(plant_path=["T_Bush_Falcon.png"], transparency=1.0, p=0.5)) #Synthetic texture
    transforms.append(T.ToDtype(torch.float, scale=True))
    if mean_std_norm:
        transforms.append(T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]))
    transforms.append(T.ToPureTensor())
    
    return T.Compose(transforms)

# Second augmentation pipeline
def get_transform_2(train, mean_std_norm):
    transforms = []
    if train:
        transforms.append(T.RandomChoice([T.Resize(size, interpolation=InterpolationMode.BILINEAR) for size in range(580, 1080, 250)]))
        transforms.append(T.RandomHorizontalFlip(p=0.5))
        transforms.append(T.RandomVerticalFlip(p=0.5))
        transforms.append(T.ColorJitter(contrast=0.2, saturation=0.2))
        transforms.append(T.GaussianBlur(kernel_size=(3, 3), sigma=(0.1, 0.5)))
        transforms.append(T.RandomPerspective(distortion_scale=0.1, p=0.5))
        transforms.append(RandomCircleOcclusion(p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3)))
        transforms.append(T.RandomZoomOut(fill={tv_tensors.Image: (0.5, 0.5, 0.5), "others": 0}, side_range=(1.0, 1.5), p=0.5))
        transforms.append(T.RandomErasing(p=0.5, scale=(0.02, 0.1), ratio=(0.3, 3.3), value=(0.5, 0.5, 0.5)))
    transforms.append(T.ToDtype(torch.float, scale=True))
    if mean_std_norm:
        transforms.append(T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]))
    transforms.append(T.ToPureTensor())
    
    return T.Compose(transforms)

# Preparing Dataloaders

In [None]:
# The dataset contains two classes (ROI + background)
NUM_CLASSES = 2
BATCHES = 2

IMAGE_DIR = r"Synthetic_to_Real_Object_Detection_Full/data/train/images"
image = cv2.imread(os.path.join(IMAGE_DIR, "000000000.png"))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# Image dimensions
img_height, img_width = image.shape[:2]
img_size = (img_height, img_width)

# Use ther dataset and defined transformations for model_1.pth
train_dataset_1 = ProcessDatasetCheerios(
    root=r"Synthetic_to_Real_Object_Detection_Full/data/train",
    image_path="images",
    label_path="labels",
    transforms=get_transform_1(train=True, mean_std_norm=False),
    num_classes=NUM_CLASSES-1) # Background to be removed

val_dataset_1 = ProcessDatasetCheerios(
    root=r"Synthetic_to_Real_Object_Detection_Full/data/val",
    image_path="images",
    label_path="labels",
    transforms=get_transform_1(train=False, mean_std_norm=False),
    num_classes=NUM_CLASSES-1) # Background to be removed

# Define training and validation data loaders
train_dataloader_1 = torch.utils.data.DataLoader(
    train_dataset_1,
    batch_size=BATCHES,
    shuffle=False,
    collate_fn=collate_fn
)

val_dataloader_1 = torch.utils.data.DataLoader(
    val_dataset_1,
    batch_size=BATCHES,
    shuffle=False,
    collate_fn=collate_fn
)

# Use ther dataset and defined transformations for model_2.pth
train_dataset_2 = ProcessDatasetCheerios(
    root=r"Synthetic_to_Real_Object_Detection_Full/data/train",
    image_path="images",
    label_path="labels",
    transforms=get_transform_2(train=True, mean_std_norm=False),
    num_classes=NUM_CLASSES-1) # Background to be removed

val_dataset_2 = ProcessDatasetCheerios(
    root=r"Synthetic_to_Real_Object_Detection_Full/data/val",
    image_path="images",
    label_path="labels",
    transforms=get_transform_2(train=False, mean_std_norm=False),
    num_classes=NUM_CLASSES-1) # Background to be removed

# Define training and validation data loaders
train_dataloader_2 = torch.utils.data.DataLoader(
    train_dataset_2,
    batch_size=BATCHES,
    shuffle=False,
    collate_fn=collate_fn
)

val_dataloader_2 = torch.utils.data.DataLoader(
    val_dataset_2,
    batch_size=BATCHES,
    shuffle=False,
    collate_fn=collate_fn
)

# Visualizing Dataloaders: Original and Transformed

In [None]:
# Visualize transformations
dataloader_tr = torch.utils.data.DataLoader(
    ProcessDatasetCheerios(
        root=r"Synthetic_to_Real_Object_Detection_Full/data/train",
        image_path="images",
        label_path="labels",
        transforms=get_transform_1(train=True, mean_std_norm=False),
        num_classes=NUM_CLASSES-1), # Background to be removed
    batch_size=BATCHES,
    shuffle=False,
    collate_fn=collate_fn)

dataloader_ntr = torch.utils.data.DataLoader(
    ProcessDatasetCheerios(
        root=r"Synthetic_to_Real_Object_Detection_Full/data/train",
        image_path="images",
        label_path="labels",
        transforms=get_transform_1(train=False, mean_std_norm=False),
        num_classes=NUM_CLASSES-1), # Background to be removed
    batch_size=BATCHES,
    shuffle=False,
    collate_fn=collate_fn
)

for idx, ((img_tr, target_tr), (img_ntr, target_ntr)) in enumerate(zip(dataloader_tr, dataloader_ntr)):   
    for i in range(0, BATCHES):
        visualize_transformed_data(img_ntr[i], target_ntr[i], img_tr[i], target_tr[i])
    if idx > 4:
        break

# Training Model 1

In [None]:
# Instatiate the model_1
model_1 = StandardFasterRCNN(
    backbone="resnet50_v2",
    num_classes=NUM_CLASSES,
    device=device
    )

# Define model name
model_name_1 = "model_1.pth"

# Create the optimizer
params = [p for p in model_1.parameters() if p.requires_grad]

# Create AdamW optimizer
LR = 1e-5
optimizer = torch.optim.AdamW(
    params=model_1.parameters(),
    lr=LR,
    betas=(0.9, 0.999),
    weight_decay=0.01
)

# Create scheduler
EPOCHS = 30
scheduler = FixedLRSchedulerWrapper(
    scheduler=CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=1e-7),
    fixed_lr=1e-7,
    fixed_epoch=EPOCHS)

# Instantiate the engine with the created model and the target device
engine = ObjectDetectionEngine(
    model=model_1,
    log_verbose=True,
    device=device)

# Configure the training method
results = engine.train(
    target_dir=MODEL_DIR,                       # Directory where the model will be saved
    model_name=model_name_1,                    # Name of the model
    save_best_model=["last", "loss"],           # Save the best models based on different criteria
    keep_best_models_in_memory=True,            # Do not keep the models stored in memory for the sake of training time and memory efficiency
    train_dataloader=train_dataloader_1,        # Train dataloader
    test_dataloader=val_dataloader_1,           # Val dataloader
    optimizer=optimizer,                        # Optimizer    
    scheduler=scheduler,                        # Scheduler
    epochs=EPOCHS,                              # Total number of epochs
    amp=True,                                   # Enable Automatic Mixed Precision (AMP)
    enable_clipping=False,                      # Disable clipping on gradients, only useful if training becomes unestable
    debug_mode=False,                           # Disable debug mode    
    accumulation_steps=2,                       # Accumulation steps: effective batch size = batch_size x accumulation steps
    apply_validation=True                       # Enable validation step
    )

# Training Model 2

In [None]:
# Instatiate the model
model_2 = StandardFasterRCNN(
    backbone="resnet50_v2",
    num_classes=NUM_CLASSES,
    device=device
    )

# Define model name
model_name_2 = "model_2.pth"

# Create the optimizer
params = [p for p in model_2.parameters() if p.requires_grad]

# Create AdamW optimizer
LR = 1e-5
optimizer = torch.optim.AdamW(
    params=model_2.parameters(),
    lr=LR,
    betas=(0.9, 0.999),
    weight_decay=0.01
)

# Create scheduler
EPOCHS = 30
scheduler = FixedLRSchedulerWrapper(
    scheduler=CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=1e-7),
    fixed_lr=1e-7,
    fixed_epoch=EPOCHS)

# Instantiate the engine with the created model and the target device
engine_2 = ObjectDetectionEngine(
    model=model_2,
    log_verbose=True,
    device=device)

# Configure the training method
results = engine_2.train(
    target_dir=MODEL_DIR,                       # Directory where the model will be saved
    model_name=model_name_2,                    # Name of the model
    save_best_model=["last", "loss"],           # Save the best models based on different criteria
    keep_best_models_in_memory=True,            # Do not keep the models stored in memory for the sake of training time and memory efficiency
    train_dataloader=train_dataloader_2,        # Train dataloader
    test_dataloader=val_dataloader_2,           # Val dataloader
    optimizer=optimizer,                        # Optimizer    
    scheduler=scheduler,                        # Scheduler
    epochs=EPOCHS,                              # Total number of epochs
    amp=True,                                   # Enable Automatic Mixed Precision (AMP)
    enable_clipping=False,                      # Disable clipping on gradients, only useful if training becomes unestable
    debug_mode=False,                           # Disable debug mode    
    accumulation_steps=2,                       # Accumulation steps: effective batch size = batch_size x accumulation steps
    apply_validation=True                       # Enable validation step
    )

# Making Predictions on Validation

In [None]:
def rename_model(model_name: str, new_name: str):
    old_name = model_name[0]
    os.rename(old_name, new_name)
    print(f"Renamed {old_name} to {new_name}")
    
# Find the model file with "model_1_loss_epoch" prefix and rename it
model_name = glob.glob(str(MODEL_DIR / "model_1_loss_epoch*.pth"))
new_model_name = str(MODEL_DIR / "model_1.pth")
rename_model(model_name, new_model_name)

# Find the model file with "model_2_loss_epoch" prefix and rename it
model_name = glob.glob(str(MODEL_DIR / "model_2_loss_epoch*.pth"))
new_model_name = str(MODEL_DIR / "model_2.pth")
rename_model(model_name, new_model_name)

In [None]:
# Instantiate the trained model_1
model_1 = StandardFasterRCNN(
    backbone="resnet50_v2",
    num_classes=NUM_CLASSES,
    device=device
    )

# Load the parameters of the best model
model_1 = Common().load_model(model_1, "outputs", "model.pth")

# Make predictions with model_1.pth
preds_1 = ObjectDetectionEngine(
    model=model_1,
    device=device).predict(
        dataloader=val_dataloader_1,
        prune_predictions = True,
        score_threshold = 0.9,
        mask_threshold = 0.9,    
        iou_threshold = 0.01
        )

# Configuration parameters for visualization
BOX_COLOR = "blue"
WIDTH = round(max(img_height, img_width)/175)
FONT_TYPE = r"C:\Windows\Fonts\arial.ttf"
FONT_SIZE = 48
PRINT_LABELS = True

# Display predictions
display_and_save_predictions(
    preds=preds_1,
    dataloader=val_dataset_1,
    box_color=BOX_COLOR,
    width=WIDTH,
    font_type=FONT_TYPE,
    font_size=FONT_SIZE,
    print_classes=True,
    print_scores=True,
    label_to_class_dict={1: 'cheerios'}
    )

In [None]:
# Instantiate the trained model_2
model_2 = StandardFasterRCNN(
    backbone="resnet50_v2",
    num_classes=NUM_CLASSES,
    device=device
    )

# Load the parameters of the best model
model_2 = Common().load_model(model_2, "outputs", "model_2.pth")

# Make predictions with model_2.pth
preds_2 = ObjectDetectionEngine(
    model=model_2,
    device=device).predict(
        dataloader=val_dataloader_2,
        prune_predictions = True,
        score_threshold = 0.9,
        mask_threshold = 0.9,    
        iou_threshold = 0.01
        )

# Configuration parameters for visualization
BOX_COLOR = "red"
WIDTH = round(max(img_height, img_width)/175)
FONT_TYPE = r"C:\Windows\Fonts\arial.ttf"
FONT_SIZE = 48
PRINT_LABELS = True

# Display predictions
display_and_save_predictions(
    preds=preds_2,
    dataloader=val_dataset_2,
    box_color=BOX_COLOR,
    width=WIDTH,
    font_type=FONT_TYPE,
    font_size=FONT_SIZE,
    print_classes=True,
    print_scores=True,
    label_to_class_dict={1: 'cheerios'}
    )

# Inference on Real Data
1. Open an ananconda terminal
2. Activate the EDU environment
3. Execute ``python predict_v3.py --model1 "outputs/model_1.pth" --model2 "outputs/model_2.pth"``
4. Execute ``python convert_preds_to_csv_v2.py``