In [2]:
import os
os.getcwd()

'/Users/sarah/Bowerbird-ID/GPU_powered/7_Classify_bowerbird_ID'

In [3]:
import os
from pathlib import Path
import cv2
from tqdm import tqdm
from ultralytics import YOLO
import numpy as np
from PIL import Image
import imagehash
from scipy.ndimage import label

# Frame Extraction

INPUT_DIR = "Videos_to_classify"
OUTPUT_DIR = "Extracted_frames"
YOLO_MODEL_PATH = "yolo11x-seg.pt"
SAMPLING_INTERVAL = 60  # Extract a frame every X frames
IOU_THRESHOLD = 0.5     # Threshold for filtering overlapping detections
SIMILARITY_THRESHOLD = 5  # pHash similarity threshold

yolo_model = YOLO(YOLO_MODEL_PATH)  # Load model

def calculate_iou(box1, box2):
    """Calculates Intersection over Union (IoU)"""
    x1, y1, x2, y2 = box1
    x1_, y1_, x2_, y2_ = box2  

    inter_x1 = max(x1, x1_)
    inter_y1 = max(y1, y1_)
    inter_x2 = min(x2, x2_)
    inter_y2 = min(y2, y2_)

    inter_area = max(0, inter_x2 - inter_x1 + 1) * max(0, inter_y2 - inter_y1 + 1)
    box1_area = (x2 - x1 + 1) * (y2 - y1 + 1)
    box2_area = (x2_ - x1_ + 1) * (y2_ - y1_ + 1)
    union_area = box1_area + box2_area - inter_area

    return inter_area / union_area if union_area > 0 else 0

def process_video(video_path):
    cap = cv2.VideoCapture(str(video_path))
    frame_count = 0
    unique_hashes = []
    multi_bird_detected = False

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Extract frames
        if frame_count % SAMPLING_INTERVAL == 0:
            results = yolo_model.predict(frame, conf=0.6, verbose=False)
            detections = results[0].boxes

            if len(detections) > 0:
                # Filter detections based on IoU --> pick the highest-confidence bbox
                filtered_detections = []
                for i, box in enumerate(detections.xyxy):
                    x1, y1, x2, y2 = map(int, box)
                    score = detections.conf[i]
                    # Only add if IoU with existing detections is below threshold
                    if all(calculate_iou((x1, y1, x2, y2), det[:4]) <= IOU_THRESHOLD for det in filtered_detections):
                        filtered_detections.append((x1, y1, x2, y2, score))

                # If more than one detection remains, we suspect there are multiple birds in the frame
                if len(filtered_detections) > 1:
                    multi_bird_detected = True
                    break

                # Keep the detection with the highest confidence
                x1, y1, x2, y2, _ = max(filtered_detections, key=lambda d: d[-1])
                cropped = frame[y1:y2, x1:x2]

                # Compute a perceptual hash of the cropped image
                pil_image = Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB))
                frame_hash = imagehash.phash(pil_image)

                # Save frame only if it is not too similar to previous frames
                if all(abs(frame_hash - h) > SIMILARITY_THRESHOLD for h in unique_hashes):
                    frame_name = f"{video_path.stem}_frame{frame_count}.png"
                    frame_path = os.path.join(OUTPUT_DIR, frame_name)
                    cv2.imwrite(frame_path, frame)
                    unique_hashes.append(frame_hash)

        frame_count += 1

    cap.release()

    if multi_bird_detected:
        print(f"There is more than one bird in the video {video_path.name}, better choose another one!")
    else:
        print(f"Processed video {video_path.name}")


video_files = list(Path(INPUT_DIR).glob("*.MP4"))
if not video_files:
    print("No videos found in the input directory.")
else:
    for video in tqdm(video_files, desc="Processing videos"):
        process_video(video)

print(f"Extracted frames saved in: {OUTPUT_DIR}")

# Mask Processing

input_dir = "Extracted_frames"   # Directory holding extracted frames
yolo_model = YOLO("yolo11x-seg.pt")  # Reload the YOLO segmentation model

MIN_BLOB_PIXELS = 5000
BOTTOM_FRACTION_ROW = 1 / 4
BOTTOM_FRACTION_NARROW = 1 / 2
HORIZONTAL_THRESHOLD = 0.8
NARROW_SEGMENT_THRESHOLD = 100

