<a href="https://colab.research.google.com/github/sainithinkatta/deep_learning_class/blob/main/DL_Project_V1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import zipfile

# Path to your zip file (adjust if needed)
zip_path = '/content/drive/MyDrive/Colab Notebooks/MOT16.zip'
extract_path = '/content/MOT16'

# Unzip the dataset
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

In [3]:
import os

# Check if train/test folders exist
print("Train sequences:", os.listdir("/content/MOT16/train"))
print("Test sequences:", os.listdir("/content/MOT16/test"))

Train sequences: ['MOT16-04', 'MOT16-09', 'MOT16-05', 'MOT16-13', 'MOT16-10', 'MOT16-11', 'MOT16-02']
Test sequences: ['MOT16-12', 'MOT16-14', 'MOT16-01', 'MOT16-07', 'MOT16-08', 'MOT16-06', 'MOT16-03']


In [4]:
# ===================================================================
# 2. Imports
# ===================================================================
from PIL import Image
import numpy as np
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import AdamW, Adam
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

In [5]:
# ============================================================================
# 4. Parse MOT16 Ground‑Truth
# ============================================================================
def parse_gt_file(file_path):
    data = []
    with open(file_path, "r") as f:
        for line in f:
            parts = line.strip().split(",")
            if len(parts) < 6:
                continue
            f_id, obj_id, x, y, w, h = parts[:6]
            data.append({
                "frame_id":   int(float(f_id)),
                "object_id":  int(float(obj_id)),
                "bb_left":    float(x),
                "bb_top":     float(y),
                "bb_width":   float(w),
                "bb_height":  float(h),
            })
    return data

In [6]:
# ===================================================================
# 5. Dataset & DataLoader for Detector (Faster R‑CNN)
# ===================================================================
class MOTDataset(Dataset):
    def __init__(self, img_dir, gt_path, transform=None):
        self.img_dir   = img_dir
        self.gt_data   = parse_gt_file(gt_path)
        self.transform = transform
        # collect unique frame IDs
        self.frame_ids = sorted({d["frame_id"] for d in self.gt_data})

    def __len__(self):
        return len(self.frame_ids)

    def __getitem__(self, idx):
        fid = self.frame_ids[idx]
        img = Image.open(os.path.join(self.img_dir, f"{fid:06d}.jpg")).convert("RGB")

        boxes, labels = [], []
        for obj in self.gt_data:
            if obj["frame_id"] != fid:
                continue
            x1 = obj["bb_left"]
            y1 = obj["bb_top"]
            x2 = x1 + obj["bb_width"]
            y2 = y1 + obj["bb_height"]
            boxes.append([x1, y1, x2, y2])
            labels.append(1)  # only “person” class

        target = {
            "boxes":  torch.tensor(boxes, dtype=torch.float32),
            "labels": torch.tensor(labels, dtype=torch.int64),
        }

        if self.transform:
            img = self.transform(img)
        return img, target

def collate_fn(batch):
    return tuple(zip(*batch))

# create transforms & loader
det_transform = transforms.Compose([
    transforms.ToTensor(),
])
det_dataset   = MOTDataset(
    img_dir=  "/content/MOT16/train/MOT16-02/img1",
    gt_path=  "/content/MOT16/train/MOT16-02/gt/gt.txt",
    transform=det_transform
)
det_loader    = DataLoader(
    det_dataset,
    batch_size=2,
    shuffle=True,
    collate_fn=collate_fn
)


In [11]:
# ===================================================================
# 6. Build & Fine‑Tune the Faster R‑CNN Detector
# ===================================================================
# helper to swap in our 2‑class head

import torch

# choose CUDA if available, otherwise fall back to CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


def get_detector(num_classes=2):
    model = fasterrcnn_resnet50_fpn(pretrained=True)
    in_feats = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_feats, num_classes)
    return model.to(device)

# instantiate
model_det = get_detector(2)
optimizer_det = AdamW(
    [p for p in model_det.parameters() if p.requires_grad],
    lr=5e-4
)

# training loop
num_epochs_det = 5
for epoch in range(num_epochs_det):
    model_det.train()
    epoch_loss = 0.0

    for imgs, targets in det_loader:
        imgs    = [img.to(device) for img in imgs]
        targets = [{k: v.to(device) for k,v in t.items()} for t in targets]

        loss_dict = model_det(imgs, targets)
        loss      = sum(loss for loss in loss_dict.values())

        optimizer_det.zero_grad()
        loss.backward()
        optimizer_det.step()

        epoch_loss += loss.item()

    print(f"[Detector] Epoch {epoch+1}/{num_epochs_det}  Loss: {epoch_loss/len(det_loader):.4f}")

