In [None]:
# OpenCV library for image and video processing
import cv2

# YOLO object detection model from Ultralytics
from ultralytics import YOLO

# PyTorch library (likely used by YOLO for model computations)
import torch

# Logging library for recording events and debugging
import logging

# Time library for measuring durations or adding delays
import time

# StringIO for creating in-memory text streams (useful for capturing outputs)
from io import StringIO

# Context manager utilities, often used with StringIO to capture output
import contextlib

# CSV module for reading/writing CSV files
import csv

# NumPy library for numerical operations on arrays
import numpy as np

# Linear sum assignment for optimal matching (used in tracking assignments)
from scipy.optimize import linear_sum_assignment

# Kalman Filter implementation from FilterPy for object tracking
from filterpy.kalman import KalmanFilter

# OS module for file and directory operations
import os


In [None]:
class KalmanTracker:
    """
    Object tracking class using a Kalman Filter with ghost track prevention.
    """
    def __init__(self, bbox, confidence, class_id, track_id):
        # Basic track information
        self.track_id = track_id  # Unique ID for this track
        self.class_id = class_id  # Detected object class
        self.confidence = confidence  # Current detection confidence
        self.age = 0  # Number of frames since track creation
        self.hits = 1  # Number of total hits
        self.time_since_update = 0  # Frames since last update
        self.max_age = 8  # Maximum age before deleting a track
        self.min_hits = 2  # Minimum hits to consider a track valid

        # Consecutive miss handling to prevent ghost tracks
        self.consecutive_misses = 0
        self.max_consecutive_misses = 3
        self.last_detection_confidence = confidence
        self.confidence_decay_rate = 0.95
        self.min_confidence = 0.1

        # Confidence and position history
        self.confidence_history = [confidence]
        self.position_history = []
        self.detection_buffer = []

        # Initialize Kalman Filter
        self.kf = KalmanFilter(dim_x=7, dim_z=4)

        # State transition matrix: [x, y, scale, ratio, dx, dy, dscale]
        self.kf.F = np.array([
            [1, 0, 0, 0, 1, 0, 0],
            [0, 1, 0, 0, 0, 1, 0],
            [0, 0, 1, 0, 0, 0, 1],
            [0, 0, 0, 1, 0, 0, 0],
            [0, 0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 0, 1, 0],
            [0, 0, 0, 0, 0, 0, 1]
        ])

        # Observation matrix
        self.kf.H = np.array([
            [1, 0, 0, 0, 0, 0, 0],
            [0, 1, 0, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0, 0],
            [0, 0, 0, 1, 0, 0, 0]
        ])

        # Noise covariances (more conservative)
        self.kf.R *= 1.0  # Measurement noise
        self.kf.P[4:, 4:] *= 100.0  # Velocity uncertainty
        self.kf.P *= 5.0  # Overall uncertainty
        self.kf.Q[-1, -1] *= 0.1  # Process noise
        self.kf.Q[4:, 4:] *= 0.1

        # Initialize state from bounding box
        bbox_center_x = (bbox[0] + bbox[2]) / 2
        bbox_center_y = (bbox[1] + bbox[3]) / 2
        bbox_scale = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
        bbox_ratio = (bbox[2] - bbox[0]) / max(bbox[3] - bbox[1], 1)  # Prevent division by zero

        self.kf.x[:4] = np.array([[bbox_center_x], [bbox_center_y], [bbox_scale], [bbox_ratio]])

        self.bbox = bbox
        self.original_bbox = bbox.copy()  # Save original detection bbox

    def update(self, bbox, confidence):
        """Update the track with a new detection"""
        self.time_since_update = 0
        self.hits += 1
        self.confidence = confidence
        self.last_detection_confidence = confidence
        self.consecutive_misses = 0  # Reset consecutive miss counter
        self.bbox = bbox

        # Update confidence history (keep last 10)
        self.confidence_history.append(confidence)
        if len(self.confidence_history) > 10:
            self.confidence_history.pop(0)

        # Update position history (keep last 5)
        center = [(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2]
        self.position_history.append(center)
        if len(self.position_history) > 5:
            self.position_history.pop(0)

        # Update detection buffer (keep last 5)
        self.detection_buffer.append({
            'confidence': confidence,
            'bbox': bbox,
            'frame_id': self.age
        })
        if len(self.detection_buffer) > 5:
            self.detection_buffer.pop(0)

        # Update Kalman filter with new observation
        bbox_center_x = (bbox[0] + bbox[2]) / 2
        bbox_center_y = (bbox[1] + bbox[3]) / 2
        bbox_scale = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
        bbox_ratio = (bbox[2] - bbox[0]) / max(bbox[3] - bbox[1], 1)

        self.kf.update(np.array([[bbox_center_x], [bbox_center_y], [bbox_scale], [bbox_ratio]]))

    def predict(self):
        """Predict the next position using the Kalman filter and handle ghost tracks"""
        if (self.kf.x[6] + self.kf.x[2]) <= 0:
            self.kf.x[6] *= 0.0

        self.kf.predict()
        self.age += 1

        if self.time_since_update > 0:
            self.hits = 0
            self.consecutive_misses += 1
            # Decay confidence if not updated
            self.confidence *= self.confidence_decay_rate

        self.time_since_update += 1

        # Calculate predicted bbox (more conservative)
        if self.kf.x[2] > 0 and self.kf.x[3] > 0:
            w = np.sqrt(abs(self.kf.x[2] * self.kf.x[3]))
            h = abs(self.kf.x[2]) / max(w, 1)

            x1 = self.kf.x[0][0] - w / 2
            y1 = self.kf.x[1][0] - h / 2
            x2 = self.kf.x[0][0] + w / 2
            y2 = self.kf.x[1][0] + h / 2

            # Keep bbox within reasonable limits
            if w > 10 and h > 10 and w < 1000 and h < 1000:
                self.bbox = [x1, y1, x2, y2]

        return self.bbox

    def get_state(self):
        """Return current bounding box state"""
        return self.bbox

    def is_valid(self):
        """Check if the track is still valid"""
        # Invalid if too many consecutive misses
        if self.consecutive_misses > self.max_consecutive_misses:
            return False

        # Invalid if confidence too low
        if self.confidence < self.min_confidence:
            return False

        # Invalid if too old and too few hits
        if self.age > self.max_age and self.hits < self.min_hits:
            return False

        # Invalid if bbox size is unreasonable
        if self.bbox:
            w = self.bbox[2] - self.bbox[0]
            h = self.bbox[3] - self.bbox[1]
            if w <= 0 or h <= 0 or w > 1000 or h > 1000:
                return False

        return True


In [None]:
def calculate_iou(bbox1, bbox2):
    """Calculate the Intersection over Union (IoU) between two bounding boxes."""

    # Compute coordinates of the intersection rectangle
    x1 = max(bbox1[0], bbox2[0])  # left
    y1 = max(bbox1[1], bbox2[1])  # top
    x2 = min(bbox1[2], bbox2[2])  # right
    y2 = min(bbox1[3], bbox2[3])  # bottom

    # If no overlap, IoU is 0
    if x2 <= x1 or y2 <= y1:
        return 0.0

    # Area of intersection
    intersection = (x2 - x1) * (y2 - y1)

    # Area of each bounding box
    area1 = (bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])
    area2 = (bbox2[2] - bbox2[0]) * (bbox2[3] - bbox2[1])

    # Area of union
    union = area1 + area2 - intersection

    # Return IoU (intersection over union), prevent division by zero
    return intersection / union if union > 0 else 0.0