def filter_horizontal_rows(mask, threshold, bottom_fraction):
    """
    Filters out horizontal rows near the bottom of the image if
    the fraction of black pixels is above a certain threshold
    """
    start_row = int(mask.shape[0] * (1 - bottom_fraction))
    for row_idx in range(start_row, mask.shape[0]):
        row = mask[row_idx, :]
        # If the row is mostly black (above threshold), clear it
        if 1 - (np.sum(row) / row.shape[0]) >= threshold:
            mask[row_idx, :] = 0
    return mask

def filter_narrow_segments(mask, max_width, bottom_fraction):
    """
    Filters out narrow segments of white pixels in each row
    near the bottom of the image if their width is under a threshold
    """
    start_row = int(mask.shape[0] * (1 - bottom_fraction))
    for row_idx in range(start_row, mask.shape[0]):
        row_indices = np.where(mask[row_idx])[0]
        segments = np.split(row_indices, np.where(np.diff(row_indices) > 1)[0] + 1)
        for segment in segments:
            if len(segment) <= max_width:
                mask[row_idx, segment] = 0
    return mask

def remove_small_blobs(mask, min_pixels):
    """
    Removes connected blobs smaller than a certain pixel count.
    """
    labeled_mask, num_features = label(mask)
    valid_labels = [
        i for i in range(1, num_features + 1)
        if np.sum(labeled_mask == i) >= min_pixels
    ]
    return np.isin(labeled_mask, valid_labels)

for frame_name in tqdm(os.listdir(input_dir), desc="Frames"):
    frame_path = os.path.join(input_dir, frame_name)
    if not frame_name.lower().endswith('.png'):
        continue

    # Run YOLO detection on the full frame
    results = yolo_model.predict(frame_path, conf=0.4, verbose=False)
    if not results[0].boxes:
        os.remove(frame_path)
        continue

    # Crop region of interest using the highest-confidence box
    img = cv2.imread(frame_path)
    x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0])
    cropped = img[y1:y2, x1:x2]

    # Predict segmentation mask on cropped region
    mask_results = yolo_model.predict(cropped, conf=0.6, verbose=False)
    if not mask_results[0].masks:
        os.remove(frame_path)
        continue

    mask = mask_results[0].masks.data[0].cpu().numpy().astype(bool)

    # Apply mask filters
    mask = filter_horizontal_rows(mask, HORIZONTAL_THRESHOLD, BOTTOM_FRACTION_ROW)
    mask = filter_narrow_segments(mask, NARROW_SEGMENT_THRESHOLD, BOTTOM_FRACTION_NARROW)
    mask = remove_small_blobs(mask, MIN_BLOB_PIXELS)

    # If there is no mask remaining, discard the frame
    if not np.any(mask):
        os.remove(frame_path)
        continue

    # Resize mask if needed
    if mask.shape[:2] != cropped.shape[:2]:
        mask = cv2.resize(
            mask.astype(np.uint8),
            (cropped.shape[1], cropped.shape[0]),
            interpolation=cv2.INTER_NEAREST
        ).astype(bool)

    # Apply mask to the cropped image
    mask_rgb = np.zeros_like(cropped)
    mask_rgb[mask] = cropped[mask]
    cv2.imwrite(frame_path, mask_rgb)


Processing videos:  20%|██        | 1/5 [01:13<04:54, 73.55s/it]

Processed video B18_20181201_056.MP4


Processing videos:  40%|████      | 2/5 [02:38<04:01, 80.45s/it]

Processed video B18_20181113_089.MP4


Processing videos:  60%|██████    | 3/5 [03:49<02:31, 76.00s/it]

Processed video B18_20181114_005.MP4


Processing videos:  80%|████████  | 4/5 [05:00<01:13, 73.92s/it]

Processed video B18_20181210_007.MP4


Processing videos: 100%|██████████| 5/5 [07:00<00:00, 84.19s/it]

Processed video B18_20181208_005.MP4
Extracted frames saved in: Extracted_frames



Frames: 100%|██████████| 68/68 [10:42<00:00,  9.45s/it]


The following approach selects the highest softmax probability and records it for each frame. AKA it treats every frame’s “winner” prediction equally, even if some predictions were made with low confidence or if the probability distribution was very flat (when the highest predicted probability is only marginally greater than the probabilities for other classes)

In [15]:
import os
import torch
from torchvision import models, transforms
from PIL import Image
from collections import Counter
from tqdm import tqdm

