<a href="https://colab.research.google.com/github/shaaagri/iat481-cv-proj/blob/main/DatasetPrep.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<center><h1> <b>Wildlife Detection CV Project - Dataset Preparation</b> </center> </h1>

This notebook is based on Week 6 tutorial, we use it to check, enumerate, analyse our datasets in terms of correct labeling and balance, before merging them and doing the training with YOLO.

Prerequisites:

In [None]:
from google.colab import drive
drive.mount("/content/MyDrive")

Mounted at /content/MyDrive


The labels for this dataset are stored in YOLO format. i.e.

    class_id bbox_x_center bbox_y_center bbox_width bbox_height

Our pest animals classes:

In [None]:
class_labels = {
    0: "bear",
    1: "raccoon",
    2: "rat",
    3: "skunk"
}

Import the required libraries:

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
import random as random
import os

Defining paths to the collected datasets, both found and the ones we prepared ourselves:

In [None]:
dataset_paths = {
  "ours_bear": "/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Ours/Bear",
  "ours_raccoon": "/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Ours/Raccoon",
  "ours_rat": "/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Ours/Rat",
  "ours_skunk": "/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Ours/Skunk",
  "found_bear": "/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Found/Bear/Bear.v1i.yolov8",
  "found_raccoon": "/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Found/Raccoon/Raccoon.v38-416x416-resize.yolov8",
  "found_rat": "/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Found/Rat/cc-object-detection.v6i.yolov8",
  "found_skunk": "/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Found/Skunk/Skunk.v1i.yolov8",
}

Collect all image and label files we've got and save into one data structure:

In [None]:
def enumerate_dataset(dataset_id, dataset_path):
  image_paths, label_paths = {}, {}

  try:
    # sorting the directories listings so that images' and labels' positions in lists match
    image_paths['train'] = [os.path.join(dataset_path, "train", "images", image_filename) for image_filename in sorted(os.listdir(os.path.join(dataset_path, "train", "images")))]
    image_paths['valid'] = [os.path.join(dataset_path, "valid", "images", image_filename) for image_filename in sorted(os.listdir(os.path.join(dataset_path, "valid", "images")))]
    image_paths['test'] = [os.path.join(dataset_path, "test", "images", image_filename) for image_filename in sorted(os.listdir(os.path.join(dataset_path, "test", "images")))]
  except FileNotFoundError:
    pass

  try:
    label_paths['train'] = [os.path.join(dataset_path, "train", "labels", image_filename) for image_filename in sorted(os.listdir(os.path.join(dataset_path, "train", "labels")))]
    label_paths['valid'] = [os.path.join(dataset_path, "valid", "labels", image_filename) for image_filename in sorted(os.listdir(os.path.join(dataset_path, "valid", "labels")))]
    label_paths['test'] = [os.path.join(dataset_path, "test", "labels", image_filename) for image_filename in sorted(os.listdir(os.path.join(dataset_path, "test", "labels")))]
  except FileNotFoundError:
    pass

  return image_paths, label_paths

def enumerate_dataset_multi(dataset_paths):
  return {dataset_id: enumerate_dataset(dataset_id, dataset_paths[dataset_id]) for dataset_id in dataset_paths}

In [None]:
datasets = enumerate_dataset_multi(dataset_paths)

In [None]:
# Function to easy count the contents of a dataset by class across 'train', 'valid', and 'test' parts
def count_samples(dataset):
  sample_counts = {}

  for k, v in dataset.items():
    sample_counts[k] = [len(v['images']['train']), len(v['images']['valid']), len(v['images']['test'])]

  return sample_counts

# create a writeable place in the dataset dictionary for a class
def initialize_class_in_dataset(cl, dataset):
  dataset[cl] = {}
  dataset[cl]['images'] = {}
  dataset[cl]['images']['train'] = []
  dataset[cl]['images']['valid'] = []
  dataset[cl]['images']['test'] = []
  dataset[cl]['labels'] = {}
  dataset[cl]['labels']['train'] = []
  dataset[cl]['labels']['valid'] = []
  dataset[cl]['labels']['test'] = []

Merging all the data we have into N strict classes (defined earlier). Doing our best with the totality of data we've got to make sure the classes are balanced.

In [None]:
dataset_ids = dataset_paths.keys()

merged_dataset = {}

for cl in class_labels.values():
  for id in dataset_ids:
    if cl in id:

      if cl not in merged_dataset:
        initialize_class_in_dataset(cl, merged_dataset)

      # ignoring KeyError, as some datasets may have only a 'train' folder, but not the others
      try:
        merged_dataset[cl]['images']['train'] += datasets[id][0]['train']
        merged_dataset[cl]['labels']['train'] += datasets[id][1]['train']
      except KeyError:
        pass

      try:
        merged_dataset[cl]['images']['valid'] += datasets[id][0]['valid']
        merged_dataset[cl]['labels']['valid'] += datasets[id][1]['valid']
      except KeyError:
        pass

      try:
        merged_dataset[cl]['images']['test'] += datasets[id][0]['test']
        merged_dataset[cl]['labels']['test'] += datasets[id][1]['test']
      except KeyError:
        pass

