In [None]:
import time
import numpy as np
import pyorbbecsdk as orbbec           
from IPython.display import clear_output
import open3d as o3d

In [None]:
# Open device and start stream
dev = orbbec.Device()
dev.open()
dev.start_stream()

In [None]:
def get_frame_pointcloud(dev):
    depth = dev.depth_frame
    rgb   = dev.color_frame
    intr  = dev.calibration.depth_intrinsics
    H, W = depth.shape
    pts, cols = [], []
    for v in range(H):
        for u in range(W):
            z = depth[v, u] / 1000.0
            if z == 0:
                continue
            pts.append([
                (u - intr.cx) * z / intr.fx,
                (v - intr.cy) * z / intr.fy,
                z
            ])
            cols.append(rgb[v, u] / 255.0)
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(np.array(pts))
    pcd.colors = o3d.utility.Vector3dVector(np.array(cols))
    return pcd

In [None]:
def centroid_of_color(pcd, channel=0, thresh=0.8):
    colors = np.asarray(pcd.colors)
    mask = colors[:, channel] > thresh
    if not mask.any():
        return None
    return np.asarray(pcd.points)[mask].mean(axis=0)

In [None]:
for _ in range(10):                # run 10 updates
    pcd = get_frame_pointcloud(dev)
    c   = centroid_of_color(pcd, channel=0, thresh=0.8)
    clear_output(wait=True)
    if c is not None:
        print(f"Centroid: X={c[0]:.3f}  Y={c[1]:.3f}  Z={c[2]:.3f} m")
    else:
        print("No matching points.")
    time.sleep(3)

In [None]:
dev.stop_stream()
dev.close()