# Person Detection and Tracking in Crowded Scenes using Classical Computer Vision Techniques

Authors : Maria Kontaratou, Chaimae Sadoune & Manon Lagarde

### 1. Prerequisites : 

#### 1.1. Import Statements : 

In [42]:
import cv2
import matplotlib.pyplot as plt
import numpy as np
import imutils
import xml.etree.ElementTree as ET
from imutils.object_detection import non_max_suppression
from scipy.spatial import distance as dist
from scipy.optimize import linear_sum_assignment

#### 1.2. File Path Configuration :

In [None]:
test_image_path = '/Users/CentraleSupelec/Documents/ComputerVision_Project/person_img.jpg'
xml_file = '/Users/CentraleSupelec/Documents/ComputerVision_Project/PETS09-S2L2/PETS2009-S2L2.xml'
video_file = '/Users/CentraleSupelec/Documents/ComputerVision_Project/PETS09-S2L2/PETS09-S2L2-raw.webm'

: 


### 2. Pedestrian Detection using HOG and SVM : 

We start by initializing a Histogram of Oriented Gradients (HOG) detector with a pre-trained Support Vector Machine (SVM) model for pedestrian detection.

In [None]:
# 1) Create the HOG descriptor and set the pre-trained SVM detector for people
hog = cv2.HOGDescriptor()
hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())

# 2) Load your test image 
img = cv2.imread(test_image_path)

if img is None:
    raise IOError("Could not read test image!")

# 3) Optionally resize for speed (comment out if you want original size)
#    This step can help reduce false positives in large images
original = img.copy()
scale_percent = 100  
width = int(img.shape[1] * scale_percent / 100)
height = int(img.shape[0] * scale_percent / 100)
img = cv2.resize(img, (width, height))

# 4) Run the detector
#    - winStride controls the step of the sliding window
#    - padding is the area around the detection window
#    - scale is how much the image is resized at each scale
(rects, weights) = hog.detectMultiScale(
    img,
    winStride=(8, 8),
    padding=(8, 8),
    scale=1.1
)

# 5) Apply non-maximum suppression to reduce overlapping boxes
#    OpenCV doesn’t automatically do NMS for HOG, so manually:
rects_np = []
for (x, y, w, h) in rects:
    rects_np.append([x, y, x + w, y + h])  # x1, y1, x2, y2

rects_np = np.array(rects_np)
pick = non_max_suppression(rects_np, probs=None, overlapThresh=0.65)


# 6) Visualize the results
for (x1, y1, x2, y2) in pick:
    cv2.rectangle(img, (x1, y1), (x2, y2), (0, 0, 255), 2)

# Convert to RGB (matplotlib uses RGB, OpenCV uses BGR)
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

plt.figure(figsize=(10, 6))
plt.imshow(img_rgb)
plt.title(f"Detected Pedestrians: {len(pick)}")
plt.axis("off")
plt.show()

### 3. Pedestrian Detection and Tracking using HOG and Centroid Tracking : 

This code detects and tracks people in a video using classical techniques:

- HOG + SVM for detection.
- Centroid-based tracking to maintain identities across frames.
- Non-Maximum Suppression to reduce redundant detections.

This approach is computationally lightweight and suitable for scenarios where deep learning isn't feasible. However, it has limitations in very crowded scenes and under occlusions.

The code is “bad” because classical HOG alone isn’t robust for small, occluded, or crowded scenes, and a simple centroid tracker easily confuses IDs without motion/appearance modeling. For a challenging dataset like PETS09-S2L1, you either need:

- More advanced classical: (1) background subtraction + morphological filtering + (2) HOG + (3) multi-feature tracking (Kalman + color).
- Modern deep methods: They handle scale/occlusion better but require training or at least a pre-trained YOLO/SSD-like network.

In [None]:
###########################################
# Simple Centroid Tracker
###########################################
class CentroidTracker:
    def __init__(self, maxDisappeared=30, maxDistance=50):
        # Next available object ID
        self.nextObjectID = 0
        # Dictionary: objectID -> (centroid, bbox)
        self.objects = {}
        # Dictionary: objectID -> number of consecutive frames missing
        self.disappeared = {}
        self.maxDisappeared = maxDisappeared
        self.maxDistance = maxDistance

    def register(self, centroid, bbox):
        self.objects[self.nextObjectID] = (centroid, bbox)
        self.disappeared[self.nextObjectID] = 0
        self.nextObjectID += 1

    def deregister(self, objectID):
        del self.objects[objectID]
        del self.disappeared[objectID]

    def update(self, rects):
        # If no detections, mark existing objects as disappeared
        if len(rects) == 0:
            for objectID in list(self.disappeared.keys()):
                self.disappeared[objectID] += 1
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)
            return self.objects

        # Compute centroids for the new detections
        inputCentroids = np.zeros((len(rects), 2), dtype="int")
        for (i, (x, y, w, h)) in enumerate(rects):
            cX = int(x + w / 2)
            cY = int(y + h / 2)
            inputCentroids[i] = (cX, cY)

        # If no objects are tracked, register all detections
        if len(self.objects) == 0:
            for i in range(0, len(inputCentroids)):
                self.register(inputCentroids[i], rects[i])
        else:
            # Grab the set of object IDs and their centroids
            objectIDs = list(self.objects.keys())
            objectCentroids = np.array([self.objects[objID][0] for objID in objectIDs])

            # Compute distance matrix between tracked centroids and new centroids
            D = dist.cdist(objectCentroids, inputCentroids)
            # For each existing object, find the closest new centroid
            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            usedRows = set()
            usedCols = set()

            # Assign the new centroid to an object if the distance is within maxDistance
            for (row, col) in zip(rows, cols):
                if row in usedRows or col in usedCols:
                    continue
                if D[row, col] > self.maxDistance:
                    continue
                objectID = objectIDs[row]
                self.objects[objectID] = (inputCentroids[col], rects[col])
                self.disappeared[objectID] = 0
                usedRows.add(row)
                usedCols.add(col)

            # Mark unmatched existing objects as disappeared
            unusedRows = set(range(0, D.shape[0])).difference(usedRows)
            for row in unusedRows:
                objectID = objectIDs[row]
                self.disappeared[objectID] += 1
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)

            # Register new detections that were not matched
            unusedCols = set(range(0, D.shape[1])).difference(usedCols)
            for col in unusedCols:
                self.register(inputCentroids[col], rects[col])

        return self.objects
    
