# DETR football training

## Prep

In [None]:
%pip install -U -q datasets transformers[torch] evaluate timm albumentations accelerate roboflow wandb torchmetrics


In [None]:
%pip uninstall -y supervision && pip install -q supervision>=0.23.0

In [None]:
!nvidia-smi

In [29]:
#MODEL_NAME = "facebook/detr-resnet-50-dc5" # DETR
#MODEL_NAME = "jozhang97/deta-swin-large" # Object detection leader on HF - based on paper https://arxiv.org/pdf/2212.06137
MODEL_NAME = "SenseTime/deformable-detr" #Deformable DETR
MODEL_CHECKPOINT = "deformable-detr-football-finetuned"
PROJECT_NAME = "football-detection"
RUN_NAME = "eval test"
EPOCHS = 5
BATCH_SIZE = 4
#IMAGE_SIZE = 800 #not used rn


Get API keys from environment variables

In [3]:
import os

try:
    from google.colab import userdata

    rf_api_key = userdata.get("ROBOFLOW_API_KEY")
    wandb_api_key = userdata.get("WANDB_API_KEY")
    hf_token=userdata.get('HF_TOKEN')


except ImportError:
    from dotenv import load_dotenv
    load_dotenv(dotenv_path='../config/.env')
    rf_api_key = os.getenv("ROBOFLOW_API_KEY")
    wandb_api_key = os.getenv("WANDB_API_KEY")
    hf_token = os.getenv("HF_TOKEN")

        

## Dataset

### Download dataset

In [None]:
from roboflow import Roboflow

dataset_version = 1
dataset_location = "../data/training"



rf = Roboflow(api_key=rf_api_key)
project = rf.workspace("sport-cv").project("football-players-detection-3zvbc-fynld")
version = project.version(2)
robo_dataset = version.download("coco", location=dataset_location)
print("loaded dataset to", dataset_location)



Transform dataset into a format that can be used by the DETR models

In [None]:
import json
from datasets import Dataset, DatasetDict
from PIL import Image
import numpy as np
import os

def load_coco_dataset(json_file, dataset_base_path, split):
    # Load JSON data
    with open(json_file, 'r', encoding='utf-8') as f:
        coco_data = json.load(f)

    dataset_dict = {
        'image_id': [],
        'image': [],
        'objects': [],
        'width': [],
        'height': []
    }

    for img in coco_data['images']:
        img_id = img['id']
        img_path = os.path.join(dataset_base_path, split, img['file_name'])
        
        image = Image.open(img_path)
        
        img_annotations = [ann for ann in coco_data['annotations'] if ann['image_id'] == img_id]
        
        objects = {
            'id': [],
            'area': [],
            'bbox': [],
            'category': []
        }
        
        for ann in img_annotations:
            objects['id'].append(ann['category_id'])
            objects['area'].append(ann['area'])
            objects['bbox'].append(ann['bbox'])
            # Find category name
            category_name = next(
                cat['name'] for cat in coco_data['categories'] 
                if cat['id'] == ann['category_id']
            )
            objects['category'].append(category_name)

        # Add to dataset dictionary
        dataset_dict['image_id'].append(img_id)
        dataset_dict['image'].append(image)
        dataset_dict['objects'].append(objects)
        dataset_dict['width'].append(img['width'])
        dataset_dict['height'].append(img['height'])

    return Dataset.from_dict(dataset_dict)

dataset_base_path = robo_dataset.location  
train_json = os.path.join(dataset_base_path, "train", "_annotations.coco.json")
val_json = os.path.join(dataset_base_path, "valid", "_annotations.coco.json")
test_json = os.path.join(dataset_base_path, "test", "_annotations.coco.json")

# Load datasets for each split
train_dataset = load_coco_dataset(train_json, dataset_base_path, 'train')
val_dataset = load_coco_dataset(val_json, dataset_base_path, 'valid')
test_dataset = load_coco_dataset(test_json, dataset_base_path, 'test')

# Combine into a DatasetDict
dataset = DatasetDict({
    'train': train_dataset,
    'validation': val_dataset,
    'test': test_dataset
})

dataset

In [9]:
train_dataset = dataset["train"]
validation_dataset = dataset["validation"].select(range(48)) # validatation dataset len have to be multiplier of 8 because of a bug in collect_image_sizes
test_dataset = dataset["test"].select(range(24))

