In [9]:
import os
import cv2
import torch
import numpy as np
from ultralytics import YOLO
import subprocess
import sys
import torchvision.transforms as transforms
from collections import OrderedDict, defaultdict, deque
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from scipy.spatial.distance import cdist
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter
from torchvision.ops import nms

In [None]:
# Install required packages
def install_requirements():
    """Install required packages for object tracking"""
    packages = [
        'ultralytics',
        'opencv-python',
        'torch',
        'torchvision',
        'torchaudio',
        'scipy',
        'filterpy',
        'lap',
        'cython-bbox'
    ]
    
    for package in packages:
        try:
            subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])
            print(f"✓ {package} installed successfully")
        except subprocess.CalledProcessError:
            print(f"✗ Failed to install {package}")

# Install requirements
install_requirements()

In [11]:
class SimpleFeatureExtractor:
    """Simplified CNN feature extractor for re-identification"""
    
    def __init__(self, device='cuda' if torch.cuda.is_available() else 'cpu'):
        self.device = device
        
        # Use a simpler ResNet-based feature extractor
        self.model = models.resnet50(pretrained=True)
        self.model.fc = nn.Identity()  # Remove final classification layer
        self.model.to(self.device)
        self.model.eval()
        
        # Standard ImageNet preprocessing
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225])
        ])
        
    def extract_features(self, image_crops):
        """Extract features from image crops"""
        if len(image_crops) == 0:
            return np.array([])
        
        features = []
        with torch.no_grad():
            for crop in image_crops:
                if crop.size == 0:
                    continue
                    
                try:
                    # Ensure crop is valid
                    if crop.shape[0] < 10 or crop.shape[1] < 10:
                        continue
                        
                    # Convert BGR to RGB
                    crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
                    
                    # Transform and extract features
                    crop_tensor = self.transform(crop_rgb).unsqueeze(0).to(self.device)
                    feature = self.model(crop_tensor)
                    features.append(feature.cpu().numpy().flatten())
                except Exception as e:
                    print(f"Error processing crop: {e}")
                    continue
        
        return np.array(features) if features else np.array([])

In [12]:
class KalmanBoxTracker:
    """Kalman filter tracker for bounding boxes with enhanced features"""
    count = 0
    
    def __init__(self, bbox, class_id=0, conf=0.0):
        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        self.kf.F = np.array([
            [1, 0, 0, 0, 1, 0, 0],
            [0, 1, 0, 0, 0, 1, 0],
            [0, 0, 1, 0, 0, 0, 1],
            [0, 0, 0, 1, 0, 0, 0],
            [0, 0, 0, 0, 1, 0, 0],
            [0, 0, 0, 0, 0, 1, 0],
            [0, 0, 0, 0, 0, 0, 1]
        ])
        
        self.kf.H = np.array([
            [1, 0, 0, 0, 0, 0, 0],
            [0, 1, 0, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0, 0],
            [0, 0, 0, 1, 0, 0, 0]
        ])
        
        # Tuned noise parameters
        self.kf.R[2:, 2:] *= 5.
        self.kf.P[4:, 4:] *= 500.
        self.kf.P *= 5.
        self.kf.Q[-1, -1] *= 0.05
        self.kf.Q[4:, 4:] *= 0.05
        
        self.kf.x[:4] = self.convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0
        self.class_id = class_id
        self.conf = conf
        
        # Enhanced feature management
        self.feature_history = deque(maxlen=10)  # Store last 10 features
        self.max_similarity = 0.8  # Threshold for confident matching
        
    def update(self, bbox, class_id=0, conf=0.0, feature=None):
        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.class_id = class_id
        self.conf = max(self.conf, conf)  # Keep highest confidence
        self.kf.update(self.convert_bbox_to_z(bbox))
        
        # Store feature for re-identification
        if feature is not None:
            self.feature_history.append(feature)
    
    def predict(self):
        if (self.kf.x[6] + self.kf.x[2]) <= 0:
            self.kf.x[6] *= 0.0
        self.kf.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(self.convert_x_to_bbox(self.kf.x))
        return self.history[-1]
    
    def get_state(self):
        return self.convert_x_to_bbox(self.kf.x)
    
    def get_average_feature(self):
        if len(self.feature_history) == 0:
            return None
        return np.mean(self.feature_history, axis=0)
    
    def get_most_recent_feature(self):
        if len(self.feature_history) == 0:
            return None
        return self.feature_history[-1]
    
    @staticmethod
    def convert_bbox_to_z(bbox):
        w = bbox[2] - bbox[0]
        h = bbox[3] - bbox[1]
        x = bbox[0] + w/2.
        y = bbox[1] + h/2.
        s = w * h
        r = w / float(h)
        return np.array([x, y, s, r]).reshape((4, 1))
    
    @staticmethod
    def convert_x_to_bbox(x, score=None):
        w = np.sqrt(x[2] * x[3])
        h = x[2] / w
        if score is None:
            return np.array([x[0] - w/2., x[1] - h/2., x[0] + w/2., x[1] + h/2.]).reshape((1, 4))
        else:
            return np.array([x[0] - w/2., x[1] - h/2., x[0] + w/2., x[1] + h/2., score]).reshape((1, 5))