# save weights
torch.save(model_det.state_dict(), "fasterrcnn_finetuned.pth")
print("🔖 Detector weights saved.")

[Detector] Epoch 1/5  Loss: 1.1970
[Detector] Epoch 2/5  Loss: 0.6677
[Detector] Epoch 3/5  Loss: 0.4930
[Detector] Epoch 4/5  Loss: 0.4117
[Detector] Epoch 5/5  Loss: 0.3601
🔖 Detector weights saved.


In [13]:
# ===================================================================
# 7. Dataset for Siamese Network (Re‑ID)
# ===================================================================
class SiameseDataset(Dataset):
    def __init__(self, img_dir, gt_path, transform=None, image_size=(16,16)):
        self.img_dir    = img_dir
        self.gt_data    = parse_gt_file(gt_path)
        self.frame_ids  = sorted({d["frame_id"] for d in self.gt_data})
        self.transform  = transform
        self.image_size = image_size

        # group by frame for fast lookup
        self.frames = {}
        for d in self.gt_data:
            self.frames.setdefault(d["frame_id"], []).append(d)

    def __len__(self):
        return len(self.frame_ids) * 10  # arbitrary

    def __getitem__(self, idx):
        # sample two different frames
        f1, f2 = np.random.choice(self.frame_ids, 2, replace=False)
        objs1, objs2 = self.frames[f1], self.frames[f2]

        # decide pos vs neg pair
        common = list({o["object_id"] for o in objs1} & {o["object_id"] for o in objs2})
        if common and np.random.rand() < 0.5:
            oid = np.random.choice(common)
            o1 = next(o for o in objs1 if o["object_id"]==oid)
            o2 = next(o for o in objs2 if o["object_id"]==oid)
            label = 1.0
        else:
            o1    = objs1[np.random.randint(len(objs1))]
            neg   = [o for o in objs2 if o["object_id"]!=o1["object_id"]]
            o2    = neg[np.random.randint(len(neg))]
            label = 0.0

        def crop_obj(o, fid):
            img = Image.open(os.path.join(self.img_dir, f"{fid:06d}.jpg")).convert("RGB")
            x1,y1 = int(o["bb_left"]), int(o["bb_top"])
            x2,y2 = x1+int(o["bb_width"]), y1+int(o["bb_height"])
            return img.crop((x1,y1,x2,y2)).resize(self.image_size)

        img1 = crop_obj(o1, f1)
        img2 = crop_obj(o2, f2)
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        return img1, img2, torch.tensor(label, dtype=torch.float32)

In [14]:
# ===================================================================
# 8. Define Siamese Network & Contrastive Loss
# ===================================================================
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, out1, out2, label):
        dist = F.pairwise_distance(out1, out2)
        return torch.mean((1-label)*dist**2 + label * F.relu(self.margin - dist)**2)

class SiameseNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 64, 3), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64,128,3), nn.ReLU(), nn.MaxPool2d(2)
        )
        self.fc = nn.Sequential(
            nn.Linear(128*2*2, 256), nn.ReLU(),
            nn.Linear(256, 256)
        )

    def forward_once(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

    def forward(self, x1, x2):
        return self.forward_once(x1), self.forward_once(x2)

In [15]:
# ===================================================================
# 9. Train the Siamese Network
# ===================================================================
siam_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
])
siam_dataset = SiameseDataset(
    img_dir=     "/content/MOT16/train/MOT16-02/img1",
    gt_path=     "/content/MOT16/train/MOT16-02/gt/gt.txt",
    transform=   siam_transform,
    image_size= (16,16)
)
siam_loader = DataLoader(siam_dataset, batch_size=32, shuffle=True)

model_siam = SiameseNetwork().to(device)
criterion_siam = ContrastiveLoss(margin=1.0)
optimizer_siam = Adam(model_siam.parameters(), lr=5e-4)