###########################################
# HOG Detector Function with NMS
###########################################
def detect_pedestrians(frame, conf_thresh=0.4):
    """
    Detect pedestrians in the frame using OpenCV's HOG detector.
    Returns a list of bounding boxes in (x, y, w, h) format.
    """
    # Convert to grayscale and optionally apply histogram equalization
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray_eq = cv2.equalizeHist(gray)
    
    hog = cv2.HOGDescriptor()
    hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
    
    (rects, weights) = hog.detectMultiScale(
        gray_eq,
        winStride=(4, 4),
        padding=(8, 8),
        scale=1.05
    )
    
    filtered_rects = []
    for (r, w_val) in zip(rects, weights):
        if w_val >= conf_thresh:
            filtered_rects.append(r)
    if len(filtered_rects) == 0:
        return []
    
    # Convert detections to (x1, y1, x2, y2) for non-maximum suppression
    rects_np = np.array([[x, y, x + w, y + h] for (x, y, w, h) in filtered_rects])
    picks = non_max_suppression(rects_np, probs=None, overlapThresh=0.65)
    final_rects = [(x1, y1, x2 - x1, y2 - y1) for (x1, y1, x2, y2) in picks]
    return final_rects

In [None]:
###########################################
# Main Processing Function
###########################################
def main():
    cap = cv2.VideoCapture(video_file)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    # Instantiate the centroid tracker
    tracker = CentroidTracker(maxDisappeared=30, maxDistance=50)

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Resize frame for consistency and processing speed
        frame = imutils.resize(frame, width=800)
        
        # Detect pedestrians in the current frame
        detections = detect_pedestrians(frame, conf_thresh=0.6)
        
        # Update the tracker with the current frame detections
        objects = tracker.update(detections)
        
        # Draw detection bounding boxes (red) and tracked IDs (green)
        for (x, y, w, h) in detections:
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 2)
        for objectID, (centroid, bbox) in objects.items():
            (x, y, w, h) = bbox
            cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
            cv2.putText(frame, f"ID {objectID}", (x, y - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

        cv2.imshow("PETS09-S2L1 - Detection and Tracking", frame)
        key = cv2.waitKey(30) & 0xFF
        if key == ord("q"):
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

2025-03-15 13:34:52.566 python[67375:2633284] +[IMKClient subclass]: chose IMKClient_Modern
2025-03-15 13:34:52.566 python[67375:2633284] +[IMKInputSession subclass]: chose IMKInputSession_Modern


#### 3.1. Metrix Calculation - Pedestrian Detection and Performance Evaluation : 

This code evaluates a pedestrian detection system using HOG + SVM.

In [None]:
###########################################
# 1. Parse XML for Ground-Truth
###########################################
def parse_xml(xml_file):
    """
    Parses an XML file with the structure:
      <dataset>
         <frame number="0">
            <objectlist>
               <object id="9">
                  <box h="75.17" w="31.03" xc="514.7109" yc="195.2731"/>
               </object>
               ...
            </objectlist>
         </frame>
         <frame number="1"> ... </frame>
         ...
      </dataset>

    Returns a dictionary:
    ground_truth[frame_number] = [ (x, y, w, h), ... ]
    We ignore the 'id' attribute for detection metrics.
    """
    tree = ET.parse(xml_file)
    root = tree.getroot()

    ground_truth = {}
    for frame_elem in root.findall('frame'):
        frame_num = int(frame_elem.get('number'))
        objectlist = frame_elem.find('objectlist')
        boxes = []
        if objectlist is not None:
            for obj in objectlist.findall('object'):
                box_elem = obj.find('box')
                if box_elem is not None:
                    w = float(box_elem.get('w'))
                    h = float(box_elem.get('h'))
                    xc = float(box_elem.get('xc'))
                    yc = float(box_elem.get('yc'))
                    # Convert center coords to top-left
                    x = xc - w/2.0
                    y = yc - h/2.0
                    boxes.append((x, y, w, h))
        ground_truth[frame_num] = boxes
    return ground_truth

###########################################
# 2. IoU and Matching Functions
###########################################
def compute_iou(boxA, boxB):
    """
    Computes Intersection over Union (IoU) between two boxes in (x, y, w, h) format.
    """
    (xA, yA, wA, hA) = boxA
    (xB, yB, wB, hB) = boxB
    xA2, yA2 = xA + wA, yA + hA
    xB2, yB2 = xB + wB, yB + hB

    interX1 = max(xA, xB)
    interY1 = max(yA, yB)
    interX2 = min(xA2, xB2)
    interY2 = min(yA2, yB2)
    interW = max(0, interX2 - interX1)
    interH = max(0, interY2 - interY1)
    intersection = interW * interH

    areaA = wA * hA
    areaB = wB * hB
    union = areaA + areaB - intersection
    if union <= 0:
        return 0.0
    return intersection / float(union)

def match_boxes(pred_boxes, gt_boxes, iou_threshold=0.5):
    """
    Matches predicted boxes to ground-truth boxes using IoU.
    Returns:
      - tp_matches: list of tuples (pred_idx, gt_idx, iou_val)
      - fp_idxs: list of prediction indices not matched
      - fn_idxs: list of GT indices not matched
    """
    tp_matches = []
    used_gt = set()
    used_pred = set()

    for pred_idx, pbox in enumerate(pred_boxes):
        best_iou = 0
        best_gt_idx = None
        for gt_idx, gbox in enumerate(gt_boxes):
            if gt_idx in used_gt:
                continue
            iou_val = compute_iou(pbox, gbox)
            if iou_val > best_iou:
                best_iou = iou_val
                best_gt_idx = gt_idx
        if best_iou >= iou_threshold and best_gt_idx is not None:
            tp_matches.append((pred_idx, best_gt_idx, best_iou))
            used_gt.add(best_gt_idx)
            used_pred.add(pred_idx)

    fp_idxs = [i for i in range(len(pred_boxes)) if i not in used_pred]
    fn_idxs = [j for j in range(len(gt_boxes)) if j not in used_gt]
    return tp_matches, fp_idxs, fn_idxs

###########################################
# 3. HOG Detector Function with NMS
###########################################
def detect_pedestrians(frame, conf_thresh=0.4):
    """
    Runs HOG detection on a single frame.
    Returns bounding boxes in (x, y, w, h).
    """
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray_eq = cv2.equalizeHist(gray)

    hog = cv2.HOGDescriptor()
    hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
    (rects, weights) = hog.detectMultiScale(
        gray_eq,
        winStride=(4,4),
        padding=(8,8),
        scale=1.05
    )
    # Filter by confidence threshold
    filtered = []
    for (r, w_val) in zip(rects, weights):
        if w_val >= conf_thresh:
            filtered.append(r)

    if len(filtered) == 0:
        return []

    # NMS
    rects_np = np.array([[x, y, x+w, y+h] for (x,y,w,h) in filtered])
    picks = non_max_suppression(rects_np, probs=None, overlapThresh=0.65)
    boxes = [(x1, y1, (x2 - x1), (y2 - y1)) for (x1,y1,x2,y2) in picks]
    return boxes

###########################################
# 4. Evaluate Detector Over Video
###########################################
def evaluate_detector(ground_truth, video_path, detector, iou_threshold=0.5):
    """
    :param ground_truth: dict { frame_idx: [ (x,y,w,h), ... ] }
    :param video_path: path to the video
    :param detector: function that returns predicted boxes
    :param iou_threshold: IoU threshold for matching
    Returns detection metrics: (precision, recall, f1, avg_iou, dice)
    """
    cap = cv2.VideoCapture(video_path)
    frame_idx = 0

    total_tp = 0
    total_fp = 0
    total_fn = 0
    iou_sum = 0
    match_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # If this frame_idx not in ground_truth, skip
        # or treat it as empty ground-truth
        gt_boxes = ground_truth.get(frame_idx, [])

        # Optional: resize for consistent processing
        frame = imutils.resize(frame, width=800)

        # Run the detector
        pred_boxes = detector(frame)

        # Match predictions to ground-truth
        tp_matches, fp_idxs, fn_idxs = match_boxes(pred_boxes, gt_boxes, iou_threshold)
        total_tp += len(tp_matches)
        total_fp += len(fp_idxs)
        total_fn += len(fn_idxs)

        for (_, _, iou_val) in tp_matches:
            iou_sum += iou_val
            match_count += 1

        frame_idx += 1

    cap.release()

    precision = total_tp / float(total_tp + total_fp) if (total_tp + total_fp) > 0 else 0.0
    recall = total_tp / float(total_tp + total_fn) if (total_tp + total_fn) > 0 else 0.0
    f1 = 2.0 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
    avg_iou = iou_sum / match_count if match_count > 0 else 0.0
    dice = f1  # For bounding-box detection, F1 is typically the same as Dice

    return precision, recall, f1, avg_iou, dice

In [None]:
###########################################
# 5. Main
###########################################
def main():
    # 1) Parse XML
    ground_truth = parse_xml(xml_file)

    # 2) Evaluate HOG detection
    precision, recall, f1, avg_iou, dice = evaluate_detector(
        ground_truth,
        video_file,
        detector=lambda frm: detect_pedestrians(frm, conf_thresh=0.4),
        iou_threshold=0.5
    )

    print("=== DETECTION METRICS ===")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print(f"Avg IoU:   {avg_iou:.4f}")
    print(f"Dice:      {dice:.4f}")

if __name__ == "__main__":
    main()

=== DETECTION METRICS ===
Precision: 0.0013
Recall:    0.0004
F1 Score:  0.0006
Avg IoU:   0.5226
Dice:      0.0006


### 4. Hybrid Pedestrian Tracking Pipeline : Background Subtraction, HOG Detection, and Kalman-Hungarian Tracking

This hybrid pedestrian tracking pipeline enhances detection and tracking accuracy by combining multiple classical computer vision techniques:

- Background Subtraction (MOG2): Isolates moving objects by removing static background elements.
- HOG Detection: Identifies pedestrians using a pre-trained SVM classifier.
- Fusion of Detections: Merges background subtraction and HOG results for improved accuracy.
- Tracking with Kalman Filter: Predicts object positions to handle occlusions and maintain identity.
- Hungarian Algorithm for Assignment: Matches detections to tracked objects based on motion and appearance.
- Appearance-Based Matching (Color Histograms): Reduces ID switches by incorporating object color similarity.
- Non-Maximum Suppression (NMS): Eliminates redundant overlapping detections for cleaner results.

In [None]:
##############################################
# 1. Helper: Simple Color Histogram Extraction
##############################################
def extract_color_hist(frame, bbox, histSize=8):
    """
    Extract a simple HSV color histogram from 'bbox' in 'frame'.
    bbox = (x, y, w, h).
    Returns a 1D normalized histogram of length histSize*3.
    """
    (x, y, w, h) = bbox
    frame_h, frame_w = frame.shape[:2]
    x = max(0, min(x, frame_w - 1))
    y = max(0, min(y, frame_h - 1))
    w = max(0, min(w, frame_w - x))
    h = max(0, min(h, frame_h - y))
    roi = frame[y:y+h, x:x+w]
    if roi.size == 0:
        return np.zeros((histSize*3,), dtype=np.float32)

    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    h_hist = cv2.calcHist([hsv], [0], None, [histSize], [0, 180])
    s_hist = cv2.calcHist([hsv], [1], None, [histSize], [0, 256])
    v_hist = cv2.calcHist([hsv], [2], None, [histSize], [0, 256])

    h_hist = cv2.normalize(h_hist, h_hist, 0, 1, cv2.NORM_MINMAX).flatten()
    s_hist = cv2.normalize(s_hist, s_hist, 0, 1, cv2.NORM_MINMAX).flatten()
    v_hist = cv2.normalize(v_hist, v_hist, 0, 1, cv2.NORM_MINMAX).flatten()
    return np.concatenate([h_hist, s_hist, v_hist])

def bhattacharyya_distance(histA, histB):
    """
    Bhattacharyya distance between two normalized histograms.
    Lower = more similar.
    """
    overlap = np.sum(np.sqrt(histA * histB))
    overlap = max(0.0, min(overlap, 1.0))
    return np.sqrt(1.0 - overlap)

##############################################
# 2. Kalman Filter Helpers
##############################################
def create_kalman_filter():
    kf = cv2.KalmanFilter(4, 2)
    kf.transitionMatrix = np.array([[1, 0, 1, 0],
                                    [0, 1, 0, 1],
                                    [0, 0, 1, 0],
                                    [0, 0, 0, 1]], np.float32)
    kf.measurementMatrix = np.eye(2, 4, dtype=np.float32)
    kf.processNoiseCov = np.eye(4, 4, dtype=np.float32) * 0.03
    kf.measurementNoiseCov = np.eye(2, 2, dtype=np.float32) * 0.5
    kf.errorCovPost = np.eye(4, 4, dtype=np.float32)
    return kf

def kalman_predict_centroid(kf):
    pred = kf.predict()
    return (pred[0,0], pred[1,0])

def kalman_correct_centroid(kf, x, y):
    measurement = np.array([[np.float32(x)], [np.float32(y)]])
    kf.correct(measurement)

##############################################
# 3. Multi-Object Tracker: Kalman + Hungarian
#    with optional appearance cost
##############################################
class MultiObjectTracker:
    def __init__(self, max_disappeared=10, max_distance=30, alpha=0.8):
        """
        :param max_disappeared: frames to allow a track to vanish
        :param max_distance: max centroid distance for matching
        :param alpha: weighting factor for distance vs. color cost
                      cost = alpha*distance + (1-alpha)*colorDist
                      (1-alpha)=0 => purely distance-based
        """
        self.nextID = 0
        self.tracks = {}  # objectID -> { 'kf':..., 'bbox':..., 'hist':..., 'disappeared':... }
        self.max_disappeared = max_disappeared
        self.max_distance = max_distance
        self.alpha = alpha

    def register(self, centroid, bbox, hist):
        kf = create_kalman_filter()
        kalman_correct_centroid(kf, centroid[0], centroid[1])
        self.tracks[self.nextID] = {
            'kf': kf,
            'bbox': bbox,
            'hist': hist,
            'disappeared': 0
        }
        self.nextID += 1

    def deregister(self, objectID):
        del self.tracks[objectID]

    def predict_all(self):
        preds = {}
        for objID, data in self.tracks.items():
            (px, py) = kalman_predict_centroid(data['kf'])
            preds[objID] = (px, py)
        return preds

    def update(self, frame, detections):
        """
        :param frame: current frame for color hist extraction
        :param detections: list of (x, y, w, h) from combined detection
        """
        if len(detections) == 0:
            # Mark all tracks as disappeared
            for objID in list(self.tracks.keys()):
                self.tracks[objID]['disappeared'] += 1
                if self.tracks[objID]['disappeared'] > self.max_disappeared:
                    self.deregister(objID)
            return self.tracks

        # Compute centroids + color hist for each detection
        input_data = []
        for (x, y, w, h) in detections:
            cX = x + w/2.0
            cY = y + h/2.0
            hist = extract_color_hist(frame, (x, y, w, h), histSize=8)
            input_data.append((cX, cY, (x,y,w,h), hist))

        if len(self.tracks) == 0:
            # Register all
            for (cX, cY, box, hist) in input_data:
                self.register((cX, cY), box, hist)
        else:
            preds = self.predict_all()
            objIDs = list(self.tracks.keys())
            objCentroids = np.array([preds[id] for id in objIDs])

            # Build cost matrix (#tracks x #detections)
            cost_matrix = np.zeros((len(objIDs), len(input_data)), dtype=np.float32)

            for i, objID in enumerate(objIDs):
                (px, py) = objCentroids[i]
                objHist = self.tracks[objID]['hist']

                for j, (cX, cY, box, detHist) in enumerate(input_data):
                    dist_val = dist.euclidean((px, py), (cX, cY))
                    # Normalize distance to [0..1] w.r.t max_distance
                    norm_dist = dist_val / float(self.max_distance)
                    if norm_dist > 1.0:
                        norm_dist = 1.0
                    colorDist = bhattacharyya_distance(objHist, detHist)
                    # Weighted cost
                    costVal = self.alpha * norm_dist + (1.0 - self.alpha) * colorDist
                    cost_matrix[i, j] = costVal

            rows, cols = linear_sum_assignment(cost_matrix)
            used_rows = set()
            used_cols = set()

            for (row, col) in zip(rows, cols):
                costVal = cost_matrix[row, col]
                if costVal > 1.0:
                    # If costVal > 1, treat it as no match
                    continue
                objID = objIDs[row]
                (cX, cY, box, detHist) = input_data[col]
                (x, y, w, h) = box

                # Correct Kalman
                kalman_correct_centroid(self.tracks[objID]['kf'], cX, cY)
                self.tracks[objID]['bbox'] = (x, y, w, h)
                self.tracks[objID]['hist'] = detHist
                self.tracks[objID]['disappeared'] = 0
                used_rows.add(row)
                used_cols.add(col)

            # Unmatched existing tracks => disappeared++
            unused_rows = set(range(cost_matrix.shape[0])) - used_rows
            for row in unused_rows:
                objID = objIDs[row]
                self.tracks[objID]['disappeared'] += 1
                if self.tracks[objID]['disappeared'] > self.max_disappeared:
                    self.deregister(objID)

            # Unmatched detections => register new
            unused_cols = set(range(cost_matrix.shape[1])) - used_cols
            for col in unused_cols:
                (cX, cY, box, detHist) = input_data[col]
                self.register((cX, cY), box, detHist)

        return self.tracks

##############################################
# 4. Combined Detection: Background Sub + HOG
##############################################
def background_subtraction(frame, fgbg,
                           min_area=1000, max_area=20000,
                           min_ratio=0.3, max_ratio=3.0):
    """
    Returns bounding boxes from background subtraction + morphology.
    """
    fgmask = fgbg.apply(frame)

    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7,7))
    fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_OPEN, kernel, iterations=2)
    fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_CLOSE, kernel, iterations=2)

    contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    boxes = []
    for c in contours:
        x, y, w, h = cv2.boundingRect(c)
        area = w * h
        if area < min_area or area > max_area:
            continue
        ratio = w / float(h)
        if ratio < min_ratio or ratio > max_ratio:
            continue
        boxes.append((x, y, w, h))
    return boxes

