In [None]:
!pip install wandb

In [None]:
import sys

def wandb_colab_login():
  """Temporary hack to prevent colab from hanging"""
  sys.modules["google.colab2"] = sys.modules["google.colab"]
  del sys.modules["google.colab"]
  wandb.login()
  sys.modules["google.colab"] = sys.modules["google.colab2"]
wandb_colab_login()

In [None]:
wandb.login(key="4e77326bcce901ff230272a5919b12ca4588d281")

### Install dependencies

In [None]:
!pip install -q git+https://github.com/huggingface/transformers.git
!pip install -q git+https://github.com/roboflow/supervision.git
!pip install -q accelerate
!pip install -q roboflow
!pip install -q torchmetrics
!pip install -q "albumentations>=1.4.5"

### Imports

In [None]:
import torch
import requests

import numpy as np
import supervision as sv
import albumentations as A

from PIL import Image
from pprint import pprint
from roboflow import Roboflow
from dataclasses import dataclass, replace
from google.colab import userdata
from torch.utils.data import Dataset
from transformers import (
    AutoImageProcessor,
    AutoModelForObjectDetection,
    TrainingArguments,
    Trainer
)
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from roboflow import Roboflow

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AutoModelForObjectDetection.from_pretrained("PekingU/rtdetr_v2_r101vd").to(DEVICE)
processor = AutoImageProcessor.from_pretrained("PekingU/rtdetr_v2_r101vd")

In [None]:
rf = Roboflow(api_key="")
project = rf.workspace("kuivashev").project("my-normal-dataset")
version = project.version(1)
dataset = version.download("coco")

In [None]:
ds_train = sv.DetectionDataset.from_coco(
    images_directory_path=f"{dataset.location}/train",
    annotations_path=f"{dataset.location}/train/_annotations.coco.json",
)
ds_valid = sv.DetectionDataset.from_coco(
    images_directory_path=f"{dataset.location}/valid",
    annotations_path=f"{dataset.location}/valid/_annotations.coco.json",
)
ds_test = sv.DetectionDataset.from_coco(
    images_directory_path=f"{dataset.location}/test",
    annotations_path=f"{dataset.location}/test/_annotations.coco.json",
)

In [None]:
GRID_DIM = 5
def _get_labels(anns, class_list):
    return [class_list[cid] for cid in anns.class_id]
def draw_annotations(frame, anns, class_list,
                     box_annotator=None,
                     label_annotator=None):
    if box_annotator is None:
        box_annotator = sv.BoxAnnotator()
    if label_annotator is None:
        label_annotator = sv.LabelAnnotator(text_scale=1, text_thickness=3)
    labels = _get_labels(anns, class_list)
    canvas = frame.copy()
    canvas = box_annotator.annotate(canvas, anns)
    canvas = label_annotator.annotate(canvas, anns, labels=labels)
    return canvas
def build_grid(dataset, grid_dim=GRID_DIM,
               tile_size=(400, 400),
               padding_color=sv.Color.WHITE,
               margin_color=sv.Color.WHITE):
    annotated = []
    for idx in range(grid_dim * grid_dim):
        _, img, anns = dataset[idx]
        annotated.append(draw_annotations(img, anns, dataset.classes))
    return sv.create_tiles(
        annotated,
        grid_size=(grid_dim, grid_dim),
        single_tile_size=tile_size,
        tile_padding_color=padding_color,
        tile_margin_color=margin_color
    )

In [None]:
IMAGE_SIZE = 480
processor = AutoImageProcessor.from_pretrained(
    CHECKPOINT,
    do_resize=True,
    size={"width": IMAGE_SIZE, "height": IMAGE_SIZE},
)

In [None]:
from torch.utils.data import Dataset

def format_to_coco(img_id, class_ids, xyxy_boxes):
    ann_list = [
        {
            "image_id": img_id,
            "category_id": cls,
            "bbox": [x1, y1, x2 - x1, y2 - y1],
            "area": (x2 - x1) * (y2 - y1),
            "iscrowd": 0
        }
        for cls, (x1, y1, x2, y2) in zip(class_ids, xyxy_boxes)
    ]
    return {
        "image_id": img_id,
        "annotations": ann_list
    }

class CustomDetectionDataset(Dataset):
    def __init__(self, base_dataset, processor, augmentations=None):
        self.base_dataset = base_dataset
        self.processor = processor
        self.augmentations = augmentations

    def __len__(self):
        return len(self.base_dataset)

    def __getitem__(self, idx):
        _, raw_img, raw_ann = self.base_dataset[idx]
        img = raw_img[..., ::-1]
        boxes = raw_ann.xyxy
        labels = raw_ann.class_id
        coco_formatted = format_to_coco(idx, labels, boxes)
        processed = self.processor(
            images=img,
            annotations=coco_formatted,
            return_tensors="pt"
        )
        return {key: tensor.squeeze(0) for key, tensor in processed.items()}

In [None]:
pytorch_dataset_train = PyTorchDetectionDataset(
    ds_train, processor, transform=train_augmentation_and_transform)
pytorch_dataset_valid = PyTorchDetectionDataset(
    ds_valid, processor, transform=valid_transform)
pytorch_dataset_test = PyTorchDetectionDataset(
    ds_test, processor, transform=valid_transform)