for epoch in range(3):
    model_siam.train()
    running = 0.0
    for img1, img2, lbl in siam_loader:
        img1, img2, lbl = img1.to(device), img2.to(device), lbl.to(device)
        out1, out2 = model_siam(img1, img2)
        loss = criterion_siam(out1, out2, lbl)
        optimizer_siam.zero_grad()
        loss.backward()
        optimizer_siam.step()
        running += loss.item()
    print(f"[Siamese] Epoch {epoch+1}/3 Loss: {running/len(siam_loader):.4f}")

torch.save(model_siam.state_dict(), "siamese_network.pth")
print("🔖 Siamese weights saved.")

[Siamese] Epoch 1/3 Loss: 0.3204
[Siamese] Epoch 2/3 Loss: 0.3027
[Siamese] Epoch 3/3 Loss: 0.2952
🔖 Siamese weights saved.


In [18]:
# ===================================================================
# 10. Inference & Tracking Pipeline
# ===================================================================
import os
import cv2
import torch
from torchvision import transforms
from PIL import Image
import torch.nn.functional as F

# 10.1 Setup device & reload models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# reload detector
model_det = get_detector(num_classes=2)
model_det.load_state_dict(torch.load("fasterrcnn_finetuned.pth", map_location=device))
model_det.to(device).eval()

# reload Siamese
model_siam = SiameseNetwork().to(device)
model_siam.load_state_dict(torch.load("siamese_network.pth", map_location=device))
model_siam.eval()

# 10.2 Define transforms
det_transform = transforms.Compose([
    transforms.ToTensor(),
])
siam_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
])