def hog_detect(frame, conf_thresh=0.5):
    """
    Returns bounding boxes from HOG detection (with eqHist).
    """
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray_eq = cv2.equalizeHist(gray)
    hog = cv2.HOGDescriptor()
    hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
    (rects, weights) = hog.detectMultiScale(
        gray_eq,
        winStride=(4, 4),
        padding=(8, 8),
        scale=1.05
    )
    # Filter by confidence
    filtered = []
    for (r, w_val) in zip(rects, weights):
        if w_val >= conf_thresh:
            filtered.append(r)
    if len(filtered) == 0:
        return []
    # NMS
    rects_np = np.array([[x, y, x+w, y+h] for (x,y,w,h) in filtered])
    picks = non_max_suppression(rects_np, probs=None, overlapThresh=0.65)
    boxes = [(x1, y1, x2-x1, y2-y1) for (x1,y1,x2,y2) in picks]
    return boxes

def combine_detections(bg_boxes, hog_boxes, iou_thresh=0.3):
    """
    Merge background-subtractor boxes and HOG boxes.
    If a HOG box overlaps a BG box with IoU> iou_thresh, treat them as one.
    Otherwise, keep both.
    """
    all_boxes = bg_boxes + hog_boxes
    # Convert to x1,y1,x2,y2
    box_xyxy = []
    for (x,y,w,h) in all_boxes:
        box_xyxy.append([x, y, x+w, y+h])

    # Then do NMS with a lower threshold to unify overlapping boxes
    merged = non_max_suppression(np.array(box_xyxy), overlapThresh=iou_thresh)
    final = []
    for (x1,y1,x2,y2) in merged:
        final.append((x1, y1, x2-x1, y2-y1))
    return final

