<h1><b>SLAM INTEGRATION</h1></b>

In [1]:
!pip install ultralytics 



In [2]:
from ultralytics import YOLO
from pathlib import Path
from collections import OrderedDict, defaultdict
from tqdm import tqdm
import json
import numpy as np

<h2>Path Declaration</h2>

In [None]:
# Paths
model_weights = '../results/yolov8s/kitti/best.pt'
calib_file = Path('../data/data_odometry_gray/sequences/08/calib.txt')
seq_dir = Path('../data/data_odometry_gray/sequences/08/image_0')
pose_dir = Path('../data/data_odometry_poses/poses/08.txt')
bbox_output_dir = Path('../data/bbox_outputs')
cuboid_output_dir = Path('../data/cuboid_outputs')
# bbox_output_dir = Path('../data/bbox_outputs_bytetrack')
# cuboid_output_dir = Path('../data/cuboid_outputs_bytetrack')
# calib_file = Path('../data/adelaide_sequence/calib.txt')
# seq_dir = Path('../data/adelaide_sequence/image_0')
# bbox_output_dir = Path('../data/bbox_outputs_adelaide')
# cuboid_output_dir = Path('../data/cuboid_outputs_adelaide')

<h2>Helper Function</h2>

In [4]:
# 1. Load camera intrinsic parameters from calib.txt (using P2 matrix for left camera)
with open(calib_file, 'r') as f:
    calib_lines = f.readlines()
# Find the line starting with 'P2:'
P2_line = next(line for line in calib_lines if line.startswith('P2:'))
P2_vals = P2_line.strip().split()[1:]  # skip 'P2:'
P2 = np.array(list(map(float, P2_vals))).reshape(3, 4)  # 3x4 projection matrix
# Intrinsic matrix K is the left 3x3 part of P2
K   = P2[:, :3]
fx  = K[0, 0]; fy = K[1, 1]
cx  = K[0, 2]; cy = K[1, 2]
print(f'Loaded camera intrinsics fx={fx:.2f}, fy={fy:.2f}, cx={cx:.2f}, cy={cy:.2f}')

Loaded camera intrinsics fx=707.09, fy=707.09, cx=601.89, cy=183.11


In [5]:
# Helper function: project 3D cuboid corners given center (X,Y,Z) and yaw in camera frame
def project_cuboid(X, Y, Z, W, H, L, yaw):
    """
    Compute the 2D bounding box (min/max u,v) of a cuboid with given center, size, and yaw (rotation about vertical axis),
    projected into the camera image.
    """
    # Rotation matrix around camera Y-axis (assumed vertical in camera coords) by yaw
    c, s = np.cos(yaw), np.sin(yaw)
    R_yaw = np.array([
        [ c, 0, s],
        [ 0, 1, 0],
        [-s, 0, c]
    ])  # rotates object local coords into camera coords
    # Eight corners of cuboid in object local coordinates (centered at origin)
    corners_local = np.array([
        [dx, dy, dz]
        for dx in (-W/2, W/2)
        for dy in (-H/2, H/2)
        for dz in (-L/2, L/2)
    ])
    # Transform corners to camera coordinates
    corners_cam = corners_local.dot(R_yaw.T) + np.array([X, Y, Z])
    # Project to image pixels
    us = fx * (corners_cam[:, 0] / corners_cam[:, 2]) + cx
    vs = fy * (corners_cam[:, 1] / corners_cam[:, 2]) + cy
    u_min, u_max = us.min(), us.max()
    v_min, v_max = vs.min(), vs.max()
    return u_min, u_max, v_min, v_max

In [6]:
# Define average object dimensions per class (width, height, length in meters)