# Improved tracking function with better ID assignment logic
# Improved tracking function with proper tensor detachment
def track_sequence(seq_dir, output_path,
                  det_model, siam_model,
                  det_tf, siam_tf,
                  det_threshold=0.7,
                  match_threshold=0.5):

    # init video writer
    frames = sorted(os.listdir(seq_dir))
    sample = cv2.imread(os.path.join(seq_dir, frames[0]))
    h, w = sample.shape[:2]
    writer = cv2.VideoWriter(output_path,
                           cv2.VideoWriter_fourcc(*"mp4v"),
                           20, (w, h))

    tracks = {}     # track_id -> feature vector
    prev_boxes = {} # track_id -> previous bounding box
    next_id = 1

    for fn in frames:
        # load frame
        img_bgr = cv2.imread(os.path.join(seq_dir, fn))
        img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
        pil_img = Image.fromarray(img_rgb)

        # 1) detection
        inp = det_tf(pil_img).to(device)
        with torch.no_grad():  # Added no_grad here
            preds = det_model([inp])[0]
        boxes = preds["boxes"].detach().cpu()  # Added detach
        scores = preds["scores"].detach().cpu()  # Added detach
        mask = scores > det_threshold
        boxes = boxes[mask]

        # 2) extract Siamese features
        feats = []
        for b in boxes:
            x1,y1,x2,y2 = map(int, b.tolist())
            crop = pil_img.crop((x1,y1,x2,y2)).resize((16,16))
            t2 = siam_tf(crop).unsqueeze(0).to(device)
            with torch.no_grad():  # Added no_grad here
                f = siam_model.forward_once(t2)
                f = f / f.norm(dim=1, keepdim=True)
            feats.append(f.detach().cpu())  # Added detach

        # 3) Improved assignment using Hungarian algorithm
        if len(feats) > 0:
            if tracks:
                # Create cost matrix
                cost_matrix = np.zeros((len(feats), len(tracks)))
                for i, feat in enumerate(feats):
                    box = boxes[i]
                    box_center = [(box[0] + box[2]) / 2, (box[1] + box[3]) / 2]

                    for j, (tid, track_feat) in enumerate(tracks.items()):
                        # Compute feature similarity
                        with torch.no_grad():  # Add no_grad around the similarity computation
                            sim = F.cosine_similarity(feat, track_feat, dim=1).item()

                        # Incorporate spatial distance if we have previous box info
                        if tid in prev_boxes:
                            prev_box = prev_boxes[tid]
                            prev_center = [(prev_box[0] + prev_box[2]) / 2, (prev_box[1] + prev_box[3]) / 2]
                            dist = np.sqrt((box_center[0] - prev_center[0])**2 + (box_center[1] - prev_center[1])**2)
                            # Normalize distance to [0, 1] range
                            norm_dist = min(dist / 100.0, 1.0)
                            # Combine similarity and distance (higher value = better match)
                            sim = sim * (1 - 0.3 * norm_dist)

                        # Convert similarity to cost (lower is better for Hungarian)
                        cost_matrix[i, j] = 1.0 - sim

                # Use Hungarian algorithm if scipy is available
                try:
                    from scipy.optimize import linear_sum_assignment
                    row_ind, col_ind = linear_sum_assignment(cost_matrix)

                    # Initialize assigned IDs
                    assigned_ids = [-1] * len(feats)

                    # Process assignments based on cost threshold
                    track_ids = list(tracks.keys())
                    for i, j in zip(row_ind, col_ind):
                        # Only assign if similarity is above threshold
                        if cost_matrix[i, j] < (1.0 - match_threshold):
                            assigned_ids[i] = track_ids[j]
                            # Update track feature with moving average
                            tid = track_ids[j]
                            alpha = 0.7  # Weight for previous feature
                            # Make sure we're doing operations on detached tensors
                            tracks[tid] = alpha * tracks[tid] + (1 - alpha) * feats[i]
                            # Normalize
                            with torch.no_grad():  # Add no_grad around the normalization
                                tracks[tid] = tracks[tid] / tracks[tid].norm(dim=1, keepdim=True)
                            # Update previous box
                            prev_boxes[tid] = boxes[i].tolist()

                    # Assign new IDs to unassigned detections
                    for i, aid in enumerate(assigned_ids):
                        if aid == -1:
                            tracks[next_id] = feats[i]
                            prev_boxes[next_id] = boxes[i].tolist()
                            assigned_ids[i] = next_id
                            next_id += 1

                except ImportError:
                    # Fallback to greedy assignment if scipy is not available
                    assigned = []
                    for i, f in enumerate(feats):
                        best_id, best_sim = None, match_threshold
                        for tid, pf in tracks.items():
                            with torch.no_grad():  # Add no_grad here
                                sim = F.cosine_similarity(f, pf, dim=1).item()
                            if sim > best_sim:
                                best_sim, best_id = sim, tid
                        if best_id is not None:
                            tracks[best_id] = f
                            prev_boxes[best_id] = boxes[i].tolist()
                            assigned.append(best_id)
                        else:
                            tracks[next_id] = f
                            prev_boxes[next_id] = boxes[i].tolist()
                            assigned.append(next_id)
                            next_id += 1
                    assigned_ids = assigned
            else:
                # First frame - assign new IDs to all detections
                assigned_ids = []
                for i, f in enumerate(feats):
                    tracks[next_id] = f
                    prev_boxes[next_id] = boxes[i].tolist()
                    assigned_ids.append(next_id)
                    next_id += 1
        else:
            assigned_ids = []

        # 4) draw & write
        for b, tid in zip(boxes, assigned_ids):
            x1,y1,x2,y2 = map(int, b.tolist())
            # Generate consistent colors based on ID
            color_r = (tid * 43) % 256
            color_g = (tid * 71) % 256
            color_b = (tid * 113) % 256
            color = (color_b, color_g, color_r)  # BGR format for OpenCV

            # box
            cv2.rectangle(img_bgr, (x1,y1), (x2,y2), color, 2)

            # label background
            text = f"person {tid}"
            font = cv2.FONT_HERSHEY_SIMPLEX
            fs = 0.5
            th = cv2.getTextSize(text, font, fs, 1)[0][1]
            text_width = cv2.getTextSize(text, font, fs, 1)[0][0]
            cv2.rectangle(img_bgr,
                        (x1, y1-th-6),
                        (x1 + text_width + 4, y1),
                        color, cv2.FILLED)

            # text
            cv2.putText(img_bgr, text, (x1+2, y1-4),
                      font, fs, (255,255,255), 1)

        writer.write(img_bgr)

    writer.release()
    print(f"✅ Saved tracking video to {output_path}")

# 10.4 Run on test sequence
test_seq = "/content/MOT16/test/MOT16-01/img1"
track_sequence(
    seq_dir=test_seq,
    output_path="tracking_output.mp4",
    det_model=model_det,
    siam_model=model_siam,
    det_tf=det_transform,
    siam_tf=siam_transform,
    det_threshold=0.7,
    match_threshold=0.5
)

  dist = np.sqrt((box_center[0] - prev_center[0])**2 + (box_center[1] - prev_center[1])**2)


✅ Saved tracking video to tracking_output.mp4