In [None]:
##############################################
# 5. Main
##############################################
def main():
    cap = cv2.VideoCapture(video_file)
    if not cap.isOpened():
        print("Error: Could not open video.")
        return

    # Background subtractor
    fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=40, detectShadows=False)

    # Multi-object tracker with appearance cost
    # alpha=0.8 => mostly distance-based, 0.2 => color-based
    tracker = MultiObjectTracker(max_disappeared=10, max_distance=30, alpha=0.8)

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame = imutils.resize(frame, width=800)

        # 1) Get BG-sub boxes
        bg_boxes = background_subtraction(frame, fgbg, min_area=1000, max_area=20000)
        # 2) Get HOG boxes
        hog_boxes = hog_detect(frame, conf_thresh=0.5)
        # 3) Combine them (some may overlap)
        combined_boxes = combine_detections(bg_boxes, hog_boxes, iou_thresh=0.3)

        # 4) Update tracker
        tracks = tracker.update(frame, combined_boxes)

        # 5) Visualization
        # Draw BG boxes in red, HOG boxes in blue, final tracked in green
        for (x,y,w,h) in bg_boxes:
            cv2.rectangle(frame, (x,y), (x+w,y+h), (0,0,255), 2)
        for (x,y,w,h) in hog_boxes:
            cv2.rectangle(frame, (x,y), (x+w,y+h), (255,0,0), 2)

        for objID, data in tracks.items():
            (bx, by, bw, bh) = data['bbox']
            cv2.rectangle(frame, (bx, by), (bx+bw, by+bh), (0,255,0), 2)
            cv2.putText(frame, f"ID {objID}", (bx, by - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)

        cv2.imshow("PETS09 - Combined Detection + Tracking", frame)
        key = cv2.waitKey(30) & 0xFF
        if key == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()

  overlap = np.sum(np.sqrt(histA * histB))


#### 4.1. Metrix Calculation 2 : Enhanced Performance Evaluation with Appearance-Based Tracking

This section extends the initial performance evaluation by integrating appearance-based matching into the tracking process. It improves pedestrian tracking by combining:

- Color Histogram Matching (HSV + Bhattacharyya Distance): Helps re-identify objects across frames.
- Motion Prediction (Kalman Filter): Handles object occlusions and missing detections.
- Detection Fusion (Background Subtraction + HOG): Improves robustness by merging multiple detection sources.
- Assignment Optimization (Hungarian Algorithm): Matches detections to tracked objects using both motion and appearance features.


In [None]:
###########################################
# 4. Color Histogram Extraction and Bhattacharyya
###########################################
def extract_color_hist(frame, bbox, histSize=8):
    """
    Extracts a simple HSV color histogram from the given bounding box in 'frame'.
    bbox = (x, y, w, h).
    Returns a normalized 1D histogram.
    """
    (x, y, w, h) = bbox
    frame_h, frame_w = frame.shape[:2]
    x = max(0, min(x, frame_w - 1))
    y = max(0, min(y, frame_h - 1))
    w = max(0, min(w, frame_w - x))
    h = max(0, min(h, frame_h - y))
    roi = frame[y:y+h, x:x+w]
    if roi.size == 0:
        return np.zeros((histSize*3,), dtype=np.float32)
    
    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
    h_hist = cv2.calcHist([hsv], [0], None, [histSize], [0,180])
    s_hist = cv2.calcHist([hsv], [1], None, [histSize], [0,256])
    v_hist = cv2.calcHist([hsv], [2], None, [histSize], [0,256])
    
    h_hist = cv2.normalize(h_hist, h_hist, 0, 1, cv2.NORM_MINMAX).flatten()
    s_hist = cv2.normalize(s_hist, s_hist, 0, 1, cv2.NORM_MINMAX).flatten()
    v_hist = cv2.normalize(v_hist, v_hist, 0, 1, cv2.NORM_MINMAX).flatten()
    return np.concatenate([h_hist, s_hist, v_hist])

def bhattacharyya_distance(histA, histB):
    overlap = np.sum(np.sqrt(histA * histB))
    overlap = max(0.0, min(overlap, 1.0))
    return np.sqrt(1.0 - overlap)

###########################################
# 5. Multi-Object Tracker: Kalman Filter + Hungarian Assignment
###########################################
class MultiObjectTracker:
    def __init__(self, max_disappeared=10, max_distance=30, alpha=0.8):
        """
        :param max_disappeared: frames allowed to miss before removal.
        :param max_distance: maximum allowed centroid distance for matching.
        :param alpha: weighting factor for combining normalized distance and color cost.
                      (cost = alpha*(distance/max_distance) + (1-alpha)*colorDist)
        """
        self.nextID = 0
        self.tracks = {}  # objectID -> { 'kf': ..., 'bbox': ..., 'hist': ..., 'disappeared': ... }
        self.max_disappeared = max_disappeared
        self.max_distance = max_distance
        self.alpha = alpha

    def register(self, centroid, bbox, hist):
        kf = create_kalman_filter()
        kalman_correct_centroid(kf, centroid[0], centroid[1])
        self.tracks[self.nextID] = {
            'kf': kf,
            'bbox': bbox,
            'hist': hist,
            'disappeared': 0
        }
        self.nextID += 1

    def deregister(self, objectID):
        del self.tracks[objectID]

    def predict_all(self):
        preds = {}
        for objID, data in self.tracks.items():
            preds[objID] = kalman_predict_centroid(data['kf'])
        return preds

    def update(self, frame, detections):
        """
        :param frame: current frame (for appearance extraction)
        :param detections: list of bounding boxes (x,y,w,h)
        :return: updated tracks dictionary
        """
        if len(detections) == 0:
            for objID in list(self.tracks.keys()):
                self.tracks[objID]['disappeared'] += 1
                if self.tracks[objID]['disappeared'] > self.max_disappeared:
                    self.deregister(objID)
            return self.tracks

        input_data = []
        for (x, y, w, h) in detections:
            cX = x + w/2.0
            cY = y + h/2.0
            hist = extract_color_hist(frame, (x, y, w, h), histSize=8)
            input_data.append((cX, cY, (x, y, w, h), hist))

        if len(self.tracks) == 0:
            for (cX, cY, box, hist) in input_data:
                self.register((cX, cY), box, hist)
        else:
            preds = self.predict_all()
            objIDs = list(self.tracks.keys())
            objCentroids = np.array([preds[objID] for objID in objIDs])
            cost_matrix = np.zeros((len(objIDs), len(input_data)), dtype=np.float32)

            for i, objID in enumerate(objIDs):
                (px, py) = objCentroids[i]
                objHist = self.tracks[objID]['hist']
                for j, (cX, cY, box, detHist) in enumerate(input_data):
                    d = dist.euclidean((px, py), (cX, cY))
                    norm_d = d / float(self.max_distance)
                    if norm_d > 1.0:
                        norm_d = 1.0
                    color_cost = bhattacharyya_distance(objHist, detHist)
                    cost = self.alpha * norm_d + (1.0 - self.alpha) * color_cost
                    cost_matrix[i, j] = cost

            rows, cols = linear_sum_assignment(cost_matrix)
            used_rows = set()
            used_cols = set()
            for (row, col) in zip(rows, cols):
                if cost_matrix[row, col] > 1.0:
                    continue
                objID = objIDs[row]
                (cX, cY, box, detHist) = input_data[col]
                (x, y, w, h) = box
                kalman_correct_centroid(self.tracks[objID]['kf'], cX, cY)
                self.tracks[objID]['bbox'] = box
                self.tracks[objID]['hist'] = detHist
                self.tracks[objID]['disappeared'] = 0
                used_rows.add(row)
                used_cols.add(col)

            unused_rows = set(range(cost_matrix.shape[0])) - used_rows
            for row in unused_rows:
                objID = objIDs[row]
                self.tracks[objID]['disappeared'] += 1
                if self.tracks[objID]['disappeared'] > self.max_disappeared:
                    self.deregister(objID)

            unused_cols = set(range(cost_matrix.shape[1])) - used_cols
            for col in unused_cols:
                (cX, cY, box, detHist) = input_data[col]
                self.register((cX, cY), box, detHist)

        return self.tracks

###########################################
# 6. Hybrid Detection: Combine BG Sub and HOG
###########################################
def background_subtraction(frame, fgbg, min_area=1000, max_area=20000, min_ratio=0.3, max_ratio=3.0):
    fgmask = fgbg.apply(frame)
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (7,7))
    fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_OPEN, kernel, iterations=2)
    fgmask = cv2.morphologyEx(fgmask, cv2.MORPH_CLOSE, kernel, iterations=2)
    contours, _ = cv2.findContours(fgmask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    boxes = []
    for c in contours:
        x, y, w, h = cv2.boundingRect(c)
        area = w * h
        if area < min_area or area > max_area:
            continue
        ratio = w / float(h)
        if ratio < min_ratio or ratio > max_ratio:
            continue
        boxes.append((x, y, w, h))
    return boxes

def hog_detect(frame, conf_thresh=0.5):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    gray_eq = cv2.equalizeHist(gray)
    hog = cv2.HOGDescriptor()
    hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
    (rects, weights) = hog.detectMultiScale(
        gray_eq,
        winStride=(4,4),
        padding=(8,8),
        scale=1.05
    )
    filtered = []
    for (r, w_val) in zip(rects, weights):
        if w_val >= conf_thresh:
            filtered.append(r)
    if len(filtered) == 0:
        return []
    rects_np = np.array([[x, y, x+w, y+h] for (x,y,w,h) in filtered])
    picks = non_max_suppression(rects_np, probs=None, overlapThresh=0.65)
    boxes = [(x1, y1, x2-x1, y2-y1) for (x1,y1,x2,y2) in picks]
    return boxes

def combine_detections(bg_boxes, hog_boxes, iou_thresh=0.3):
    all_boxes = bg_boxes + hog_boxes
    box_xyxy = []
    for (x, y, w, h) in all_boxes:
        box_xyxy.append([x, y, x+w, y+h])
    merged = non_max_suppression(np.array(box_xyxy), overlapThresh=iou_thresh)
    final = [(x1, y1, x2-x1, y2-y1) for (x1, y1, x2, y2) in merged]
    return final

def hybrid_detections(frame, fgbg, conf_thresh=0.5):
    bg_boxes = background_subtraction(frame, fgbg, min_area=1000, max_area=20000)
    hog_boxes = hog_detect(frame, conf_thresh=conf_thresh)
    combined = combine_detections(bg_boxes, hog_boxes, iou_thresh=0.3)
    return combined

###########################################
# 7. Evaluation for Hybrid Model
###########################################
def evaluate_hybrid(ground_truth, video_path, fgbg, tracker, iou_threshold=0.5):
    cap = cv2.VideoCapture(video_path)
    frame_idx = 0
    total_tp = 0
    total_fp = 0
    total_fn = 0
    iou_sum = 0
    match_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame = imutils.resize(frame, width=800)
        gt_boxes = ground_truth.get(frame_idx, [])
        combined_boxes = hybrid_detections(frame, fgbg, conf_thresh=0.5)
        tracks = tracker.update(frame, combined_boxes)
        pred_boxes = [data['bbox'] for data in tracks.values()]

        tp, fp, fn = match_boxes(pred_boxes, gt_boxes, iou_threshold)
        total_tp += len(tp)
        total_fp += len(fp)
        total_fn += len(fn)
        for (_, _, iou_val) in tp:
            iou_sum += iou_val
            match_count += 1
        frame_idx += 1

    cap.release()
    precision = total_tp / float(total_tp + total_fp) if (total_tp + total_fp) > 0 else 0
    recall = total_tp / float(total_tp + total_fn) if (total_tp + total_fn) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
    avg_iou = iou_sum / match_count if match_count > 0 else 0
    dice = f1
    return precision, recall, f1, avg_iou, dice

In [None]:
###########################################
# 8. Main Function
###########################################
def main():
    # Parse ground-truth annotations from XML
    ground_truth = parse_xml(xml_file)

    # Create background subtractor
    fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=40, detectShadows=False)

    # Create multi-object tracker instance
    tracker = MultiObjectTracker(max_disappeared=10, max_distance=30, alpha=0.8)

    # Evaluate the hybrid model
    precision, recall, f1, avg_iou, dice = evaluate_hybrid(ground_truth, video_file, fgbg, tracker, iou_threshold=0.5)
    
    print("=== HYBRID MODEL DETECTION METRICS ===")
    print(f"Precision: {precision:.4f}")
    print(f"Recall:    {recall:.4f}")
    print(f"F1 Score:  {f1:.4f}")
    print(f"Avg IoU:   {avg_iou:.4f}")
    print(f"Dice:      {dice:.4f}")

