## Download Data from Kaggle

In [None]:
!kaggle competitions download -c nfl-health-and-safety-helmet-assignment

## Extract Data

In [None]:
!mkdir data
!unzip -o nfl-health-and-safety-helmet-assignment.zip -d "./data/"
!rm nfl-health-and-safety-helmet-assignment.zip

## Import required packages

In [None]:
import shutil as sh
from itertools import accumulate
from math import ceil
from pathlib import Path
from pprint import pprint as pp

import numpy as np
import pandas as pd
import supervision as sv
import torch
import torchvision as tv
from PIL import Image

In [None]:
# check GPU availability
print(torch.cuda.is_available())

In [None]:
BASE_DIR = Path(r".\data")
images_dir = BASE_DIR / "images"
images_lables_csv = BASE_DIR / "image_labels.csv"

In [None]:
images = Path(images_dir).glob("*.jpg")
images = list(images)
img_labels = pd.read_csv(images_lables_csv)

## Check sample Image

In [None]:
def get_annotated_img(
    image_path: Path, annotation_df: pd.DataFrame
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    curr_image = Image.open(image_path)
    img_name = image_path.name
    curr_img_lbl = annotation_df.query("image == @img_name")
    curr_lbl_xywh = curr_img_lbl[["left", "top", "width", "height"]].values
    curr_lbl_xyxy = tv.ops.box_convert(
        torch.Tensor(curr_lbl_xywh),
        in_fmt="xywh",
        out_fmt="xyxy",
    ).numpy()
    labels = sv.Detections(xyxy=curr_lbl_xyxy)
    annotations = sv.BoxAnnotator(
        color=sv.Color(0, 0, 255),
    ).annotate(np.array(curr_image.copy()), labels, skip_label=True)
    # annotated_img = Image.fromarray(annotations)
    return np.array(curr_image.copy()), curr_lbl_xyxy, annotations

In [None]:
curr_img_idx = np.random.randint(0, len(images))
print(curr_img_idx)
img = images[curr_img_idx]
curr_image, curr_lbl_xyxy, annotated_img = get_annotated_img(img, img_labels)
annotated_img = Image.fromarray(annotated_img)

annotated_img

## Train test split

In [None]:
img_labels["label"].value_counts()

label_mapping = {
    "Helmet": 0,
    "Helmet-Blurred": 1,
    "Helmet-Sideline": 2,
    "Helmet-Partial": 3,
    "Helmet-Difficult": 4,
}

In [None]:
temp_df = img_labels.copy()
images = temp_df["image"].unique()
print(len(images))

np.random.seed(47)

pp(images[:5])
np.random.shuffle(images)
pp(images[:5])

num_images = len(images)
split_index = [0.7, 0.2, 0.1]
train_idx, valid_idx, test_idx = [ceil(num_images * i) for i in split_index]
train_idx, valid_idx, test_idx = list(accumulate([train_idx, valid_idx, test_idx]))

train_images, valid_images, test_images = np.split(images, [train_idx, valid_idx])
train_df, valid_df, test_df = (
    temp_df.query("image in @train_images"),
    temp_df.query("image in @valid_images"),
    temp_df.query("image in @test_images"),
)

In [None]:
print(train_df.shape, test_df.shape, valid_df.shape)
print(train_df["label"].value_counts())
print(test_df["label"].value_counts())
print(valid_df["label"].value_counts())

print(
    len(train_df["image"].unique()),
    len(test_df["image"].unique()),
    len(valid_df["image"].unique()),
)

## Move Images and annotations to different directories
- Move images and annotations to directories
- Convert dataset to YOLO8 required format (scaling annotations by image dimensions) 
- [Dataset Format Reference](https://docs.ultralytics.com/datasets/detect/)


In [None]:
y8_dataset_ouput_path = Path(r".\y8_image_dataset_split")  # fmt: skip

In [None]:
directories = (
    (y8_dataset_ouput_path / "train", train_df),
    (y8_dataset_ouput_path / "valid", valid_df),
    (y8_dataset_ouput_path / "test", test_df),
)

for out_dir, curr_df in directories:
    out_img_dir = out_dir / "images"
    out_img_dir.mkdir(exist_ok=True, parents=True)

    out_lbl_dir = out_dir / "labels"
    out_lbl_dir.mkdir(exist_ok=True, parents=True)
    curr_df["class_id"] = curr_df["label"].map(label_mapping)
    curr_df_grouped = curr_df.groupby(["image"])

    for img_name in curr_df["image"].unique():
        # copy image
        img_path = images_dir / img_name
        out_img_path = out_img_dir / img_name
        img_width, img_height = Image.open(img_path).size
        sh.copyfile(img_path, out_img_path)

        # convert label and write file
        out_label_path = out_lbl_dir / f"{img_path.stem}.txt"
        curr_group = curr_df_grouped.get_group(img_name)
        bboxes_xywh = curr_group[["left", "top", "width", "height"]].values
        bboxes_cxcywh = tv.ops.box_convert(
            boxes=torch.tensor(bboxes_xywh),
            in_fmt="xywh",
            out_fmt="cxcywh",
        )
        bboxes_cxcywh_np = bboxes_cxcywh.numpy().astype(float)
        bboxes_cxcywh_np[:, 0] /= img_width  # x_center
        bboxes_cxcywh_np[:, 2] /= img_width  # width
        bboxes_cxcywh_np[:, 1] /= img_height  # y_center
        bboxes_cxcywh_np[:, 3] /= img_height  # height
        class_ids = curr_group[["class_id"]].values
        final_labels = np.hstack([class_ids, bboxes_cxcywh_np])
        temp_lbl_df = pd.DataFrame(final_labels)
        temp_lbl_df.to_csv(out_label_path, header=False, index=False, sep=" ")

## Create Model Training Config YAML

References
- [YOLOv8 Train](https://docs.ultralytics.com/tasks/detect/#train)
- [Training Config](https://docs.ultralytics.com/usage/cfg/#train)


In [5]:
from ultralytics import YOLO
import torch

In [None]:
# minor hack to use python variables with %%writefile
# - https://github.com/ipython/ipython/issues/6701
from IPython.core.magic import register_line_cell_magic


@register_line_cell_magic
def writetemplate(line, cell):
    with open(line, "w") as f:
        f.write(cell.format(**globals()))


pwd = str(Path(".").absolute())
pwd = list(pwd)
pwd[0] = pwd[0].upper()
pwd = "".join(pwd)
# print(pwd)

In [None]:
%%writetemplate y8_train_config.yaml

train: {pwd}\y8_image_dataset_split\train
val: {pwd}\y8_image_dataset_split\valid
test: {pwd}\y8_image_dataset_split\test

nc: 5

names:
  0: 'Helmet'
  1: 'Helmet-Blurred'
  2: 'Helmet-Sideline'
  3: 'Helmet-Partial'
  4: 'Helmet-Difficult'


## Train Model

In [None]:
train_cfg_path = "./y8_train_config.yaml"

torch.cuda.empty_cache()

model = YOLO("yolov8n.pt")
results = model.train(data=train_cfg_path, epochs=350)

## Prediction 

In [17]:
from ultralytics import YOLO
import supervision as sv
from PIL import Image

In [None]:
img_path = r".\data\images\57502_000480_Endzone_frame0495.jpg"

loaded_model = YOLO("./yolo8_model.pt")
results = loaded_model(img_path)

In [None]:
# show results
box_annotator = sv.BoxAnnotator(thickness=2, text_thickness=1, text_scale=0.5)


r = results[0]
detections = sv.Detections.from_ultralytics(r)
img = r.orig_img

frame = box_annotator.annotate(scene=img, detections=detections)
Image.fromarray(frame)