MODEL_PATH = "./best_model.pth"
FRAME_DIR = "Extracted_frames"

CLASS_NAMES = [
    'B02', 'B03', 'B04', 'B05', 'B07', 'B11', 'B18', 'B23',
    'B26', 'B29', 'B30', 'B31', 'B47', 'B49', 'B50', 'B52'
]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet50(pretrained=False, num_classes=16)
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.fc = torch.nn.Linear(model.fc.in_features, len(CLASS_NAMES))

model = model.to(device)
model.eval()

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

predictions = []
for frame_name in tqdm(os.listdir(FRAME_DIR), desc="Processing frames"):
    frame_path = os.path.join(FRAME_DIR, frame_name)
    if not frame_name.lower().endswith('.png'):
        continue

    image = Image.open(frame_path).convert("RGB")
    input_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = model(input_tensor)
        probabilities = torch.nn.functional.softmax(outputs[0], dim=0)
        predicted_class = probabilities.argmax().item()

        if predicted_class >= len(CLASS_NAMES):
            print(f"Warning: Predicted class index {predicted_class} is out of bounds for CLASS_NAMES.")
            continue

        predictions.append(predicted_class)

# Count how many times each class was predicted
prediction_counts = Counter(predictions)
total_predictions = sum(prediction_counts.values())

if total_predictions == 0:
    print("No valid predictions were made.")
    exit()

# Calculate percentage for each class
percentages = {
    CLASS_NAMES[k]: (v / total_predictions) * 100
    for k, v in prediction_counts.items()
}
sorted_percentages = dict(
    sorted(percentages.items(), key=lambda item: item[1], reverse=True)
)

# Determine the most common class
most_common_class = max(prediction_counts, key=prediction_counts.get)
most_common_percentage = sorted_percentages[CLASS_NAMES[most_common_class]]

print("\nPrediction Results:")
for bird_id, percentage in sorted_percentages.items():
    print(f"{bird_id}: {percentage:.2f}%")

print(f"\nThe bird is most likely {CLASS_NAMES[most_common_class]}")

Processing frames: 100%|██████████| 42/42 [00:09<00:00,  4.51it/s]


Prediction Results:
B18: 78.57%
B29: 11.90%
B04: 7.14%
B52: 2.38%

The bird is most likely B18





Argmax

In [19]:
import os
import random
import numpy as np
import torch
from torchvision import models, transforms
from PIL import Image
from collections import Counter
from tqdm import tqdm

# Reproducibility settings

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# Force deterministic algorithms in cuDNN (Ensuring the approach is deterministic. Otherwise re running the model was outputting a different result)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Classification (ArgMax only)

MODEL_PATH = "./best_model.pth"
FRAME_DIR = "Extracted_frames"

CLASS_NAMES = [
    'B02', 'B03', 'B04', 'B05', 'B07', 'B11', 'B18', 'B23',
    'B26', 'B29', 'B30', 'B31', 'B47', 'B49', 'B50', 'B52'
]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load model 
model = models.resnet50(pretrained=False, num_classes=16)
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.fc = torch.nn.Linear(model.fc.in_features, len(CLASS_NAMES))

model = model.to(device)
model.eval()

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

predictions = []

for frame_name in tqdm(os.listdir(FRAME_DIR), desc="Processing frames"):
    frame_path = os.path.join(FRAME_DIR, frame_name)
    if not frame_name.lower().endswith('.png'):
        continue

    image = Image.open(frame_path).convert("RGB")
    input_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = model(input_tensor)
        predicted_class = outputs.argmax(dim=1).item()

        if predicted_class >= len(CLASS_NAMES):
            print(f"Warning: Predicted class index {predicted_class} is out of bounds for CLASS_NAMES.")
            continue

        predictions.append(predicted_class)

# count how many times each class was predicted
prediction_counts = Counter(predictions)
total_predictions = sum(prediction_counts.values())

if total_predictions == 0:
    print("No valid predictions were made.")
    exit()

# calculate percentage for each class
percentages = {
    CLASS_NAMES[k]: (v / total_predictions) * 100
    for k, v in prediction_counts.items()
}
sorted_percentages = dict(
    sorted(percentages.items(), key=lambda item: item[1], reverse=True)
)

# determine the most common class
most_common_class = max(prediction_counts, key=prediction_counts.get)

