# CIS 519 Final Project: Counting Animals in a Sequence of Images
## Wahub Ahmed, Ankith Pinnamaneni, Bailey Hirota
#### Adapted from https://www.kaggle.com/code/jordandahan/iwild2022

## Imports

In [2]:
import argparse
import glob
import os
import sys
import time
import warnings
import json, codecs
import random
import matplotlib.pyplot as plt
from scipy.optimize import linear_sum_assignment as linear_assignment
import math

import cycler
from matplotlib import colors
from matplotlib import pyplot as plt
import matplotlib.patches as patches
import tensorflow as tf
import numpy as np
import pandas as pd
from PIL import Image, ImageFile, ImageFont, ImageDraw
import statistics
from torch.utils.data import Dataset,DataLoader
import torch
import torchvision
import cv2
from tqdm import tqdm
%matplotlib inline

# from CameraTraps.ct_utils import truncate_float
print('TensorFlow version:', tf.__version__)
print('Is GPU available? tf.test.is_gpu_available:', tf.test.is_gpu_available())

TensorFlow version: 2.6.4
Is GPU available? tf.test.is_gpu_available: False


## Load metadata information

In [None]:
def image_name_to_id(name):
    return name.rstrip('.jpg')

def read_image(path):
    with tf.io.gfile.GFile(path, 'rb') as f:
        return np.array(Image.open(f))
    
def read_json(path):
    with tf.io.gfile.GFile(path) as f:
        return json.load(f)
    
def create_detection_map(annotations,mode="train"):
    """Creates a dict mapping IDs ---> detections."""

    ann_map = {}
    for image in annotations['images']:
        if image['file'].split('/')[0] == mode:
            ann_map[image['file'].split('/')[-1].rstrip('.jpg')] = image['detections']
    return ann_map

# TRAIN
IMAGES_DIR_TRAIN = "/kaggle/input/iwildcam2022-fgvc9/train/train"
BOX_ANNOTATION_FILE = "/kaggle/input/iwildcam2022-fgvc9/metadata/metadata/iwildcam2022_mdv4_detections.json"
MASKS_DIR = "/kaggle/input/iwildcam2022-fgvc9/instance_masks/instance_masks"

images_train = tf.io.gfile.listdir(IMAGES_DIR_TRAIN)
# The annotations file contains annotations for all images in train and test
annotations = read_json(BOX_ANNOTATION_FILE)
detection_train_map = create_detection_map(annotations)
images_train_ids = list(detection_train_map.keys())

#TEST
IMAGES_DIR_TEST = "/kaggle/input/iwildcam2022-fgvc9/test/test"
images_test = tf.io.gfile.listdir(IMAGES_DIR_TEST)
detection_test_map = create_detection_map(annotations,mode="test")
images_test_ids = list(detection_test_map.keys())

print(f'length of detection map for train = {len(detection_train_map)}\nlength of detection map for test = {len(detection_test_map)}\n')
print(f'length of images for train = {len(images_train)}\nlength of images for test = {len(images_test)}\n')
print(f'total annotations from megaDetector model = {len(annotations["images"])}')

### Data Augmentation Method

In [None]:
def read_image(path):
    with tf.io.gfile.GFile(path, 'rb') as f:
        return np.array(Image.open(f).convert('L')) # Change 'L' to '1' and add parameter dithering = Image.FLOYDSTEINBERG

In [None]:
with codecs.open("../input/iwildcam2022-fgvc9/metadata/metadata/iwildcam2022_train_annotations.json", 'r',
                 encoding='utf-8', errors='ignore') as f:
    train_meta = json.load(f)
    
with codecs.open("../input/iwildcam2022-fgvc9/metadata/metadata/iwildcam2022_test_information.json", 'r',
                 encoding='utf-8', errors='ignore') as f:
    test_meta = json.load(f)
seq_test = pd.DataFrame(test_meta['images'])
#train_cat.columns = [ 'category_id', 'scientificName','family', 'genus']
display(seq_test)
seq_train = pd.DataFrame(train_meta['images'])
#train_cat.columns = [ 'category_id', 'scientificName','family', 'genus']
display(seq_train)

## Show & Analyze Data

In [None]:
COLOR_CYCLER = cycler.cycler(color=['tab:blue', 'tab:green', 'tab:orange',
                                    'tab:red', 'tab:purple'])