# Using KITTI dataset stats:contentReference[oaicite:8]{index=8} for Car/Pedestrian/Cyclist as examples
class_dims = {
    0: {'name': 'Car',          'dims': (1.6, 1.5, 3.9)},   # width, height, length
    1: {'name': 'Pedestrian',   'dims': (0.6, 1.7, 0.6)},   # human body
    2: {'name': 'Cyclist',      'dims': (0.6, 1.7, 1.5)},   # person + bicycle
    3: {'name': 'Lane',         'dims': (3.5, 0.1, 50.0)},  # typical lane width ~3.5m, very flat & long
    4: {'name': 'Traffic Sign', 'dims': (0.8, 2.0, 0.2)},   # pole-mounted sign (width ~0.8m, height ~2m)
    5: {'name': 'Traffic Light','dims': (0.4, 1.0, 0.4)},   # pole-mounted light cluster
    6: {'name': 'Drivable Area','dims': (6.0, 0.1, 50.0)},  # approximate: wide, flat road patch
    7: {'name': 'Truck',        'dims': (2.5, 3.5, 12.0)},  # semi-truck
    8: {'name': 'Bus',          'dims': (2.5, 3.0, 12.0)},  # city bus
    9: {'name': 'Bike',         'dims': (0.6, 1.2, 1.8)},   # standalone bicycle
   10: {'name': 'Motor',        'dims': (0.8, 1.4, 2.2)},   # motorcycle + rider
   11: {'name': 'Train',        'dims': (3.2, 4.5, 30.0)}   # single carriage segment
}

def estimate_cuboid(bbox, class_id):
    """
    Estimate 3D cuboid from 2D bounding box.
    
    Args:
        bbox: [x_min, y_min, x_max, y_max] or list
        class_id: Object class ID
    
    Returns:
        dict: Cuboid parameters (center, dimensions, orientation, corners)
    """
    # Convert bbox to array if needed
    if isinstance(bbox, list):
        bbox = np.array(bbox)
    
    x1, y1, x2, y2 = bbox
    
    # Get class dimensions or default
    class_info = class_dims.get(int(class_id), {'name': 'Unknown', 'dims': (1.0, 1.0, 1.0)})
    W, H, L = class_info['dims']
    
    # Depth estimation from height
    pixel_height = y2 - y1
    Z = (fy * H) / (pixel_height + 1e-6)
    
    # Horizontal position
    u_center = 0.5 * (x1 + x2)
    X = (u_center - cx) / fx * Z
    
    # Vertical position
    v_bottom = max(y1, y2)
    Y = (v_bottom - cy) / fy * Z - 0.5 * H
    
    # Orientation estimation via search
    pixel_width = x2 - x1  
    best_yaw = 0.0
    min_err = float('inf')
    for deg in range(0, 91, 5):
        yaw_cand = np.deg2rad(deg)
        u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw_cand)
        err = abs((u_max - u_min) - pixel_width)
        if err < min_err:
            min_err = err
            best_yaw = yaw_cand
    
    # Refine yaw
    yaw = best_yaw
    for _ in range(3):
        delta = np.deg2rad(2)
        u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw)
        err0 = abs((u_max - u_min) - pixel_width)
        
        u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw + delta)
        err_plus = abs((u_max - u_min) - pixel_width)
        
        u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw - delta)
        err_minus = abs((u_max - u_min) - pixel_width)
        
        if err_plus < err0 or err_minus < err0:
            yaw += delta if err_plus < err_minus else -delta
        else:
            break
    
    # Center alignment adjustment
    u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw)
    proj_center = 0.5 * (u_min + u_max)
    X += ((u_center - proj_center) / fx) * Z
    
    # Compute 3D corners
    c, s = np.cos(yaw), np.sin(yaw)
    R_yaw = np.array([[c, 0, s], [0, 1, 0], [-s, 0, c]])
    
    corners_local = np.array([
        [dx, dy, dz]
        for dx in (-W/2, W/2)
        for dy in (-H/2, H/2)
        for dz in (-L/2, L/2)
    ])
    
    corners_3d = corners_local.dot(R_yaw.T) + np.array([X, Y, Z])
    
    return {
        'center': [float(X), float(Y), float(Z)],
        'dimensions': [float(W), float(H), float(L)],
        'orientation': float(yaw),
        'rotation': {  # Quaternion (for compatibility)
            'w': float(np.cos(yaw/2)),
            'x': 0.0,
            'y': float(np.sin(yaw/2)),
            'z': 0.0
        },
        'corners_3d': corners_3d.tolist(),
        'class_id': int(class_id),
        'class_name': class_info['name'],
    }

<h2>1. ByteTrack Tracker Implementation</h2>

**Multi-object tracker to maintain object identity across frames** 

```
KITTI Images → YOLOv8 → [NEW: ByteTrack] → 3D Cuboids (cached) → ORB-SLAM3
```