In [None]:
from pprint import pprint

pprint(train_dataset[0])

In [None]:
import numpy as np
from PIL import Image, ImageDraw


def draw_image_from_idx(dataset, idx):
    sample = dataset[idx]
    image = sample["image"]
    annotations = sample["objects"]
    draw = ImageDraw.Draw(image)
    width, height = sample["width"], sample["height"]

    for i in range(len(annotations["id"])):
        box = annotations["bbox"][i]
        class_idx = annotations["id"][i]
        x, y, w, h = tuple(box)
        if max(box) > 1.0:
            x1, y1 = int(x), int(y)
            x2, y2 = int(x + w), int(y + h)
        else:
            x1 = int(x * width)
            y1 = int(y * height)
            x2 = int((x + w) * width)
            y2 = int((y + h) * height)
        draw.rectangle((x1, y1, x2, y2), outline="red", width=1)
        draw.text((x1, y1), annotations["category"][i], fill="white")
    return image


import random
random_idx = random.randint(0, len(train_dataset) - 1)
draw_image_from_idx(dataset=train_dataset, idx=random_idx)

In [None]:
import matplotlib.pyplot as plt


import random

def plot_images(dataset, num_images=9):
    indices = random.sample(range(len(dataset)), num_images)
    num_rows = num_images // 3
    num_cols = 3
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(15, 10))

    for i, idx in enumerate(indices):
        row = i // num_cols
        col = i % num_cols

        # Draw image
        image = draw_image_from_idx(dataset, idx)

        # Display image on the corresponding subplot
        axes[row, col].imshow(image)
        axes[row, col].axis("off")

    plt.tight_layout()
    plt.show()


# Now use the function to plot images
plot_images(train_dataset)


### Preprocessing the Dataset

In [15]:
from transformers import AutoImageProcessor

image_processor = AutoImageProcessor.from_pretrained(MODEL_NAME)

In [16]:
import albumentations
import numpy as np

transform = albumentations.Compose(
    [
        #albumentations.Resize(IMAGE_SIZE, IMAGE_SIZE),
        albumentations.Perspective(p=0.1),
        albumentations.RandomBrightnessContrast(p=0.5),
        albumentations.HueSaturationValue(p=0.1),
    ],
        bbox_params=albumentations.BboxParams(
        format="coco",
        label_fields=["category"],
        clip=True,
        check_each_transform=True #TODO: sprawdzić działanie na False
    ),
)

Once we initialize all the transformations, we need to make a function which formats the annotations and returns the a list of annotation with a very specific format.

This is because the image_processor expects the annotations to be in the following format: {'image_id': int, 'annotations': List[Dict]}, where each dictionary is a COCO object annotation.

In [17]:
def formatted_anns(image_id, category, area, bbox):
    annotations = []
    for i in range(0, len(category)):
        new_ann = {
            "image_id": image_id,
            "category_id": category[i],
            "isCrowd": 0,
            "area": area[i],
            "bbox": list(bbox[i]),
        }
        annotations.append(new_ann)

    return annotations

In [18]:
# transforming a batch

def transform_aug_ann(examples):
    image_ids = examples["image_id"]
    images, bboxes, area, categories = [], [], [], []
    for image, objects in zip(examples["image"], examples["objects"]):
        image = np.array(image.convert("RGB"))[:, :, ::-1]
        out = transform(image=image, bboxes=objects["bbox"], category=objects["id"])

        area.append(objects["area"])
        images.append(out["image"])
        bboxes.append(out["bboxes"])
        categories.append(out["category"])

    targets = [
        {"image_id": id_, "annotations": formatted_anns(id_, cat_, ar_, box_)}
        for id_, cat_, ar_, box_ in zip(image_ids, categories, area, bboxes)
    ]

    return image_processor(images=images, annotations=targets, return_tensors="pt")

In [None]:
# Apply transformations for both train and test dataset

train_dataset_transformed = train_dataset.with_transform(transform_aug_ann)
validation_dataset_transformed = validation_dataset.with_transform(transform_aug_ann)
test_dataset_transformed = test_dataset.with_transform(transform_aug_ann)

A collate_fn is responsible for taking a list of samples from a dataset and converting them into a batch suitable for model’s input format.