In [13]:
class DeepOCSORT:
    """Enhanced DeepOCSORT tracker with ResNet features"""
    
    def __init__(self, max_age=30, min_hits=3, iou_threshold=0.4, feature_threshold=0.7):
        self.max_age = max_age
        self.min_hits = min_hits
        self.iou_threshold = iou_threshold
        self.feature_threshold = feature_threshold
        self.trackers = []
        self.frame_count = 0
        self.temporal_window = 3  # Number of frames to consider for feature matching
        
        # Initialize feature extractor
        try:
            self.feature_extractor = SimpleFeatureExtractor()
            print("✓ Feature extractor initialized successfully")
        except Exception as e:
            print(f"⚠ Feature extractor initialization failed: {e}")
            self.feature_extractor = None
        
    def update(self, detections, frame):
        """Update tracker with detections"""
        self.frame_count += 1
        
        # Extract features from detection crops
        features = self._extract_features(detections, frame) if self.feature_extractor else []
        
        # Predict existing trackers
        trks = np.zeros((len(self.trackers), 5))
        to_del = []
        
        for t, trk in enumerate(self.trackers):
            pos = trk.predict()[0]
            trks[t] = [pos[0], pos[1], pos[2], pos[3], 0]
            if np.any(np.isnan(pos)):
                to_del.append(t)
        
        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        for t in reversed(to_del):
            self.trackers.pop(t)
        
        # Associate detections to trackers
        if len(detections) > 0:
            dets = np.array([[d[0], d[1], d[2], d[3], d[4]] for d in detections])
        else:
            dets = np.empty((0, 5))
        
        matched, unmatched_dets, unmatched_trks = self._associate_detections_to_trackers(
            dets, trks, features, self.iou_threshold)
        
        # Update matched trackers
        for m in matched:
            det_idx, trk_idx = m[0], m[1]
            feature = features[det_idx] if det_idx < len(features) else None
            self.trackers[trk_idx].update(
                dets[det_idx, :4], 
                int(detections[det_idx][5]), 
                dets[det_idx, 4],
                feature
            )
        
        # Create new trackers for unmatched detections
        for i in unmatched_dets:
            feature = features[i] if i < len(features) else None
            trk = KalmanBoxTracker(dets[i, :4], int(detections[i][5]), dets[i, 4])
            if feature is not None:
                trk.update(dets[i, :4], int(detections[i][5]), dets[i, 4], feature)
            self.trackers.append(trk)
        
        # Return confirmed tracks
        tracks = []
        for trk in self.trackers:
            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                d = trk.get_state()[0]
                tracks.append({
                    'id': trk.id,
                    'bbox': [d[0], d[1], d[2], d[3]],
                    'class_id': trk.class_id,
                    'conf': trk.conf
                })
        
        # Remove dead trackers
        i = len(self.trackers)
        for trk in reversed(self.trackers):
            if trk.time_since_update > self.max_age:
                self.trackers.pop(i - 1)
            i -= 1
        
        return tracks
    
    def _extract_features(self, detections, frame):
        """Extract features from detection crops with bounds checking"""
        if not self.feature_extractor:
            return []
            
        crops = []
        for det in detections:
            x1, y1, x2, y2 = map(int, det[:4])
            # Add bounds checking
            x1 = max(0, x1)
            y1 = max(0, y1)
            x2 = min(frame.shape[1], x2)
            y2 = min(frame.shape[0], y2)
            
            if x2 > x1 and y2 > y1:  # Valid crop
                crop = frame[y1:y2, x1:x2]
                crops.append(crop)
            else:
                crops.append(np.array([]))  # Empty crop
        
        if len(crops) == 0:
            return []
        
        try:
            features = self.feature_extractor.extract_features(crops)
            return features
        except Exception as e:
            print(f"Feature extraction error: {e}")
            return []
    
    def _associate_detections_to_trackers(self, detections, trackers, features, iou_threshold=0.3):
        """Enhanced association with both IoU and appearance features"""
        if len(trackers) == 0:
            return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)
        
        # IoU matching
        iou_matrix = np.zeros((len(detections), len(trackers)))
        for d, det in enumerate(detections):
            for t, trk in enumerate(trackers):
                iou_matrix[d, t] = self._iou(det, trk)
        
        # Appearance matching using cosine similarity
        appearance_matrix = np.zeros((len(detections), len(trackers)))
        if len(features) > 0 and self.feature_extractor:
            for d, det_feature in enumerate(features):
                for t, tracker in enumerate(self.trackers):
                    avg_feature = tracker.get_average_feature()
                    if avg_feature is not None:
                        try:
                            # Use cosine similarity
                            similarity = 1 - cdist(
                                det_feature.reshape(1, -1), 
                                avg_feature.reshape(1, -1), 
                                'cosine'
                            )[0][0]
                            appearance_matrix[d, t] = similarity
                        except Exception as e:
                            appearance_matrix[d, t] = 0
        
        # Combine metrics with adaptive weighting
        combined_matrix = np.zeros_like(iou_matrix)
        for d in range(len(detections)):
            for t in range(len(trackers)):
                if iou_matrix[d, t] > 0.1:  # Some overlap exists
                    combined_matrix[d, t] = 0.6 * iou_matrix[d, t] + 0.4 * appearance_matrix[d, t]
                else:  # No overlap, rely more on appearance
                    combined_matrix[d, t] = 0.2 * iou_matrix[d, t] + 0.8 * appearance_matrix[d, t]
        
        # Solve assignment problem
        if min(combined_matrix.shape) > 0:
            row_ind, col_ind = linear_sum_assignment(-combined_matrix)
            matched_indices = np.stack([row_ind, col_ind], axis=1)
        else:
            matched_indices = np.empty(shape=(0, 2))
        
        unmatched_detections = []
        for d, det in enumerate(detections):
            if d not in matched_indices[:, 0]:
                unmatched_detections.append(d)
        
        unmatched_trackers = []
        for t, trk in enumerate(trackers):
            if t not in matched_indices[:, 1]:
                unmatched_trackers.append(t)
        
        matches = []
        for m in matched_indices:
            # Only confirm matches with sufficient evidence
            if (combined_matrix[m[0], m[1]] < self.iou_threshold and 
                appearance_matrix[m[0], m[1]] < self.feature_threshold):
                unmatched_detections.append(m[0])
                unmatched_trackers.append(m[1])
            else:
                matches.append(m.reshape(1, 2))
        
        if len(matches) == 0:
            matches = np.empty((0, 2), dtype=int)
        else:
            matches = np.concatenate(matches, axis=0)
        
        return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
    
    def _iou(self, bb_test, bb_gt):
        """Calculate IoU between two bounding boxes"""
        xx1 = np.maximum(bb_test[0], bb_gt[0])
        yy1 = np.maximum(bb_test[1], bb_gt[1])
        xx2 = np.minimum(bb_test[2], bb_gt[2])
        yy2 = np.minimum(bb_test[3], bb_gt[3])
        w = np.maximum(0., xx2 - xx1)
        h = np.maximum(0., yy2 - yy1)
        wh = w * h
        o = wh / ((bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) + 
                  (bb_gt[2] - bb_gt[0]) * (bb_gt[3] - bb_gt[1]) - wh)
        return o