In [None]:
def collate_fn(batch):
    data = {}
    data["pixel_values"] = torch.stack([x["pixel_values"] for x in batch])
    data["labels"] = [x["labels"] for x in batch]
    return data

In [None]:
id2label = dict(enumerate(ds_train.classes))
label2id = {v: k for k, v in id2label.items()}
@dataclass
class ModelOutput:
    logits: torch.Tensor
    pred_boxes: torch.Tensor
class DetectionMAP:
    def __init__(self, processor, threshold=0.0, id2label=None):
        self.processor = processor
        self.threshold = threshold
        self.id2label = id2label
    def _get_sizes(self, batches):
        return [torch.tensor([item["size"] for item in batch]) for batch in batches]
    def _extract_targets(self, batches, sizes):
        out = []
        for batch, size in zip(batches, sizes):
            for entry, (h, w) in zip(batch, size):
                boxes = sv.xcycwh_to_xyxy(entry["boxes"]) * torch.tensor([w, h, w, h])
                out.append({"boxes": torch.tensor(boxes), "labels": torch.tensor(entry["class_labels"])})
        return out
    def _extract_preds(self, raw_preds, sizes):
        out = []
        for logits, scores, boxes in raw_preds:
            mo = ModelOutput(logits=torch.tensor(scores), pred_boxes=torch.tensor(boxes))
            processed = self.processor.post_process_object_detection(
                mo, threshold=self.threshold, target_sizes=sizes
            )
            out.extend(processed)
        return out
    @torch.no_grad()
    def __call__(self, eval_res):
        raw_preds, raw_targs = eval_res.predictions, eval_res.label_ids
        sizes = self._get_sizes(raw_targs)
        targets = self._extract_targets(raw_targs, sizes)
        preds = self._extract_preds(raw_preds, sizes)
        m = MeanAveragePrecision(box_format="xyxy", class_metrics=True)
        m.warn_on_many_detections = False
        m.update(preds, targets)
        stats = m.compute()
        classes = stats.pop("classes")
        maps = stats.pop("map_per_class")
        mars = stats.pop("mar_100_per_class")
        for cid, mp, mr in zip(classes, maps, mars):
            name = self.id2label[cid.item()] if self.id2label else cid.item()
            stats[f"map_{name}"] = mp
            stats[f"mar_100_{name}"] = mr
        return {k: round(v.item(), 4) for k, v in stats.items()}

compute_metrics = DetectionMAP(processor, threshold=0.01, id2label=id2label)

In [None]:
model = AutoModelForObjectDetection.from_pretrained(
    CHECKPOINT,
    id2label=id2label,
    label2id=label2id,
    anchor_image_size=None,
    ignore_mismatched_sizes=True,
)

In [None]:
training_args = TrainingArguments(
    output_dir=f"first_tracked_finetune",
    num_train_epochs=30,
    max_grad_norm=0.1,
    learning_rate=5e-5 * 1.5,
    warmup_steps=300,
    per_device_train_batch_size=45,
    dataloader_num_workers=2,
    metric_for_best_model="eval_map",
    greater_is_better=True,
    load_best_model_at_end=True,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    remove_unused_columns=False,
    eval_do_concat_batches=False,
    report_to="none",
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=pytorch_dataset_train,
    eval_dataset=pytorch_dataset_valid,
    tokenizer=processor,
    data_collator=collate_fn,
    compute_metrics=eval_compute_metrics_fn,
)

trainer.train()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
targets = []
predictions = []
for entry in ds_test:
    file_path, _, annots = entry
    img = Image.open(file_path)
    tensor_in = processor(img, return_tensors="pt").to(DEVICE)
    with torch.no_grad():
        model_out = model(**tensor_in)
    w, h = img.size
    post = processor.post_process_object_detection(
        model_out,
        target_sizes=[(h, w)],
        threshold=0.3
    )[0]
    dets = sv.Detections.from_transformers(post)
    targets.append(annots)
    predictions.append(dets)

In [None]:
mean_average_precision = sv.MeanAveragePrecision.from_detections(
    predictions=predictions,
    targets=targets,
)
print(f"map50_95: {mean_average_precision.map50_95:.2f}")
print(f"map50: {mean_average_precision.map50:.2f}")
print(f"map75: {mean_average_precision.map75:.2f}")

In [None]:
model.save_pretrained("/content/rt-detr/")
processor.save_pretrained("/content/rt-detr/")

In [None]:
NUM_SAMPLES = 35
def display_samples(count):
    for img_path, original, truth in ds_test[:count]:
        pil_img = Image.open(img_path)
        batch = processor(pil_img, return_tensors="pt").to(DEVICE)
        with torch.no_grad():
            out = model(**batch)
        w, h = pil_img.size
        det_batch = processor.post_process_object_detection(
            out, target_sizes=[(h, w)], threshold=0.3
        )[0]
        preds = sv.Detections.from_transformers(det_batch).with_nms(threshold=0.1)
        side_by_side = [
            annotate(original, truth, ds_train.classes),
            annotate(original, preds, ds_train.classes)
        ]
        tile_grid = sv.create_tiles(
            side_by_side,
            titles=['ground truth', 'prediction'],
            titles_scale=0.5,
            single_tile_size=(400, 400),
            tile_padding_color=sv.Color.WHITE,
            tile_margin_color=sv.Color.WHITE
        )
        sv.plot_image(tile_grid, size=(6, 6))

display_samples(NUM_SAMPLES)