In [22]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [23]:
import sys
sys.path.append('/content/drive/MyDrive/Deep_Learning/Final Project')

In [24]:
# Define the root folder withing your Google Drive for this project (!User must change based on their own setup!)
PROJECT_PATH = "/content/drive/MyDrive/Deep_Learning/Final Project"

In [25]:
!pip install motmetrics -q
!pip install ipynb



In [26]:
%cd /content/drive/MyDrive/Deep_Learning/Final Project/

/content/drive/MyDrive/Deep_Learning/Final Project


In [27]:
# Import All Packages
import os
import cv2
import glob
import torch
import configparser
#
import numpy as np
import pandas as pd
import torch.nn as nn
import motmetrics as mm
import matplotlib.pyplot as plt
import torch.nn.functional as F
#
from tqdm import tqdm
from pathlib import Path
from random import randint
from torchvision.utils import draw_bounding_boxes
from torchvision.io import read_image, write_jpeg, write_video
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
#
# Patch np.asfarray if it does not exist (NumPy 2.x) for motmetrics library
if not hasattr(np, "asfarray"):
    np.asfarray = lambda a: np.asarray(a, dtype=float)

In [28]:
# Siamese Network Class
class Siamese_Network(torch.nn.Module):
    def __init__(self):
        super(Siamese_Network, self).__init__()
        # Input size: 3x128x64

        # Update the input channels of conv1 from 1 to 3
        self.conv1 = torch.nn.Conv2d(3, 64, kernel_size=3)
        self.conv2 = torch.nn.Conv2d(64, 128, kernel_size=3)
        self.conv3 = torch.nn.Conv2d(128, 128, kernel_size=3)

        # 2 fully connected layers
        self.fc1 = torch.nn.Linear(128*14*6, 256)
        self.fc2 = torch.nn.Linear(256, 256)

    def forward_one(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2(x), 2))
        x = F.relu(F.max_pool2d(self.conv3(x), 2))
        x = torch.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

    def forward(self, x1, x2):
        return self.forward_one(x1), self.forward_one(x2)

In [29]:
# Gallery Class
class Gallery(torch.utils.data.Dataset):
    errorCount = 0
    def __init__(self, path, transform=None, max_pairs_per_id=50, max_neg_pairs_per_id=50):
        self.path = path
        self.imgs = sorted([x for x in os.listdir(path) if x.endswith('.jpg')])
        self.transform = transform
        self.max_pairs_per_id = max_pairs_per_id  # Limit positive pairs
        self.max_neg_pairs_per_id = max_neg_pairs_per_id  # Limit negative pairs
        self.pairs = []
        self.labels = []
        self._create_pairs()

    def _create_pairs(self):
        # Organize images by person ID
        person_images = {}
        for img_name in self.imgs:
            person_id = img_name.split('_')[0]
            if person_id not in person_images:
                person_images[person_id] = []
            person_images[person_id].append(img_name)

        # Create pairs
        for person_id, images in person_images.items():
            # Positive pairs: randomly choose up to `max_pairs_per_id` pairs
            positive_pairs = [(images[i], images[j]) for i in range(len(images)) for j in range(i + 1, len(images))]
            positive_pairs = random.sample(positive_pairs, min(len(positive_pairs), self.max_pairs_per_id))
            self.pairs.extend(positive_pairs)
            self.labels.extend([1] * len(positive_pairs))

            # Negative pairs: choose random images from other person IDs
            other_person_ids = list(person_images.keys())
            other_person_ids.remove(person_id)
            negative_pairs = []
            for other_id in random.sample(other_person_ids, min(len(other_person_ids), self.max_neg_pairs_per_id)):
                negative_pairs.append((images[0], person_images[other_id][0]))
            self.pairs.extend(negative_pairs)
            self.labels.extend([0] * len(negative_pairs))

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        img1_name, img2_name = self.pairs[idx]
        img1_path = os.path.join(self.path, img1_name)
        img2_path = os.path.join(self.path, img2_name)

        try:
            img1 = read_image(img1_path).float() / 255.0
            img2 = read_image(img2_path).float() / 255.0
        except (RuntimeError, OSError) as e:
            Gallery.errorCount += 1
            print(f"Error#: {Gallery.errorCount}. Error loading {img1_path} or {img2_path}: {e}")
            # You can skip, replace, or take another action here.
            img1 = torch.zeros(3, 128, 64)  # Placeholder image
            img2 = torch.zeros(3, 128, 64)

        label = torch.tensor(self.labels[idx], dtype=torch.float32)

        return img1, img2, label