pd_example = seq_train[seq_train['seq_id'] == "30048d32-7d42-11eb-8fb5-0242ac1c0002"]
def  get_image_annotation(image, detection_annotations, categories,instance_id_image,ax):
    """Plot boxes and mask annotations for a given image.

            Args:
            image: An image array of shape [H, W, 3]
            detection_annotations: A list of detections. Each detection is a dict
              containing the keys 'category', 'bbox' and 'conf'.
            categories: A dict mapping category IDs to names.
            instance_id_image: An array of shape [H, W] containing the instance ID
              at each pixel. IDs are expected to be 1-indexed, with 0 reserved for
              the background."""
        
    cycle_iter = COLOR_CYCLER()
    image_height, image_width = image.shape[:2]
    ax.imshow(image)
    for i, annotation in enumerate(detection_annotations):
        xmin, ymin, width, height = annotation['bbox']
        xmin *= image_width
        ymin *= image_height
        width *= image_width
        height *= image_height
        color = next(cycle_iter)['color']
        rect = patches.Rectangle((xmin, ymin), width, height,
                                 linewidth=3, edgecolor=color, facecolor='none')
        ax.add_patch(rect)
        label = '{}:{:.2f}'.format(categories[annotation['category']],
                                   annotation['conf'])
        ax.text(xmin, ymin - 5, label, fontsize=30, color='white',
                  bbox=dict(boxstyle='square,pad=0.0', facecolor=color, alpha=0.75,
                            ec='none'))
        r, g, b, _ = colors.to_rgba(color)
        color_array = np.array([r, g, b]).reshape(1, 1, 3)
        color_image = np.ones((image_height, image_width, 3)) * color_array
        mask = (instance_id_image == (i + 1)).astype(np.float32)[:, :, np.newaxis]
        color_mask = np.concatenate([color_image, mask], axis=2)
        
        ax.imshow(color_mask, alpha=0.5)
    return color_mask
        
def show_images_seq(data,detections,train = True):
    rows = data.shape[0]
    cols = data.shape[1]
    fig, axs = plt.subplots(rows, dpi=80,  figsize=(rows*5, rows*5))
    for i in range(rows):
        image_name = data["file_name"][i]
        image_path = os.path.join(IMAGES_DIR_TRAIN if train else IMAGES_DIR_TEST, image_name)
        image_id = image_name_to_id(image_name)
        mask_path = os.path.join(MASKS_DIR, f'{image_id}.png')
        image = read_image(image_path)
        if (image_id not in detections) or (len(detections[image_id]) == 0) or ( not tf.io.gfile.exists(mask_path)):
            plt.title(f'{image_name} missing detection data.')
            axs[i].imshow(image)
        else:
            detection_annotations = detections[image_id]
            instance_id_image = read_image(mask_path)
            image = get_image_annotation(image,detection_annotations,annotations['detection_categories'],instance_id_image,axs[i])   
            
        axs[i].axis("off")    
    plt.show()
            
show_images_seq(pd_example,detection_train_map)

## Solutions
* Solution 1: Iwildcam competition winners in 2021 counted the maximum number of bboxes per frame as part of their sequence.
* Solution 2. The identified objects will be tracked using an object tracker, which gives each object a unique ID number.

### Solution 1: Maximum detections in sequence

In [None]:
def count_detections(row,detections, thres):
    image_id = row['file_name'].split('.')[0]
    threshold = thres
    count = 0
    
    for bbox in detections[image_id]:
        if bbox['conf'] > threshold:
            count += 1   
    return count

def generate_zero_submission(seq_ids):
    sub = pd.DataFrame(seq_ids, columns=['Id'])
    sub['Predicted'] = 0
    return sub


In [None]:
thres_list = [0.8, 0.85, 0.9, 0.95, 0.97]
for thres in thres_list: 
    seq_test["detections_count"] = np.nan
    for idx,row in tqdm(seq_test.iterrows()):
        seq_test.at[idx, 'detections_count'] = count_detections(row,detection_test_map, thres)

    submission_res_by_max = generate_zero_submission(seq_test.seq_id.unique())
    for seq_id in tqdm(seq_test.seq_id.unique()):
        max_count = seq_test[seq_test.seq_id == seq_id]['detections_count'].max()
        submission_res_by_max.loc[submission_res_by_max.Id == seq_id, 'Predicted'] = max_count
        
    display(submission_res_by_max)
    print("Final results:")
    print(max(submission_res_by_max.Predicted))
    submission_res_by_max.to_csv (r'res_by_max_'+str(thres)+'.csv', index = False, header=True) 


