In [1]:
import kagglehub
import cv2 as cv
import numpy as np
import matplotlib.pyplot as plt
import os
import time
import pandas as pd
import random
import pickle
import gzip
from google.colab.patches import cv2_imshow

# Download Data

In [22]:
# download latest version
path = kagglehub.dataset_download("silviamatoke/serengeti-dataset")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'serengeti-dataset' dataset.
Path to dataset files: /kaggle/input/serengeti-dataset


# Create file index

In [3]:
path_base = "/kaggle/input/serengeti-dataset/Set1"

def list_all_files_recursive(root_directory: str, ignore: list=None) -> pd.DataFrame:
    """
    Lists all files in a given directory and its subdirectories.

    Args:
        root_directory (str): The path to the starting directory.

    Returns:
        list: A list containing the full paths of all files found.
    """
    all_files = []
    for dirpath, dirnames, filenames in os.walk(root_directory):
        for filename in filenames:
            if filename in ignore:
                continue
            full_path = os.path.join(dirpath, filename)
            all_files.append(full_path)

    sequence_names = [f.split("/")[-2] for f in all_files]
    animal_labels = [f.split("/")[-3][5:] for f in all_files]
    sequence_path = ["/".join(f.split("/")[:-1]) for f in all_files]

    return pd.DataFrame({"path": all_files,
                         "sequence_path": sequence_path,
                         "sequence_name": sequence_names,
                         "animal": animal_labels})

ignore_list = ["README.txt", "labels.txt"]
files = list_all_files_recursive(path_base, ignore_list)
files.head()

Unnamed: 0,path,sequence_path,sequence_name,animal
0,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,SEQ81358,European_Hare
1,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,SEQ81358,European_Hare
2,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,SEQ81358,European_Hare
3,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,SEQ81358,European_Hare
4,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,SEQ81358,European_Hare


In [4]:
files.shape

(24673, 4)

# Background Subtraction and Foreground Mask Creation Pipeline

In [12]:
HISTORY = 20
VAR_THRESHOLD = 15

backsub = cv.createBackgroundSubtractorMOG2(detectShadows=False)
backsub.setHistory(HISTORY)
backsub.setVarThreshold(VAR_THRESHOLD)

In [10]:
def generate_foreground_mask(sequence_list: list[str], background_subtractor):
  foreground_masks = []
  bboxes = []

  for file in sequence_list:
      print(file)
      frame = cv.imread(file)
      gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
      gray_denoised = cv.fastNlMeansDenoising(gray)

      # masking image header and footer artifacts
      gray_denoised[:30, :] = 0
      gray_denoised[-65:, :] = 0

      if gray_denoised is None:
        raise Exception(f"Error: Could not read image", file)
        continue

      mask = background_subtractor.apply(gray_denoised)
      mask = cv.GaussianBlur(mask, (3, 3), 0)

      kernel = cv.getStructuringElement(cv.MORPH_ELLIPSE, (10, 10))
      mask = cv.morphologyEx(mask, cv.MORPH_OPEN, kernel)
      mask = cv.morphologyEx(mask, cv.MORPH_CLOSE, kernel)

      contours, _ = cv.findContours(
          mask,
          cv.RETR_EXTERNAL,
          cv.CHAIN_APPROX_SIMPLE
      )

      # largest bounding box
      if contours:
        largest = max(contours, key=cv.contourArea)
        x, y, w, h = cv.boundingRect(largest)

        # safety: avoid division by zero
        if h == 0 or w == 0:
            bbox = None
        else:
            img_area = frame.shape[0] * frame.shape[1]
            box_area = w * h
            area_ratio = box_area / img_area
            aspect_ratio = w / h if h != 0 else float('inf')

            # debug prints
            print("file:", file, "area_ratio:", area_ratio, "aspect_ratio:", aspect_ratio)

            # threshold checks
            if box_area < 15000:
                bbox = None
            elif area_ratio > 0.9:
                bbox = None
            else:
                bbox = ((x, y), (x + w, y + h))
      else:
          bbox = None


      foreground_masks.append(mask)
      bboxes.append(bbox)

  return {"sequence": sequence_list,
          "foreground_masks": foreground_masks,
          "bboxes": bboxes}


# Checkpoint Creation

In [37]:
def pickle_checkpoint(outputs: list[dict], save_path: str) -> None:
  # Ensure the directory exists
  os.makedirs(os.path.dirname(save_path), exist_ok=True)

  with gzip.open(save_path, "wb") as f:
    pickle.dump(outputs, f, protocol=pickle.HIGHEST_PROTOCOL)

  print(f"Outputs saved successfully to {save_path}")

# Execution

In [None]:
sequences = files["sequence_name"].unique()
outputs = []
file_count = 0
folder_number = 0

for i, seq in enumerate(sequences):
  print(seq)
  start_time = time.time()
  input_seq = files["path"][files["sequence_name"]==seq]
  input_seq = input_seq.to_list()
  input_seq.sort()

  file_count += len(input_seq)
  print(file_count)

  output = generate_foreground_mask(input_seq, backsub)
  outputs.append(output)
  end_time = time.time()
  print(i, "/", len(sequences), " : ", end_time - start_time)

  # Every 10 sequences, compress outputs and save to disk
  if i % 10 == 0:
    save_path = f'/content/drive/MyDrive/serengeti_bb_checkpoints/serengeti_processed_outputs_{folder_number}.pkl.gz'
    pickle_checkpoint(outputs, save_path)
    outputs = []
    folder_number += 1