In [30]:
# MOT16 Train Class
class MOT16TrainDataset(torch.utils.data.Dataset):
    def __init__(self, root):
        self.root = root
        # Get image directory paths
        self.imgs = []
        self.targets = []
        for subdir in os.listdir(root):

            next_dir = list(sorted(os.listdir(os.path.join(root, subdir, "img1"))))
            next_dir = [os.path.join(subdir, "img1", filename) for filename in next_dir]
            self.imgs += next_dir

            # Import ground truth data
            gt = np.genfromtxt(os.path.join(root, subdir, "gt", "gt.txt"), delimiter=',', dtype=int)

            # Only get the boxes corresponding to people
            person_mask = (gt[:, PERSON_COL] == 1)
            gt = gt[person_mask, :]

            # Get bounding boxes in correct format for pretrained model
            # NOTE the "top" column in the text file is actually the bottom of the box (y axis is inverted)
            bots = gt[:, TOP_COL]
            lefts = gt[:, LEFT_COL]
            tops = bots + gt[:, HEIGHT_COL]
            rights = lefts + gt[:, WIDTH_COL]
            boxes = np.column_stack((lefts, bots, rights, tops))
            person_ids = gt[:, ID_COL]

            # Create target dictionaries for fine tuning
            for i in range(len(next_dir)):
                d = {}
                mask = (gt[:, FRAME_COL] == i+1)
                req_boxes = boxes[mask, :]
                req_person_ids = person_ids[mask]
                d['boxes'] = torch.tensor(req_boxes, dtype=torch.float)
                d['labels'] = torch.ones(mask.shape, dtype=torch.int64)
                d['person_ids'] = torch.tensor(req_person_ids, dtype=torch.int64)
                self.targets.append(d)

        print('Length of dataset: ', len(self.imgs))

    def __getitem__(self, idx):

        if(idx > len(self.imgs)):
            return None, None

        # Load image
        img_path = os.path.join(self.root, self.imgs[idx])

        # Scale image values between 0 and 1
        img = torch.div(read_image(img_path).float(), 255.0)

        return img, self.targets[idx]

    def __len__(self):
        return len(self.imgs)

In [31]:
# MOT16 Test Class
class MOT16TestDataset(torch.utils.data.Dataset):
    def __init__(self, root):
        self.root = root
        # Get image directory path
        self.imgs = list(sorted(os.listdir(os.path.join(root, "img1"))))

    def __getitem__(self, idx):

        if(idx > len(self.imgs)):
            return None, None

        # Load image
        img_path = os.path.join(self.root, "img1", self.imgs[idx])

        # Scale image values between 0 and 1
        img = torch.div(read_image(img_path).float(), 255.0)

        return img

    def __len__(self):
        return len(self.imgs)

In [32]:
# Get Detector Model Function
def get_detector_model(load_weights=True):
    # Configure model with new box predictor head to train
    model = fasterrcnn_resnet50_fpn(weights='DEFAULT')
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, NUM_CLASSES)
    if load_weights:
        if os.path.exists(model_param_path):
            model.load_state_dict(torch.load(model_param_path, weights_only=True))
    return model

In [None]:
# Unzip MOT16 data if needed
!unzip -q f"{PROJECT_PATH}/MOT16.zip" -d /content/MOT16_data/

In [33]:
# Cell 2: Define constants and paths
similarity_model_path = f'{PROJECT_PATH}/siamese_network.pth'
sequence_dir = f'{PROJECT_PATH}/MOT16/test/MOT16-03'
video_path = f'{PROJECT_PATH}/results_FasterRCNN/tracked_test_video.mp4'
model_param_path = f'{PROJECT_PATH}/models_FasterRCNN/bbox_detector.pth'
output_txt_path = f'{PROJECT_PATH}/results_FasterRCNN/FasterRCNN_tracker_results.txt'
gt_txt_path=os.path.join(sequence_dir,"gt/gt.txt")
csv_path= f'{PROJECT_PATH}/results_FasterRCNN/FasterRCNN_tracker_metrics.csv'
BBOX_SCORE_THRESH = 0.7
NUM_CLASSES = 2

