<a href="https://www.kaggle.com/code/akaiinu/soict-hackathon-1617b9?scriptVersionId=209037037" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

Install dependencies.

In [None]:
from __future__ import annotations


!python --version
!pip install -U ipywidgets pyyaml sahi shapely tqdm ultralytics

Monkey-patch ultralytics weighted fitness function (prioritize mAP50). See [the original source file](https://github.com/ultralytics/ultralytics/blob/main/ultralytics/utils/metrics.py).

Note that subprocesses are unaffected by this patch.

In [None]:
import numpy
import ultralytics.utils.metrics


class Metric(ultralytics.utils.metrics.Metric):
    def fitness(self):
        """Model fitness as a weighted combination of metrics."""
        w = [0.0, 0.0, 0.9, 0.1]  # weights for [P, R, mAP@0.5, mAP@0.5:0.95]
        return (numpy.array(self.mean_results()) * w).sum()


ultralytics.utils.metrics.Metric = Metric

Setup global configurations.

In [None]:
import io
import itertools
import random
import re
import shutil
from pathlib import Path
from typing import Any, Iterable, List, Tuple, TypeVar

import torch
import yaml
from sahi import AutoDetectionModel
from tqdm import tqdm
from ultralytics import YOLO
from sahi.predict import PredictionResult, get_prediction
from ultralytics.nn.tasks import DetectionModel
from ultralytics.utils.loss import BboxLoss, v8DetectionLoss
from ultralytics.models.yolo.detect import DetectionTrainer


KAGGLE_INPUT = Path("/kaggle/input")
# /kaggle/input/soict-hackathon-2024
KAGGLE_DATASET = KAGGLE_INPUT / "soict-hackathon-2024"
# /kaggle/input/sh2024-models/pytorch/default/3
KAGGLE_MODEL = KAGGLE_INPUT / "sh2024-models" / "pytorch" / "default" / "3"
KAGGLE_WORKSPACE = Path("/kaggle/working")
IMAGE_SIZE = 640
CONFIDENCE_THRESHOLD = 0.001

In [None]:
!rm -rf {KAGGLE_WORKSPACE / "*"}

See available [train settings](https://docs.ultralytics.com/modes/train/#train-settings) and [augmentation arguments](https://docs.ultralytics.com/modes/train/#augmentation-settings-and-hyperparameters).

In [None]:
DETECT_ARGS = {
    "epochs": 150, # Number of training epochs
"imgsz": IMAGE_SIZE, # Image size for training
"plots": True, # Save training plots
"save_json": True, # Save results as JSON
"save_conf": True, # Save confidence scores
"batch": 32, # Size of training batches
"workers": 16, # Number of workers for data loading
"weight_decay": 0.00005, # Weight decay for regularization
"augment": True, # Enable data augmentation
"mixup": 0.5, # Mixup augmentation factor
"optimizer": "Adam", # Optimizer to use 
}

Transform dataset format for object detection. See [supported dataset formats](https://docs.ultralytics.com/datasets/detect/).

In [None]:
def make_object_detection_dataset() -> Path:
    target = KAGGLE_WORKSPACE / "detect"
    shutil.rmtree(target, ignore_errors=True)

    images = target / "images"
    images_train = images / "train"
    images_val = images / "val"

    labels = target / "labels"
    labels_train = labels / "train"
    labels_val = labels / "val"

    for subdir in (images_train, images_val, labels_train, labels_val):
        subdir.mkdir(parents=True)

    for dirname in ("daytime", "nighttime"):
        source = KAGGLE_DATASET / "train_20241023" / dirname
        for file in source.iterdir():
            match = re.search(r"^cam_(\d+)_\d{5}(?!\d)", file.stem)
            if int(match.group(1)) < 10:
                images = images_train
                labels = labels_train
            else:
                images = images_val
                labels = labels_val
    
            if file.suffix == ".jpg":
                images.joinpath(file.name).symlink_to(file)
            elif file.suffix == ".txt":
                labels.joinpath(file.name).symlink_to(file) 

    def count_files(path: Path) -> int:
        return len(list(path.iterdir()))

    for subdir in (images_train, images_val, labels_train, labels_val):
        print(f"Size of {subdir}: {count_files(subdir)}")

    data_yaml = target / "data.yaml"
    with data_yaml.open("w", encoding="utf-8") as config:
        print(f"Writing to {data_yaml}")
        config.write(
            yaml.dump(
                {
                    "path": str(target),
                    "train": str(images_train),
                    "val": str(images_val),
                    "names": {
                        0: "motorcycle",
                        1: "car",
                        2: "coach",
                        3: "container",
                    },
                },
            ),
        )

    return data_yaml


data_yaml = make_object_detection_dataset()

In [None]:
!cat {data_yaml}

Customize loss function. See [the original source code](https://github.com/ultralytics/ultralytics/blob/main/ultralytics/utils/loss.py).

Attributes of [`v8DetectionLoss`](https://docs.ultralytics.com/reference/utils/loss/#ultralytics.utils.loss.v8DetectionLoss):
- `bce`: An instance of [`torch.nn.BCEWithLogitsLoss`](https://pytorch.org/docs/stable/generated/torch.nn.BCEWithLogitsLoss.html).
- `bbox_loss`: An instance of [`ultralytics.utils.loss.BboxLoss`](https://docs.ultralytics.com/reference/utils/loss/#ultralytics.utils.loss.BboxLoss).

In [None]:
class CustomBboxLoss(BboxLoss):
    pass


class CustomBCEWithLogitsLoss(torch.nn.BCEWithLogitsLoss):
    pass


class CustomDetectionLoss(v8DetectionLoss):
    def __init__(self, model: CustomModel, tal_topk: int = 10) -> None:
        super().__init__(model, tal_topk)

        device = next(model.parameters()).device
        m = model.model[-1]

        self.bce = CustomBCEWithLogitsLoss(reduction="none")
        self.bbox_loss = CustomBboxLoss(m.reg_max).to(device)


class CustomModel(DetectionModel):
    def init_criterion(self) -> CustomDetectionLoss:
        return CustomDetectionLoss(self)


class CustomTrainer(DetectionTrainer):
    def get_model(self, cfg: Any = None, weights: Any = None, verbose: bool = True) -> CustomModel:
        model = CustomModel(cfg, nc=4, verbose=verbose)
        if weights is not None:
            model.load(weights)

        return model

Code [copied](https://chatgpt.com/share/672f8b5b-2b48-8009-a1ce-c07adb3c41d8) from ChatGPT lol.

The idea here is that we utilize opencv's feature extraction.

In [None]:
import torch
import torch.nn as nn
import cv2
import numpy as np

class SIFTFeatureLayer(nn.Module):
    def __init__(self, original_layer):
        super().__init__()
        self.in_channels = original_layer.conv.in_channels  # Should be 3 (RGB)
        self.out_channels = original_layer.conv.out_channels
        self.kernel_size = original_layer.conv.kernel_size
        self.stride = original_layer.conv.stride
        self.padding = original_layer.conv.padding

        # Initialize the convolution layer
        self.conv = nn.Conv2d(
            in_channels=self.in_channels + 1,  # Adding 1 channel for SIFT feature map
            out_channels=self.out_channels,
            kernel_size=self.kernel_size,
            stride=self.stride,
            padding=self.padding,
            bias=False
        )
        nn.init.kaiming_normal_(self.conv.weight)

        # Initialize SIFT detector
        self.sift = cv2.SIFT_create()

    def sift_feature_map(self, image_np):
        # Convert to grayscale for SIFT
        gray = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY)
        keypoints, descriptors = self.sift.detectAndCompute(gray, None)

        # Create an empty feature map
        sift_map = np.zeros_like(gray, dtype=np.float32)

        if keypoints:
            # Use numpy for marking keypoints locations with their strength
            keypoint_locations = np.array([kp.pt for kp in keypoints], dtype=np.int32)
            keypoint_strengths = np.array([kp.response for kp in keypoints], dtype=np.float32)

            # Clip locations to stay within bounds
            valid_indices = (keypoint_locations[:, 0] >= 0) & (keypoint_locations[:, 0] < gray.shape[1]) & \
                            (keypoint_locations[:, 1] >= 0) & (keypoint_locations[:, 1] < gray.shape[0])

            # Assign strengths to valid keypoints
            sift_map[keypoint_locations[valid_indices, 1], keypoint_locations[valid_indices, 0]] = keypoint_strengths[valid_indices]

        # Normalize and add channel dimension (1, H, W)
        sift_map = np.expand_dims(sift_map / np.max(sift_map) if sift_map.max() > 0 else 1, axis=0)

        return sift_map

    def forward(self, x):
        batch_size, _, height, width = x.shape
        
        # Convert to numpy for SIFT extraction
        x_np = x.permute(0, 2, 3, 1).cpu().numpy() * 255.0  # (B, H, W, C)
        x_np = x_np.astype(np.uint8)

        # Extract SIFT feature maps for each image in the batch
        sift_maps = np.zeros((batch_size, 1, height, width), dtype=np.float32)

        for i in range(batch_size):
            sift_map = self.sift_feature_map(x_np[i])
            sift_maps[i] = sift_map

        # Convert sift_maps back to tensor
        sift_map_tensor = torch.tensor(sift_maps, dtype=torch.float32).to(x.device)

        # Concatenate the SIFT feature map with the original image tensor
        x = torch.cat((x, sift_map_tensor), dim=1)

        # Apply convolution
        return self.conv(x)

Train object detection model.

In [None]:
def train() -> None:
    pretrained_path = KAGGLE_MODEL / "detect.pt"
    if pretrained_path.is_file():
        pretrained = pretrained_path
    else:
        pretrained = "yolo11s.pt"

    model = YOLO(pretrained)
    # Replace the first convolution layer of the model with the custom SIFT layer
    layers = model.model.model
    layers[0] = SIFTFeatureLayer(layers[0])
    model.train(trainer=CustomTrainer, data=data_yaml, **DETECT_ARGS)

train()

Remove downloaded models, if any.

In [None]:
!rm -f yolo11n.pt yolo11m.pt yolo11s.pt

There are 2 prediction methods: with and without [SAHI sliced inference](https://docs.ultralytics.com/guides/sahi-tiled-inference).

In [None]:
detect = YOLO("runs/detect/train/weights/best.pt", task="detect")
detect_sahi = AutoDetectionModel.from_pretrained(
    model_type="yolov8",
    model=detect,
    confidence_threshold=CONFIDENCE_THRESHOLD,
)

public_test = KAGGLE_DATASET / "public test"

Predict with [SAHI sliced inference](https://docs.ultralytics.com/guides/sahi-tiled-inference).

In [None]:
def write_sahi(writer: io.TextIOWrapper, file: Path, result: PredictionResult) -> None:
    for o in result.object_prediction_list:
        bbox = o.bbox
        centerx = (bbox.minx + bbox.maxx) / (2 * result.image_width)
        centery = (bbox.miny + bbox.maxy) / (2 * result.image_height)
        width = (bbox.maxx - bbox.minx) / result.image_width
        height = (bbox.maxy - bbox.miny) / result.image_height
        writer.write(f"{file.name} {o.category.id} {centerx} {centery} {width} {height} {o.score.value}\n")


with KAGGLE_WORKSPACE.joinpath("predict-sahi.txt").open("w", encoding="utf-8") as writer:
    for file in tqdm(public_test.iterdir()):
        write_sahi(writer, file, get_prediction(str(file), detect_sahi))

Predict without [SAHI sliced inference](https://docs.ultralytics.com/guides/sahi-tiled-inference).

In [None]:
T = TypeVar("T")


def write(file: Path, writer: io.TextIOWrapper) -> None:
    with file.open("r") as f:
        for line in f.readlines():
            writer.write(f"{file.stem}.jpg {line}")


def batched(iterable: Iterable[T], n: int) -> Iterable[Tuple[T, ...]]:
    if n < 1:
        raise ValueError("n < 1")

    iterator = iter(iterable)
    while batch := tuple(itertools.islice(iterator, n)):
        yield batch


for files in batched(public_test.iterdir(), 32):
    for _ in detect.predict(
        files,
        conf=CONFIDENCE_THRESHOLD,
        imgsz=IMAGE_SIZE,
        stream=True,
        save=True,
        save_conf=True,
        save_txt=True,
        verbose=False,
    ):
        pass


with KAGGLE_WORKSPACE.joinpath("predict.txt").open("w", encoding="utf-8") as writer:
    for file in KAGGLE_WORKSPACE.joinpath("runs", "detect", "predict", "labels").iterdir():
        write(file, writer)