In [7]:
class SimpleByteTracker:
    """ByteTrack implementation for maintaining object IDs across frames."""
    
    def __init__(self, track_thresh=0.5, track_buffer=30, match_thresh=0.8, min_box_area=100):
        self.track_thresh = track_thresh
        self.track_buffer = track_buffer
        self.match_thresh = match_thresh
        self.min_box_area = min_box_area
        self.tracked_tracks = OrderedDict()
        self.lost_tracks = OrderedDict()
        self.removed_tracks = OrderedDict()
        self.frame_id = 0
        self.track_id_count = 0
    
    def update(self, detections):
        """Update tracker with new detections."""
        self.frame_id += 1
        if len(detections) == 0:
            detections = np.empty((0, 6))
        
        valid_detections = self._filter_detections(detections)
        high_det = valid_detections[valid_detections[:, 4] >= self.track_thresh]
        low_det = valid_detections[valid_detections[:, 4] < self.track_thresh]
        tracks_output = []
        
        if len(self.tracked_tracks) > 0:
            # Get list of current track IDs
            track_ids = list(self.tracked_tracks.keys())
            
            # Associate high-confidence detections with existing tracks
            matched, unmatched_tracks, unmatched_dets = self._associate(self.tracked_tracks, high_det)
            
            # Update matched tracks
            for track_idx, det_idx in matched:
                track_id = track_ids[track_idx]
                track = self.tracked_tracks[track_id]  # ✅ Use track_id
                det = high_det[det_idx]
                track.update({
                    'bbox': det[:4].tolist(),
                    'score': float(det[4]),
                    'frame_id': self.frame_id,
                    'state': 'tracked'
                })
                tracks_output.append(track.copy())
            
            # Move unmatched tracks to lost
            for track_idx in unmatched_tracks:
                track_id = track_ids[track_idx]
                track = self.tracked_tracks[track_id]  # ✅ Use track_id
                track['state'] = 'lost'
                self.lost_tracks[track['track_id']] = track
            
            # Delete unmatched tracks from tracked_tracks
            for track_idx in unmatched_tracks:
                track_id = track_ids[track_idx]
                del self.tracked_tracks[track_id]  # ✅ Use track_id
            
            # Try to recover lost tracks with low-confidence detections
            if len(low_det) > 0 and len(self.lost_tracks) > 0:
                lost_track_ids = list(self.lost_tracks.keys())
                matched_lost, _, _ = self._associate(self.lost_tracks, low_det)
                
                for track_idx, det_idx in matched_lost:
                    track_id = lost_track_ids[track_idx]
                    track = self.lost_tracks[track_id]  # ✅ Use track_id
                    det = low_det[det_idx]
                    track.update({
                        'bbox': det[:4].tolist(),
                        'score': float(det[4]),
                        'frame_id': self.frame_id,
                        'state': 're-identified'
                    })
                    self.tracked_tracks[track_id] = track  # ✅ Use track_id
                    tracks_output.append(track.copy())
                    del self.lost_tracks[track_id]  # ✅ Use track_id
            
            # Initialize new tracks for unmatched high-confidence detections
            for det_idx in unmatched_dets:
                new_track = self._init_track(high_det[det_idx])
                self.tracked_tracks[new_track['track_id']] = new_track
                tracks_output.append(new_track.copy())
        else:
            # First frame - initialize all high-confidence detections as new tracks
            for det in high_det:
                new_track = self._init_track(det)
                self.tracked_tracks[new_track['track_id']] = new_track
                tracks_output.append(new_track.copy())
        
        # Remove old lost tracks that exceeded buffer time
        lost_to_remove = []
        for track_id, track in self.lost_tracks.items():
            if self.frame_id - track['frame_id'] > self.track_buffer:
                lost_to_remove.append(track_id)
                self.removed_tracks[track_id] = track
        
        for track_id in lost_to_remove:
            del self.lost_tracks[track_id]
        
        return tracks_output
    
    def _filter_detections(self, detections):
        """Filter out detections with too small area."""
        if len(detections) == 0:
            return detections
        areas = (detections[:, 2] - detections[:, 0]) * (detections[:, 3] - detections[:, 1])
        return detections[areas >= self.min_box_area]
    
    def _init_track(self, detection):
        """Initialize a new track from a detection."""
        self.track_id_count += 1
        return {
            'track_id': self.track_id_count,
            'bbox': detection[:4].tolist(),
            'score': float(detection[4]),
            'class_id': int(detection[5]),
            'frame_id': self.frame_id,
            'state': 'new'
        }
    
    def _associate(self, tracks, detections):
        """
        Associate detections with tracks using IoU matching.
        Returns: (matched_pairs, unmatched_track_indices, unmatched_det_indices)
        """
        if len(tracks) == 0 or len(detections) == 0:
            return [], list(range(len(tracks))), list(range(len(detections)))
        
        # Convert tracks to array for IoU computation
        track_boxes = np.array([t['bbox'] for t in tracks.values()])
        det_boxes = detections[:, :4]
        
        # Compute IoU matrix
        iou_matrix = self._compute_iou_matrix(track_boxes, det_boxes)
        
        # Greedy matching
        matched = []
        unmatched_tracks = list(range(len(tracks)))
        unmatched_dets = list(range(len(detections)))
        
        while unmatched_tracks and unmatched_dets:
            max_iou = -1
            best_track = -1
            best_det = -1
            
            for t in unmatched_tracks:
                for d in unmatched_dets:
                    if iou_matrix[t, d] > max_iou:
                        max_iou = iou_matrix[t, d]
                        best_track = t
                        best_det = d
            
            if max_iou >= self.match_thresh:
                matched.append((best_track, best_det))
                unmatched_tracks.remove(best_track)
                unmatched_dets.remove(best_det)
            else:
                break
        
        return matched, unmatched_tracks, unmatched_dets
    
    @staticmethod
    def _compute_iou_matrix(boxes1, boxes2):
        """Compute IoU matrix between two sets of boxes."""
        area1 = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1])
        area2 = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1])
        
        x1 = np.maximum(boxes1[:, 0:1], boxes2[:, 0])
        y1 = np.maximum(boxes1[:, 1:2], boxes2[:, 1])
        x2 = np.minimum(boxes1[:, 2:3], boxes2[:, 2])
        y2 = np.minimum(boxes1[:, 3:4], boxes2[:, 3])
        
        intersection = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1)
        union = area1[:, np.newaxis] + area2 - intersection
        
        return intersection / (union + 1e-6)
    
    def get_track_count(self):
        """Get total number of tracks created."""
        return self.track_id_count