In [34]:
# Cell 3: Visualization
def get_sequence_info(sequence_dir):
    seqinfo_path = os.path.join(sequence_dir, "seqinfo.ini")
    config = configparser.ConfigParser()
    config.read(seqinfo_path)
    fps = None
    width = None
    height = None
    fps = config.getint("Sequence", "frameRate", fallback=fps)
    width = config.getint("Sequence", "imWidth", fallback=width)
    height = config.getint("Sequence", "imHeight", fallback=height)
    return fps, (width, height)

def visualize_sequence(sequence_dir, results_file, output_video_path):
    # Load results
    results_df = pd.read_csv(results_file, header=None)
    results_df.columns = ["frame","id","x","y","w","h","conf","x3","y3","z3"]

    # Load frames
    image_paths = sorted(glob.glob(os.path.join(sequence_dir, "img1", "*.jpg")))

    # Video writer
    fps, frame_size = get_sequence_info(sequence_dir)
    first_img = cv2.imread(image_paths[0])
    frame_size = (first_img.shape[1], first_img.shape[0])
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, frame_size)

    # Assign random colors for IDs
    max_id = int(results_df["id"].max()) + 1
    np.random.seed(42)  # reproducible colors
    colors = np.random.randint(0, 255, size=(max_id, 3), dtype=np.uint8)

    # Process frames
    for img_path in image_paths:
        frame_num = int(os.path.splitext(os.path.basename(img_path))[0])
        img = cv2.imread(img_path)
        if img is None:
            continue

        frame_data = results_df[results_df["frame"] == frame_num]
        for _, row in frame_data.iterrows():
            track_id = int(row["id"])
            color = tuple(map(int, colors[track_id]))
            x, y, w_box, h_box = int(row["x"]), int(row["y"]), int(row["w"]), int(row["h"])
            cv2.rectangle(img, (x, y), (x + w_box, y + h_box), color, 2)
            cv2.putText(img, str(track_id), (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
        video_writer.write(img)
    video_writer.release()
    print(f"Video saved to {output_video_path}")

In [35]:
# Cell 4: Evaluation metrics
def evaluate_mot_sequence(gt_txt_path, output_txt_path, csv_path, max_iou=0.5):
    # Load GT and tracker files
    gt = pd.read_csv(gt_txt_path, header=None)
    tr = pd.read_csv(output_txt_path, header=None)

    # Keep only first 7 columns: frame, id, x, y, w, h, conf
    gt = gt.iloc[:, :7]
    tr = tr.iloc[:, :7]

    gt.columns = ["frame","id","x","y","w","h","conf"]
    tr.columns = ["frame","id","x","y","w","h","conf"]

    gt = gt[gt["conf"] > 0].copy() # keep only evaluation boxes (conf > 0)
    tr = tr[tr["conf"] > 0].copy() # keep only evaluation boxes (conf > 0)

    # Initialize MOTAccumulator
    acc = mm.MOTAccumulator(auto_id=True)

    # Go frame by frame
    frames = sorted(gt["frame"].unique())
    for f in frames:
        gt_frame = gt[gt["frame"]==f]
        tr_frame = tr[tr["frame"]==f]

        gt_ids = gt_frame["id"].tolist()
        tr_ids = tr_frame["id"].tolist()

        gt_boxes = gt_frame[["x","y","w","h"]].values
        tr_boxes = tr_frame[["x","y","w","h"]].values

        # Compute IoU distance matrix
        distances = 1 - mm.distances.iou_matrix(gt_boxes, tr_boxes, max_iou=max_iou)
        acc.update(gt_ids, tr_ids, distances)

    # Create MetricsHost
    mh = mm.metrics.create()
    # Compute MOTChallenge metrics
    summary = mh.compute(acc, metrics=mm.metrics.motchallenge_metrics)
    summary.to_csv(csv_path)
    print(f"[OK] Metrics saved to {csv_path}")
    print(summary)

In [36]:
# Cell 5: Function to resize image using adaptive pooling
def resize_image(image):
    # Resize the image to 3x128x64
    return F.adaptive_avg_pool2d(image, (128, 64))


In [37]:
from torchvision import transforms
from PIL import Image, ImageDraw, ImageFont

def main():
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    bbox_detector = get_detector_model(load_weights=False)
    bbox_detector.load_state_dict(torch.load(model_param_path))
    bbox_detector.to(device)
    bbox_detector.eval()

    similarity_model = Siamese_Network().to(device)
    similarity_model.load_state_dict(torch.load(similarity_model_path, weights_only=True))
    similarity_model.eval()
    similarity_model = Siamese_Network_Extended(similarity_model)

    img_set = MOT16TestDataset(sequence_dir)

    # Tracking state management
    next_id = 0
    tracked_persons = {}  # {id: {"embeddings": [last N frame embeddings], "color": (R,G,B), "last_seen": frame_idx}}
    MAX_EMBEDDING_HISTORY = 5  # Keep embeddings from last N frames
    MAX_FRAMES_MISSING = 30    # Max frames allowed missing
    SIMILARITY_THRESHOLD = 0.6 # Threshold below which it is considered a new target


    output_file = open(output_txt_path, "w")

    # tqdm progress display
    for i, img in tqdm(enumerate(img_set), total=len(img_set), desc="Processing frames", unit="frame"):
        img_device = img.to(device)

        # Object detection
        bbox_pred = bbox_detector([img_device])[0]
        scores = bbox_pred["scores"]
        boxes = bbox_pred["boxes"]
        # 只保留高于阈值的框及对应的 score
        mask = (scores[:] >= BBOX_SCORE_THRESH)
        boxes = boxes[mask].type(torch.int32)
        scores = scores[mask]

        # 当前帧的检测结果（box + embedding + score）
        current_detections = []
        for box, score in zip(boxes, scores):
            x1, y1, x2, y2 = box.tolist()
            if x2 - x1 > 0 and y2 - y1 > 0:  # 保证 crop 合法
                cropped = resize_image(img_device[:, y1:y2, x1:x2]).to(device)
                embedding = similarity_model.get_embedding(cropped)
                current_detections.append(
                    {
                        "box": box,
                        "embedding": embedding,
                        "score": float(score.item()),  # 检测置信度
                    }
                )

        # Match existing tracked targets
        unmatched_detections = list(range(len(current_detections)))
        matched_tracks = {}

        # Try to find best match for each tracked person
        for person_id, person_data in list(tracked_persons.items()):
            if len(unmatched_detections) == 0:
                person_data["last_seen"] = i - 1
                continue

            best_detection_idx = None
            best_similarity = -float('inf')

            # Use multiple recent templates for matching
            person_embeddings = person_data["embeddings"]

            for det_idx in unmatched_detections:
                detection = current_detections[det_idx]

                # Compute average similarity with templates
                similarities = []
                for template_embedding in person_embeddings:
                    similarity = F.cosine_similarity(
                        detection["embedding"].unsqueeze(0),
                        template_embedding.unsqueeze(0)
                    ).item()
                    similarities.append(similarity)

                avg_similarity = sum(similarities) / len(similarities)

                if avg_similarity > best_similarity:
                    best_similarity = avg_similarity
                    best_detection_idx = det_idx

            # Match if similarity exceeds threshold
            if best_similarity > SIMILARITY_THRESHOLD and best_detection_idx is not None:
                matched_tracks[person_id] = {
                    "detection_idx": best_detection_idx,
                    "similarity": best_similarity
                }
                unmatched_detections.remove(best_detection_idx)

        # Update matched tracks
        for person_id, match_info in matched_tracks.items():
            det_idx = match_info["detection_idx"]
            detection = current_detections[det_idx]

            person_data = tracked_persons[person_id]

            # 更新 embedding 历史
            person_data["embeddings"].append(detection["embedding"])
            if len(person_data["embeddings"]) > MAX_EMBEDDING_HISTORY:
                person_data["embeddings"].pop(0)

            # 更新时间 & 位置 & 置信度
            person_data["last_seen"] = i
            person_data["current_box"] = detection["box"]
            person_data["score"] = detection.get("score", 1.0)

        # Create new tracks for unmatched detections
        for det_idx in unmatched_detections:
            detection = current_detections[det_idx]
            tracked_persons[next_id] = {
                "embeddings": [detection["embedding"]],
                "color": (randint(0, 255), randint(0, 255), randint(0, 255)),
                "last_seen": i,
                "current_box": detection["box"],
                "score": detection.get("score", 1.0),
            }
            next_id += 1

        # Remove stale tracks
        for person_id in list(tracked_persons.keys()):
            if i - tracked_persons[person_id]["last_seen"] > MAX_FRAMES_MISSING:
                del tracked_persons[person_id]

        # Draw tracking results for the current frame
        annotations = []
        for person_id, person_data in tracked_persons.items():
            if person_data["last_seen"] == i:
                box = person_data["current_box"]
                x1, y1, x2, y2 = box.tolist()
                w = x2 - x1
                h = y2 - y1
                conf = float(person_data.get("score", 1.0))

                # 写入一行: frame,id,x,y,w,h,conf
                # 如果你朋友的 YOLO 文件 frame 是从 1 开始，这里用 i+1 比较方便对齐
                frame_idx_for_txt = i + 1
                output_file.write(
                    f"{frame_idx_for_txt},{person_id},{x1},{y1},{w},{h},{conf:.4f},1,1,1\n"
                )

                annotations.append((box, person_data["color"], str(person_id)))

        # 画框
        if annotations:
            all_boxes = torch.stack([ann[0] for ann in annotations])
            all_colors = [ann[1] for ann in annotations]
            all_labels = [ann[2] for ann in annotations]

            annotated_img = draw_bounding_boxes_with_labels(
                img,
                all_boxes.cpu(),
                labels=all_labels,
                colors=all_colors,
                width=4,
            )
        else:
            annotated_img = img

        # Save result
        write_jpeg(torch.mul(annotated_img, 255.0).to(torch.uint8), f'./out/{i:05d}.jpg')

    # 关闭 txt 文件
    output_file.close()
    print(f"Tracking results saved to: {output_txt_path}")

    # Visualize
    visualize_sequence(sequence_dir, output_txt_path, video_path)

    # Metrics
    if os.path.exists(gt_txt_path):
        evaluate_mot_sequence(gt_txt_path, output_txt_path, csv_path, max_iou=0.5)
    else:
        print("No ground truth, testing sequence")


# Utility function: Enhanced bounding box drawing with labels
def draw_bounding_boxes_with_labels(image, boxes, labels=None, colors=None, width=1):
    """
    Draw bounding boxes with labels

    Args:
        image: Input image tensor [C, H, W]
        boxes: Bounding box coordinates [N, 4] in (x1, y1, x2, y2) format
        labels: List of labels [N]
        colors: List of colors [N]
        width: Line width

    Returns:
        Image with bounding boxes and labels
    """
    image_with_boxes = draw_bounding_boxes(image, boxes, colors=colors, width=width)

    if labels is None:
        return image_with_boxes

    image_pil = transforms.ToPILImage()(image_with_boxes)
    draw = ImageDraw.Draw(image_pil)

    try:
        font = ImageFont.truetype("arial.ttf", 20)
    except IOError:
        font = ImageFont.load_default()

    for i, box in enumerate(boxes):
        x1, y1, x2, y2 = box.tolist()
        color = colors[i] if colors is not None else (255, 255, 255)
        text = labels[i]

        if hasattr(draw, 'textbbox'):
            bbox = draw.textbbox((0, 0), text, font=font)
            text_w = bbox[2] - bbox[0]
            text_h = bbox[3] - bbox[1]
        else:
            text_w, text_h = draw.textsize(text, font=font)

        draw.rectangle([(x1, y1 - text_h - 2), (x1 + text_w, y1)], fill=color)
        draw.text((x1, y1 - text_h - 2), text, fill=(0, 0, 0), font=font)

    return transforms.ToTensor()(image_pil)

# Hypothetical extension of Siamese network to add get_embedding()
class Siamese_Network_Extended(nn.Module):
    def __init__(self, original_model):
        super(Siamese_Network_Extended, self).__init__()
        self.original_model = original_model
        self.encoder = original_model.encoder if hasattr(original_model, 'encoder') else None

    def forward(self, x1, x2):
        return self.original_model(x1, x2)

    def get_embedding(self, x):
        """Extract embedding for a single image"""
        if self.encoder:
            return self.encoder(x)
        else:
            dummy = torch.zeros_like(x)
            out_x, _ = self.original_model(x, dummy)
            return out_x

# Main part

In [38]:
# Cell 6: Entry point to execute the main function
import os
os.makedirs(f"{PROJECT_PATH}/out/", exist_ok=True)


if __name__ == "__main__":
    main()


Processing frames: 100%|██████████| 1500/1500 [42:54<00:00,  1.72s/frame]


Tracking results saved to: /content/drive/MyDrive/Deep_Learning/Final Project/results_FasterRCNN/FasterRCNN_tracker_results.txt
Video saved to /content/drive/MyDrive/Deep_Learning/Final Project/results_FasterRCNN/tracked_test_video.mp4
No ground truth, testing sequence
