In [2]:
import cv2
import torch
import numpy as np
from collections import deque

# ==========================
# Tracker Class Definition
# ==========================
class Tracker:
    def __init__(self, max_lost=30, distance_threshold=50):
        self.next_object_id = 0
        self.objects = dict()  # object_id: (center_x, center_y)
        self.lost = dict()     # object_id: number of consecutive frames lost
        self.max_lost = max_lost
        self.distance_threshold = distance_threshold
        self.trajectories = dict()  # object_id: deque of positions
        self.colors = dict()  # object_id: (B, G, R)

    def assign_colors(self, object_id):
        np.random.seed(object_id)
        color = tuple(np.random.randint(0, 255, size=3).tolist())
        self.colors[object_id] = color
        return color

    def update(self, detections):
        updated_objects = dict()

        if len(self.objects) == 0:
            for center in detections:
                self.objects[self.next_object_id] = center
                self.lost[self.next_object_id] = 0
                self.trajectories[self.next_object_id] = deque(maxlen=64)
                self.trajectories[self.next_object_id].append(center)
                self.assign_colors(self.next_object_id)
                self.next_object_id += 1
        else:
            object_ids = list(self.objects.keys())
            object_centers = list(self.objects.values())
            D = np.linalg.norm(np.array(object_centers)[:, np.newaxis] - np.array(detections), axis=2)
            rows = D.min(axis=1).argsort()
            cols = D.argmin(axis=1)[rows]

            assigned_detections = set()
            assigned_objects = set()

            for row, col in zip(rows, cols):
                if row in assigned_objects or col in assigned_detections:
                    continue
                if D[row, col] > self.distance_threshold:
                    continue
                object_id = object_ids[row]
                self.objects[object_id] = detections[col]
                self.lost[object_id] = 0
                self.trajectories[object_id].append(detections[col])
                updated_objects[object_id] = detections[col]
                assigned_objects.add(row)
                assigned_detections.add(col)

            for row, object_id in enumerate(object_ids):
                if row not in assigned_objects:
                    self.lost[object_id] += 1
                    if self.lost[object_id] > self.max_lost:
                        del self.objects[object_id]
                        del self.lost[object_id]
                        del self.trajectories[object_id]
                        del self.colors[object_id]

            for i, center in enumerate(detections):
                if i not in assigned_detections:
                    self.objects[self.next_object_id] = center
                    self.lost[self.next_object_id] = 0
                    self.trajectories[self.next_object_id] = deque(maxlen=64)
                    self.trajectories[self.next_object_id].append(center)
                    self.assign_colors(self.next_object_id)
                    self.next_object_id += 1

        return self.objects

# ==========================
# Helper Functions
# ==========================
def get_center(bbox):
    x_min, y_min, x_max, y_max = bbox
    center_x = int((x_min + x_max) / 2)
    center_y = int((y_min + y_max) / 2)
    return (center_x, center_y)

def draw_map(map_img, trajectories, colors, map_scale=1.0):
    for object_id, points in trajectories.items():
        color = colors[object_id]
        for i in range(1, len(points)):
            pt1 = (int(points[i-1][0] * map_scale), int(points[i-1][1] * map_scale))
            pt2 = (int(points[i][0] * map_scale), int(points[i][1] * map_scale))
            cv2.line(map_img, pt1, pt2, color, 2)
        if len(points) > 0:
            cv2.circle(map_img, (int(points[-1][0] * map_scale), int(points[-1][1] * map_scale)), 5, color, -1)