In [None]:
def calculate_distance(bbox1, bbox2):
    """Calculate the Euclidean distance between the centers of two bounding boxes."""

    # Compute center coordinates of the first bbox
    center1 = [(bbox1[0] + bbox1[2]) / 2, (bbox1[1] + bbox1[3]) / 2]

    # Compute center coordinates of the second bbox
    center2 = [(bbox2[0] + bbox2[2]) / 2, (bbox2[1] + bbox2[3]) / 2]

    # Return the Euclidean distance between the two centers
    return np.sqrt((center1[0] - center2[0])**2 + (center1[1] - center2[1])**2)

In [None]:
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3, distance_threshold=100):
    """
    Advanced matching between detections and existing trackers using both IoU and distance.

    Parameters:
    - detections: list of detection dicts with 'bbox' key
    - trackers: list of KalmanTracker objects
    - iou_threshold: minimum IoU to consider a match
    - distance_threshold: maximum distance (pixels) for acceptable match

    Returns:
    - matches: array of matched detection-tracker indices
    - unmatched_detections: indices of detections that were not matched
    - unmatched_trackers: indices of trackers that were not matched
    """

    # If there are no existing trackers, all detections are unmatched
    if len(trackers) == 0:
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)

    # Create a cost matrix for matching: rows=detections, columns=trackers
    cost_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)

    for d, det in enumerate(detections):
        for t, trk in enumerate(trackers):
            # Compute IoU and distance between detection and tracker
            iou = calculate_iou(det['bbox'], trk.get_state())
            distance = calculate_distance(det['bbox'], trk.get_state())

            # Normalize distance to range 0-1
            normalized_distance = min(distance / distance_threshold, 1.0)

            # Combined cost: IoU dominates (weight 0.7), distance secondary (weight 0.3)
            cost = -(iou * 0.7 + (1 - normalized_distance) * 0.3)
            cost_matrix[d, t] = cost

    # Solve assignment problem using Hungarian algorithm
    if min(cost_matrix.shape) > 0:
        matched_indices = linear_sum_assignment(cost_matrix)
        matched_indices = np.array(list(zip(*matched_indices)))
    else:
        matched_indices = np.empty(shape=(0, 2))

    # Find unmatched detections
    unmatched_detections = []
    for d, det in enumerate(detections):
        if d not in matched_indices[:, 0]:
            unmatched_detections.append(d)

    # Find unmatched trackers
    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if t not in matched_indices[:, 1]:
            unmatched_trackers.append(t)

    # Filter out low-quality matches
    matches = []
    for m in matched_indices:
        det = detections[m[0]]
        trk = trackers[m[1]]

        iou = calculate_iou(det['bbox'], trk.get_state())
        distance = calculate_distance(det['bbox'], trk.get_state())

        # Strict matching criteria: either IoU above threshold or low distance
        if iou >= iou_threshold or (iou >= 0.1 and distance <= distance_threshold):
            matches.append(m.reshape(1, 2))
        else:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])

    # Convert matches to proper array
    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)

    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)

