In [6]:
import typing

import cv2
import cv2.aruco as aruco
import numpy as np

aruco_dict = aruco.getPredefinedDictionary(aruco.DICT_4X4_50)
code = 3
marker = aruco.generateImageMarker(aruco_dict, code, 120, borderBits=1)
cv2.imwrite(f'aruco{code}.png', marker)
# cv2.imshow('image', marker)
# cv2.waitKey(0)
# cv2.destroyAllWindows()

True

In [12]:
def add_margin(image, top, bottom, left, right, color=(0,0,0)):
    """Adds margin to an image.

    Args:
        image: A NumPy array representing the image.
        top: Margin size at the top.
        bottom: Margin size at the bottom.
        left: Margin size at the left.
        right: Margin size at the right.
        color: Color of the margin (default: black).

    Returns:
        A new NumPy array with the added margin.
    """
    height, width = image.shape[:2]
    new_height = height + top + bottom
    new_width = width + left + right

    if image.ndim == 3:
      new_image = np.zeros((new_height, new_width, image.shape[2]), dtype=image.dtype)
      new_image[:] = color
      new_image[top:top+height, left:left+width, :] = image
    else:
      new_image = np.zeros((new_height, new_width), dtype=image.dtype)
      new_image[:] = color[0]
      new_image[top:top+height, left:left+width] = image
    return new_image