In general a DataCollator typically performs tasks such as padding, truncating etc. In a custom collate function, we often define what and how we want to group the data into batches or simply, how to represent each batch.

Data collator mainly puts the data together and then preprocesses them. Let’s make our collate function.

In [34]:
def collate_fn(batch):
    pixel_values = [item["pixel_values"] for item in batch]
    encoding = image_processor.pad(pixel_values, return_tensors="pt")
    labels = [item["labels"] for item in batch]
    batch = {}
    batch["pixel_values"] = encoding["pixel_values"]
    batch["pixel_mask"] = encoding["pixel_mask"]
    batch["labels"] = labels
    return batch

## Training

### Prepare evaluation

#### Login to Hugging Face

In [None]:
from huggingface_hub import login

login(token=hf_token)

In [None]:
from transformers import AutoModelForObjectDetection

id2label = {1: "ball", 2: "goalkeeper", 3: "player", 4: "referee"}
label2id = {v: k for k, v in id2label.items()}


model = AutoModelForObjectDetection.from_pretrained(
    MODEL_NAME,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True,
)

In [None]:
from transformers.image_transforms import center_to_corners_format
import torch

def convert_bbox_yolo_to_pascal(boxes, image_size):
    """
    Convert bounding boxes from YOLO format (x_center, y_center, width, height) in range [0, 1]
    to Pascal VOC format (x_min, y_min, x_max, y_max) in absolute coordinates.

    Args:
        boxes (torch.Tensor): Bounding boxes in YOLO format
        image_size (Tuple[int, int]): Image size in format (height, width)

    Returns:
        torch.Tensor: Bounding boxes in Pascal VOC format (x_min, y_min, x_max, y_max)
    """
    # convert center to corners format
    boxes = center_to_corners_format(boxes)

    # convert to absolute coordinates
    height, width = image_size
    boxes = boxes * torch.tensor([[width, height, width, height]])

    return boxes

In [None]:
from torchmetrics.detection.mean_ap import MeanAveragePrecision
import numpy as np
from dataclasses import dataclass
from functools import partial
import wandb
from torchmetrics.detection.mean_ap import MeanAveragePrecision
import supervision as sv

@dataclass
class ModelOutput:
    logits: torch.Tensor
    pred_boxes: torch.Tensor


class MAPEvaluator:

    def __init__(self, image_processor, threshold=0.00, id2label=None):
        self.image_processor = image_processor
        self.threshold = threshold
        self.id2label = id2label

    def collect_image_sizes(self, targets):
        """Collect image sizes across the dataset as list of tensors with shape [batch_size, 2]."""
        image_sizes = []
        for batch in targets:
            batch_image_sizes = torch.tensor(np.array([x["size"] for x in batch]))
            image_sizes.append(batch_image_sizes)
        return image_sizes

    def collect_targets(self, targets, image_sizes):
        post_processed_targets = []
        for target_batch, image_size_batch in zip(targets, image_sizes):
            
            for target, (height, width) in zip(target_batch, image_size_batch):
                boxes = target["boxes"]
                boxes = sv.xcycwh_to_xyxy(boxes)
                boxes = boxes * np.array([width, height, width, height])
                boxes = torch.tensor(boxes)
                labels = torch.tensor(target["class_labels"])
                post_processed_targets.append({"boxes": boxes, "labels": labels})
        return post_processed_targets

    def collect_predictions(self, predictions, image_sizes):
        post_processed_predictions = []
        for batch, target_sizes in zip(predictions, image_sizes):
            batch_logits, batch_boxes = batch[1], batch[2]
            output = ModelOutput(logits=torch.tensor(batch_logits), pred_boxes=torch.tensor(batch_boxes))
            post_processed_output = self.image_processor.post_process_object_detection(
                output, threshold=self.threshold, target_sizes=target_sizes
            )
            post_processed_predictions.extend(post_processed_output)
        return post_processed_predictions

    @torch.no_grad()
    def __call__(self, evaluation_results):

        predictions, targets = evaluation_results.predictions, evaluation_results.label_ids

        image_sizes = self.collect_image_sizes(targets)
        post_processed_targets = self.collect_targets(targets, image_sizes)
        post_processed_predictions = self.collect_predictions(predictions, image_sizes)

        evaluator = MeanAveragePrecision(box_format="xyxy", class_metrics=True)
        evaluator.warn_on_many_detections = False
        evaluator.update(post_processed_predictions, post_processed_targets)

        metrics = evaluator.compute()

            # Prepare metrics for wandb
        wandb_metrics = {}

        wandb_metrics.update({
          'mAP': metrics['map'].item(),
          'mAP_50': metrics['map_50'].item(),
          'mAP_75': metrics['map_75'].item(),
          'mAR_1': metrics['mar_1'].item(),
          'mAR_10': metrics['mar_10'].item(),
          'mAR_100': metrics['mar_100'].item(),
        })


        # Replace list of per class metrics with separate metric for each class
        classes = metrics.pop("classes")
        map_per_class = metrics.pop("map_per_class")
        mar_100_per_class = metrics.pop("mar_100_per_class")
        for class_id, class_map, class_mar in zip(classes, map_per_class, mar_100_per_class):
            if class_id == 0: #we don't want to use class with index 0
                continue
            class_name = id2label[class_id.item()] if id2label is not None else class_id.item()
            metrics[f"map_{class_name}"] = class_map
            metrics[f"mar_100_{class_name}"] = class_mar
            wandb_metrics.update({
            f'{class_name}/mAP': class_map.item(),
            f'{class_name}/mAR': class_mar.item(),
            })

        metrics = {k: round(v.item(), 4) for k, v in metrics.items()}
        wandb.log(wandb_metrics)

        return metrics