# ==========================
# Main Function
# ==========================
def main():
    # Load YOLOv5 model from torch.hub
    print("Loading YOLOv5 model...")
    yolo_model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
    yolo_model.conf = 0.4  # confidence threshold
    print("Model loaded.")

    # Load MiDaS model from torch.hub
    print("Loading MiDaS model...")
    midas = torch.hub.load('intel-isl/MiDaS', 'MiDaS_small')
    midas.to('cpu')
    midas.eval()

    transforms = torch.hub.load('intel-isl/MiDaS', 'transforms')
    transform = transforms.small_transform
    print("MiDaS model loaded.")

    # Initialize video capture (0 for default camera)
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open video capture.")
        return

    # Get frame dimensions
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Initialize tracker
    tracker = Tracker(max_lost=30, distance_threshold=80)

    # Initialize a blank map image (same size as frame or scaled)
    map_scale = 0.5
    map_width = int(frame_width * map_scale)
    map_height = int(frame_height * map_scale)
    map_img = np.ones((map_height, map_width, 3), dtype=np.uint8) * 255

    # Main loop
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Failed to grab frame.")
            break

        # Perform YOLOv5 detection
        yolo_results = yolo_model(frame)
        detections = yolo_results.xyxy[0].cpu().numpy()

        # Extract bounding boxes with confidence above threshold
        boxes = []
        for *bbox, conf, cls in detections:
            if conf < yolo_model.conf:
                continue
            boxes.append([int(coord) for coord in bbox])

        # Get centers of detected objects
        centers = [get_center(box) for box in boxes]

        # Update tracker with detected centers
        tracked_objects = tracker.update(centers)

        # Draw bounding boxes and labels on the frame
        for i, box in enumerate(boxes):
            center = centers[i]
            matched_id = None
            for object_id, obj_center in tracked_objects.items():
                if obj_center == center:
                    matched_id = object_id
                    break
            if matched_id is not None:
                color = tracker.colors[matched_id]
                cv2.rectangle(frame, (box[0], box[1]), (box[2], box[3]), color, 2)
                label = f'ID {matched_id}'
                cv2.putText(frame, label, (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Draw trajectories on the map
        map_img.fill(255)
        draw_map(map_img, tracker.trajectories, tracker.colors, map_scale=map_scale)

        # Perform MiDaS depth estimation
        img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img_transformed = transform(img_rgb).to('cpu')
        with torch.no_grad():
            depth_prediction = midas(img_transformed)
            depth_prediction = torch.nn.functional.interpolate(
                depth_prediction.unsqueeze(1),
                size=frame.shape[:2],
                mode='bicubic',
                align_corners=False
            ).squeeze().cpu().numpy()

        # Normalize and apply a colormap to the depth map
        depth_normalized = cv2.normalize(depth_prediction, None, 0, 255, cv2.NORM_MINMAX)
        depth_colormap = cv2.applyColorMap(depth_normalized.astype(np.uint8), cv2.COLORMAP_JET)

        # Combine the depth map with the trajectory map
        # combined_display = np.hstack((frame, depth_colormap, map_img))

        # # Display the combined output
        # cv2.imshow('YOLOv5 Object Tracking and Depth Map', combined_display)

        # Resize the frame and depth map to match the height of the map image
        frame_resized = cv2.resize(frame, (map_img.shape[1], map_img.shape[0]))
        depth_colormap_resized = cv2.resize(depth_colormap, (map_img.shape[1], map_img.shape[0]))

        # Combine the resized images
        combined_display = np.hstack((frame_resized, depth_colormap_resized, map_img))

        # Display the combined output
        cv2.imshow('YOLOv5 Object Tracking and Depth Map', combined_display)


        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


Loading YOLOv5 model...


Using cache found in /Users/adil/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2024-8-26 Python-3.11.7 torch-2.4.0 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


Model loaded.
Loading MiDaS model...


Using cache found in /Users/adil/.cache/torch/hub/intel-isl_MiDaS_master


Loading weights:  None


Using cache found in /Users/adil/.cache/torch/hub/rwightman_gen-efficientnet-pytorch_master
Using cache found in /Users/adil/.cache/torch/hub/intel-isl_MiDaS_master
  with amp.autocast(autocast):


MiDaS model loaded.


  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with a

ValueError: operands could not be broadcast together with shapes (1,1,2) (0,) 

: 

In [1]:
import cv2
import torch
import numpy as np
from collections import deque

# ==========================
# Tracker Class Definition
# ==========================
class Tracker:
    def __init__(self, max_lost=30, distance_threshold=50):
        self.next_object_id = 0
        self.objects = dict()  # object_id: (center_x, center_y)
        self.lost = dict()     # object_id: number of consecutive frames lost
        self.max_lost = max_lost
        self.distance_threshold = distance_threshold
        self.trajectories = dict()  # object_id: deque of positions
        self.colors = dict()  # object_id: (B, G, R)

    def assign_colors(self, object_id):
        np.random.seed(object_id)
        color = tuple(np.random.randint(0, 255, size=3).tolist())
        self.colors[object_id] = color
        return color

    def update(self, detections):
        updated_objects = dict()

        if len(self.objects) == 0:
            for center in detections:
                self.objects[self.next_object_id] = center
                self.lost[self.next_object_id] = 0
                self.trajectories[self.next_object_id] = deque(maxlen=64)
                self.trajectories[self.next_object_id].append(center)
                self.assign_colors(self.next_object_id)
                self.next_object_id += 1
        else:
            object_ids = list(self.objects.keys())
            object_centers = list(self.objects.values())
            if len(detections) > 0:
                D = np.linalg.norm(np.array(object_centers)[:, np.newaxis] - np.array(detections), axis=2)
                rows = D.min(axis=1).argsort()
                cols = D.argmin(axis=1)[rows]

                assigned_detections = set()
                assigned_objects = set()  # Initialize here

                for row, col in zip(rows, cols):
                    if row in assigned_objects or col in assigned_detections:
                        continue
                    if D[row, col] > self.distance_threshold:
                        continue
                    object_id = object_ids[row]
                    self.objects[object_id] = detections[col]
                    self.lost[object_id] = 0
                    self.trajectories[object_id].append(detections[col])
                    updated_objects[object_id] = detections[col]
                    assigned_objects.add(row)
                    assigned_detections.add(col)

            for row, object_id in enumerate(object_ids):
                if row not in assigned_objects:
                    self.lost[object_id] += 1
                    if self.lost[object_id] > self.max_lost:
                        del self.objects[object_id]
                        del self.lost[object_id]
                        del self.trajectories[object_id]
                        del self.colors[object_id]

            for i, center in enumerate(detections):
                if i not in assigned_detections:
                    self.objects[self.next_object_id] = center
                    self.lost[self.next_object_id] = 0
                    self.trajectories[self.next_object_id] = deque(maxlen=64)
                    self.trajectories[self.next_object_id].append(center)
                    self.assign_colors(self.next_object_id)
                    self.next_object_id += 1

        return self.objects


# ==========================
# Helper Functions
# ==========================
def get_center(bbox):
    x_min, y_min, x_max, y_max = bbox
    center_x = int((x_min + x_max) / 2)
    center_y = int((y_min + y_max) / 2)
    return (center_x, center_y)

def draw_map(map_img, trajectories, colors, map_scale=1.0):
    for object_id, points in trajectories.items():
        color = colors[object_id]
        for i in range(1, len(points)):
            pt1 = (int(points[i-1][0] * map_scale), int(points[i-1][1] * map_scale))
            pt2 = (int(points[i][0] * map_scale), int(points[i][1] * map_scale))
            cv2.line(map_img, pt1, pt2, color, 2)
        if len(points) > 0:
            cv2.circle(map_img, (int(points[-1][0] * map_scale), int(points[-1][1] * map_scale)), 5, color, -1)

# ==========================
# Main Function
# ==========================
def main():
    # Load YOLOv5 model from torch.hub
    print("Loading YOLOv5 model...")
    yolo_model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
    yolo_model.conf = 0.4  # confidence threshold
    print("Model loaded.")

    # Load MiDaS model from torch.hub
    print("Loading MiDaS model...")
    midas = torch.hub.load('intel-isl/MiDaS', 'MiDaS_small')
    midas.to('cpu')
    midas.eval()

    transforms = torch.hub.load('intel-isl/MiDaS', 'transforms')
    transform = transforms.small_transform
    print("MiDaS model loaded.")

    # Initialize video capture (0 for default camera)
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open video capture.")
        return

    # Get frame dimensions
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Initialize tracker
    tracker = Tracker(max_lost=30, distance_threshold=80)

    # Initialize a blank map image (same size as frame or scaled)
    map_scale = 0.5
    map_width = int(frame_width * map_scale)
    map_height = int(frame_height * map_scale)
    map_img = np.ones((map_height, map_width, 3), dtype=np.uint8) * 255

    # Main loop
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Failed to grab frame.")
            break

        # Perform MiDaS depth estimation
        img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img_transformed = transform(img_rgb).to('cpu')
        with torch.no_grad():
            depth_prediction = midas(img_transformed)
            depth_prediction = torch.nn.functional.interpolate(
                depth_prediction.unsqueeze(1),
                size=frame.shape[:2],
                mode='bicubic',
                align_corners=False
            ).squeeze().cpu().numpy()

        # Normalize and apply a colormap to the depth map
        depth_normalized = cv2.normalize(depth_prediction, None, 0, 255, cv2.NORM_MINMAX)
        depth_colormap = cv2.applyColorMap(depth_normalized.astype(np.uint8), cv2.COLORMAP_JET)

        # Perform YOLOv5 detection on the depth map
        yolo_results = yolo_model(depth_colormap)
        detections = yolo_results.xyxy[0].cpu().numpy()

        # Extract bounding boxes with confidence above threshold
        boxes = []
        for *bbox, conf, cls in detections:
            if conf < yolo_model.conf:
                continue
            boxes.append([int(coord) for coord in bbox])

        # Get centers of detected objects
        centers = [get_center(box) for box in boxes]

        # Update tracker with detected centers
        tracked_objects = tracker.update(centers)

        # Draw bounding boxes and labels on the depth map
        for i, box in enumerate(boxes):
            center = centers[i]
            matched_id = None
            for object_id, obj_center in tracked_objects.items():
                if obj_center == center:
                    matched_id = object_id
                    break
            if matched_id is not None:
                color = tracker.colors[matched_id]
                cv2.rectangle(depth_colormap, (box[0], box[1]), (box[2], box[3]), color, 2)
                label = f'ID {matched_id}'
                cv2.putText(depth_colormap, label, (box[0], box[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

        # Draw trajectories on the map
        map_img.fill(255)
        draw_map(map_img, tracker.trajectories, tracker.colors, map_scale=map_scale)

        # Resize the depth map and map image to match the height of the original frame
        depth_colormap_resized = cv2.resize(depth_colormap, (map_img.shape[1], map_img.shape[0]))

        # Combine the resized images
        combined_display = np.hstack((depth_colormap_resized, map_img))

        # Display the combined output
        cv2.imshow('Object Tracking on Depth Map', combined_display)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()


Loading YOLOv5 model...


Using cache found in /Users/adil/.cache/torch/hub/ultralytics_yolov5_master
YOLOv5 🚀 2024-8-26 Python-3.11.7 torch-2.4.0 CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


Model loaded.
Loading MiDaS model...


Using cache found in /Users/adil/.cache/torch/hub/intel-isl_MiDaS_master
  from .autonotebook import tqdm as notebook_tqdm


Loading weights:  None


Using cache found in /Users/adil/.cache/torch/hub/rwightman_gen-efficientnet-pytorch_master
Using cache found in /Users/adil/.cache/torch/hub/intel-isl_MiDaS_master


MiDaS model loaded.


  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):
  with amp.autocast(autocast):


UnboundLocalError: cannot access local variable 'assigned_objects' where it is not associated with a value

: 