In [1]:
import cv2
import numpy as np
from filterpy.kalman import KalmanFilter
import sys
import matplotlib.pyplot as plt 
from src.detection_routine import detect_objects

In [2]:

import yaml
from src.config import ImageProcessingParams

# 1) Load config
with open("config.yaml", 'r') as f:
    config = yaml.safe_load(f)

# 2) Create an ImageProcessingParams instance
ip_params = ImageProcessingParams(config["image_processing"])

In [3]:
def gaussian_pdf(innovation, mean, cov):
    # Calculate the Gaussian probability density (multivariate)
    k = len(innovation)
    cov_det = np.linalg.det(cov)
    cov_inv = np.linalg.inv(cov)
    norm_factor = 1 / (np.power((2 * np.pi), k/2) * np.sqrt(cov_det))
    exponent = -0.5 * innovation.T @ cov_inv @ innovation
    return norm_factor * np.exp(exponent)

In [4]:
# --------------------------
# Kalman Filter Initialization
# --------------------------
# We are tracking a state vector [x, y, vx, vy] with a measurement [x, y]
kf = cv2.KalmanFilter(4, 2)

# Define the state transition matrix (assumes dt=1 for simplicity)
kf.transitionMatrix = np.array([[1, 0, 1, 0],
                                [0, 1, 0, 1],
                                [0, 0, 1, 0],
                                [0, 0, 0, 1]], np.float32)

# Measurement matrix (we directly measure position)
kf.measurementMatrix = np.array([[1, 0, 0, 0],
                                 [0, 1, 0, 0]], np.float32)

# Process noise covariance
kf.processNoiseCov = np.eye(4, dtype=np.float32) * 0.1

# Measurement noise covariance
kf.measurementNoiseCov = np.eye(2, dtype=np.float32) * 0.1

# Error covariance matrix (post-update)
kf.errorCovPost = np.eye(4, dtype=np.float32)

# Initialize the state (if you have an initial detection, use it here)
kf.statePost = np.array([[0],
                         [0],
                         [0],
                         [0]], np.float32)


# --------------------------
# JPDAF Parameters
# --------------------------
gating_threshold = 5000       # Maximum allowed Mahalanobis distance for association
missed_detection_probability = 0.1  # Weight for a missed detection

# --------------------------
# Main Tracking Loop
# --------------------------
cap = cv2.VideoCapture("red_dot_video.mp4")  # Change to 0 for webcam or another video file