# undersampling all classes to match the lowest amount of train samples available - this will result in the balance we look for
train_samples_count = [v[0] for k, v in count_samples(merged_dataset).items()]
min_train_samples_count = np.array(train_samples_count).min()

balanced_dataset = {}

for cl, v in merged_dataset.items():
  if cl not in balanced_dataset:
    initialize_class_in_dataset(cl, balanced_dataset)

  # Fill the training set, capping at min_train_samples_count
  balanced_dataset[cl]['images']['train'] = v['images']['train'][:min_train_samples_count]
  balanced_dataset[cl]['labels']['train'] = v['labels']['train'][:min_train_samples_count]

  # Take out the samples from the original merged dataset
  merged_dataset[cl]['images']['train'] = v['images']['train'][min_train_samples_count:]
  merged_dataset[cl]['labels']['train'] = v['labels']['train'][min_train_samples_count:]


sample_pool = {}

# try to reach 80/10/10 ratios for our train-valid-test split
valid_test_target_sample_count = round(min_train_samples_count * 0.125)

#print(valid_test_target_sample_count)

for cl, data in merged_dataset.items():
  # dumping all leftover samples into one list per class
  sample_pool = {}
  sample_pool['images'] = []
  sample_pool['labels'] = []

  sample_pool['images'] += [i for i in data['images']['train']]
  sample_pool['images'] += [i for i in data['images']['valid']]
  sample_pool['images'] += [i for i in data['images']['test']]
  sample_pool['labels'] += [l for l in data['labels']['train']]
  sample_pool['labels'] += [l for l in data['labels']['valid']]
  sample_pool['labels'] += [l for l in data['labels']['test']]

  n = valid_test_target_sample_count

  # finally, populating our validation and testing set
  balanced_dataset[cl]['images']['valid'] = sample_pool['images'][:n]
  balanced_dataset[cl]['labels']['valid'] = sample_pool['labels'][:n]
  balanced_dataset[cl]['images']['test'] = sample_pool['images'][n:n+n]
  balanced_dataset[cl]['labels']['test'] = sample_pool['labels'][n:n+n]

Checking the train-valid-test distribution of samples in our final, merged and balanced dataset:

In [None]:
print(count_samples(balanced_dataset))

{'bear': [162, 20, 20], 'raccoon': [162, 20, 20], 'rat': [162, 20, 20], 'skunk': [162, 20, 20]}


Awesome! Now, just let's make sure the dataset does not require any further cleaning, meaning there are no images with missing labels, or orphan labels that do not belong to any image:

In [None]:
def dataset_cleanness_check(dataset):
  image_files = set()
  label_files = set()

  for cl, data in dataset.items():
    for subset_type in ['train', 'valid', 'test']:
      image_files.update({i.split("/")[-1].split(".")[0] for i in data['images'][subset_type]})
      label_files.update({l.split("/")[-1].split(".")[0] for l in data['labels'][subset_type]})

  print(f"Extra images (without corresponding labels): {image_files - label_files}")
  print(f"Extra labels (without corresponding images): {label_files - image_files}")

In [None]:
dataset_cleanness_check(balanced_dataset)

Extra images (without corresponding labels): set()
Extra labels (without corresponding images): set()


There are none extra images or labels! Great. Now we are ready to save the merged dataset on disk (till this point we have been just manipulating file references) and proceed to training.

In [None]:
#import os
#run it once for making directories

final_dataset_root = '/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final'

os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/train', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/train/images', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/train/labels', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/valid', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/valid/images', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/valid/labels', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/test', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/test/images', exist_ok=True)
os.makedirs('/content/MyDrive/MyDrive/IAT481/481 CV Project/Datasets/Final/test/labels', exist_ok=True)

In [None]:
from PIL import Image

class_labels_reversed = {class_labels[l]: l for l in class_labels.keys()}

def save_merged_dataset():
  for cl, data in balanced_dataset.items():
    final_dataset_root

    for subset_type in ['train', 'valid', 'test']:
      for file_category in ['images', 'labels']:
        for file in data[file_category][subset_type]:
          if file_category == 'images':
            if file.endswith(".jpg"):
              image = Image.open(file)
              image = image.convert("RGB")

              new_filename = os.path.splitext(os.path.basename(file))[0] + ".jpg"
              print(os.path.join(final_dataset_root, subset_type, "images", new_filename))

              image.save(os.path.join(final_dataset_root, subset_type, "images", new_filename))

          if file_category == 'labels':
              with open(file, "r") as src:
                lines = src.readlines()
                lines_new = []

                # forcing the first value in the line to be the correct class id
                # (YOLO object detection labeling format)
                for line in lines:
                  line_s = line.split(' ')
                  line_s[0] = str(class_labels_reversed[cl])
                  lines_new.append(' '.join(line_s))

                dst_file = os.path.join(final_dataset_root, subset_type, "labels", os.path.basename(file))

                with open(dst_file, "w") as dst:
                  dst.writelines(lines_new)

In [None]:
save_merged_dataset()

The consolidated dataset should have been appeared in Drive when the code finished its job.