image = cv2.imread('fiducial-test.jpg')
image = (image.astype(np.int16)).clip(0, 255).astype(np.uint8)
image = cv2.resize(image, (image.shape[1] // 4, image.shape[0] // 4), interpolation=cv2.INTER_AREA)
# image = cv2.rotate(image, 2)
# cv2.imshow('image', image)
cv2.waitKey(0)
cv2.destroyAllWindows()
# image = np.expand_dims(marker, axis=2).repeat(3, axis=2)
# image = add_margin(image, 50, 50, 50, 50, (255,255,255))
# print(image.min())
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
parameters =  aruco.DetectorParameters()
detector = aruco.ArucoDetector(aruco_dict, parameters)

# Detect ArUco markers
markerCorners, markerIds, rejectedCandidates = detector.detectMarkers(gray)

# Draw bounding boxes around the detected markers
if markerIds is not None:
    for i in range(len(markerIds)):
        corners = markerCorners[i].astype("int")
        cv2.polylines(image, [corners], True, (0, 255, 0), 2)
        cv2.putText(image, str(markerIds[i][0]), (corners[0][0, 0], corners[0][0, 1] - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Display the image with detected markers
    cv2.imshow('Detected ArUco Markers', image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
else:
    print("No ArUco markers detected in the image.")

In [3]:
from arucosort import Sort
from functorch import einops
import torch
import decord
import numpy as np
import cv2
import cv2.aruco as aruco
import typing
from pathlib import Path


aruco_dict = aruco.getPredefinedDictionary(aruco.DICT_4X4_50)
def video_frames_extractor(video_path: Path):
  vr = decord.VideoReader(str(video_path), ctx=decord.cpu(0))
  frames = []
  for i in range(len(vr)):
      frames.append(vr[i])
  frames_tensor = torch.stack(frames)
  frames_tensor = frames_tensor / 255.0
  frames_tensor = einops.rearrange(frames_tensor, "t h w c -> t c h w")
  # frames_tensor = NORMALIZER(frames_tensor)
  return frames_tensor


class TrackingAnnotator:
  def __init__(self, aruco_dict: aruco.Dictionary, annotate_frame):
    self.aruco_dict = aruco_dict
    self.annotate_frame = annotate_frame
    
  def annotate_frames(self, frames: typing.List[np.ndarray]):
    missing_frames = 0
    mot_tracker = Sort(max_age=1000, min_hits=1, iou_threshold=0.3)
    for frame in frames:
      gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
      parameters =  aruco.DetectorParameters()
      # parameters.cornerRefinementMethod = aruco.CORNER_REFINE_CONTOUR
      # parameters.cornerRefinementMaxIterations = 5
      detector = aruco.ArucoDetector(aruco_dict, parameters)
      
      # Detect ArUco markers
      markerCorners, markerIds, rejectedCandidates = detector.detectMarkers(gray)
      
      detections = []
      # Draw bounding boxes around the detected markers
      if markerIds is not None:
        detections = np.zeros((len(markerIds), 9))
        for i in range(len(markerIds)):
          corners = markerCorners[i].astype("int").reshape((8,))
          tracking_data = np.concatenate((corners, markerIds[i]))
          detections[i] = tracking_data
      else:
        detections = np.zeros((0, 9))
        missing_frames += 1
          
          # cv2.polylines(frame, [corners], True, (0, 255, 0), 2)
          # cv2.putText(frame, str(markerIds[i][0]), (corners[0][0, 0], corners[0][0, 1] - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
      tracked = mot_tracker.update(detections)
      unseen = {1, 2}
      for tracker in tracked:
        tracker = tracker.astype('int')
        corners = tracker[:8].reshape((1, 4, 2))
        # print('tracker:', tracker[-1])
        cv2.polylines(frame, [corners], True, (0, 255, 0), 2)
        cv2.putText(frame, str(tracker[-1]), (corners[0][0, 0], corners[0][0, 1] - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        # cv2.putText(frame, str(tracker[-1]), (100, 100), cv2.FONT_HERSHEY_SIMPLEX, 2.0, (0, 255, 0), 2)
        unseen.remove(tracker[-1])
      if 1 in unseen:
        missing_frames += 1
          
    print('Missing frames: {}'.format(missing_frames))
    

def draw_marker(image: np.ndarray):
  gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  parameters =  aruco.DetectorParameters()
  parameters.cornerRefinementMaxIterations = 2
  detector = aruco.ArucoDetector(aruco_dict, parameters)
  
  # Detect ArUco markers
  markerCorners, markerIds, rejectedCandidates = detector.detectMarkers(gray)
  
  # Draw bounding boxes around the detected markers
  if markerIds is not None:
      for i in range(len(markerIds)):
          corners = markerCorners[i].astype("int")
          print(corners.shape)
          cv2.polylines(image, [corners], True, (0, 255, 0), 2)
          cv2.putText(image, str(markerIds[i][0]), (corners[0][0, 0], corners[0][0, 1] - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
  
      # Display the image with detected markers
      # cv2.imshow('Detected ArUco Markers', image)
      # cv2.waitKey(0)
      # cv2.destroyAllWindows()
  else:
      print("No ArUco markers detected in the image.")
  return image


def label_frames(video_path: Path):
  vr = decord.VideoReader(str(video_path), ctx=decord.cpu(0))
  fps = vr.get_avg_fps()
  frames = []
  for i in range(len(vr)):
    frame = cv2.cvtColor(vr[i].asnumpy(), cv2.COLOR_RGB2BGR)
    # draw_marker(frame)
    frames.append(frame)
  annotator = TrackingAnnotator(aruco_dict, None)
  annotator.annotate_frames(frames)
  print(frames[0].shape)
  # fourcc = cv2.VideoWriterProperties(*'mp4v')  # Codec for video encoding
  video = cv2.VideoWriter('out.mp4', -1, fps, (frames[0].shape[1], frames[0].shape[0]))
  for frame in frames:
    video.write(frame)
    
  cv2.destroyAllWindows()
  video.release()
  
label_frames(Path('../data/RGB_2025-03-05-14_58_10.mp4'))

Missing frames: 41
(960, 720, 3)


In [24]:
from dataclasses import dataclass
from arucosort import Sort
from functorch import einops
import torch
import decord
import numpy as np
import cv2
import cv2.aruco as aruco
import typing
from pathlib import Path


aruco_dict = aruco.getPredefinedDictionary(aruco.DICT_4X4_50)
def video_frames_extractor(video_path: Path):
  vr = decord.VideoReader(str(video_path), ctx=decord.cpu(0))
  frames = []
  for i in range(len(vr)):
      frames.append(vr[i])
  frames_tensor = torch.stack(frames)
  frames_tensor = frames_tensor / 255.0
  frames_tensor = einops.rearrange(frames_tensor, "t h w c -> t c h w")
  # frames_tensor = NORMALIZER(frames_tensor)
  return frames_tensor


@dataclass
class DetectionGap:
  start_frame_index: int
  start_corners: np.ndarray  # [[x1, y1], [x2, y2], [x3, y3], [x4, y4]]
  end_frame_index: int = None
  end_corners: np.ndarray = None
  
  
class TrackingAnnotator:
  def __init__(self, aruco_dict: aruco.Dictionary, annotate_frame):
    self.aruco_dict = aruco_dict
    self.annotate_frame = annotate_frame
    
  def annotate_frames(self, frames: typing.List[np.ndarray]):
    trajectory_breaks: typing.Dict[int, typing.List[DetectionGap]] = {}
    last_trajectories: typing.Dict[int, DetectionGap] = {}
    previous_corners = []
    for frame_index, frame in enumerate(frames):
      gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
      parameters =  aruco.DetectorParameters()
      detector = aruco.ArucoDetector(aruco_dict, parameters)
      
      # Detect ArUco markers
      markerCorners, markerIds, rejectedCandidates = detector.detectMarkers(gray)
      
      if markerIds is None:
        markerIds = []
      if markerCorners:
        markerCorners, markerIds = markerCorners[0], markerIds[0]
        
      # record when detection for an aruco marker comes back online
      for detection_index in range(len(markerIds)):
        markerId = markerIds[detection_index]
        corners = markerCorners[detection_index]
        if markerId not in last_trajectories:  # TODO: rename the awfully named last_trajectories and tangential variables
          continue
        trajectory = last_trajectories[markerId]
        trajectory.end_frame_index = frame_index
        trajectory.end_corners = corners
        if markerId not in trajectory_breaks:
          trajectory_breaks[markerId] = []
        trajectory_breaks[markerId].append(trajectory)
        del last_trajectories[markerId]
        
      for markerId in [1, 2, 3]:
        if markerId in markerIds or markerId in last_trajectories or markerId not in previous_corners:
          continue
        trajectory_break = DetectionGap(
          frame_index - 1,
          previous_corners[markerId]
        )
        last_trajectories[markerId] = trajectory_break
      
      previous_corners = {markerIds[i]: markerCorners[i] for i in range(len(markerIds))}
      
      # annotate the detected markers
      for i, markerId in enumerate(markerIds):
        self._annotate_frame(frame, markerId, markerCorners[i])
        
    self._annotate_gaps(frames, trajectory_breaks)
  
  def _annotate_gaps(self, frames: typing.List[np.ndarray], detection_gaps: typing.Dict[int, typing.List[DetectionGap]]):
    
    # for performance, first calculate all necessary optical flow frames
    gray = [cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) for frame in frames]
    optical_flow = [None] * len(frames)
    for markerId, marker_detection_gaps in detection_gaps.items():
      for detection_gap in marker_detection_gaps:
        corners = detection_gap.start_corners.copy()
        for frame_index in range(detection_gap.start_frame_index + 1, detection_gap.end_frame_index):
          if optical_flow[frame_index] is not None:
            continue
          frame = frames[frame_index]
          corners, st, err = cv2.calcOpticalFlowPyrLK(gray[frame_index - 1], gray[frame_index], corners, None)
          corner_out_of_bounds = False
          for corner_index, corner in enumerate(corners):
            y, x = int(corner[0]), int(corner[1])
            if not (0 <= y < frame.shape[0] and 0 <= x < frame.shape[1]):
              corner_out_of_bounds = True
              break
          if corner_out_of_bounds:
            continue
          self._annotate_frame(frames[frame_index], markerId, corners)
          
    
    # for markerId, marker_detection_gap in detection_gaps.items():
    #   for detection_gap in marker_detection_gap:
    #     corners = detection_gap.start_corners.copy()
    #     for frame_index in range(detection_gap.start_frame_index + 1, detection_gap.end_frame_index):
    #       # shift corners by corresponding optical flow value
    #       corner_out_of_bounds = False
    #       for corner_index, corner in enumerate(corners):
    #         y, x = int(corner[0]), int(corner[1])  # we could technically be more precise and take weighted average of optical flow rather than casting indices to int
    #         if not (0 <= y < optical_flow[frame_index].shape[0] and 0 <= x < optical_flow[frame_index].shape[1]):
    #           corner_out_of_bounds = True
    #           break
    #         corner_movement = optical_flow[frame_index][y, x]
    #         corners[corner_index] += corner_movement
    #       if corner_out_of_bounds:
    #         continue
    #       self._annotate_frame(frames[frame_index], markerId, corners)
    
  def _annotate_frame(self, frame: np.ndarray, markerId: int, corners: np.ndarray):
    corners = corners.astype("int")
    cv2.polylines(frame, [corners], True, (0, 255, 0), 2)
    cv2.putText(frame, str(markerId), (corners[0, 0], corners[0, 1] - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)  
   

def label_frames(video_path: Path):
  vr = decord.VideoReader(str(video_path), ctx=decord.cpu(0))
  fps = vr.get_avg_fps()
  frames = []
  for i in range(len(vr)):
    frame = cv2.cvtColor(vr[i].asnumpy(), cv2.COLOR_RGB2BGR)
    # draw_marker(frame)
    frames.append(frame)
  annotator = TrackingAnnotator(aruco_dict, None)
  annotator.annotate_frames(frames)
  print(frames[0].shape)
  # fourcc = cv2.VideoWriterProperties(*'mp4v')  # Codec for video encoding
  video = cv2.VideoWriter('out.mp4', -1, fps, (frames[0].shape[1], frames[0].shape[0]))
  for frame in frames:
    video.write(frame)
    
  cv2.destroyAllWindows()
  video.release()
  
label_frames(Path('../data/RGB_2025-03-05-14_58_10.mp4'))

(960, 720, 3)