In [None]:
class ImprovedTracker:
    """Advanced tracker with ghost track prevention"""

    def __init__(self):
        # List of active KalmanTracker objects
        self.trackers = []

        # Frame counter
        self.frame_count = 0

        # ID for assigning new trackers
        self.next_id = 0

        # Conservative parameters
        self.max_age = 8                # Max frames before a track is removed
        self.min_hits = 2               # Minimum hits to consider a track valid
        self.iou_threshold = 0.3        # Minimum IoU for a match
        self.distance_threshold = 100   # Max allowed distance (pixels)
        self.conf_threshold_new = 0.4   # Confidence threshold for creating new tracks
        self.conf_threshold_existing = 0.2  # Confidence threshold for updating existing tracks

    def update(self, detections):
        """Update the tracker with new detections"""

        self.frame_count += 1

        # Remove invalid trackers (too old, low confidence, etc.)
        self.trackers = [t for t in self.trackers if t.is_valid()]

        # Predict next positions for all trackers
        for tracker in self.trackers:
            tracker.predict()

        # Advanced matching between detections and trackers
        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(
            detections, self.trackers, self.iou_threshold, self.distance_threshold)

        # Update matched trackers with new detections
        for m in matched:
            detection = detections[m[0]]
            tracker = self.trackers[m[1]]

            # Only update if detection has sufficient confidence
            if detection['confidence'] >= self.conf_threshold_existing:
                tracker.update(detection['bbox'], detection['confidence'])

        # Create new trackers for unmatched detections with high confidence
        for i in unmatched_dets:
            detection = detections[i]
            if detection['confidence'] >= self.conf_threshold_new:
                # Check bounding box size to avoid spurious tracks
                w = detection['bbox'][2] - detection['bbox'][0]
                h = detection['bbox'][3] - detection['bbox'][1]

                if w > 10 and h > 10 and w < 500 and h < 500:  # Reasonable size
                    tracker = KalmanTracker(
                        detection['bbox'],
                        detection['confidence'],
                        detection['class_id'],
                        self.next_id
                    )
                    self.trackers.append(tracker)
                    self.next_id += 1

        # Final filtering: keep only valid trackers
        self.trackers = [t for t in self.trackers if t.is_valid()]

        # Collect reliable results to return
        results = []
        for tracker in self.trackers:
            # Only return trustworthy tracks
            if (tracker.hits >= self.min_hits and
                tracker.confidence >= 0.15 and
                tracker.consecutive_misses <= 2):

                bbox = tracker.get_state()

                # Final bbox sanity check
                if bbox and len(bbox) == 4:
                    w, h = bbox[2] - bbox[0], bbox[3] - bbox[1]
                    if w > 5 and h > 5:  # Minimum size threshold
                        results.append({
                            'bbox': bbox,
                            'confidence': tracker.confidence,
                            'class_id': tracker.class_id,
                            'track_id': tracker.track_id,
                            'age': tracker.age,
                            'hits': tracker.hits,
                            'misses': tracker.consecutive_misses
                        })

        return results

