In [17]:
import os
import sys
import time
import cv2
import matplotlib.pyplot as plt
from ultralytics import YOLO
from ultralytics import YOLOv10
from multiprocessing import freeze_support
import torch
import numpy as np
from collections import deque
from scipy.optimize import linear_sum_assignment
from filterpy.kalman import KalmanFilter

freeze_support()

In [18]:
# Set seed for reproducibility
seed = 0
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [19]:
n = "x"
# Load the YOLOv10 model
model = YOLOv10(f"chkpts/6DOF/v10{n}/yolov10{n}-detect-6dof/weights/best.pt")
model.to(device)

YOLOv10(
  (model): YOLOv10DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 80, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(80, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(80, 160, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C2f(
        (cv1): Conv(
          (conv): Conv2d(160, 160, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(400, 160, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(160, eps=0.001, momentum=0.03, affine=Tru

In [20]:
def assign_tooltip_to_tool(tools, tooltips):
    tooltip_assignment = [-1] * len(tooltips)
    for i, tooltip in enumerate(tooltips):
        closest_tool = None
        min_distance = float("inf")
        for j, tool in enumerate(tools):
            # Calculate distance between tooltip and tool center
            distance = np.linalg.norm(
                np.array([tooltip[0], tooltip[1]]) - np.array([tool[0], tool[1]])
            )
            if distance < min_distance:
                min_distance = distance
                closest_tool = j
        tooltip_assignment[i] = closest_tool
    return tooltip_assignment


def increase_confidence_based_on_previous_frame(boxes, confs, ids, previous_ids):
    # Placeholder logic for adjusting confidence based on previous frame
    # In this example, confidence is increased by 0.1 if the ID is consistent with the previous frame
    adjusted_confs = []
    for i, current_id in enumerate(ids):
        if current_id in previous_ids:
            adjusted_confs.append(
                min(confs[i] + 0.1, 1.0)
            )  # Increase confidence slightly
        else:
            adjusted_confs.append(confs[i])  # Keep confidence the same
    return adjusted_confs

In [21]:
def iou(bbox1, bbox2):
    """Compute the intersection over union of two sets of boxes."""
    x1, y1 = np.maximum(bbox1[:2], bbox2[:2])
    x2, y2 = np.minimum(bbox1[2:], bbox2[2:])
    intersection = np.prod(np.maximum(0, [x2 - x1, y2 - y1]))
    area1 = np.prod(bbox1[2:] - bbox1[:2])
    area2 = np.prod(bbox2[2:] - bbox2[:2])
    union = area1 + area2 - intersection
    return intersection / union if union > 0 else 0


class Track:
    def __init__(self, track_id, bbox, feature, max_age=30):
        self.track_id = track_id
        self.bbox = bbox
        self.features = deque([feature], maxlen=100)
        self.kf = self.create_kalman_filter(bbox)
        self.time_since_update = 0
        self.hit_streak = 0
        self.age = 0
        self.max_age = max_age
        self.confidence = 0

    def create_kalman_filter(self, bbox):
        """Create a Kalman filter for tracking bounding boxes."""
        kf = KalmanFilter(dim_x=7, dim_z=4)
        kf.F = np.array(
            [
                [1, 0, 0, 0, 1, 0, 0],
                [0, 1, 0, 0, 0, 1, 0],
                [0, 0, 1, 0, 0, 0, 1],
                [0, 0, 0, 1, 0, 0, 0],
                [0, 0, 0, 0, 1, 0, 0],
                [0, 0, 0, 0, 0, 1, 0],
                [0, 0, 0, 0, 0, 0, 1],
            ]
        )
        kf.H = np.array(
            [
                [1, 0, 0, 0, 0, 0, 0],
                [0, 1, 0, 0, 0, 0, 0],
                [0, 0, 0, 1, 0, 0, 0],
                [0, 0, 0, 0, 0, 0, 1],
            ]
        )
        kf.P[
            4:, 4:
        ] *= 1000.0  # Give high uncertainty to the unobservable initial velocities
        kf.P *= 10.0
        kf.R *= 0.01
        kf.x[:4] = bbox
        return kf

    def predict(self):
        """Predict the next state of the track."""
        self.kf.predict()
        self.age += 1
        self.time_since_update += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        return self.kf.x[:4].reshape(-1)

    def update(self, bbox, feature):
        """Update the track with a new bounding box and feature."""
        self.time_since_update = 0
        self.hit_streak += 1
        self.features.append(feature)
        self.kf.update(bbox)
        self.bbox = self.kf.x[:4].reshape(-1)
        self.confidence = min(
            1.0, self.confidence + 0.1
        )  # Increase confidence with each successful update


class DeepSort:
    def __init__(
        self, max_age=50, n_init=3, max_iou_distance=0.9, max_cosine_distance=0.5
    ):
        self.tracks = []
        self.next_id = 1
        self.max_age = max_age
        self.n_init = n_init
        self.max_iou_distance = max_iou_distance
        self.max_cosine_distance = max_cosine_distance

    def cosine_distance(self, features, targets):
        """Compute the cosine distance between features and targets."""
        if len(features) == 0 or len(targets) == 0:
            return np.zeros((len(features), len(targets)))
        features = np.array(features)
        targets = np.array(targets)
        return 1.0 - np.dot(features, targets.T) / (
            np.linalg.norm(features, axis=1, keepdims=True)
            * np.linalg.norm(targets, axis=1, keepdims=True).T
        )

    def match(self, detections):
        """Match detections to existing tracks based on IOU and appearance."""
        if len(self.tracks) == 0:
            return [], list(range(len(detections))), []

        iou_matrix = np.zeros((len(self.tracks), len(detections)), dtype=np.float32)
        for t, track in enumerate(self.tracks):
            for d, detection in enumerate(detections):
                iou_matrix[t, d] = iou(track.bbox, detection["bbox"])

        matched_indices = linear_sum_assignment(-iou_matrix)
        unmatched_tracks = list(set(range(len(self.tracks))) - set(matched_indices[0]))
        unmatched_detections = list(
            set(range(len(detections))) - set(matched_indices[1])
        )

        return matched_indices, unmatched_tracks, unmatched_detections

    def update_tracks(self, detections, frame):
        """Update the tracks with new detections."""
        matched_indices, unmatched_tracks, unmatched_detections = self.match(detections)

        # Debugging print statements
        print("Matched Indices: ", matched_indices)
        print("Unmatched Tracks: ", unmatched_tracks)
        print("Unmatched Detections: ", unmatched_detections)

        for t, d in zip(*matched_indices):
            self.tracks[t].update(detections[d]["bbox"], detections[d]["feature"])

        # Create new tracks for unmatched detections
        for d in unmatched_detections:
            self.tracks.append(
                Track(self.next_id, detections[d]["bbox"], detections[d]["feature"])
            )
            self.next_id += 1

        # Remove old tracks
        self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age]

        return self.tracks


# Use the DeepSort class with updated parameters for tracking
deepsort = DeepSort(
    max_age=50,  # Allow tracks to survive longer without updates
    n_init=3,  # Require more consecutive detections to establish a track
    max_iou_distance=0.9,  # Increase IOU threshold for matching
    max_cosine_distance=0.5,  # Increase cosine distance threshold for matching
)

In [22]:
from sympy import det


def euclidean_distance(bbox1, bbox2):
    """Compute the Euclidean distance between the centers of two bounding boxes."""
    center1 = np.array([(bbox1[0] + bbox1[2]) / 2, (bbox1[1] + bbox1[3]) / 2])
    center2 = np.array([(bbox2[0] + bbox2[2]) / 2, (bbox2[1] + bbox2[3]) / 2])
    return np.linalg.norm(center1 - center2)


def initialize_tracks(detections, max_tools=2):
    """Initialize tracks based on the highest confidence scores."""
    tracks = []
    detections = sorted(detections, key=lambda x: x["conf"], reverse=True)
    tools_count = 0
    for detection in detections:
        if tools_count < max_tools and detection["cls"] == 0:  # Tool
            tools_count += 1
            tracks.append(
                {
                    "id": tools_count,
                    "bbox": detection["bbox"],
                    "confidence": detection["conf"],
                    "type": "tool",
                }
            )
        elif tools_count <= max_tools and detection["cls"] == 1:  # Tooltip
            # Check if the tooltip belongs to an existing tool
            closest_tool = None
            closest_distance = float("inf")
            for track in tracks:
                if track["type"] == "tool":
                    distance = euclidean_distance(track["bbox"], detection["bbox"])
                    if distance < closest_distance:
                        closest_distance = distance
                        closest_tool = track

            if closest_tool:
                tracks.append(
                    {
                        "id": closest_tool["id"],
                        "bbox": detection["bbox"],
                        "confidence": detection["conf"],
                        "type": "tooltip",
                    }
                )

    return tracks


def match_tracks(tracks, detections, max_distance=50):
    """Match detections to existing tracks based on Euclidean distance."""
    matches = []
    for track in tracks:
        best_match = None
        best_distance = max_distance
        for detection in detections:
            distance = euclidean_distance(track["bbox"], detection["bbox"])
            if distance < best_distance:
                best_distance = distance
                best_match = detection
        if best_match:
            matches.append((track, best_match))
    return matches


def update_tracks(tracks, detections, max_distance=50, max_tools=2):
    """Update the tracks with new detections."""
    matched_tracks = []
    tools_tracked = 0
    tool_ids = {track["id"] for track in tracks if track["type"] == "tool"}

    for track, detection in match_tracks(tracks, detections, max_distance):
        track["bbox"] = detection["bbox"]
        track["confidence"] = min(1.0, track["confidence"] + 0.1)
        matched_tracks.append(track)
        if track["type"] == "tool":
            tools_tracked += 1

    # Handle missing tools if fewer than max_tools are tracked
    if tools_tracked < max_tools:
        missing_tools = max_tools - tools_tracked
        unmatched_detections = sorted(detections, key=lambda x: x["conf"], reverse=True)
        for detection in unmatched_detections:
            if detection["cls"] == 0:
                tools_tracked += 1
                track_id = tools_tracked
                matched_tracks.append(
                    {
                        "id": track_id,
                        "bbox": detection["bbox"],
                        "confidence": detection["conf"],
                        "type": "tool",
                    }
                )
                if tools_tracked == max_tools:
                    break

    return matched_tracks


def penalize_and_filter_tracks(tracks):
    """Penalize tracks that don't meet the criteria and filter them."""
    final_tracks = []
    for track in tracks:
        if track["type"] == "tool":
            tooltip_exists = any(
                t["type"] == "tooltip"
                and t["id"] == track["id"]
                and euclidean_distance(t["bbox"], track["bbox"]) < 100
                for t in tracks
            )
            if tooltip_exists:
                final_tracks.append(track)
            else:
                track["confidence"] = max(0, track["confidence"] - 0.2)
                if track["confidence"] > 0.2:
                    final_tracks.append(track)
        elif track["type"] == "tooltip":
            tool_exists = any(
                t["type"] == "tool"
                and t["id"] == track["id"]
                and euclidean_distance(t["bbox"], track["bbox"]) < 200
                for t in tracks
            )
            if tool_exists:
                final_tracks.append(track)
            else:
                track["confidence"] = max(0, track["confidence"] - 0.2)
                if track["confidence"] > 0.2:
                    final_tracks.append(track)

    return final_tracks

In [23]:
def visualize_tracking(model, video_path, n_init=10, max_tools=2):
    # Open the video file
    cap = cv2.VideoCapture(video_path)
    output_path = "data/6DOF/tracked_output.mp4"
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, 20.0, (int(cap.get(3)), int(cap.get(4))))

    tracks = []
    count = 0
    while cap.isOpened():
        count += 1
        ret, frame = cap.read()
        if not ret or count > 100:
            break

        # Perform inference
        results = model(frame, verbose=False)

        # Extract the required data from results
        boxes = results[0].boxes.xyxy.cpu().numpy()  # Bounding boxes
        confs = results[0].boxes.conf.cpu().numpy()  # Confidence scores
        classes = results[0].boxes.cls.cpu().numpy()  # Class IDs

        detections = [{"bbox": box, "conf": conf, "cls": cls} for box, conf, cls in zip(boxes, confs, classes)]

        if count % n_init == 0 or len(tracks) == 0:
            # Reinitialize every n_init frames or if no tracks
            tracks = initialize_tracks(detections, max_tools=max_tools)
        else:
            # Update the tracks
            tracks = update_tracks(tracks, detections, max_tools=max_tools)
            tracks = penalize_and_filter_tracks(tracks)

        # Draw bounding boxes and labels with tracking IDs
        for track in tracks:
            x1, y1, x2, y2 = map(int, track["bbox"])
            label = f"{track['type']}-{track['id']}"
            color = (0, 255, 0) if track["type"] == "tool" else (0, 0, 255)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(
                frame,
                label,
                (x1, y1 - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.9,
                color,
                2,
            )

        out.write(frame)

    cap.release()
    out.release()

In [24]:
# visualize_tracking(model, "data/6DOF/Dataset.mp4")

In [25]:
def load_images(input_path):
    images = []
    if os.path.isdir(input_path):
        paths = [f for f in os.listdir(input_path)]
        # remove all non image files 
        paths = [f for f in paths if f.endswith((".png", ".jpg", ".jpeg", ".bmp"))]
        # sorts based on filename - test5_0.png, test5_1.png, test5_2.png, ...
        paths.sort(key=lambda x: int(x.split("_")[-1].split(".")[0]))
        for filename in paths:
            img_path = os.path.join(input_path, filename)
            img = cv2.imread(img_path)
            images.append(img)
    else:
        cap = cv2.VideoCapture(input_path)
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            images.append(frame)
        cap.release()
    print("Loaded", len(images), "images.")
    return images


def relabel_ids_based_on_distance(results, prev_tool_positions):
    new_results = []
    for frame_idx, frame_results in enumerate(results):
        new_frame_results = []
        for det in frame_results.boxes:
            cls = det.cls.cpu().numpy()  # Move to CPU and convert to numpy
            conf = det.conf.cpu().numpy()  # Move to CPU and convert to numpy
            bbox = det.xyxy[0].cpu().numpy()  # Move to CPU and convert to numpy
            if det.id is not None:
                track_id = det.id.cpu().numpy()  # Move to CPU and convert to numpy
            else:
                track_id = -1  # Assign a default ID or handle the case appropriately


            new_det = {"cls": cls, "conf": conf, "bbox": bbox, "id": track_id}

            if cls == 1:  # Tooltip
                min_distance = float("inf")
                best_tool_id = track_id
                for tool_id, tool_bbox in prev_tool_positions.items():
                    tool_center = np.array(
                        [
                            (tool_bbox[0] + tool_bbox[2]) / 2,
                            (tool_bbox[1] + tool_bbox[3]) / 2,
                        ]
                    )
                    tip_center = np.array(
                        [(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2]
                    )
                    distance = np.linalg.norm(tool_center - tip_center)
                    if distance < min_distance:
                        min_distance = distance
                        best_tool_id = tool_id
                # Assign the tooltip ID to match the closest tool's ID
                new_det["id"] = best_tool_id
            else:  # Tool
                prev_tool_positions[int(track_id)] = (
                    bbox  # Update the position of the tool
                )

            new_frame_results.append(new_det)
        new_results.append(new_frame_results)
        
    print("Relabeling complete.")
    return new_results


def enforce_id_order(results):
    for frame_results in results:
        tools = [det for det in frame_results if det["cls"] == 0]
        if len(tools) > 2:
            tools = sorted(tools, key=lambda x: x["conf"], reverse=True)[:2]
        tools = sorted(
            tools, key=lambda x: x["bbox"][0]
        )  # Sort tools by x-coordinate (left to right)
        if len(tools) > 1:
            tools[0]["id"] = 1  # Leftmost tool gets ID 1
            tools[1]["id"] = 2  # Rightmost tool gets ID 2
        elif len(tools) == 1:
            tools[0]["id"] = 1  # Only one tool, assign ID 1

        # Assign tooltip IDs based on closest tool
        for det in frame_results:
            if det["cls"] == 1:  # Tooltip
                min_distance = float("inf")
                best_tool_id = det["id"]
                for tool in tools:
                    tool_center = np.array(
                        [
                            (tool["bbox"][0] + tool["bbox"][2]) / 2,
                            (tool["bbox"][1] + tool["bbox"][3]) / 2,
                        ]
                    )
                    tip_center = np.array(
                        [
                            (det["bbox"][0] + det["bbox"][2]) / 2,
                            (det["bbox"][1] + det["bbox"][3]) / 2,
                        ]
                    )
                    distance = np.linalg.norm(tool_center - tip_center)
                    if distance < min_distance:
                        min_distance = distance
                        best_tool_id = tool["id"]
                det["id"] = best_tool_id
    print("ID enforcement complete.")

def process_input(model, input_path, output_path):
    # Load images from the input path (video or directory)
    images = load_images(input_path)

    # Perform tracking on all images
    results = model.track(input_path, save=False, verbose=True, stream=True)

    torch.cuda.empty_cache()
    torch.cuda.empty_cache()

    # Initialize tracking correction
    prev_tool_positions = {}

    # Modify the IDs based on proximity (Euclidean distance)
    results = relabel_ids_based_on_distance(results, prev_tool_positions)

    # Enforce ID order (tool on left = 1, tool on right = 2)
    enforce_id_order(results)

    # Save the processed frames
    os.makedirs(output_path, exist_ok=True)
    # sort results by conf
    results = [sorted(frame_results, key=lambda x: x["conf"], reverse=True) for frame_results in results]
    for idx, frame_results in enumerate(results):
        frame = images[idx]
        for det in frame_results:
            x1, y1, x2, y2 = map(int, det["bbox"])
            label = (
                f"ID: {det['id']}, Class: {'Tool' if det['cls'] == 0 else 'Tooltip'}"
            )
            color = (0, 255, 0) if det["cls"] == 0 else (0, 0, 255)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(
                frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2
            )
        cv2.imwrite(os.path.join(output_path, f"frame_{idx:04d}.jpg"), frame)
    print("Processing complete. Frames saved in:", output_path)

    # Now go into the output directory and create a video
    output_video_path = os.path.join(output_path, "output.mp4")
    frame_files = [f for f in os.listdir(output_path) if f.endswith(".jpg")]
    frame_files.sort(key=lambda x: int(x.split("_")[-1].split(".")[0]))
    frame = cv2.imread(os.path.join(output_path, frame_files[0]))
    h, w, _ = frame.shape
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_video_path, fourcc, 20.0, (w, h))
    for frame_file in frame_files:
        frame = cv2.imread(os.path.join(output_path, frame_file))
        out.write(frame)
        # os.remove(os.path.join(output_path, frame_file))
    out.release()

    print("Processing complete. Frames saved in:", output_path)

In [26]:
# for n in ["n", "s", "m", "b", "l", "x"]:
# # for n in ["x"]:
#     model = YOLOv10(f"chkpts/6DOF/v10{n}/yolov10{n}-detect-6dof/weights/best.pt").to(device)
#     process_input(model, "data/6DOF/images/val", f"chkpts/6DOF/v10{n}/track")
#     print("Done with", n)

In [None]:
n = "x"
model = YOLOv10(f"chkpts/6DOF/v10{n}/yolov10{n}-detect-6dof/weights/best.pt").to(device)
model.track("H:\Data\\6DOF\Test 2 png", save=False, verbose=True, show=True)