# Re-identification in a Single Feed

In [None]:
import cv2
import numpy as np
import torch
from ultralytics import YOLO
from collections import defaultdict, deque
import time
from sklearn.metrics.pairwise import cosine_similarity
from scipy.spatial.distance import cdist
import pickle
import os
from IPython.display import HTML, display
import base64
from google.colab.patches import cv2_imshow
import matplotlib.pyplot as plt

In [None]:
class PlayerReIDTracker:
    """
    Real-time Player Re-Identification and Tracking System for Football

    This system uses YOLOv11 for detection and implements custom Re-ID logic
    to maintain consistent player identities across occlusions and re-entries.
    """

    def __init__(self, model_path, video_path, conf_threshold=0.3, iou_threshold=0.5):
        """
        Initialize the tracking system

        Args:
            model_path (str): Path to YOLOv11 model
            video_path (str): Path to input video
            conf_threshold (float): Confidence threshold for detections
            iou_threshold (float): IoU threshold for NMS
        """
        self.model = YOLO(model_path)
        self.video_path = video_path
        self.conf_threshold = conf_threshold
        self.iou_threshold = iou_threshold

        # Tracking parameters
        self.max_disappeared = 30  # Frames before considering a player gone
        self.max_distance = 150    # Maximum distance for ID assignment
        self.appearance_threshold = 0.6  # Cosine similarity threshold for Re-ID

        # Class mapping - ONLY goalkeeper and player
        self.class_names = {0: 'ball', 1: 'goalkeeper', 2: 'player', 3: 'referee'}
        self.target_classes = [1, 2]  # Focus ONLY on goalkeepers and players

        # Color filtering parameters for yellowish/green colors
        self.excluded_colors = [
            (127, 255, 0),   # #7fff00
            (153, 255, 51),  # #99ff33
            (178, 255, 102), # #b2ff66
            (204, 255, 153)  # #ccff99
        ]
        self.color_threshold = 0.3  # 30% threshold for color matching

        # Tracking state
        self.next_id = 0
        self.active_tracks = {}  # Currently active tracks
        self.disappeared_tracks = {}  # Recently disappeared tracks
        self.track_history = defaultdict(lambda: deque(maxlen=50))  # Track position history
        self.appearance_features = {}  # Store appearance features for Re-ID

        # Performance metrics
        self.frame_count = 0
        self.processing_times = []

        # Colab compatibility
        self.is_colab = self._check_colab_environment()

    def _check_colab_environment(self):
        """Check if running in Google Colab"""
        try:
            import google.colab
            return True
        except ImportError:
            return False

    def _has_excluded_color(self, frame, bbox):
        """
        Check if the bounding box region contains more than 30% of excluded colors

        Args:
            frame (np.array): Input frame
            bbox (tuple): Bounding box coordinates (x1, y1, x2, y2)

        Returns:
            bool: True if object should be excluded due to color
        """
        x1, y1, x2, y2 = map(int, bbox)

        # Ensure coordinates are within frame bounds
        h, w = frame.shape[:2]
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(w, x2), min(h, y2)

        if x2 <= x1 or y2 <= y1:
            return False

        # Extract ROI
        roi = frame[y1:y2, x1:x2]

        if roi.size == 0:
            return False

        # Convert BGR to RGB for color comparison
        roi_rgb = cv2.cvtColor(roi, cv2.COLOR_BGR2RGB)
        total_pixels = roi_rgb.shape[0] * roi_rgb.shape[1]

        excluded_pixel_count = 0

        # Check each excluded color
        for target_color in self.excluded_colors:
            # Create color range (allowing some tolerance)
            tolerance = 30
            lower_bound = np.array([max(0, c - tolerance) for c in target_color])
            upper_bound = np.array([min(255, c + tolerance) for c in target_color])

            # Create mask for this color range
            mask = cv2.inRange(roi_rgb, lower_bound, upper_bound)
            excluded_pixel_count += np.sum(mask > 0)

        # Calculate percentage of excluded color pixels
        excluded_percentage = excluded_pixel_count / total_pixels

        # Return True if more than 30% of pixels match excluded colors
        return excluded_percentage > self.color_threshold

    def extract_appearance_features(self, frame, bbox):
        """
        Extract appearance features from a bounding box region

        Args:
            frame (np.array): Input frame
            bbox (tuple): Bounding box coordinates (x1, y1, x2, y2)

        Returns:
            np.array: Normalized appearance feature vector
        """
        x1, y1, x2, y2 = map(int, bbox)

        # Ensure coordinates are within frame bounds
        h, w = frame.shape[:2]
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(w, x2), min(h, y2)

        if x2 <= x1 or y2 <= y1:
            return np.zeros(128)  # Return zero vector for invalid bbox

        # Extract ROI
        roi = frame[y1:y2, x1:x2]

        if roi.size == 0:
            return np.zeros(128)

        # Resize to standard size
        try:
            roi_resized = cv2.resize(roi, (64, 128))
        except:
            return np.zeros(128)

        # Extract color histogram features
        hist_features = []
        for i in range(3):  # BGR channels
            hist = cv2.calcHist([roi_resized], [i], None, [32], [0, 256])
            hist_features.extend(hist.flatten())

        # Extract texture features using LBP-like approach
        gray_roi = cv2.cvtColor(roi_resized, cv2.COLOR_BGR2GRAY)
        texture_features = self._extract_texture_features(gray_roi)

        # Combine features
        features = np.concatenate([hist_features, texture_features])

        # Normalize features
        norm = np.linalg.norm(features)
        if norm > 0:
            features = features / norm

        return features

    def _extract_texture_features(self, gray_image):
        """Extract simple texture features"""
        if gray_image.size == 0:
            return np.zeros(6)

        # Compute gradients
        grad_x = cv2.Sobel(gray_image, cv2.CV_64F, 1, 0, ksize=3)
        grad_y = cv2.Sobel(gray_image, cv2.CV_64F, 0, 1, ksize=3)

        # Compute statistics
        features = [
            np.mean(grad_x), np.std(grad_x),
            np.mean(grad_y), np.std(grad_y),
            np.mean(gray_image), np.std(gray_image)
        ]

        return np.array(features)

    def calculate_distance(self, box1, box2):
        """Calculate Euclidean distance between box centers"""
        center1 = [(box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2]
        center2 = [(box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2]

        return np.sqrt(sum((a - b) ** 2 for a, b in zip(center1, center2)))

    def calculate_iou(self, box1, box2):
        """Calculate IoU between two bounding boxes"""
        x1, y1, x2, y2 = box1
        x1_p, y1_p, x2_p, y2_p = box2

        # Calculate intersection
        xi1 = max(x1, x1_p)
        yi1 = max(y1, y1_p)
        xi2 = min(x2, x2_p)
        yi2 = min(y2, y2_p)

        if xi2 <= xi1 or yi2 <= yi1:
            return 0

        intersection = (xi2 - xi1) * (yi2 - yi1)

        # Calculate union
        area1 = (x2 - x1) * (y2 - y1)
        area2 = (x2_p - x1_p) * (y2_p - y1_p)
        union = area1 + area2 - intersection

        return intersection / union if union > 0 else 0

    def update_tracks(self, frame, detections):
        """
        Update tracking state with new detections

        Args:
            frame (np.array): Current frame
            detections (list): List of detection results

        Returns:
            dict: Updated tracking results
        """
        current_detections = []

        # Process detections with filtering
        for detection in detections:
            bbox = detection[:4]  # x1, y1, x2, y2
            conf = detection[4]
            class_id = int(detection[5])

            # ONLY process goalkeeper and player classes
            if class_id in self.target_classes and conf > self.conf_threshold:
                # Check if object has excluded colors
                if self._has_excluded_color(frame, bbox):
                    print(f"Excluding object due to yellowish/green color content > 30%")
                    continue

                # Extract appearance features
                features = self.extract_appearance_features(frame, bbox)

                current_detections.append({
                    'bbox': bbox,
                    'conf': conf,
                    'class_id': class_id,
                    'features': features
                })

        # Assignment phase
        assigned_tracks = {}
        unassigned_detections = list(range(len(current_detections)))

        if self.active_tracks:
            # Calculate cost matrix
            cost_matrix = self._calculate_cost_matrix(current_detections)

            # Perform assignment using greedy approach
            assignments = self._greedy_assignment(cost_matrix)

            # Update assigned tracks
            for track_id, det_idx in assignments:
                if det_idx < len(current_detections):
                    detection = current_detections[det_idx]

                    # Update track with new detection
                    if track_id in self.active_tracks:
                        self.active_tracks[track_id].update({
                            'bbox': detection['bbox'],
                            'conf': detection['conf'],
                            'class_id': detection['class_id'],
                            'features': detection['features'],
                            'age': self.active_tracks[track_id].get('age', 0) + 1,
                            'disappeared': 0
                        })
                        assigned_tracks[track_id] = self.active_tracks[track_id]

                        # Update track history (for center point only, not for drawing lines)
                        center = [(detection['bbox'][0] + detection['bbox'][2]) / 2,
                                (detection['bbox'][1] + detection['bbox'][3]) / 2]
                        self.track_history[track_id].append(center)

                    if det_idx in unassigned_detections:
                        unassigned_detections.remove(det_idx)

        # Handle unassigned detections
        for det_idx in unassigned_detections:
            detection = current_detections[det_idx]

            # Check if this detection matches any disappeared track (Re-ID)
            best_match_id = self._find_reid_match(detection['features'])

            if best_match_id is not None:
                # Reactivate disappeared track
                disappeared_track = self.disappeared_tracks[best_match_id]
                disappeared_track.update({
                    'bbox': detection['bbox'],
                    'conf': detection['conf'],
                    'class_id': detection['class_id'],
                    'features': detection['features'],
                    'disappeared': 0
                })
                self.active_tracks[best_match_id] = disappeared_track
                assigned_tracks[best_match_id] = disappeared_track

                # Remove from disappeared tracks
                if best_match_id in self.disappeared_tracks:
                    del self.disappeared_tracks[best_match_id]

                print(f"Re-ID successful: Track {best_match_id} reactivated")
            else:
                # Create new track
                new_track = {
                    'id': self.next_id,
                    'bbox': detection['bbox'],
                    'conf': detection['conf'],
                    'class_id': detection['class_id'],
                    'features': detection['features'],
                    'age': 1,
                    'disappeared': 0
                }
                self.active_tracks[self.next_id] = new_track
                assigned_tracks[self.next_id] = new_track
                self.appearance_features[self.next_id] = detection['features']

                # Initialize track history
                center = [(detection['bbox'][0] + detection['bbox'][2]) / 2,
                        (detection['bbox'][1] + detection['bbox'][3]) / 2]
                self.track_history[self.next_id].append(center)

                print(f"New track created: ID {self.next_id}")
                self.next_id += 1

        # Handle disappeared tracks
        disappeared_ids = []
        for track_id, track in self.active_tracks.items():
            if track_id not in assigned_tracks:
                track['disappeared'] += 1
                if track['disappeared'] > self.max_disappeared:
                    disappeared_ids.append(track_id)
                    print(f"Track {track_id} permanently lost")
                else:
                    self.disappeared_tracks[track_id] = track

        # Remove completely disappeared tracks
        for track_id in disappeared_ids:
            if track_id in self.active_tracks:
                del self.active_tracks[track_id]
            if track_id in self.disappeared_tracks:
                del self.disappeared_tracks[track_id]
            if track_id in self.appearance_features:
                del self.appearance_features[track_id]
            if track_id in self.track_history:
                del self.track_history[track_id]

        # Update active tracks
        self.active_tracks = assigned_tracks

        return assigned_tracks

    def _calculate_cost_matrix(self, detections):
        """Calculate cost matrix for assignment"""
        track_ids = list(self.active_tracks.keys())
        cost_matrix = np.full((len(track_ids), len(detections)), float('inf'))

        for i, track_id in enumerate(track_ids):
            track = self.active_tracks[track_id]
            track_bbox = track['bbox']

            for j, detection in enumerate(detections):
                det_bbox = detection['bbox']

                # Calculate spatial distance
                spatial_distance = self.calculate_distance(track_bbox, det_bbox)

                # Calculate appearance similarity
                try:
                    appearance_sim = cosine_similarity(
                        [track['features']], [detection['features']]
                    )[0][0]
                except:
                    appearance_sim = 0

                # Calculate IoU
                iou = self.calculate_iou(track_bbox, det_bbox)

                # Combined cost (lower is better)
                if spatial_distance < self.max_distance:
                    # Weighted combination of distance, appearance, and IoU
                    cost = (spatial_distance * 0.5) - (appearance_sim * 50) - (iou * 30)
                    cost_matrix[i, j] = cost

        return cost_matrix

    def _greedy_assignment(self, cost_matrix):
        """Simplified greedy assignment"""
        assignments = []
        track_ids = list(self.active_tracks.keys())

        used_tracks = set()
        used_detections = set()

        # Sort by cost
        costs = []
        for i in range(cost_matrix.shape[0]):
            for j in range(cost_matrix.shape[1]):
                if cost_matrix[i, j] != float('inf'):
                    costs.append((cost_matrix[i, j], track_ids[i], j))

        costs.sort()

        for cost, track_id, det_idx in costs:
            if track_id not in used_tracks and det_idx not in used_detections:
                assignments.append((track_id, det_idx))
                used_tracks.add(track_id)
                used_detections.add(det_idx)

        return assignments

    def _find_reid_match(self, features):
        """Find best Re-ID match from disappeared tracks"""
        best_match_id = None
        best_similarity = 0

        for track_id, track in self.disappeared_tracks.items():
            try:
                similarity = cosine_similarity([features], [track['features']])[0][0]

                if similarity > self.appearance_threshold and similarity > best_similarity:
                    best_similarity = similarity
                    best_match_id = track_id
            except:
                continue

        return best_match_id

    def process_video(self, output_path=None, display_interval=30, save_frames=False):
        """
        Process the entire video with tracking and Re-ID

        Args:
            output_path (str): Path to save output video
            display_interval (int): Show frame every N frames in Colab
            save_frames (bool): Save individual frames
        """
        cap = cv2.VideoCapture(self.video_path)

        if not cap.isOpened():
            raise ValueError(f"Cannot open video file: {self.video_path}")

        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Initialize video writer if output path provided
        writer = None
        if output_path:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        print(f"Processing video: {width}x{height} at {fps} FPS")
        print(f"Total frames: {total_frames}")
        print(f"Tracking only: Goalkeepers and Players")
        print(f"Excluding yellowish/green colored objects with >30% color content")

        frame_times = []
        frames_to_display = []

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            start_time = time.time()

            # Run detection
            results = self.model(frame, conf=self.conf_threshold, iou=self.iou_threshold, verbose=False)

            # Convert results to numpy array
            detections = []
            if results[0].boxes is not None:
                boxes = results[0].boxes.xyxy.cpu().numpy()
                confs = results[0].boxes.conf.cpu().numpy()
                classes = results[0].boxes.cls.cpu().numpy()

                for i in range(len(boxes)):
                    detections.append([
                        boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3],
                        confs[i], classes[i]
                    ])

            # Update tracking
            tracks = self.update_tracks(frame, detections)

            # Draw tracking results (without trajectory lines)
            annotated_frame = self.draw_tracks(frame, tracks)

            # Calculate processing time
            processing_time = time.time() - start_time
            frame_times.append(processing_time)

            # Add performance info
            cv2.putText(annotated_frame, f"Frame: {self.frame_count}/{total_frames}",
                       (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            cv2.putText(annotated_frame, f"FPS: {1/processing_time:.1f}",
                       (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            cv2.putText(annotated_frame, f"Active: {len(tracks)}",
                       (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            cv2.putText(annotated_frame, f"Disappeared: {len(self.disappeared_tracks)}",
                       (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

            # Save frame if writer available
            if writer:
                writer.write(annotated_frame)

            # Save frames for display in Colab
            if self.is_colab and (self.frame_count % display_interval == 0 or self.frame_count < 5):
                frames_to_display.append((self.frame_count, annotated_frame.copy()))

            # Save individual frames if requested
            if save_frames and self.frame_count % 30 == 0:
                frame_path = f"frame_{self.frame_count:04d}.jpg"
                cv2.imwrite(frame_path, annotated_frame)

            self.frame_count += 1

            # Print progress
            if self.frame_count % 30 == 0:
                avg_fps = 1 / np.mean(frame_times[-30:])
                progress = (self.frame_count / total_frames) * 100
                print(f"Progress: {progress:.1f}% - Frame {self.frame_count}/{total_frames} - Avg FPS: {avg_fps:.1f}")

        # Cleanup
        cap.release()
        if writer:
            writer.release()

        # Display frames in Colab
        if self.is_colab and frames_to_display:
            self._display_frames_in_colab(frames_to_display)

        # Print final statistics
        if frame_times:
            avg_processing_time = np.mean(frame_times)
            print(f"\n{'='*50}")
            print(f"PROCESSING COMPLETE!")
            print(f"{'='*50}")
            print(f"Total frames processed: {self.frame_count}")
            print(f"Average processing time: {avg_processing_time:.4f}s")
            print(f"Average FPS: {1/avg_processing_time:.1f}")
            print(f"Total unique players detected: {self.next_id}")
            print(f"Final active tracks: {len(self.active_tracks)}")
            print(f"Final disappeared tracks: {len(self.disappeared_tracks)}")

            if output_path:
                print(f"Output video saved to: {output_path}")

        return self.frame_count, self.next_id

    def _display_frames_in_colab(self, frames_to_display):
        """Display frames in Google Colab"""
        print(f"\nDisplaying {len(frames_to_display)} sample frames:")

        for frame_num, frame in frames_to_display:
            print(f"\n--- Frame {frame_num} ---")
            # Convert BGR to RGB for display
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            plt.figure(figsize=(12, 8))
            plt.imshow(frame_rgb)
            plt.title(f"Frame {frame_num} - Football Player Tracking (No Lines, Players/GK Only)")
            plt.axis('off')
            plt.show()

    def draw_tracks(self, frame, tracks):
        """Draw tracking results on frame WITHOUT trajectory lines"""
        annotated_frame = frame.copy()

        # Define colors for different classes
        colors = {
            1: (0, 255, 0),    # Goalkeeper - Green
            2: (255, 0, 0),    # Player - Blue
        }

        # Generate unique colors for each track ID
        track_colors = {}
        for track_id in tracks.keys():
            if track_id not in track_colors:
                # Generate a unique color based on track ID
                np.random.seed(track_id)
                color = tuple(np.random.randint(0, 255, 3).tolist())
                track_colors[track_id] = color

        for track_id, track in tracks.items():
            bbox = track['bbox']
            class_id = track['class_id']
            conf = track['conf']

            x1, y1, x2, y2 = map(int, bbox)

            # Use class color or unique track color
            color = colors.get(class_id, track_colors.get(track_id, (255, 255, 255)))

            # Draw bounding box
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)

            # Draw track ID and class
            label = f"ID:{track_id} {self.class_names[class_id]} {conf:.2f}"
            label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]

            # Draw label background
            cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10),
                         (x1 + label_size[0], y1), color, -1)

            # Draw label text
            cv2.putText(annotated_frame, label, (x1, y1 - 5),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

            # Draw center point
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2
            cv2.circle(annotated_frame, (center_x, center_y), 4, color, -1)

            # REMOVED: Trajectory line drawing
            # The track history is still maintained for Re-ID purposes but not drawn

        return annotated_frame




In [None]:

def main():
    """Main function to run the tracking system"""
    # Configuration
    MODEL_PATH = "/content/drive/MyDrive/Colab Notebooks/Assignment/P2_best.pt"
    VIDEO_PATH = "/content/drive/MyDrive/Colab Notebooks/Assignment/15sec_input_720p.mp4"
    OUTPUT_PATH = "/content/drive/MyDrive/Colab Notebooks/Assignment/output/tracked_output.mp4"

    # Tracking parameters
    CONF_THRESHOLD = 0.25  # Lowered for better detection
    IOU_THRESHOLD = 0.5

    print("🏈 Initializing Football Player Re-ID Tracking System...")
    print("=" * 60)
    print("✅ Modifications Applied:")
    print("   - Trajectory lines removed")
    print("   - Only tracking Goalkeepers and Players")
    print("   - Yellowish/green color filtering enabled")
    print("=" * 60)

    # Create output directory
    os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)

    # Initialize tracker
    tracker = PlayerReIDTracker(
        model_path=MODEL_PATH,
        video_path=VIDEO_PATH,
        conf_threshold=CONF_THRESHOLD,
        iou_threshold=IOU_THRESHOLD
    )

    # Process video
    try:
        total_frames, total_players = tracker.process_video(
            output_path=OUTPUT_PATH,
            display_interval=25,  # Show every 25th frame in Colab
            save_frames=False
        )

        print(f"\n✅ SUCCESS!")
        print(f"📊 Processed {total_frames} frames")
        print(f"👥 Detected {total_players} unique players")
        print(f"💾 Output saved to: {OUTPUT_PATH}")

    except Exception as e:
        print(f"❌ Error processing video: {e}")
        import traceback
        traceback.print_exc()
        return False

    return True


In [None]:
if __name__ == "__main__":
    success = main()
    if success:
        print("\n🎉 Tracking completed successfully!")
        print("🔧 Applied modifications:")
        print("   ✓ No trajectory lines")
        print("   ✓ Only Goalkeepers & Players tracked")
        print("   ✓ Yellowish/green objects filtered out")
    else:
        print("\n❌ Tracking failed!")