class TrackManager:
    """Manages tracks and determines when to generate 3D cuboids."""
    def __init__(self, min_track_length=3):
        self.min_track_length = min_track_length
        self.track_history = defaultdict(list)
        self.cuboid_generated = set()
    
    def update(self, tracked_objects):
        tracks_for_cuboid = []
        for track in tracked_objects:
            track_id = track['track_id']
            self.track_history[track_id].append({
                'bbox': track['bbox'],
                'score': track['score'],
                'class_id': track['class_id'],
                'state': track['state']
            })
            track_length = len(self.track_history[track_id])
            if track_id not in self.cuboid_generated and track_length >= self.min_track_length:
                tracks_for_cuboid.append({**track, 'reason': 'new_stable'})
                self.cuboid_generated.add(track_id)
            elif track['state'] == 're-identified' and track_id in self.cuboid_generated:
                tracks_for_cuboid.append({**track, 'reason': 'updated'})
        return tracks_for_cuboid

def yolo_to_bytetrack_format(boxes, scores, classes):
    if len(boxes) == 0:
        return np.empty((0, 6))
    return np.column_stack([boxes, scores, classes])

print('✅ ByteTrack tracker classes loaded!')

✅ ByteTrack tracker classes loaded!


<h2>2. Extracting 2D Bounding Box Coordinates using YOLOv8x weight (Exp 2)</h2>

In [8]:
# Initialize ByteTrack tracker
tracker = SimpleByteTracker(track_thresh=0.5, track_buffer=30, match_thresh=0.35)
track_manager = TrackManager(min_track_length=3)
tracking_stats = {'total_detections': 0, 'total_tracks': 0}
cuboid_cache = {}  # Cache cuboids by track_id
print('✅ ByteTrack initialized!')

✅ ByteTrack initialized!