if __name__ == "__main__":
    main()

  overlap = np.sum(np.sqrt(histA * histB))


=== HYBRID MODEL DETECTION METRICS ===
Precision: 0.0079
Recall:    0.0030
F1 Score:  0.0044
Avg IoU:   0.5516
Dice:      0.0044


#### 4.2. Metrix Calculation 2 : Tracking Performance Evaluation using MOTA and MOTP : 

This section evaluates multi-object tracking accuracy and precision using two key metrics:

- MOTA (Multiple Object Tracking Accuracy): Measures overall tracking performance by accounting for false positives, false negatives, and ID switches.
- MOTP (Multiple Object Tracking Precision): Calculates the average Intersection over Union (IoU) to assess how accurately objects are localized.
- Hybrid Detection (MOG2 + HOG): Improves object detection by combining motion-based and feature-based methods.
- Kalman Filter + Hungarian Algorithm: Tracks objects across frames by predicting motion and optimizing assignments.
- Robust Tracking in Crowded Scenes: Enhances pedestrian tracking by integrating motion and appearance-based matching.

This evaluation ensures accurate and stable tracking in real-world, dynamic environments. 

In [None]:
###########################################
# 1. Parse XML for Ground-Truth Tracks
###########################################
def parse_xml_tracks(xml_file):
    """
    Parses the XML file containing ground-truth tracking annotations.
    Each <frame> element has a "number" attribute.
    Each <object> has an "id" and a <box> with attributes: w, h, xc, yc.
    Returns a dictionary:
       gt_tracks[frame_idx] = { gt_id: (x, y, w, h), ... }
    (Coordinates are converted from center-based to top-left-based.)
    """
    tree = ET.parse(xml_file)
    root = tree.getroot()
    gt_tracks = {}
    for frame_elem in root.findall('frame'):
        frame_num = int(frame_elem.get('number'))
        tracks = {}
        objectlist = frame_elem.find('objectlist')
        if objectlist is not None:
            for obj in objectlist.findall('object'):
                gt_id = int(obj.get('id'))
                box_elem = obj.find('box')
                if box_elem is not None:
                    w = float(box_elem.get('w'))
                    h = float(box_elem.get('h'))
                    xc = float(box_elem.get('xc'))
                    yc = float(box_elem.get('yc'))
                    x = xc - w / 2.0
                    y = yc - h / 2.0
                    tracks[gt_id] = (x, y, w, h)
        gt_tracks[frame_num] = tracks
    return gt_tracks

