# Transform the dataset to YOLO dataset format.
Transforming the dataset of 20 animal (which I pick from the original dataset: https://www.kaggle.com/datasets/antoreepjana/animals-detection-images-dataset) into YOLO dataset format.

Table of contents:
- Data processing (Custom dataset to YOLO format)
- YOLO dataset configuration

The dataset before transforming: https://kaggle.com/datasets/5228c8f6b89b26721a7ce9493a8cc751b0ee07f1f201bddad1742ab15147de2e

The dataset after transforming: https://kaggle.com/datasets/f9b375eef26f759b115050a2399e12816bcd232322eb6fc2c1fccc1a6b53845e

YOLOv8 can set up just by unstalling the `ultralytics` package.

In [1]:
!pip install ultralytics
!yolo checks

Collecting ultralytics
  Downloading ultralytics-8.2.79-py3-none-any.whl.metadata (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.3/41.3 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.5-py3-none-any.whl.metadata (8.9 kB)
Downloading ultralytics-8.2.79-py3-none-any.whl (869 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m869.1/869.1 kB[0m [31m18.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.5-py3-none-any.whl (25 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.2.79 ultralytics-thop-2.0.5
Ultralytics YOLOv8.2.79 🚀 Python-3.10.13 torch-2.1.2 CUDA:0 (Tesla P100-PCIE-16GB, 16269MiB)
Setup complete ✅ (4 CPUs, 31.4 GB RAM, 5771.7/8062.4 GB disk)

OS                  Linux-5.15.154+-x86_64-with-glibc2.31
Environment         Kaggle
Python              3.10.13
Install  

In [2]:
import os
import random
import shutil
from collections import defaultdict, Counter
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Iterable
import yaml

import cv2
import plotly.express as px
from plotly import subplots
from tqdm.notebook import tqdm
import torch
from ultralytics import YOLO

## Data processing

### Original Dataset Structure

Dataset structure is given in next format:

```yaml
Chosen_animal/
    train/
        {class1}/
            Label/
                {id_1}.txt
                {id_2}.txt
                ...
            {id_1}.jpg
            {id_2}.jpg
            ...
        {class2}/
            ...
        ...
    test/
    ...
```

In summary, the dataset is already split into `train` and `test` subsets. Each subset contains N different classes like "Bear", "Brown bear", "Bull", etc. Each class has its own folder in the (train/test) subset that contains list of images and label text files. Labels are inside `Label` directory while the images are in the root of the class directory.

Annotations are in format: "{label} {x_min} {y_min} {x_max} {y_max}" where coordinates are not normalized.

### Yolo Dataset Structure

Dataset structure should be transformed to next format:

```yaml
yolo_dataset/
    train/
        images/
            {id_1}.jpg
            {id_2}.jpg
            ...
        labels/
            {id_1}.txt
            {id_2}.txt
            ...
        ...
    test/
    ...
```

Annotations should be in format: "{label_index} {x_center} {y_center} {width} {height}" where coordinates are normalized.

### Environment Configuration

In [3]:
DATASET_PATH = '/kaggle/input/chosen-animal-20-class/Chosen'  # Path to source dataset
MASTER_PATH = '/kaggle/working'  # Path where all outputs are stored (intermediate and final)
DEBUG = False # Activete to run notebook faster
CPU = False

if not CPU:
    assert torch.cuda.is_available(), 'CUDA not found!'

### Dataset indexing and analysis

Most of the work here is to convert custom Animal dataset to YOLO format. Two helper classes will be used here:
- LookupTable: Creates vocabulary for class labels during;
- AnimalToYOLODatasetAdapter: Support for Animal dataset parsing and converting it to YOLO dataset format.

In [4]:
class LookupTable:
    """Vocabulary - Label lookup table (token <-> index)."""
    def __init__(
        self,
        token_to_index: Optional[Dict[str, int]] = None,
        unknown_token: str = '<unk>',
        add_unknown_token: bool = True
    ):
        """
        Args:
            token_to_index: Predefine token to index mappings.
            unknown_token: Unknown token value.
            add_unknown_token: Use unknown token.
        """
        self._token_to_index = token_to_index
        self._unknown_token = unknown_token
        self._add_unknown_token = add_unknown_token

        if self._token_to_index is None:
            self._token_to_index = {}

        if unknown_token not in self._token_to_index and add_unknown_token:
            self._token_to_index[unknown_token] = len(self._token_to_index)

        self._index_to_token = {v: k for k, v in self._token_to_index.items()}
        self._next_index = len(self._token_to_index)

    def add(self, token: str) -> int:
        """
        Adds token to the lookup table if it does not already exist.
        
        Args:
            token: Label (token)
            
        Returns:
            label (token) index
        """
        if token in self._token_to_index:
            return self._token_to_index[token]

        new_index = self._next_index
        self._next_index += 1
        self._token_to_index[token] = new_index
        self._index_to_token[new_index] = token
        return new_index

    def lookup(self, token: str) -> int:
        """
        Acquires token index if it exists in the table.
        In case the token does not exist in the lookup table:
            - and unknown token is used then unkown token index is returned;
            - otherwise KeyError is raised
            
        Raises:
            KeyError: Unknown token
            
        Returns:
            label (token) index
        """
        if token not in self._token_to_index and self._add_unknown_token:
            return self._token_to_index[self._unknown_token]

        return self._token_to_index[token]

    def inverse_lookup(self, index: int) -> str:
        """
        Inverse to `lookup`. Acquire token by index.
        
        Raises:
            KeyError: Unknown index
            
        Returns:
            label (token)
        """
        return self._index_to_token[index]
    
    def __iter__(self) -> Iterable[Tuple[str, int]]:
        for token, index in self._token_to_index.items():
            yield token, index

    def __getitem__(self, token: str) -> int:
        return self.lookup(token)  # Alias for `lookup`

    def __len__(self):
        return self._next_index

In [5]:
DatasetIndex = Dict[str, Dict[str, List[str]]]
DatasetStats = Dict[str, int]


class AnimalToYOLODatasetAdapter:
    """Adapts custom animal dataset to YOLO format."""
    def __init__(self, path: str, label_filter: Optional[List[str]] = None):
        """
        Args:
            path: Path where dataset is stored
            label_filter: Use specific set of labels (remove others from dataset)
        """
        self._path = path
        
        self._index, self.label_stats, self.split_stats, self.label_lookup, self._size = \
            self._index_dataset(path, label_filter)
        
    @staticmethod
    def _index_dataset(path: str, label_filter: Optional[List[str]] = None) \
        -> Tuple[DatasetIndex, DatasetStats, DatasetStats, LookupTable, int]:
        """
        Creates datast index. Index is mapping (split -> label -> sample_id). 
        Input dataset format is given in previosly defined structure.

        Args:
            path: Dataset path
            label_filter: Filter used labels

        Returns:
            Dataset index, Label stats, Split stats, dataset size
        """
        index: DatasetIndex = defaultdict(dict)
        label_stats: DatasetStats = Counter()
        split_stats: DatasetStats = Counter()
        lookup = LookupTable(add_unknown_token=False)
        size = 0

        splits = os.listdir(path)
        for split in splits:        
            split_path = os.path.join(path, split)
            labels = os.listdir(split_path)
            for label in tqdm(labels, desc=f'Indexing {split}', unit='sample'):
                if label_filter is not None and label not in label_filter:
                    continue
                
                label_path = os.path.join(split_path, label)
                sample_ids = [Path(filename).stem for filename in os.listdir(label_path) 
                              if filename != 'Label' and filename.endswith('.jpg')]
                annotations_path = os.path.join(label_path, 'Label')
                annot_sample_ids = [Path(filename).stem for filename in os.listdir(annotations_path)
                                    if filename.endswith('.txt')]
                assert set(sample_ids) == set(annot_sample_ids), 'Image sample ids and annotation sample ids do not match'

                # Update index, stats and lookup
                index[split][label] = sample_ids
                
                n_samples = len(sample_ids)
                label_stats[label] += n_samples
                split_stats[split] += n_samples
                size += n_samples
                
                lookup.add(label)

        return dict(index), dict(label_stats), dict(split_stats), lookup, size
    
    def __len__(self) -> int:
        return self._size
    
    @property
    def labels(self) -> List[str]:
        """
        Returns:
            List of labels (classes) in lookup table
        """
        return list(self.label_lookup)

    @property
    def n_labels(self) -> int:
        """
        Returns:
            Number of labels (classes) in lookup table
        """
        return len(self.label_lookup)
    
    def get_random_samples(self, n: int, split: str = 'train') -> List[Tuple[str, str, str]]:
        """
        Fetchen `n` random samples from dataset for chosen split.
        
        Args:
            n: Number of samples
            split: chosen split
            
        Returns:
            List of tuples (split, label, sample_id)
        """
        split_index = self._index[split]
        label_names, _ = zip(*self.labels)
        
        result: List[Tuple[str, str, str]] = []
        for i in range(n):
            label = random.choice(label_names)
            sample_ids = split_index[label]
            sample_id = random.choice(sample_ids)
            result.append((split, label, sample_id))
            
        return result
    
    def get_split_size(self, split: str) -> int:
        """
        Returns:
            Number of samples in split
        """
        return self.split_stats[split]

    def get_image_path(self, split: str, label: str, sample_id: str) -> str:
        """
        Animal dataset image path convention.
        
        Args:
            split: Split
            label: Label (token)
            sample_id: Sample id
        
        Returns:
            Image path
        """
        return os.path.join(self._path, split, label, f'{sample_id}.jpg')

    def load_image(self, split: str, label: str, sample_id: str) -> str:
        """        
        Args:
            split: Split
            label: Label (token)
            sample_id: Sample id
        
        Returns:
            Loaded image
        """
        image_path = self.get_image_path(split, label, sample_id)
        if not os.path.exists(image_path):
            raise FileNotFound(f'Image "{image_path}" not found!')
        return cv2.imread(image_path)

    def get_annot_path(self, split: str, label: str, sample_id: str) -> str:
        """
        Animal dataset annotation path convention.
        
        Args:
            split: Split
            label: Label (token)
            sample_id: Sample id
        
        Returns:
            Annotation path
        """
        return os.path.join(self._path, split, label, 'Label', f'{sample_id}.txt')

    def parse_annot(self, split: str, label: str, sample_id: str) \
        -> List[Tuple[str, float, float, float, float]]:
        """        
        Parses annotation (ground truth) file.
        
        Args:
            split: Split
            label: Label (token)
            sample_id: Sample id
        
        Returns:
            Parsed annotations
        """
        annot_path = self.get_annot_path(split, label, sample_id)
        with open(annot_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        annots: List[Tuple[str, float, float, float, float]] = []
        for l in lines:
            items = l.split()
            label_name = ' '.join(items[:-4])
            coords = [float(v) for v in items[-4:]]
            annots.append([label_name, *coords])
        return annots
    
    def convert(self, path: str) -> None:
        """
        Converts dataset tp YOLO format.
        
        Args:
            path: Output path
        """
        for split in self._index:
            split_path = os.path.join(path, split)
            images_path = os.path.join(split_path, 'images')
            labels_path = os.path.join(split_path, 'labels')
            Path(images_path).mkdir(parents=True, exist_ok=True)
            Path(labels_path).mkdir(parents=True, exist_ok=True)
            
            for label, sample_ids in tqdm(self._index[split].items(), desc='Converting to Yolo format', total=len(self._index[split])):
                assert len(sample_ids) == len(set(sample_ids))
                for sample_id in sample_ids:
                    image_path = self.get_image_path(split, label, sample_id)
                    new_image_path = os.path.join(images_path, f'{sample_id}.jpg')
                    annots = self.parse_annot(split, label, sample_id)
                    new_annot_path = os.path.join(labels_path, f'{sample_id}.txt')
                    
                    # Image needs to be loaded in order to read width and height
                    # which are required for coordinate normalization
                    image = self.load_image(split, label, sample_id)
                    h, w, _ = image.shape
                    
                    # Conversion
                    converted_annot: List[Tuple[int, float, float, float, float]] = []
                    for label, x_min, y_min, x_max, y_max in annots:
                        label_index = self.label_lookup[label]
                        x_center = (x_min + x_max) / (2 * w)
                        y_center = (y_min + y_max) / (2 * h)
                        width = (x_max - x_min) / w
                        height = (y_max - y_min) / h
                        
                        converted_annot.append((label_index, x_center, y_center, width, height))
                        
                    # Save data
                    with open(new_annot_path, 'a', encoding='utf-8') as f:
                        converted_annot_lines = [' '.join([str(v) for v in row]) for row in converted_annot]
                        f.write('\n'.join(converted_annot_lines))
                        f.write('\n')
                        
                    if not os.path.exists(new_image_path):  
                        shutil.copy(image_path, new_image_path)


adapter = AnimalToYOLODatasetAdapter(
    path=DATASET_PATH, 
    label_filter=['Horse'] if DEBUG else None
)

print(f'Total number of samples in the dataset is {len(adapter)}.')
print(f'Total number of classes in the dataset is {adapter.n_labels}.')
print(f'Train dataset size is {adapter.get_split_size("train")} (images). Test dataset size is {adapter.get_split_size("test")} (images)')

Indexing test:   0%|          | 0/20 [00:00<?, ?sample/s]

Indexing train:   0%|          | 0/20 [00:00<?, ?sample/s]

Total number of samples in the dataset is 9249.
Total number of classes in the dataset is 20.
Train dataset size is 6823 (images). Test dataset size is 2426 (images)


#### Class distribution

In [6]:
fig = px.histogram(x=list(adapter.label_stats.keys()), y=list(adapter.label_stats.values())) \
        .update_layout(xaxis_title="Class", yaxis_title="Class size", xaxis={'categoryorder':'total descending'})
fig.show()

As expected, the dataset is very unbalanced. Next step is to visualize few samples.

In [7]:
def visualize_samples(
    adapter: AnimalToYOLODatasetAdapter,
    n_rows: int,
    n_cols: int,
    bbox_color: Tuple[int, int, int] = (255, 0, 0),  # RBG - RED
    model: Optional[YOLO] = None
) -> None:
    """
    Visualizes image sample with ground truths and (optionally) model predictions.
    Number of images is equal to product of `n_rows` and `n_cols`
    
    Args:
        adapter: Animal dataset to YOLO adapter
        n_rows: Number of rows in image matrix
        n_cols: Number of cols in image matrix
        bbox_color: Ground truth bbox color
        model: Model to generate prediction for given images
    """
    n: int = n_rows * n_cols
    
    viz_samples = adapter.get_random_samples(n)
    fig = subplots.make_subplots(rows=n_rows, cols=n_cols)
    for plot_index, (split, label, sample_id) in enumerate(viz_samples):
        row = plot_index // n_cols + 1
        col = plot_index % n_cols + 1
        image = adapter.load_image(split, label, sample_id)
        label_index = adapter.label_lookup.lookup(label)
        
        # Visualize ground truth
        for _, x_min, y_min, x_max, y_max in adapter.parse_annot(split, label, sample_id):
            x_min, y_min, x_max, y_max = [int(v) for v in [x_min, y_min, x_max, y_max]]
            labek_text = f'{label} ({label_index})'
            image = cv2.rectangle(image, (x_min, y_min), (x_max, y_max), color=bbox_color, thickness=4)
            image = cv2.putText(image, label, (x_min, y_min - 10), cv2.FONT_HERSHEY_SIMPLEX, 2, bbox_color, 3)
            
        if model is not None:
            # Visualize model predictions
            prediction = model.predict([image], imgsz=512, conf=0.3)
            for p in prediction:
                image = p.plot()
            
        subfig = px.imshow(image)
        fig.add_trace(subfig.data[0], row=row, col=col)

    fig = fig.update_xaxes(showticklabels=False)
    fig = fig.update_yaxes(showticklabels=False)
    fig = fig.update_layout(
        autosize=False,
        width=1200,
        height=600
    )
    fig.show()
    
visualize_samples(adapter, 1, 3)

In [8]:
adapter.convert(MASTER_PATH)

Converting to Yolo format:   0%|          | 0/20 [00:00<?, ?it/s]

Converting to Yolo format:   0%|          | 0/20 [00:00<?, ?it/s]

## YOLO Dataset configuration

Defining dataset configuration is simple once the dataset is converted.

In [9]:
class_names = [name for name, _ in adapter.label_lookup]
config = {
    'path': MASTER_PATH,
    'train': 'train/images',
    'val': 'test/images',
    'nc': len(adapter.label_lookup),  # number of classes
    'names': class_names
}

config_path = os.path.join(MASTER_PATH, 'config.yaml')
with open(config_path, 'w', encoding='utf-8') as f:
    yaml.dump(config, f)

print(yaml.dump(config))

names:
- Cattle
- Turtle
- Snake
- Butterfly
- Lion
- Hippopotamus
- Chicken
- Pig
- Mouse
- Kangaroo
- Frog
- Sheep
- Snail
- Fish
- Duck
- Rabbit
- Crab
- Bull
- Camel
- Crocodile
nc: 20
path: /kaggle/working
train: train/images
val: test/images



In [10]:
!zip -r train.zip '/kaggle/working/train'

  adding: kaggle/working/train/ (stored 0%)
  adding: kaggle/working/train/labels/ (stored 0%)
  adding: kaggle/working/train/labels/1d92c4c3d0b726b0.txt (deflated 32%)
  adding: kaggle/working/train/labels/8edf941e14c47f05.txt (deflated 34%)
  adding: kaggle/working/train/labels/d3a4177752dc1189.txt (deflated 24%)
  adding: kaggle/working/train/labels/af2ec7f2114651ff.txt (deflated 24%)
  adding: kaggle/working/train/labels/7a188615e076725d.txt (deflated 25%)
  adding: kaggle/working/train/labels/a852d43ea316ead3.txt (deflated 22%)
  adding: kaggle/working/train/labels/b582af2b1fd56c8d.txt (deflated 34%)
  adding: kaggle/working/train/labels/f89e31285397e326.txt (deflated 27%)
  adding: kaggle/working/train/labels/e48cb0d571893b05.txt (deflated 22%)
  adding: kaggle/working/train/labels/65d5ccc5497d84dc.txt (deflated 25%)
  adding: kaggle/working/train/labels/728b67e0f79d92f0.txt (deflated 37%)
  adding: kaggle/working/train/labels/0557649c5a7a429d.txt (deflated 10%)
  a

In [11]:
!zip -r train.zip '/kaggle/working/test'

  adding: kaggle/working/test/ (stored 0%)
  adding: kaggle/working/test/labels/ (stored 0%)
  adding: kaggle/working/test/labels/f526f001b38b111c.txt (deflated 37%)
  adding: kaggle/working/test/labels/0a77e5dee4c414a4.txt (deflated 33%)
  adding: kaggle/working/test/labels/7e104b49d2d938de.txt (deflated 10%)
  adding: kaggle/working/test/labels/9b4b421bd3eaf87e.txt (deflated 33%)
  adding: kaggle/working/test/labels/0a5e8d559aa5d680.txt (deflated 7%)
  adding: kaggle/working/test/labels/03352f3be3405e65.txt (deflated 31%)
  adding: kaggle/working/test/labels/daa215a0a122bf3e.txt (deflated 34%)
  adding: kaggle/working/test/labels/305ac2f729123abc.txt (deflated 35%)
  adding: kaggle/working/test/labels/8ad25cfc7412c72a.txt (deflated 10%)
  adding: kaggle/working/test/labels/a69b475114c822e6.txt (deflated 31%)
  adding: kaggle/working/test/labels/af762d11789236f3.txt (deflated 32%)
  adding: kaggle/working/test/labels/3049e5534d5fadc4.txt (deflated 40%)
  adding: kaggle/w