In [None]:
def process_video_with_improved_tracking(model_path, video_path, output_path="output.mp4"):
    """Process a video using YOLO object detection and ImprovedTracker with ghost track prevention."""

    # Prepare output video name
    video_name = os.path.basename(video_path).split('.')[0]
    output_path = output_path.replace(".mp4", f"_{video_name}.mp4")

    # Select device: CUDA if available, otherwise CPU
    device = "cuda" if torch.cuda.is_available() else "cpu"
    logging.info(f"Using device: {device}")

    # Load YOLO model
    try:
        model = YOLO(model_path).to(device)
        logging.info("YOLO model loaded successfully.")
        logging.info(f"Model classes: {model.names}")
    except Exception as e:
        logging.error(f"Error loading YOLO model: {e}")
        return

    # Open video file
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        logging.error("Failed to open video file.")
        return

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    logging.info(f"Video properties: {frame_width}x{frame_height} @ {fps} FPS, {total_frames} frames total")

    # VideoWriter to save output video
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    # Prepare CSV file for logging tracked objects
    os.makedirs("outputs/csvs", exist_ok=True)
    csv_file = open(f"outputs/csvs/ghost_free_tracking_{video_name}.csv", "w", newline="")
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(["Frame", "Track_ID", "Class", "Confidence", "X1", "Y1", "X2", "Y2", "Age", "Hits", "Misses"])

    # Initialize tracker
    tracker = ImprovedTracker()
    frame_id = 0

    while True:
        start_time = time.time()

        # Read a frame
        ret, frame = cap.read()
        if not ret:
            logging.info("End of video.")
            break

        frame_id += 1
        if frame_id % 30 == 0:
            logging.info(f"Processing frame {frame_id}/{total_frames}")

        # YOLO prediction
        try:
            # Suppress YOLO stdout
            with contextlib.redirect_stdout(StringIO()):
                results = model.predict(frame, conf=0.2, verbose=False)

            # Prepare detections
            detections = []
            for result in results:
                if result.boxes is not None and len(result.boxes) > 0:
                    for bbox, conf, class_id in zip(result.boxes.xyxy.cpu().numpy(),
                                                   result.boxes.conf.cpu().numpy(),
                                                   result.boxes.cls.cpu().numpy().astype(int)):
                        # Ignore very small boxes
                        w, h = bbox[2] - bbox[0], bbox[3] - bbox[1]
                        if w > 10 and h > 10:
                            detections.append({
                                'bbox': bbox.tolist(),
                                'confidence': float(conf),
                                'class_id': int(class_id)
                            })
        except Exception as e:
            logging.error(f"Error in YOLO prediction for frame {frame_id}: {e}")
            detections = []

        # Update tracker with new detections
        tracked_objects = tracker.update(detections)

        # Draw tracked objects and save to CSV
        for obj in tracked_objects:
            bbox = obj['bbox']
            track_id = obj['track_id']
            confidence = obj['confidence']
            class_id = obj['class_id']
            age = obj['age']
            hits = obj['hits']
            misses = obj.get('misses', 0)

            # Convert bbox coordinates to integers and clip to frame
            left, top, right, bottom = map(int, bbox)
            left = max(0, min(left, frame_width-1))
            top = max(0, min(top, frame_height-1))
            right = max(left+1, min(right, frame_width))
            bottom = max(top+1, min(bottom, frame_height))

            # Choose color based on reliability (miss count)
            if misses == 0:
                color = (0, 255, 0)      # Green - reliable
            elif misses <= 1:
                color = (0, 255, 255)    # Yellow - medium
            else:
                color = (0, 100, 255)    # Orange - suspicious

            # Thickness proportional to confidence
            thickness = max(1, int(confidence * 4))
            cv2.rectangle(frame, (left, top), (right, bottom), color, thickness)

            # Draw label
            class_name = model.names.get(class_id, f"C{class_id}")
            label = f"ID:{track_id} {class_name} {confidence:.2f}"
            if misses > 0:
                label += f" M:{misses}"
            cv2.putText(frame, label, (left, top - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            # Save tracking info to CSV
            csv_writer.writerow([frame_id, track_id, class_id, f"{confidence:.2f}",
                                 left, top, right, bottom, age, hits, misses])

        # Overlay info text on frame
        info_y = 30
        cv2.putText(frame, f"Frame: {frame_id}/{total_frames}", (10, info_y),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        cv2.putText(frame, f"Detections: {len(detections)}", (10, info_y + 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        cv2.putText(frame, f"Active Tracks: {len(tracked_objects)}", (10, info_y + 60),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        # Compute and display FPS
        end_time = time.time()
        processing_fps = 1 / (end_time - start_time + 1e-6)
        cv2.putText(frame, f"FPS: {processing_fps:.1f}", (10, info_y + 90),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

        # Write processed frame to output video
        out.write(frame)

    # Release resources
    cap.release()
    out.release()
    csv_file.close()

    logging.info(f"Video processing complete. Output saved to {output_path}")
    logging.info(f"CSV saved to outputs/csvs/ghost_free_tracking_{video_name}.csv")

In [None]:
if __name__ == "__main__":
    # Run the video processing function with YOLO model and improved tracking
    process_video_with_improved_tracking(
        model_path="models/best.pt",   # Path to your trained YOLO model
        video_path="inputs/ornekvideo2021_7.5_FPS.mp4",  # Input video path
        output_path="outputs/processed_output_with_ghost_free_tracking.mp4"  # Output video path
    )