###########################################
# 7. Evaluate Hybrid Tracker (Tracking Metrics)
###########################################
def evaluate_tracking(gt_tracks, pred_tracks, iou_threshold=0.5):
    """
    Evaluates tracking performance between ground-truth and predicted tracks.
    
    Parameters:
      - gt_tracks: dict { frame_idx: { gt_id: (x,y,w,h), ... } }
      - pred_tracks: dict { frame_idx: { pred_id: (x,y,w,h), ... } }
      - iou_threshold: IoU threshold for matching
      
    Returns:
      - MOTA: 1 - (FN + FP + ID switches) / total_GT
      - MOTP: average IoU over matched detections
    """
    total_fp = 0
    total_fn = 0
    total_idsw = 0
    total_gt = 0
    iou_sum = 0
    total_matches = 0
    prev_assignment = {}  # gt_id -> pred_id from previous frame

    frames = sorted(gt_tracks.keys())
    for frame in frames:
        gt_frame = gt_tracks.get(frame, {})     # { gt_id: bbox }
        pred_frame = pred_tracks.get(frame, {})   # { pred_id: bbox }
        gt_ids = list(gt_frame.keys())
        pred_ids = list(pred_frame.keys())
        total_gt += len(gt_ids)
        
        if len(gt_ids) == 0:
            total_fp += len(pred_ids)
            prev_assignment = {}
            continue
        
        cost_matrix = np.zeros((len(gt_ids), len(pred_ids)), dtype=np.float32)
        for i, gt_id in enumerate(gt_ids):
            for j, pred_id in enumerate(pred_ids):
                iou_val = compute_iou(gt_frame[gt_id], pred_frame[pred_id])
                cost_matrix[i, j] = -iou_val  # maximize IoU
        
        rows, cols = linear_sum_assignment(cost_matrix)
        matched_gt = set()
        matched_pred = set()
        for i, j in zip(rows, cols):
            iou_val = -cost_matrix[i, j]
            if iou_val >= iou_threshold:
                gt_id = gt_ids[i]
                pred_id = pred_ids[j]
                matched_gt.add(gt_id)
                matched_pred.add(pred_id)
                iou_sum += iou_val
                total_matches += 1
                # Check ID switch: if previous frame had a different pred assignment for this gt_id
                if gt_id in prev_assignment and prev_assignment[gt_id] != pred_id:
                    total_idsw += 1
                prev_assignment[gt_id] = pred_id
        fp = len(pred_ids) - len(matched_pred)
        fn = len(gt_ids) - len(matched_gt)
        total_fp += fp
        total_fn += fn

    mota = 1.0 - float(total_fn + total_fp + total_idsw) / float(total_gt) if total_gt > 0 else 0
    motp = iou_sum / float(total_matches) if total_matches > 0 else 0
    return mota, motp

