In [12]:
from ultralytics import YOLO
from PIL import Image
import cv2
import numpy as np
import torch
import torchvision.transforms as T
from torchvision.models import resnet18, ResNet18_Weights
from scipy.spatial.distance import cosine

In [8]:
model = YOLO('runs/detect/train9/weights/best.pt')


In [3]:

images_path = []
for i in range(0,2):
  images_path.append('test_images/image'+str(i)+'.jpg')

results = model(images_path)
output_path = []
for i in range(0,2):
  output_path.append('detection_results/detection_result'+str(i)+'.jpg')
  results[i].save(filename=output_path[i])
for i in range(0,2):
  results[i].show()




0: 640x640 4 persons, 27.1ms
1: 640x640 2 persons, 27.1ms
Speed: 9.5ms preprocess, 27.1ms inference, 2.8ms postprocess per image at shape (1, 3, 640, 640)


WORKING WITH VIDEOS FURTHER ON

In [40]:
class IdentityManager:
    def __init__(self):
        self.known_identities = {}  # {id: embedding}
        self.active_ids = set()     
        
        # Load feature extractor
        self.encoder = resnet18(weights=ResNet18_Weights.DEFAULT)
        self.encoder.eval() 
        
        self.preprocess = T.Compose([
            T.ToPILImage(),
            # FIX 1: Use proper ReID aspect ratio (Height, Width)
            T.Resize((256, 128)), 
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    def get_embedding(self, img_crop):
        input_tensor = self.preprocess(img_crop).unsqueeze(0)
        with torch.no_grad():
            embedding = self.encoder(input_tensor).flatten().numpy()
        return embedding

    def resolve_identity(self, img_crop, current_tracker_id):
        new_embedding = self.get_embedding(img_crop)
        
        # FIX 2: Stricter Threshold (Try 0.15, go lower if still failing)
        match_threshold = 0.20
        best_match_id = None
        min_dist = float('inf')

        for known_id, known_embedding in self.known_identities.items():
            if known_id in self.active_ids:
                continue
            
            dist = cosine(new_embedding, known_embedding)
            
            if dist < min_dist and dist < match_threshold:
                min_dist = dist
                best_match_id = known_id

        if best_match_id is not None:
            # FIX 3: Weighted Update (Prevent "Identity Theft")
            # We only blend 10% of the new image into the old memory.
            # This prevents one bad frame from ruining the ID.
            alpha = 0.40 
            old_emb = self.known_identities[best_match_id]
            avg_emb = (1 - alpha) * old_emb + alpha * new_embedding
            self.known_identities[best_match_id] = avg_emb
            
            return best_match_id
        else:
            self.known_identities[current_tracker_id] = new_embedding
            return current_tracker_id

In [3]:
video_path= []
for i in range(0,4):
  video_path.append('test_video/video'+str(i)+'.mp4')

output_video_path = []
for i in range(0,4):
  output_video_path.append('detection_results_video/detection_result'+str(i)+'.mp4')

In [41]:
# ... (Assume imports and IdentityManager class are defined above) ...

# Initialize the Model and Identity Manager
model = YOLO('runs/detect/train9/weights/best.pt') 
id_manager = IdentityManager()

# Use your existing path variables
video = cv2.VideoCapture(video_path[3])

if not video.isOpened():
    print(f"Error: Could not open video file")
    exit()

frame_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(video.get(cv2.CAP_PROP_FPS))

# Adjust the output FPS because you are skipping every 2nd frame
# If original is 30fps, your output effectively becomes 15fps
effective_fps = fps / 2 

fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path[3], fourcc, effective_fps, (frame_width, frame_height))

print("Processing video... This may take a moment.")

i = 0
track_history = {} 
MIN_FRAMES = 5

while video.isOpened():
    # Your frame skipping logic
    ret = video.grab() 
    if not ret:
        break
    
    i += 1
    if (i % 2 == 0): 
        continue
    
    success, frame = video.retrieve() 

    if success:
        # Run tracking
        results = model.track(frame, persist=True, conf=0.6, verbose=False)
        
        # 1. Reset active IDs for this frame so we don't duplicate
        id_manager.active_ids = set()
        
        # Check if any objects were detected
        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.int().cpu().numpy()
            classes = results[0].boxes.cls.int().cpu().numpy()

            # 1. Update Active IDs for ReID logic
            id_manager.active_ids = set()
            for t_id in track_ids:
                id_manager.active_ids.add(t_id)

            for box, track_id, cls in zip(boxes, track_ids, classes):
                
                # --- PROBATION LOGIC STARTS HERE ---
                
                # Increment the frame count for this specific track_id
                if track_id not in track_history:
                    track_history[track_id] = 0
                track_history[track_id] += 1

                # IF strictly less than threshold, SKIP everything for this person
                if track_history[track_id] <= MIN_FRAMES:
                    continue 
                
                # --- PROBATION PASSED: EXECUTE LOGIC ---

                x1, y1, x2, y2 = map(int, box)
                
                if cls == 0: # If Person
                    person_crop = frame[y1:y2, x1:x2]
                    
                    if person_crop.size > 0:
                        # Only run ReID on confirmed, stable tracks
                        final_id = id_manager.resolve_identity(person_crop, track_id)
                        
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
                        cv2.putText(frame, f"ID: {final_id}", (x1, y1 - 10), 
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
                    else:
                        # Optional: Draw other objects without ReID
                        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)

        # Write the manually drawn frame
        out.write(frame)
    else:
        break
    if i % 100 == 0:
        # Remove IDs from history if they are no longer being tracked by YOLO
        active_now = set(track_ids) if results[0].boxes.id is not None else set()
        # Keep only IDs that are currently active OR have been seen recently
        # (This is a simple version; strict cleanup is complex, but this helps)
        track_history = {k: v for k, v in track_history.items() if k in active_now}
video.release()
out.release()
cv2.destroyAllWindows()

print(f"Successfully saved tracked video")

Processing video... This may take a moment.
Successfully saved tracked video


In [11]:
video = cv2.VideoCapture(video_path[3])

# Check if video opened successfully
if not video.isOpened():
    print(f"Error: Could not open video file {video_path[1]}")
    exit()

# 4. Get video properties (width, height, fps)
frame_width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(video.get(cv2.CAP_PROP_FPS))

# 5. Define the codec and create VideoWriter object
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path[3], fourcc, fps, (frame_width, frame_height))