print("\nPrediction Results (ArgMax Only):")
for bird_id, percentage in sorted_percentages.items():
    print(f"{bird_id}: {percentage:.2f}%")

print(f"\nThe bird is most likely {CLASS_NAMES[most_common_class]}")


Processing frames: 100%|██████████| 42/42 [00:09<00:00,  4.52it/s]


Prediction Results (ArgMax Only):
B52: 50.00%
B26: 33.33%
B18: 16.67%

The bird is most likely B52





In [None]:
import os
import random
import numpy as np
import torch
from torchvision import models, transforms
from PIL import Image
from collections import Counter
from tqdm import tqdm

# Reproducibility settings

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# Force cuDNN to be deterministic
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Classification

MODEL_PATH = "./best_model.pth"
FRAME_DIR = "Extracted_frames"

CLASS_NAMES = [
    'B02', 'B03', 'B04', 'B05', 'B07', 'B11', 'B18', 'B23',
    'B26', 'B29', 'B30', 'B31', 'B47', 'B49', 'B50', 'B52'
]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = models.resnet50(pretrained=False, num_classes=16)
model.load_state_dict(torch.load(MODEL_PATH, map_location=device))
model.fc = torch.nn.Linear(model.fc.in_features, len(CLASS_NAMES))

model = model.to(device)
model.eval()

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])

predictions = []
frame_results = []

for frame_name in tqdm(os.listdir(FRAME_DIR), desc="Processing frames"):
    frame_path = os.path.join(FRAME_DIR, frame_name)
    if not frame_name.lower().endswith('.png'):
        continue

    image = Image.open(frame_path).convert("RGB")
    input_tensor = transform(image).unsqueeze(0).to(device)

    with torch.no_grad():
        outputs = model(input_tensor)
        probabilities = torch.nn.functional.softmax(outputs[0], dim=0)
        predicted_class = probabilities.argmax().item()

        # Save frame name and probability distribution for later printing
        frame_results.append((frame_name, probabilities.cpu().tolist()))

        if predicted_class >= len(CLASS_NAMES):
            print(f"Warning: Predicted class index {predicted_class} is out of bounds for CLASS_NAMES.")
            continue

        predictions.append(predicted_class)

header = "Frame".ljust(20) + "".join(name.ljust(10) for name in CLASS_NAMES)
print("\nDetailed Frame Predictions:")
print(header)
print("-" * len(header))

# Print each frame's probabilities
for frame_name, probs in frame_results:
    row = frame_name.ljust(20)
    for p in probs:
        row += f"{p*100:8.2f}% "
    print(row)

# Original majority voting
prediction_counts = Counter(predictions)
total_predictions = sum(prediction_counts.values())

if total_predictions == 0:
    print("No valid predictions were made.")
    exit()

# Calculate percentage for each class from majority votes
percentages = {
    CLASS_NAMES[k]: (v / total_predictions) * 100
    for k, v in prediction_counts.items()
}
sorted_percentages = dict(
    sorted(percentages.items(), key=lambda item: item[1], reverse=True)
)

most_common_class = max(prediction_counts, key=prediction_counts.get)

print("\nPrediction Results:")
for bird_id, percentage in sorted_percentages.items():
    print(f"{bird_id}: {percentage:.2f}%")

print(f"\nThe bird is most likely {CLASS_NAMES[most_common_class]}")

Processing frames: 100%|██████████| 42/42 [00:09<00:00,  4.52it/s]


Detailed Frame Predictions:
Frame               B02       B03       B04       B05       B07       B11       B18       B23       B26       B29       B30       B31       B47       B49       B50       B52       
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
B18_20181201_056_frame840.png    5.84%     8.71%     4.84%     5.37%     5.00%     4.25%    10.21%     4.54%    11.86%     3.36%     3.99%     4.35%     4.45%     3.10%     6.73%    13.40% 
B18_20181113_089_frame1080.png    6.04%     5.81%     3.63%     3.70%     7.83%     3.14%     9.75%     3.85%    10.14%     3.63%     8.49%     4.22%     4.59%     3.15%     6.66%    15.37% 
B18_20181208_005_frame1500.png    9.72%     3.20%     4.18%     3.91%     5.82%     2.59%    10.47%     3.48%    14.37%     3.77%     6.02%     4.42%     4.51%     4.69%     7.14%    11.70% 
B18_20181208_005_frame180.png    7.34