eval_compute_metrics_fn = MAPEvaluator(image_processor=image_processor, threshold=0.01, id2label=id2label)

In [None]:
from transformers import TrainingArguments
from transformers import Trainer

# Define the training arguments

training_args = TrainingArguments(
    output_dir=MODEL_CHECKPOINT,
    per_device_train_batch_size=BATCH_SIZE,
    num_train_epochs=EPOCHS,
    fp16=False,
    logging_steps=100,
    learning_rate=1e-4,
    weight_decay=1e-4,
    remove_unused_columns=False,
    push_to_hub= True,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    metric_for_best_model="eval_map",
    greater_is_better=True,
    load_best_model_at_end=True,
    report_to="wandb",
    logging_dir="./logs",
    logging_strategy="steps",
    eval_do_concat_batches=False
)

# Initialize wandb before training

wandb.login(key=wandb_api_key)

wandb.init(
    project=PROJECT_NAME,
    name=RUN_NAME,
    config={
        "learning_rate": training_args.learning_rate,
        "epochs": training_args.num_train_epochs,
        "batch_size": training_args.per_device_train_batch_size,
    }
)

# Define the trainer

trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=collate_fn,
    train_dataset=train_dataset_transformed,
    eval_dataset=validation_dataset_transformed,
    processing_class=image_processor,
    compute_metrics=eval_compute_metrics_fn,
)

trainer.train()



## Evaluation







In [None]:

metrics = trainer.evaluate(eval_dataset=test_dataset_transformed, metric_key_prefix="test")
pprint(metrics)

wandb.finish()


## Testing


If you save the model to the hub, you need to change the model to your own username and the name of the model.

In [None]:
import requests
from transformers import pipeline

# make the object detection pipeline
model_checkpoint = "theButcher22/" + MODEL_CHECKPOINT
obj_detector = pipeline(
    "object-detection", 
    model= model_checkpoint, 
    threshold=0.3
)
results = obj_detector(train_dataset[0]["image"])

print(results)

In [12]:
from PIL import ImageFont

def plot_results(image, results, threshold=0.7):
    image = Image.fromarray(np.uint8(image))
    draw = ImageDraw.Draw(image)
    for result in results:
        score = result["score"]
        label = result["label"]
        box = list(result["box"].values())
        if score > threshold:
            x, y, x2, y2 = tuple(box)
            draw.rectangle((x, y, x2, y2), outline="red", width=1)
            draw.text(
                (x + 0.5, y - 0.5),
                text=str(label),
                fill="green" if score > 0.7 else "red",
                font=ImageFont.load_default(size=16) # Increased font size
            )
    return image

In [None]:
plot_results(image, results, threshold=0.5)


In [None]:
import random

def predict(image, pipeline, threshold=0.1):
    results = pipeline(image)
    return plot_results(image, results, threshold)

img = random.choice(test_dataset)["image"]
predict(img, obj_detector)