In [10]:
def extract2D(bytetrack=False, model_path=None, input_dir=None, output_dir=None, 
              conf_thresh=0.6, iou_thresh=0.45, 
              track_thresh=0.5, track_buffer=30, match_thresh=0.35):
    
    # Use global paths if not provided
    _model_path = Path(model_path) if model_path else Path(model_weights)
    _input_dir = Path(input_dir) if input_dir else seq_dir
    _output_dir = Path(output_dir) if output_dir else bbox_output_dir
    
    # Create output directory
    _output_dir.mkdir(parents=True, exist_ok=True)
    
    # Initialize tracker if ByteTrack is enabled
    tracker = None
    track_manager = None
    tracking_stats = {'total_detections': 0, 'total_tracks': 0}
    
    if bytetrack:
        tracker = SimpleByteTracker(
            track_thresh=track_thresh,
            track_buffer=track_buffer,
            match_thresh=match_thresh
        )
        track_manager = TrackManager(min_track_length=3)
        print('✅ ByteTrack initialized!')
    else:
        print('✅ Running without ByteTrack')
    
    # Load YOLO model
    model = YOLO(str(_model_path))
    print(f'✅ Model loaded: {_model_path}')
    
    # Get all image files
    image_files = sorted(_input_dir.glob('*.png'))
    print(f'Processing {len(image_files)} images...')
    
    # Track detection ID counter for non-ByteTrack mode
    detection_id_counter = 0
    
    # Inference on images
    for frame_idx, img_file in enumerate(tqdm(image_files, desc='Detecting & Tracking' if bytetrack else 'Detecting')):
        results = model(str(img_file), conf=conf_thresh, iou=iou_thresh, verbose=False)
        
        # Extract detections
        boxes_list = []
        scores_list = []
        classes_list = []
        
        for result in results:
            boxes = result.boxes.xyxy.cpu().numpy()  # [x_min, y_min, x_max, y_max]
            classes = result.boxes.cls.cpu().numpy()
            confs = result.boxes.conf.cpu().numpy()
            
            for i in range(len(boxes)):
                boxes_list.append(boxes[i])
                scores_list.append(confs[i])
                classes_list.append(classes[i])
        
        # Prepare frame data
        frame_data = []
        
        if bytetrack:
            # ByteTrack mode: assign track IDs
            if len(boxes_list) > 0:
                detections = yolo_to_bytetrack_format(
                    np.array(boxes_list),
                    np.array(scores_list),
                    np.array(classes_list)
                )
            else:
                detections = np.empty((0, 6))
            
            tracked_objects = tracker.update(detections)
            tracking_stats['total_detections'] += len(detections)
            tracking_stats['total_tracks'] = tracker.get_track_count()
            
            # Format output with track_id
            for track in tracked_objects:
                # Handle bbox: convert to list if it's a numpy array
                bbox = track['bbox']
                if isinstance(bbox, np.ndarray):
                    bbox = bbox.tolist()
                elif not isinstance(bbox, list):
                    bbox = list(bbox)  # Handle other array-like types
                
                obj_output = {
                    'track_id': track['track_id'],
                    'class': track['class_id'],
                    'confidence': track['score'],
                    'bbox': bbox
                }
                frame_data.append(obj_output)
        else:
            # Non-ByteTrack mode: no track IDs 
            for i in range(len(boxes_list)):
                obj_output = {
                    'class': int(classes_list[i]),
                    'confidence': float(scores_list[i]),
                    'bbox': boxes_list[i].tolist()
                }
                frame_data.append(obj_output)
                detection_id_counter += 1
        
        # Save to JSON
        json_path = _output_dir / f'{img_file.stem}.json'
        with open(json_path, 'w') as f:
            json.dump(frame_data, f, indent=4)
    
    # Print statistics
    print(f'\n✅ 2D bounding box extraction complete!')
    print(f'   Total frames processed: {len(image_files)}')
    print(f'   Output directory: {_output_dir}')
    
    if bytetrack:
        print(f'   Total detections: {tracking_stats["total_detections"]}')
        print(f'   Total unique tracks: {tracking_stats["total_tracks"]}')
        return tracking_stats
    else:
        print(f'   Total detections: {detection_id_counter}')
        return {'total_detections': detection_id_counter}

In [11]:
extract2D(bytetrack=True)

✅ ByteTrack initialized!
✅ Model loaded: ../results/yolov8s/kitti/best.pt
Processing 4071 images...


Detecting & Tracking: 100%|██████████| 4071/4071 [05:30<00:00, 12.32it/s]


✅ 2D bounding box extraction complete!
   Total frames processed: 4071
   Output directory: ../data/bbox_outputs_bytetrack
   Total detections: 12214
   Total unique tracks: 2298





{'total_detections': 12214, 'total_tracks': 2298}

<h2>3. Converting 2D Bounding Boxes to 3D Cuboids</h2>

In [12]:
# 1. (Optional) Load camera poses for each frame (SLAM trajectory or KITTI ground truth)