num_frames =90
while num_frames > 0:
    ret, frame = cap.read()
    if not ret:
        break
    num_frames -=1
    # 1. Prediction step: update the Kalman filter's internal state.
    kf.predict()
    # Retrieve the predicted state from statePre (cv2.KalmanFilter.predict() returns None)
    predicted_state = kf.statePre  # shape: (4, 1)
    predicted_state_flat = predicted_state.flatten()  # convert to 1D array: [x, y, vx, vy]

    detections = []
    # 2. Detection step: get candidate measurements
    rectified_frame,base_image, cx,cy,w,h, contour_mask = detect_objects(frame,debug=False)

    detections.append(np.array([cx, cy]))


    candidate_measurements = []
    probabilities = []

    # 3. Association (JPDAF–style)
    # For each candidate detection, compute the innovation and its likelihood.
    for measurement in detections:
        # Predicted measurement = H * predicted_state
        predicted_measurement = np.dot(kf.measurementMatrix, predicted_state)
        predicted_measurement_flat = predicted_measurement.flatten()
        innovation = measurement - predicted_measurement_flat

        # Innovation covariance: S = H * P_pre * H^T + R
        S = np.dot(kf.measurementMatrix, np.dot(kf.errorCovPre, kf.measurementMatrix.T)) + kf.measurementNoiseCov

        # Compute the Mahalanobis distance: sqrt(innovation^T * inv(S) * innovation)
        try:
            S_inv = np.linalg.inv(S)
        except np.linalg.LinAlgError:
            S_inv = np.linalg.pinv(S)
        distance = np.sqrt(np.dot(innovation.T, np.dot(S_inv, innovation)))

        print(distance)
        
        if distance < gating_threshold:
            # Compute the likelihood using the multivariate Gaussian probability density function.
            det_S = np.linalg.det(S)
            if det_S <= 0:
                det_S = 1e-6  # avoid division by zero
            norm_factor = 1.0 / (np.power(2 * np.pi, len(innovation) / 2) * np.sqrt(det_S))
            likelihood = norm_factor * np.exp(-0.5 * np.dot(innovation.T, np.dot(S_inv, innovation)))
            print(f"Detection: {measurement}, Distance: {distance}, Likelihood: {likelihood}")
            candidate_measurements.append(measurement)
            probabilities.append(likelihood)

    # Normalize the probabilities, including the possibility of missed detection.
    total_prob = sum(probabilities) + missed_detection_probability
    if total_prob != 0:
        probabilities = [p / total_prob for p in probabilities]

    print(probabilities)
    # 4. Update: Combine candidate measurements into a weighted measurement.
    if candidate_measurements:
        # Weighted sum of all candidate detections.
        weighted_measurement = np.sum([p * m for p, m in zip(probabilities, candidate_measurements)], axis=0)
        measurement_vector = np.array([[weighted_measurement[0]],
                                       [weighted_measurement[1]]], np.float32)
        # Update the Kalman filter with the combined measurement.
        kf.correct(measurement_vector)
        # Draw the weighted measurement (green circle)
        cv2.circle(frame, (int(weighted_measurement[0]), int(weighted_measurement[1])), 5, (0, 255, 0), 2)
    else:
        # No detection update (e.g., missed detection)
        pass

    # 5. Visualization
    # Draw the predicted position (blue circle)
    cv2.circle(frame, (int(predicted_state_flat[0]), int(predicted_state_flat[1])), 5, (0, 255, 0), 2)
    # Draw all candidate detections that passed the gating (red dots)
    for det in candidate_measurements:
        cv2.circle(frame, (int(det[0]), int(det[1])), 3, (0, 0, 255), -1)

    cv2.imshow("Tracking", frame)
    if cv2.waitKey(30) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


HSV Values
{'min_h': 0, 'max_h': 0, 'min_s': 0, 'max_s': 0, 'min_v': 0, 'max_v': 0}
971.7503416099075
Detection: [ 959 1076], Distance: 971.7503416099075, Likelihood: 0.0
[0.0]
HSV Values
{'min_h': 0, 'max_h': 0, 'min_s': 0, 'max_s': 0, 'min_v': 0, 'max_v': 0}
1418.9414910445923
Detection: [ 959 1076], Distance: 1418.9414910445923, Likelihood: 0.0
[0.0]
HSV Values
{'min_h': 0, 'max_h': 0, 'min_s': 0, 'max_s': 0, 'min_v': 0, 'max_v': 0}
1714.1207711800764
Detection: [ 959 1076], Distance: 1714.1207711800764, Likelihood: 0.0
[0.0]
HSV Values
{'min_h': 0, 'max_h': 0, 'min_s': 0, 'max_s': 0, 'min_v': 0, 'max_v': 0}
1869.2226998017743
Detection: [ 959 1076], Distance: 1869.2226998017743, Likelihood: 0.0
[0.0]
HSV Values
{'min_h': 0, 'max_h': 0, 'min_s': 0, 'max_s': 0, 'min_v': 0, 'max_v': 0}
1913.9279952810275
Detection: [ 959 1076], Distance: 1913.9279952810275, Likelihood: 0.0
[0.0]
HSV Values
{'min_h': 0, 'max_h': 0, 'min_s': 0, 'max_s': 0, 'min_v': 0, 'max_v': 0}
1922.620595879189
Detec