def evaluate_hybrid(ground_truth, video_path, fgbg, tracker, iou_threshold=0.5):
    """
    Evaluates the hybrid model over the video.
    Uses the final tracked bounding boxes (from the tracker) as predictions.
    :param ground_truth: dict { frame_idx: { gt_id: (x,y,w,h), ... } }
    :param video_path: path to the video file
    :param fgbg: background subtractor
    :param tracker: instance of MultiObjectTracker
    :param iou_threshold: IoU threshold for matching
    :return: tracking metrics: MOTA, MOTP, and also detection metrics.
    """
    cap = cv2.VideoCapture(video_path)
    frame_idx = 0
    pred_tracks = {}  # frame_idx -> { pred_id: bbox, ... }
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        frame = imutils.resize(frame, width=800)
        # Get hybrid detections from background subtraction and HOG
        combined_boxes = hybrid_detections(frame, fgbg, conf_thresh=0.5)
        # Update tracker with these detections
        tracks = tracker.update(frame, combined_boxes)
        # Save predicted tracks for this frame (tracker output: {pred_id: data})
        frame_preds = {}
        for pred_id, data in tracks.items():
            frame_preds[pred_id] = data['bbox']
        pred_tracks[frame_idx] = frame_preds
        frame_idx += 1
    
    cap.release()
    
    # Evaluate tracking: MOTA and MOTP using frame-by-frame matching
    mota, motp = evaluate_tracking(ground_truth, pred_tracks, iou_threshold)
    return mota, motp

In [None]:
###########################################
# 8. Main Function: Putting It All Together
###########################################
def main():
    # Parse ground-truth tracking annotations from XML.
    gt_tracks = parse_xml_tracks(xml_file)
    
    # Create background subtractor (tune parameters as needed)
    fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=40, detectShadows=False)
    
    # Create multi-object tracker instance
    tracker = MultiObjectTracker(max_disappeared=10, max_distance=30, alpha=0.8)
    
    # Evaluate the hybrid model for tracking
    mota, motp = evaluate_hybrid(gt_tracks, video_file, fgbg, tracker, iou_threshold=0.5)
    
    print("=== TRACKING METRICS ===")
    print(f"MOTA: {mota:.4f}")
    print(f"MOTP: {motp:.4f}")

if __name__ == "__main__":
    main()

  overlap = np.sum(np.sqrt(histA * histB))


=== TRACKING METRICS ===
MOTA: -0.3775
MOTP: 0.5631
