In [None]:
!pip install uv

In [None]:
!!uv pip install --system  ultralytics==8.1.15

Collecting ultralytics==8.1.15
  Downloading ultralytics-8.1.15-py3-none-any.whl.metadata (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.4/40.4 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Collecting thop>=0.1.1 (from ultralytics==8.1.15)
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl.metadata (2.7 kB)
Downloading ultralytics-8.1.15-py3-none-any.whl (715 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m715.1/715.1 kB[0m [31m18.4 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hDownloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Installing collected packages: thop, ultralytics
Successfully installed thop-0.1.1.post2209072238 ultralytics-8.1.15


In [None]:
!pip install torch

In [None]:
import os
import random
import shutil
from collections import defaultdict, Counter
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Iterable
import yaml

import cv2
import plotly.express as px
from plotly import subplots
from tqdm.notebook import tqdm
import torch
from ultralytics import YOLO

In [None]:
!mkdir "/content/dataset"
!mkdir "/content/output"

In [None]:
import kaggle as kg
import pandas as pd
import os

os.environ['KAGGLE_USERNAME'] = 'suryakeerthigubbala'
os.environ['KAGGLE_KEY'] = 'a176794881faaed6485bde341f29d531'

kg.api.authenticate()

In [None]:
# !kaggle datasets download -d rijubera2000/poaching-and-animal-detection-dataset
kg.api.dataset_download_files(dataset = "rijubera2000/poaching-and-animal-detection-dataset", path='/content/dataset', unzip=True)


In [None]:
# DATASET_PATH = '/kaggle/input/poaching-and-animal-detection-dataset'  # Path to source dataset
# MASTER_PATH = '/kaggle/working/' # Path where all outputs are stored (intermediate and final)
DATASET_PATH = '/content/dataset'  # Path to source dataset
MASTER_PATH = '/content/output' # Path where all outputs are stored (intermediate and final)
DEBUG = False # Activete to run notebook faster
CPU = False

if not CPU:
    assert torch.cuda.is_available(), 'CUDA not found!'

In [None]:
import os
import random
import shutil
from collections import defaultdict, Counter
from pathlib import Path
from tqdm import tqdm
from typing import Dict, List, Optional, Tuple
import cv2
import yaml  # Ensure PyYAML is installed

# Type aliases
DatasetIndex = Dict[str, Dict[str, List[str]]]
DatasetStats = Dict[str, int]

# LookupTable class definition
class LookupTable:
    def __init__(self, add_unknown_token=True):
        self.add_unknown_token = add_unknown_token
        self.table = {}
        self.inverse_table = {}
        if add_unknown_token:
            self.add('<unknown>')

    def add(self, item):
        if item not in self.table:
            index = len(self.table)
            self.table[item] = index
            self.inverse_table[index] = item

    def __getitem__(self, item):
        if self.add_unknown_token:
            return self.table.get(item, self.table['<unknown>'])
        return self.table[item]

    def __len__(self):
        return len(self.table)

    def __iter__(self):
        return iter(self.table)

# AnimalToYOLODatasetAdapter class definition
class AnimalToYOLODatasetAdapter:

    def __init__(self, path: str, label_filter: Optional[List[str]] = None):
        self._path = path
        self._index, self.label_stats, self.split_stats, self.label_lookup, self._size = \
            self._index_dataset(path, label_filter)

    @staticmethod
    def _index_dataset(path: str, label_filter: Optional[List[str]] = None) \
        -> Tuple[DatasetIndex, DatasetStats, DatasetStats, LookupTable, int]:

        index: DatasetIndex = defaultdict(dict)
        label_stats: DatasetStats = Counter()
        split_stats: DatasetStats = Counter()
        lookup = LookupTable(add_unknown_token=False)
        size = 0

        splits = os.listdir(path)
        for split in splits:
            split_path = os.path.join(path, split)
            if not os.path.isdir(split_path):
                continue
            labels = os.listdir(split_path)
            for label in tqdm(labels, desc=f'Indexing {split}', unit='sample'):
                if label_filter is not None and label not in label_filter:
                    continue

                label_path = os.path.join(split_path, label)
                sample_ids = [Path(filename).stem for filename in os.listdir(label_path)
                              if filename != 'Label' and (filename.endswith('.jpg') or filename.endswith('.jpeg'))]
                annotations_path = os.path.join(label_path, 'Label')
                if not os.path.exists(annotations_path):
                    continue
                annot_sample_ids = [Path(filename).stem for filename in os.listdir(annotations_path)
                                    if filename.endswith('.txt')]
                assert set(sample_ids) == set(annot_sample_ids), 'Image sample ids and annotation sample ids do not match'

                # Update index, stats and lookup
                index[split][label] = sample_ids

                n_samples = len(sample_ids)
                label_stats[label] += n_samples
                split_stats[split] += n_samples
                size += n_samples

                lookup.add(label)

        return dict(index), dict(label_stats), dict(split_stats), lookup, size

    def __len__(self) -> int:
        return self._size

    @property
    def labels(self) -> List[str]:
        return list(self.label_lookup)

    @property
    def n_labels(self) -> int:
        return len(self.label_lookup)

    def get_random_samples(self, n: int, split: str = 'train') -> List[Tuple[str, str, str]]:
        split_index = self._index.get(split, {})
        if not split_index:
            raise ValueError(f'Split "{split}" not found in the dataset.')
        label_names = self.labels

        result: List[Tuple[str, str, str]] = []
        for i in range(n):
            label = random.choice(label_names)
            sample_ids = split_index[label]
            sample_id = random.choice(sample_ids)
            result.append((split, label, sample_id))

        return result

    def get_split_size(self, split: str) -> int:
        if split not in self.split_stats:
            raise ValueError(f'Split "{split}" not found in the dataset.')
        return self.split_stats[split]

    def get_image_path(self, split: str, label: str, sample_id: str) -> str:
        for ext in ['.jpg', '.jpeg']:
            image_path = os.path.join(self._path, split, label, f'{sample_id}{ext}')
            if os.path.exists(image_path):
                return image_path
        raise FileNotFoundError(f'Image for sample id "{sample_id}" not found in {split}/{label}!')

    def load_image(self, split: str, label: str, sample_id: str) -> str:
        image_path = self.get_image_path(split, label, sample_id)
        if not os.path.exists(image_path):
            raise FileNotFoundError(f'Image "{image_path}" not found!')
        return cv2.imread(image_path)

    def get_annot_path(self, split: str, label: str, sample_id: str) -> str:
        return os.path.join(self._path, split, label, 'Label', f'{sample_id}.txt')

    def parse_annot(self, split: str, label: str, sample_id: str) -> List[Tuple[str, float, float, float, float]]:
        annot_path = self.get_annot_path(split, label, sample_id)
        with open(annot_path, 'r', encoding='utf-8') as f:
            lines = f.readlines()
        annots: List[Tuple[str, float, float, float, float]] = []
        for l in lines:
            items = l.split()
            if len(items) < 5:
                print(f"Skipping invalid annotation in {annot_path}: {l.strip()}")
                continue
            label_name = ' '.join(items[:-4])
            coords = [float(v) for v in items[-4:]]
            annots.append((label_name, *coords))
        return annots

    def convert(self, path: str) -> None:
        for split in self._index:
            split_path = os.path.join(path, split)
            images_path = os.path.join(split_path, 'images')
            labels_path = os.path.join(split_path, 'labels')
            Path(images_path).mkdir(parents=True, exist_ok=True)
            Path(labels_path).mkdir(parents=True, exist_ok=True)

            for label, sample_ids in tqdm(self._index[split].items(), desc='Converting to Yolo format', total=len(self._index[split])):
                assert len(sample_ids) == len(set(sample_ids))
                for sample_id in sample_ids:
                    image_path = self.get_image_path(split, label, sample_id)
                    new_image_path = os.path.join(images_path, f'{sample_id}.jpg')
                    annots = self.parse_annot(split, label, sample_id)
                    new_annot_path = os.path.join(labels_path, f'{sample_id}.txt')

                    # Debug statement to check the annotations
                    print(f"Annotations for {sample_id}: {annots}")

                    # Image needs to be loaded in order to read width and height
                    # which are required for coordinate normalization
                    image = self.load_image(split, label, sample_id)
                    h, w, _ = image.shape

                    # Conversion
                    converted_annot: List[Tuple[int, float, float, float, float]] = []
                    for label, x_min, y_min, x_max, y_max in annots:
                        label_index = self.label_lookup[label]
                        x_center = (x_min + x_max) / (2 * w)
                        y_center = (y_min + y_max) / (2 * h)
                        width = (x_max - x_min) / w
                        height = (y_max - y_min) / h

                        converted_annot.append((label_index, x_center, y_center, width, height))

                    # Save data
                    with open(new_annot_path, 'a', encoding='utf-8') as f:
                        converted_annot_lines = [' '.join([str(v) for v in row]) for row in converted_annot]
                        f.write('\n'.join(converted_annot_lines))
                        f.write('\n')

                    if not os.path.exists(new_image_path):
                        shutil.copy(image_path, new_image_path)

# Example usage:
# Make sure to set the path and debug variables
DATASET_PATH = '/content/dataset'
MASTER_PATH = '/content/output'
DEBUG = False

adapter = AnimalToYOLODatasetAdapter(
    path=DATASET_PATH,
    label_filter=['Horse'] if DEBUG else None
)

print(f'Total number of samples in the dataset is {len(adapter)}.')
print(f'Total number of classes in the dataset is {adapter.n_labels}.')
try:
    print(f'Train dataset size is {adapter.get_split_size("train")} (images). Test dataset size is {adapter.get_split_size("test")} (images)')
except ValueError as e:
    print(e)

adapter.convert(MASTER_PATH)

# Generate the YOLO config file
class_names = list(adapter.label_lookup.table.keys())
config = {
    'path': MASTER_PATH,
    'train': 'train/images',
    'val': 'test/images',
    'nc': len(class_names),  # Number of classes
    'names': class_names
}

# Print the config
print(config)

# Define the path where you want to save the config.yaml file
config_path = '/content/config.yaml'

# Write the config dictionary to a YAML file
with open(config_path, 'w') as f:
    yaml.dump(config, f)

# Print a confirmation message
print(f'Config file saved to {config_path}')

In [None]:
# Load a pretrained YOLO model (recommended for training)
model = YOLO('yolov8n.pt')

# Train the model using the processed dataset
results = model.train(
    data='config.yaml',
    epochs=100 if not DEBUG else 1,
    optimizer='Adam',
    val=True,
    batch=64,
    imgsz=640,
    device=[0] if not CPU else 'cpu',
    lr0=0.001,
    lrf=0.0005
)

# Evaluate the model's performance on the validation set
results = model.val()

print(results)

In [None]:
!zip -r results.zip /content/

  adding: kaggle/working/runs/ (stored 0%)
  adding: kaggle/working/runs/detect/ (stored 0%)
  adding: kaggle/working/runs/detect/train2/ (stored 0%)
  adding: kaggle/working/runs/detect/train2/PR_curve.png (deflated 7%)
  adding: kaggle/working/runs/detect/train2/val_batch0_pred.jpg (deflated 12%)
  adding: kaggle/working/runs/detect/train2/val_batch2_labels.jpg (deflated 7%)
  adding: kaggle/working/runs/detect/train2/confusion_matrix.png (deflated 16%)
  adding: kaggle/working/runs/detect/train2/val_batch1_labels.jpg (deflated 6%)
  adding: kaggle/working/runs/detect/train2/val_batch1_pred.jpg (deflated 6%)
  adding: kaggle/working/runs/detect/train2/R_curve.png (deflated 9%)
  adding: kaggle/working/runs/detect/train2/P_curve.png (deflated 8%)
  adding: kaggle/working/runs/detect/train2/confusion_matrix_normalized.png (deflated 16%)
  adding: kaggle/working/runs/detect/train2/F1_curve.png (deflated 9%)
  adding: kaggle/working/runs/detect/train2/val_batch2_pred.jpg (deflated 6%)
  

In [None]:
!ls

config.yaml  results.zip  runs	test  train  wandb  yolov8n.pt


In [None]:
from IPython.display import FileLink
FileLink(r'results.zip')