poses = {}
if pose_dir.exists():
    # Each line in pose_file is a 3x4 transform matrix (T_cam_world) flattened
    with open(pose_dir, 'r') as f:
        for idx, line in enumerate(f):
            vals = list(map(float, line.split()))
            if len(vals) == 12:  # valid pose line
                Tcw = np.array(vals).reshape(3, 4)     # Transform from world to camera
                Rcw = Tcw[:, :3]                       # rotation matrix (camera <- world)
                tcw = Tcw[:, 3]                        # translation vector (camera origin in world coords, in camera frame)
                # Compute world-to-camera inverse: camera-to-world (Rwc, twc)
                Rwc = Rcw.T
                twc = -Rwc.dot(tcw)
                poses[idx] = (Rwc, twc)
    print(f'Loaded {len(poses)} camera poses for SLAM integration.')
else:
    print('Camera pose file not found. Results will be in camera coordinates.')

Loaded 4071 camera poses for SLAM integration.


In [13]:
# 2. Function to estimate 3D cuboid from a single 2D bounding box

def estimate_cuboid_from_bbox(bbox, class_id):
    """
    Given a 2D bounding box [x_min, y_min, x_max, y_max] and object class,
    estimate the 3D cuboid's center (camera coords), orientation (yaw), and dimensions.
    Returns (X, Y, Z, yaw, width, height, length).
    """
    x1, y1, x2, y2 = bbox
    W, H, L = class_dims.get(class_id, {'dims': (1.0, 1.0, 1.0)})['dims']  # default to 1m cube if class unknown
    # **Depth estimation**: use bounding box height to approximate distance
    pixel_height = y2 - y1
    Z = (fy * H) / (pixel_height + 1e-6)   # depth along camera Z-axis (in meters)
    # **Horizontal position**: assume bounding box center corresponds to object center horizontally
    u_center = 0.5 * (x1 + x2)
    X = (u_center - cx) / fx * Z           # lateral position in camera coords (X axis)
    # **Vertical position**: compute Y so that bottom of cuboid aligns with detected bottom
    v_bottom = max(y1, y2)
    Y = (v_bottom - cy) / fy * Z - 0.5 * H  # vertical position (camera Y-axis, positive downwards)
    # **Orientation (yaw) estimation**: solve for yaw such that projected width fits the 2D box width
    pixel_width = x2 - x1
    # Use geometry: for a given yaw, the effective horizontal span ≈ |cos(yaw)*W + sin(yaw)*L| in world units:contentReference[oaicite:10]{index=10}.
    # We find yaw that matches the observed width ~ (pixel_width/fx)*Z.
    target_hspan = (pixel_width / fx) * Z  # horizontal span in meters that the box suggests
    # Solve for yaw using the equation: |cos(yaw)*W + sin(yaw)*L| = target_hspan
    # (We consider yaw in [0, pi/2] since symmetric; will decide left/right later.)
    # Avoiding negative sqrt issues:
    cos_yaw = 0.0
    if target_hspan < L:
        # Quadratic solve: cos_yaw * W + sin_yaw * L = target_hspan
        # => (L^2+W^2)*sin^2(yaw) - 2*W*target_hspan*sin(yaw) + (W^2 - target_hspan^2) = 0 in terms of sin(yaw).
        # We solve for cos(yaw) instead via: cos_yaw = sqrt(1 - sin^2(yaw)).
        # We'll do a simple bracket search since analytic might be complex with abs.
        pass  # (we'll handle via iteration below)
    # Instead of closed-form, do a small search over yaw
    best_yaw = 0.0
    min_err = float('inf')
    for deg in range(0, 91, 5):  # coarse search every 5 degrees
        yaw_cand = np.deg2rad(deg)
        u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw_cand)
        proj_width = u_max - u_min
        err = abs(proj_width - pixel_width)
        if err < min_err:
            min_err = err
            best_yaw = yaw_cand
    # Refine yaw around best_yaw
    yaw = best_yaw
    for _ in range(3):  # a few refinement iterations
        delta = np.deg2rad(2)  # small adjustment step (~2 degrees)
        # Try adjusting yaw slightly up or down to see if error improves
        u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw)
        err0 = abs((u_max - u_min) - pixel_width)
        u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw + delta); err_plus = abs((u_max - u_min) - pixel_width)
        u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw - delta); err_minus = abs((u_max - u_min) - pixel_width)
        # Gradient descent: move yaw in direction of decreasing error
        if err_plus < err0 or err_minus < err0:
            if err_plus < err_minus:
                yaw += delta
            else:
                yaw -= delta
        else:
            break  # no improvement
    # **Center adjustment**: re-align X if needed so projected box is centered on detection
    u_min, u_max, _, _ = project_cuboid(X, Y, Z, W, H, L, yaw)
    proj_center = 0.5 * (u_min + u_max)
    X += ((u_center - proj_center) / fx) * Z  # small tweak to center alignment

    return X, Y, Z, yaw, W, H, L