# Unpacking Results

In [2]:
cps = list(os.walk('/content/drive/MyDrive/serengeti_bb_checkpoints'))[0][2]

In [49]:
output_df = pd.DataFrame()

for cp in cps:
  with gzip.open(f'/content/drive/MyDrive/serengeti_bb_checkpoints/{cp}', 'rb') as f:
    outputs = pickle.load(f)
    for output in outputs:
      output_df = pd.concat([output_df, pd.DataFrame(output)])
      output_df = output_df[output_df["bboxes"].notna()]

  print(f'{cp} loaded')

output_df.reset_index(drop=True, inplace=True)


serengeti_processed_outputs_0.pkl.gz loaded
serengeti_processed_outputs_1.pkl.gz loaded
serengeti_processed_outputs_2.pkl.gz loaded
serengeti_processed_outputs_3.pkl.gz loaded
serengeti_processed_outputs_4.pkl.gz loaded
serengeti_processed_outputs_5.pkl.gz loaded
serengeti_processed_outputs_6.pkl.gz loaded
serengeti_processed_outputs_7.pkl.gz loaded
serengeti_processed_outputs_8.pkl.gz loaded
serengeti_processed_outputs_9.pkl.gz loaded
serengeti_processed_outputs_10.pkl.gz loaded
serengeti_processed_outputs_11.pkl.gz loaded
serengeti_processed_outputs_12.pkl.gz loaded
serengeti_processed_outputs_13.pkl.gz loaded
serengeti_processed_outputs_14.pkl.gz loaded
serengeti_processed_outputs_15.pkl.gz loaded
serengeti_processed_outputs_16.pkl.gz loaded
serengeti_processed_outputs_17.pkl.gz loaded
serengeti_processed_outputs_18.pkl.gz loaded
serengeti_processed_outputs_19.pkl.gz loaded
serengeti_processed_outputs_20.pkl.gz loaded
serengeti_processed_outputs_21.pkl.gz loaded
serengeti_processed_

In [4]:
output_df.shape

(9261, 3)

In [8]:
output_df.memory_usage(index=True, deep=True).sum()

np.int64(3085522)

In [10]:
output_df.head()

Unnamed: 0,sequence,foreground_masks,bboxes
0,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...","((0, 32), (2048, 1120))"
1,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...","((917, 769), (1118, 857))"
2,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...","((894, 1003), (2048, 1474))"
3,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...","((769, 36), (1250, 819))"
4,/kaggle/input/serengeti-dataset/Set1/1.63-Euro...,"[[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,...","((556, 360), (1000, 938))"


# Code for Sampling Results to Quantify False-positive Rates

In [36]:
def draw_bounding_box(path, bbox):
  frame = cv.imread(path)
  if bbox is not None:
    (x, y), (w, h) = bbox
    cv.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
    return frame

In [52]:
OUTPUT_PATH = "/content/drive/MyDrive/serengeti_bb_samples/"

sample_files = output_df.sample(100)
for i, row in sample_files.iterrows():
  path = row["sequence"]
  label = path.split("/")[-1].split(".")[0]
  print(label)

  frame = draw_bounding_box(path, row["bboxes"])
  cv.imwrite(f"{OUTPUT_PATH}bbox_{label}.png", frame)

SEQ88198_IMG_0025
SEQ90482_IMG_0120
SEQ88455_IMG_0015
SEQ90609_IMG_0018
SEQ76322_IMG_0009
SEQ87876_IMG_0032
SEQ85159_IMG_0003
SEQ80439_IMG_0010
SEQ90556_IMG_0009
SEQ90522_IMG_0045
SEQ85046_IMG_0003
SEQ77075_IMG_0028
SEQ90384_IMG_0026
SEQ88872_IMG_0007
SEQ87887_IMG_0007
SEQ78237_IMG_0012
SEQ90507_IMG_0009
SEQ90520_IMG_0007
SEQ76509_IMG_0001
SEQ88198_IMG_0024
SEQ76494_IMG_0053
SEQ93265_IMG_0003
SEQ87839_IMG_0009
SEQ90404_IMG_0002
SEQ81682_IMG_0039
SEQ90520_IMG_0052
SEQ90476_IMG_0097
SEQ87249_IMG_0002
SEQ80536_IMG_0188
SEQ87752_IMG_0019
SEQ75245_IMG_0003
SEQ90559_IMG_0024
SEQ90553_IMG_0003
SEQ88198_IMG_0031
SEQ87657_IMG_0010
SEQ90182_IMG_0002
SEQ76356_IMG_0006
SEQ90609_IMG_0014
SEQ76420_IMG_0002
SEQ87309_IMG_0004
SEQ80505_IMG_0044
SEQ77397_IMG_0004
SEQ75633_IMG_0127
SEQ80505_IMG_0169
SEQ87892_IMG_0025
SEQ75930_IMG_0009
SEQ79953_IMG_0002
SEQ89977_IMG_0008
SEQ90789_IMG_0003
SEQ87720_IMG_0002
SEQ81798_IMG_0001
SEQ76466_IMG_0002
SEQ90597_IMG_0012
SEQ90902_IMG_0002
SEQ80135_IMG_0002
SEQ87657_I