print("Processing video... This may take a moment.")

# i = 0
# 6. Loop through the video frames
while video.isOpened():
    ret = video.grab() 
    if not ret:
        break
    
    i+=1
    if(i % 2 == 0): 
      continue
    success, frame = video.read()

    if success:
        # grayFrame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        # grayRGB = cv2.cvtColor(grayFrame, cv2.COLOR_GRAY2BGR)
        # Run YOLOv8 tracking on the frame
        results = model.track(frame, persist=True, conf=0.3)

        annotated_frame = results[0].plot()
        out.write(annotated_frame)
    else:
        break

# 7. Release resources
video.release()
out.release()
# cv2.destroyAllWindows() # Only need this if you use cv2.imshow

print(f"Successfully saved tracked video")

Processing video... This may take a moment.

0: 384x640 (no detections), 12.0ms
Speed: 296.7ms preprocess, 12.0ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 11.8ms
Speed: 1.5ms preprocess, 11.8ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 11.7ms
Speed: 1.1ms preprocess, 11.7ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 11.7ms
Speed: 1.1ms preprocess, 11.7ms inference, 1.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 11.7ms
Speed: 1.2ms preprocess, 11.7ms inference, 0.9ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 11.9ms
Speed: 1.2ms preprocess, 11.9ms inference, 0.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detections), 11.7ms
Speed: 1.1ms preprocess, 11.7ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 (no detecti