### Solution 2: using Object-Tracker-Model


#### Intro to Tracking
Tracking in deep learning is the task of predicting the positions of objects throughout a video using their spatial as well as temporal features. More technically, Tracking is getting the initial set of detections, assigning unique ids, and tracking them throughout frames of the video (or sequence of frames) feed while maintaining the assigned ids. Tracking is generally a two-step process:  
    **1. A detection module for target localization: The module responsible for detecting and localization of the object in the frame using some object detector like YOLOv4, CenterNet, etc (In our case- the detections is given part of db).**  
    **2. A motion predictor: This module is responsible for predicting the future motion of the object using its past information.**

### Types of Trackers

1. **Single Object Tracker**- These types of trackers track only a single object even if there are many other objects present in the frame. (We will not use here).
2. **Multiple Object Tracker -**
These types of trackers can track multiple objects present in a frame. Some of the algorithms include DeepSORT, JDE, and CenterTrack which are very powerful algorithms and handle most of the challenges faced by trackers. (We will present the idea below).  

DeepSORT: [arXiv:1703.07402](https://arxiv.org/abs/1703.07402)  
Centroid: [centroid tracking](https://pyimagesearch.com/2018/07/23/simple-object-tracking-with-opencv/)

#### Centroid-Tracker using L2 Distance

In [None]:
                                                                                                                               #######  
class EuclideanDistTracker:
    def __init__(self):
        self.center_points = {}
        self.id_count = 0
    
    def update(self, objects_rect):
        """
        Parameters:
        -----------
        object_rect:  array of bounding box coordinates.
        --------
        Returns:
            list containing [x,y,w,h,object_id].
                x,y,w,h are the bounding box coordinates, and object_id is the id assigned to that particular bounding box.
        --------
        """
        # Objects boxes and ids
        objects_bbs_ids = []

        # Get center point of new object
        for rect in objects_rect:
            x, y, w, h = rect
            cx,cy = (x+w)/2, (y+h)/2
#             cx = (x + x + w) // 2 # Center x
#             cy = (y + y + h) // 2 # Center y
            # Find out if that object was detected already
            same_object_detected = False
            for id, pt in self.center_points.items():
                dist = math.hypot(cx - pt[0], cy - pt[1])

                if dist < 25:
                    self.center_points[id] = (cx, cy)
                    objects_bbs_ids.append([x, y, w, h, id])
                    same_object_detected = True
                    break

            # New object is detected we assign the ID to that object
            if same_object_detected is False:
                self.center_points[self.id_count] = (cx, cy)
                objects_bbs_ids.append([x, y, w, h, self.id_count])
                self.id_count += 1

        # Clean the dictionary by center points to remove IDS not used anymore
        new_center_points = {}
        for obj_bb_id in objects_bbs_ids:
            _, _, _, _, object_id = obj_bb_id
            center = self.center_points[object_id]
            new_center_points[object_id] = center

        # Update dictionary with IDs not used removed
        self.center_points = new_center_points.copy()
        return objects_bbs_ids
    
def get_detections(detections_dict,height,width, thres):
    detections = []
    for diction in detections_dict:
        if diction['conf'] > thres:
            detections.append(diction['bbox'])

    return detections 

thres_list = [0.8, 0.85, 0.9, 0.95, 0.97]
for thres in thres_list: 
    tracker = EuclideanDistTracker()
    submission_res_by_tracks = generate_zero_submission(seq_test.seq_id.unique())
    i=0
    for seq_id in tqdm(seq_test.seq_id.unique()):
        tracker.__init__()
        seq_frames = seq_test[seq_test.seq_id == seq_id].sort_values(by=['seq_frame_num']).reset_index( drop = True)
        for _,frame in seq_frames.iterrows():
            detections = get_detections(detection_test_map[frame['id']], frame['height'],frame['width'], thres)
            boxes_ids = tracker.update(detections)
        if i < 3 and tracker.id_count > 1 :
            display(seq_frames)
            show_images_seq(seq_frames,detection_test_map,train=False)
            print(f'id_count = {tracker.id_count}')
            i+=1
            
        submission_res_by_tracks.loc[submission_res_by_tracks.Id == seq_id, 'Predicted'] = tracker.id_count

    display(submission_res_by_tracks)
    print(max(submission_res_by_tracks.Predicted))
    submission_res_by_tracks.to_csv (r'res_by_tracks_' + str(thres) + '.csv', index = False, header=True) 


#### Centroid-Tracker using L1 (Manhattan) Distance

In [None]:
class L1DistTracker:
    def __init__(self):
        self.center_points = {}
        self.id_count = 0
    
    def update(self, objects_rect):
        """
        Parameters:
        -----------
        object_rect:  array of bounding box coordinates.
        --------
        Returns:
            list containing [x,y,w,h,object_id].
                x,y,w,h are the bounding box coordinates, and object_id is the id assigned to that particular bounding box.
        --------
        """
        # Objects boxes and ids
        objects_bbs_ids = []

        # Get center point of new object
        for rect in objects_rect:
            x, y, w, h = rect
            cx,cy = x+w/2, y+h/2
            # Find out if that object was detected already
            same_object_detected = False
            for id, pt in self.center_points.items():
                dist = abs(cx - pt[0]) + abs(cy - pt[1])

                if dist < 25:
                    self.center_points[id] = (cx, cy)
                    objects_bbs_ids.append([x, y, w, h, id])
                    same_object_detected = True
                    break

            # New object is detected we assign the ID to that object
            if same_object_detected is False:
                self.center_points[self.id_count] = (cx, cy)
                objects_bbs_ids.append([x, y, w, h, self.id_count])
                self.id_count += 1

        # Clean the dictionary by center points to remove IDS not used anymore
        new_center_points = {}
        for obj_bb_id in objects_bbs_ids:
            _, _, _, _, object_id = obj_bb_id
            center = self.center_points[object_id]
            new_center_points[object_id] = center

        # Update dictionary with IDs not used removed
        self.center_points = new_center_points.copy()
        return objects_bbs_ids
    
def get_detections(detections_dict, height, width, thres):
    detections = []
    for diction in detections_dict:
        if diction['conf'] > thres:
            detections.append(diction['bbox'])

    return detections 

thres_list = [0.8, 0.85, 0.9, 0.95, 0.97]

for thres in thres_list: 
    tracker = L1DistTracker()
    submission_res_by_tracks = generate_zero_submission(seq_test.seq_id.unique())
    i=0
    for seq_id in tqdm(seq_test.seq_id.unique()):
        tracker.__init__()
        seq_frames = seq_test[seq_test.seq_id == seq_id].sort_values(by=['seq_frame_num']).reset_index(drop=True)
        for _,frame in seq_frames.iterrows():
            detections = get_detections(detection_test_map[frame['id']], frame['height'], frame['width'], thres)
            boxes_ids = tracker.update(detections)
        if i < 3 and tracker.id_count > 1 :
            display(seq_frames)
            show_images_seq(seq_frames, detection_test_map, train=False)
            print(f'id_count = {tracker.id_count}')
            i += 1
            
        submission_res_by_tracks.loc[submission_res_by_tracks.Id == seq_id, 'Predicted'] = tracker.id_count

    display(submission_res_by_tracks)
    print(max(submission_res_by_tracks.Predicted))
    submission_res_by_tracks.to_csv(r'res_by_tracks_l1_' + str(thres) + '.csv', index=False, header=True)


## Faster R-CNN Implementation Using Pretrained Model
### Load model and define bounding box heuristics

In [None]:
# Load the pretrained model
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

# Set the model to evaluation mode
model.eval()

# Define a function to draw bounding boxes on an image
def draw_boxes(image, boxes, labels, scores):
    # Convert the image to BGR format for OpenCV
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    # Loop over the boxes
    for box, label, score in zip(boxes, labels, scores):
        # Draw a rectangle around the box
        cv2.rectangle(image, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (0, 255, 0), 2)
        # Draw the label and score on the top-left corner of the box
        cv2.putText(image, f"{label}: {score:.2f}", (int(box[0]), int(box[1]) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
    # Return the image with boxes
    return image

# Define a function to count the maximum number of bounding boxes in a sequence of images
def count_max_boxes(images):
    # Initialize a list to store the number of boxes for each image
    num_boxes = []
    # Loop over the images
    for image in images:
        # Convert the image to RGB format for PyTorch
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # Convert the image to a tensor and normalize it
        image_tensor = torchvision.transforms.functional.to_tensor(image)
        image_tensor = torchvision.transforms.functional.normalize(image_tensor, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        # Add a batch dimension to the image tensor
        image_tensor = image_tensor.unsqueeze(0)
        # Get the predictions from the model
        with torch.no_grad():
            predictions = model(image_tensor)
        # Get the boxes, labels and scores from the predictions
        boxes = predictions[0]["boxes"].numpy()
        labels = predictions[0]["labels"].numpy()
        scores = predictions[0]["scores"].numpy()
        # Filter out the boxes with low scores
        threshold = 0.95
        mask = scores >= threshold
        boxes = boxes[mask]
        labels = labels[mask]
        scores = scores[mask]
        # Append the number of boxes to the list
        num_boxes.append(len(boxes))
    # Return the maximum number of boxes in the list
    return max(num_boxes)

# Define the paths for the training and testing images
train_path = "/kaggle/input/iwildcam2022-fgvc9/train/train"
test_path = "/kaggle/input/iwildcam2022-fgvc9/test/test"

# Get a list of training and testing images
train_images = glob.glob(os.path.join(train_path, "*.jpg"))
test_images = glob.glob(os.path.join(test_path, "*.jpg"))

# Load a sample training image and get its predictions
sample_train_image = cv2.imread(train_images[0])
sample_train_predictions = model(torchvision.transforms.functional.to_tensor(sample_train_image).unsqueeze(0))

### Draw bounding boxes

In [None]:
# Draw bounding boxes on the sample training image and display it
sample_train_boxes = sample_train_predictions[0]["boxes"].detach().numpy()
sample_train_labels = sample_train_predictions[0]["labels"].detach().numpy()
sample_train_scores = sample_train_predictions[0]["scores"].detach().numpy()
sample_train_image_with_boxes = draw_boxes(sample_train_image, sample_train_boxes, sample_train_labels, sample_train_scores)
plt.imshow(sample_train_image_with_boxes)
plt.show()
plt.close()

# Load a sample testing image and get its predictions
sample_test_image = cv2.imread(test_images[0])
sample_test_predictions = model(torchvision.transforms.functional.to_tensor(sample_test_image).unsqueeze(0))

# Draw bounding boxes on the sample testing image and display it
sample_test_boxes = sample_test_predictions[0]["boxes"].detach().numpy()
sample_test_labels = sample_test_predictions[0]["labels"].detach().numpy()
sample_test_scores = sample_test_predictions[0]["scores"].detach().numpy()
sample_test_image_with_boxes = draw_boxes(sample_test_image, sample_test_boxes, sample_test_labels, sample_test_scores)
plt.imshow(sample_test_image_with_boxes)
plt.show()
plt.close()

# Load a sequence of training images and count the maximum number of bounding boxes
sequence_train_images = [cv2.imread(image) for image in train_images[:10]] # You can change the number of images as per your preference
max_train_boxes = count_max_boxes(sequence_train_images)
print(f"The maximum number of bounding boxes in the sequence of training images is {max_train_boxes}")

# Load a sequence of testing images and count the maximum number of bounding boxes
sequence_test_images = [cv2.imread(image) for image in test_images[:10]] # You can change the number of images as per your preference
max_test_boxes = count_max_boxes(sequence_test_images)
print(f"The maximum number of bounding boxes in the sequence of testing images is {max_test_boxes}")

## Conclusion
* It is obvious that the performance of the object tracker is not as good as the counting heuristic. There could be several reasons for this.
- Also, in order to get better performance - we can produce the identifications ourselves by our own model.

    * It makes sense that the counting heuristic works best because we are counting animals in this project and since they were taken with a cameratrap (which takes many pictures in sequence when it detects movement) it could be that in one frame all the animals that passed by in sequence were captured. Therefore, the simple and working solution is to count the maximum number of identifications in the sequence of images.