<b>CubeSLAM technique</b>

1. Single-view 3D cuboid proposal:
- What CubeSLAM does: Given a 2D bounding box + class prior (average dimensions per category), estimate a plausible 3D cuboid (center, yaw, dimensions) in camera frame.
- Corresponding code snippet: `estimate_cuboid_from_bbox()` and `project_cuboid()`.

2. Multi-view optimization/SLAM fusion:
- What CubeSLAM does: Refines cuboids jointly with camera poses using bundle adjustment constraints.
- Corresponding code: This part is in ORB-SLAM3, but ORB-SLAM3 doesn’t yet include cuboid objects by default. Need to add manually.

In [14]:
num_frames = len(list(seq_dir.glob("*.png")))
# frame_times = np.arange(num_frames) / fps
frame_times = np.loadtxt(str(seq_dir.parent / 'times.txt'))
print(f"Total frames in sequence: {num_frames}")

Total frames in sequence: 4071


In [17]:
def generate3D_cuboids(bbox_dir=None, output_dir=None, use_tracking=False, num_frames=None):
    # Use global paths if not provided
    _bbox_dir = Path(bbox_dir) if bbox_dir else bbox_output_dir
    _output_dir = Path(output_dir) if output_dir else cuboid_output_dir
    
    # Create output directory
    _output_dir.mkdir(parents=True, exist_ok=True)
    
    # Get all bbox files
    bbox_files = sorted(_bbox_dir.glob('*.json'), key=lambda x: int(x.stem))
    
    # Determine frames to process
    if num_frames is None:
        frames_to_process = range(len(bbox_files))
        print(f'Processing all {len(bbox_files)} frames...')
    else:
        frames_to_process = range(min(num_frames, len(bbox_files)))
        print(f'Processing {len(frames_to_process)} frames...')
    
    # Initialize dimension cache (only if tracking is enabled)
    dimension_cache = {}
    cache_stats = {'cache_hits': 0, 'cache_misses': 0, 'no_cache': 0}
    
    if use_tracking:
        print('✅ Dimension caching ENABLED (tracking mode)')
    else:
        print('✅ Dimension caching DISABLED (no tracking mode)')
    
    # Process each frame
    for frame_id in tqdm(frames_to_process, desc='Generating 3D Cuboids'):
        # Find corresponding bbox file
        bbox_file = None
        for f in bbox_files:
            if int(f.stem) == frame_id:
                bbox_file = f
                break
        
        if bbox_file:
            with open(bbox_file, 'r') as f:
                detections = json.load(f)
            
            output_data = {'frame': frame_id, 'objects': []}
            
            for det in detections:
                # Get track_id or detection_id
                track_id = det.get('track_id')  # Will be None if tracking wasn't used
                detection_id = det.get('detection_id')  # Will be None if tracking was used
                
                cls_id = det['class']
                conf = det.get('confidence', 1.0)
                bbox = det['bbox']
                
                # ALWAYS generate position and rotation from current bbox
                Xc, Yc, Zc, yaw, W_new, H_new, L_new = estimate_cuboid_from_bbox(bbox, cls_id)
                
                # Use CAMERA coordinates
                x_cam, y_cam, z_cam = float(Xc), float(Yc), float(Zc)
                
                # Rotation quaternion from current yaw
                half_yaw = yaw / 2.0
                rot = {
                    'w': float(np.cos(half_yaw)),
                    'x': 0.0,
                    'y': float(np.sin(half_yaw)),
                    'z': 0.0
                }
                
                # Handle dimensions based on tracking mode
                if use_tracking and track_id is not None:
                    # Tracking mode: Use dimension caching
                    if track_id in dimension_cache:
                        # Reuse cached dimensions (more stable)
                        W, H, L = dimension_cache[track_id]
                        cache_stats['cache_hits'] += 1
                    else:
                        # Use newly estimated dimensions and cache them
                        W, H, L = float(W_new), float(H_new), float(L_new)
                        dimension_cache[track_id] = (W, H, L)
                        cache_stats['cache_misses'] += 1
                else:
                    # No tracking mode: Always use fresh dimensions
                    W, H, L = float(W_new), float(H_new), float(L_new)
                    cache_stats['no_cache'] += 1
                
                # Prepare object record
                obj_record = {
                    'class': class_dims.get(cls_id, {'name': str(cls_id)})['name'],
                    'class_id': int(cls_id),
                    'confidence': float(conf),
                    'center': [x_cam, y_cam, z_cam],      # Updated every frame
                    'rotation': rot,                       # Updated every frame
                    'dimensions': [W, H, L]                # Stable if cached, fresh otherwise
                }
                
                # Add ID field based on mode
                if use_tracking and track_id is not None:
                    obj_record['track_id'] = track_id
                elif detection_id is not None:
                    obj_record['detection_id'] = detection_id
                
                output_data['objects'].append(obj_record)
            
            # Save to JSON
            out_path = _output_dir / f'{frame_id:06d}.json'
            with open(out_path, 'w') as f:
                json.dump(output_data, f, indent=4)
        else:
            print(f'Warning: No bbox data for frame {frame_id}')
    
    # Print statistics
    print(f'\n✅ 3D Cuboid generation complete!')
    print(f'   Total frames processed: {len(frames_to_process)}')
    print(f'   Output directory: {_output_dir}')
    
    if use_tracking:
        print(f'   Total unique tracks with cached dimensions: {len(dimension_cache)}')
        print(f'   Cache hits: {cache_stats["cache_hits"]}')
        print(f'   Cache misses: {cache_stats["cache_misses"]}')
        if cache_stats['cache_hits'] + cache_stats['cache_misses'] > 0:
            hit_rate = cache_stats['cache_hits'] / (cache_stats['cache_hits'] + cache_stats['cache_misses']) * 100
            print(f'   Dimension cache hit rate: {hit_rate:.1f}%')
    else:
        print(f'   Total detections processed: {cache_stats["no_cache"]}')
        print(f'   Dimension caching: DISABLED')
    
    return cache_stats