In [14]:
class ObjectTracker:
    """Enhanced Object Tracker with YOLOv8 + DeepOCSORT + ResNet features"""
    
    def __init__(self, model_path='yolov8x.pt', conf_threshold=0.25, iou_threshold=0.45):
        self.conf_threshold = conf_threshold
        self.iou_threshold = iou_threshold
        
        # Load YOLOv8 model
        print("Loading YOLOv8 model...")
        try:
            self.model = YOLO(model_path)
            print(f"✓ Model loaded successfully. Device: {self.model.device}")
        except Exception as e:
            print(f"✗ Model loading failed: {e}")
            raise
        
        # Initialize DeepOCSORT tracker
        print("Initializing DeepOCSORT tracker...")
        self.tracker = DeepOCSORT(
            max_age=30,
            min_hits=3,
            iou_threshold=0.2,
            feature_threshold=0.7
        )
        self.class_names = [
            'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck',
            'boat', 'traffic light', 'fire hydrant', 'stop sign', 'parking meter', 'bench',
            'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra',
            'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee',
            'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove',
            'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup',
            'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange',
            'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch',
            'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse',
            'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink',
            'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
            'toothbrush'
        ]
        
        # Color palette for visualization
        self.colors = np.random.randint(0, 255, size=(200, 3), dtype=np.uint8)
        
        # Tracking metrics
        self.total_detections = 0
        self.total_frames = 0

    def detect_objects(self, frame):
        """Enhanced object detection with error handling"""
        try:
            # Run YOLO detection
            results = self.model(frame, conf=self.conf_threshold, iou=self.iou_threshold, verbose=False)
            detections = []
            
            for result in results:
                if result.boxes is not None:
                    # Convert to numpy arrays
                    boxes = result.boxes.xyxy.cpu().numpy()
                    scores = result.boxes.conf.cpu().numpy()
                    class_ids = result.boxes.cls.cpu().numpy().astype(int)
                    
                    # Format detections
                    for box, score, cls in zip(boxes, scores, class_ids):
                        # Filter for relevant classes (vehicles + people)
                        if cls in [0, 1, 2, 3, 5, 6, 7, 8]:  # person, bicycle, car, motorcycle, bus, train, truck, boat
                            detections.append([*box, score, cls])
            
            self.total_detections += len(detections)
            return detections
            
        except Exception as e:
            print(f"Detection error: {e}")
            return []

    def draw_tracks(self, frame, tracks):
        """Enhanced visualization of tracking results"""
        annotated_frame = frame.copy()
        
        for track in tracks:
            track_id = track['id']
            bbox = track['bbox']
            class_id = track['class_id']
            conf = track['conf']
            
            x1, y1, x2, y2 = map(int, bbox)
            
            # Get color for this track
            color = tuple(map(int, self.colors[track_id % len(self.colors)]))
            
            # Draw bounding box
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
            
            # Draw track ID and class
            class_name = self.class_names[class_id] if class_id < len(self.class_names) else f"object"
            label = f"ID:{track_id} {class_name} {conf:.2f}"
            (text_width, text_height), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
            
            # Draw text background
            cv2.rectangle(annotated_frame, 
                         (x1, y1 - text_height - 10), 
                         (x1 + text_width + 10, y1), 
                         color, -1)
            
            # Put text
            cv2.putText(annotated_frame, label, (x1 + 5, y1 - 5), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
        
        # Add frame info
        cv2.putText(annotated_frame, f"Frame: {self.total_frames}", (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        cv2.putText(annotated_frame, f"Active tracks: {len(tracks)}", (10, 60),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        
        return annotated_frame

    def process_video(self, input_path, output_path):
        """Process video with enhanced object tracking"""
        print(f"Processing video: {input_path}")
        
        # Open video
        cap = cv2.VideoCapture(input_path)
        if not cap.isOpened():
            print(f"Error: Cannot open video {input_path}")
            return
        
        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        print(f"Video properties: {width}x{height}, {fps} FPS, {total_frames} frames")
        
        # Create output directory
        os.makedirs(os.path.dirname(output_path), exist_ok=True)
        
        # Initialize video writer
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        self.total_frames = 0
        self.total_detections = 0
        
        print("Starting object tracking...")
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            self.total_frames += 1
            
            # Detect objects
            detections = self.detect_objects(frame)
            
            # Track objects
            tracks = self.tracker.update(detections, frame)
            
            # Draw tracking results
            annotated_frame = self.draw_tracks(frame, tracks)
            
            # Write frame
            out.write(annotated_frame)
            
            # Progress update
            if self.total_frames % 30 == 0:
                progress = (self.total_frames / total_frames) * 100
                print(f"Progress: {progress:.1f}% - Active tracks: {len(tracks)}")
        
        # Release resources
        cap.release()
        out.release()
        
        print(f"\nTracking completed! Output saved to: {output_path}")

In [16]:
def main():
    """Main function to run enhanced object tracking"""
    
    # Define paths - UPDATE THESE TO YOUR PATHS
    input_video = "/kaggle/input/videofwces/MOT17-07-raw.webm"  # Change this to your input video path
    output_video = "/kaggle/working/output/tracked_video.mp4"  # Output path
    
    # Check if input video exists
    if not os.path.exists(input_video):
        print(f"Error: Input video not found at {input_video}")
        print("Please update the input_video path in the main() function")
        return
    
    # Initialize tracker
    print("Initializing object tracker...")
    tracker = ObjectTracker(
        model_path='yolov8x.pt',  # Using smaller model for stability
        conf_threshold=0.4,
        iou_threshold=0.45
    )
    
    # Process video
    tracker.process_video(input_video, output_video)
    
    print("Tracking completed successfully!")

if __name__ == "__main__":
    main()

Initializing object tracker...
Loading YOLOv8 model...
✓ Model loaded successfully. Device: cpu
Initializing DeepOCSORT tracker...
✓ Feature extractor initialized successfully
Processing video: /kaggle/input/videofwces/MOT17-07-raw.webm
Video properties: 960x540, 30 FPS, 500 frames
Starting object tracking...
Progress: 6.0% - Active tracks: 13
Progress: 12.0% - Active tracks: 11
Progress: 18.0% - Active tracks: 16
Progress: 24.0% - Active tracks: 10
Progress: 30.0% - Active tracks: 12
Progress: 36.0% - Active tracks: 13
Progress: 42.0% - Active tracks: 16
Progress: 48.0% - Active tracks: 15
Progress: 54.0% - Active tracks: 10
Progress: 60.0% - Active tracks: 12
Progress: 66.0% - Active tracks: 13
Progress: 72.0% - Active tracks: 13
Progress: 78.0% - Active tracks: 15
Progress: 84.0% - Active tracks: 15
Progress: 90.0% - Active tracks: 13
Progress: 96.0% - Active tracks: 13

Tracking completed! Output saved to: /kaggle/working/output/tracked_video.mp4
Tracking completed successfully!