In [18]:
generate3D_cuboids(bbox_dir=bbox_output_dir, output_dir=cuboid_output_dir, use_tracking=True)

Processing all 4071 frames...
✅ Dimension caching ENABLED (tracking mode)


Generating 3D Cuboids: 100%|██████████| 4071/4071 [00:07<00:00, 524.71it/s]


✅ 3D Cuboid generation complete!
   Total frames processed: 4071
   Output directory: ../data/cuboid_outputs_bytetrack
   Total unique tracks with cached dimensions: 2298
   Cache hits: 9916
   Cache misses: 2298
   Dimension cache hit rate: 81.2%





{'cache_hits': 9916, 'cache_misses': 2298, 'no_cache': 0}

In [19]:
# Compare frame coverage
print(f"Before: {2189} poses across {4071} frames = {2189/4071*100:.1f}% coverage")
print(f"After: {1806} poses across {4071} frames = {1806/4071*100:.1f}% coverage")

Before: 2189 poses across 4071 frames = 53.8% coverage
After: 1806 poses across 4071 frames = 44.4% coverage


In [20]:
import numpy as np

kf = np.loadtxt('../third_party/ORB_SLAM3/Examples/Monocular/KeyFrameTrajectory.txt')
trans = kf[:, [3, 7, 11]]

# Check for jumps (tracking loss indicators)
diffs = np.linalg.norm(np.diff(trans, axis=0), axis=1)
large_jumps = np.where(diffs > 20)[0]  # >20m jumps suspicious

print(f"Total keyframes: {len(kf)}")
print(f"Large jumps (>20m): {len(large_jumps)}")
print(f"Jump frames: {large_jumps[:10] if len(large_jumps) > 0 else 'None'}")

# Check trajectory smoothness
print(f"Mean inter-frame distance: {diffs.mean():.2f}m")
print(f"Std inter-frame distance: {diffs.std():.2f}m")

if len(large_jumps) > 10:
    print("→ Many jumps suggest tracking loss!")
if diffs.mean() < 1.0:
    print("→ Very small movements suggest scale drift!")

Total keyframes: 1298
Large jumps (>20m): 0
Jump frames: None
Mean inter-frame distance: 0.37m
Std inter-frame distance: 0.27m
→ Very small movements suggest scale drift!
