In [1]:
import io
import json
import os
import h5py
import numpy as np
import itertools
import matplotlib.pyplot as plt
from scipy.spatial.transform import Rotation

In [2]:
NUM_SAMPLE_FROM_POSTERIOR = 20
SMOOTHING_WINDOW_SIZE = 3
FPS = 100

def compute_angular_velocity(q1, q2, dt):
    # Convert quaternions to scipy Rotation objects
    rot1 = Rotation.from_quat(q1)
    rot2 = Rotation.from_quat(q2)

    # Compute the relative rotation
    relative_rotation = rot2 * rot1.inv()

    # Convert the relative rotation to angle-axis representation
    angle = relative_rotation.magnitude()  # Rotation angle in radians
    axis = (
        relative_rotation.as_rotvec() / angle if angle != 0 else np.zeros(3)
    )  # Rotation axis

    # Compute angular velocity
    angular_velocity = (axis * angle) / dt
    return {
        "x": angular_velocity[0].astype(float).item(),
        "y": angular_velocity[1].astype(float).item(),
        "z": angular_velocity[2].astype(float).item(),
    }

def sample_from_posterior(log_probs_categories, option="rank"):
    log_probs = [item[0] for item in log_probs_categories]
    categories = [item[1] for item in log_probs_categories]
    num_categories = len(log_probs)

    if option == "uniform":
        def draw_single_sample():
            index = np.random.choice(num_categories)
            return categories[index]
    elif option == "veridical":
        def draw_single_sample():
            # see this: https://stackoverflow.com/questions/58339083/how-to-sample-from-a-log-probability-distribution
            gumbels = np.random.gumbel(size=num_categories)
            index = np.argmax(log_probs + gumbels)
            return categories[index]
    elif option == "rank":
        def draw_single_sample():
            weights = np.array([1 / (n + 1) for n in range(num_categories)])
            weights_norm = weights / weights.sum()
            index = np.random.choice(num_categories, p=weights_norm)
            return categories[index]
    elif option == "mix":
        def draw_single_sample():
            t = 0.5
            t * np.array(log_probs) + (1 - t) * (1 / num_categories)
            return
    else:
        raise NotImplementedError

    samples = []
    np.random.seed(42)
    for _ in range(NUM_SAMPLE_FROM_POSTERIOR):
        sample = draw_single_sample()
        samples.append(sample)
    return samples

def get_last_appearance(posterior_across_frames, o_id, start, stop, step):
    for index in range(start, stop, step):
        if o_id in list(posterior_across_frames["pose"][index].keys()):
            return index
    return None

def get_posterior_poses_for_frame_for_object(posterior_across_frames, o_id, frame, c2f_level=0):
    for idx, poses in posterior_across_frames["pose"][frame].items():
        if str(o_id) == idx:
            # print("yes!!")
            # poses = posterior_across_frames["pose"][frame][o_id]
            best_pose = poses[-1]
            pose_samples_from_posterior = [
                    [pose for pose in sample_from_posterior(poses[0][c2f_level])],
                    best_pose,
                ]
            return pose_samples_from_posterior

def compute_linear_velocity(
    pos_now,
    pos_last,
    dt,
):
    linear_vel = (pos_now - pos_last) / dt
    return {"x": linear_vel[0], "y": linear_vel[1], "z": linear_vel[2]}

def set_axes_equal(ax):
    """
    Make axes of 3D plot have equal scale so that spheres appear as spheres,
    cubes as cubes, etc.

    Input
      ax: a matplotlib axis, e.g., as output from plt.gca().
    """

    x_limits = ax.get_xlim3d()
    y_limits = ax.get_ylim3d()
    z_limits = ax.get_zlim3d()

    x_range = abs(x_limits[1] - x_limits[0])
    x_middle = np.mean(x_limits)
    y_range = abs(y_limits[1] - y_limits[0])
    y_middle = np.mean(y_limits)
    z_range = abs(z_limits[1] - z_limits[0])
    z_middle = np.mean(z_limits)

    # The plot bounding box is a sphere in the sense of the infinity
    # norm, hence I call half the max range the plot radius.
    plot_radius = 0.5*max([x_range, y_range, z_range])

    ax.set_xlim3d([x_middle - plot_radius, x_middle + plot_radius])
    ax.set_ylim3d([y_middle - plot_radius, y_middle + plot_radius])
    ax.set_zlim3d([z_middle - plot_radius, z_middle + plot_radius])

def mkdir(path):
    if not os.path.exists(path):
        os.mkdir(path)
    return path

In [3]:
# path = '/home/haoliangwang/data/b3d_tracking_results/test'
# for root, dirs, files in os.walk(path):
#     for file in files:
#         if file.endswith('.json'):
#             if root.endswith('verbose'):
#                 with open(os.path.join(root, file)) as f:
#                     tracking_per_frame = json.load(f)
#             elif '_' not in root:
#                 mkdir(os.path.join('/home/haoliangwang/data/b3d_tracking_results/new', root.split('/')[-1]))
#                 with open(os.path.join(root, file)) as f:
#                     tracking_final_frame = json.load(f)
            


In [4]:
config = 'pilot_dominoes_0mid_d3chairs_o1plants_tdwroom'
trial_index = str(10)

with open(f'/home/haoliangwang/data/b3d_tracking_results/test/dominoes_verbose/{config}_{trial_index.zfill(4)}.json') as f:
    tracking_per_frame = json.load(f)
with open(f'/home/haoliangwang/data/b3d_tracking_results/test/dominoes/{config}_{trial_index.zfill(4)}.json') as f:
    tracking_final_frame = json.load(f)

In [5]:
new_per_frame_info = {'pose': []}
for frame in tracking_per_frame["pose"]:
    new_frame_info = {}
    for idx, poses in frame.items():
        best_pose = poses[-1]
        new_best_pose = list(itertools.chain.from_iterable(best_pose))
        new_samples = []
        for c2f_level in poses[0]:
            new_c2f_level = []
            for pose in c2f_level:
                new_c2f_level.append([pose[0], list(itertools.chain.from_iterable(pose[1:]))])
            new_samples.append(new_c2f_level)
        new_frame_info[idx] = [new_samples, new_best_pose]
    new_per_frame_info['pose'].append(new_frame_info)

In [6]:
new_tracking_final_frame = {}
info_dict = dict(
    [
        (
            str(o_id),
            [
                {
                    "px": pose[0],
                    "py": pose[1],
                    "pz": pose[2],
                    "rx": pose[3],
                    "ry": pose[4],
                    "rz": pose[5],
                    "rw": pose[6],
                }
                for pose in get_posterior_poses_for_frame_for_object(new_per_frame_info, o_id, get_last_appearance(new_per_frame_info, o_id, len(new_per_frame_info["pose"]) - 1, -1, -1), c2f_level=2)[0]
            ],
        )
        for o_id in tracking_final_frame['model'].keys()
    ]
)
for k, v in tracking_final_frame.items():
    if k == 'position':
        position_dict = {}
        for object_id, samples in info_dict.items():
            position_dict[object_id] = []
            for sample in samples:
                position_dict[object_id].append({'x': sample['px'], 'y': sample['py'], 'z': sample['pz']})
        new_tracking_final_frame[k] = position_dict
    elif k == 'rotation':
        rotation_dict = {}
        for object_id, samples in info_dict.items():
            rotation_dict[object_id] = []
            for sample in samples:
                rotation_dict[object_id].append({'x': sample['rx'], 'y': sample['ry'], 'z': sample['rz'], 'w': sample['rw']})
        new_tracking_final_frame[k] = rotation_dict
    elif k == 'velocity':
        linear_velocity_dict = {}
        for o_id in v.keys():
            if o_id not in list(new_per_frame_info["pose"][-1].keys()):
                linear_velocity_dict[str(o_id)] = [{"x": 0.0, "y": 0.0, "z": 0.0} for _ in range(NUM_SAMPLE_FROM_POSTERIOR)]
            else:
                anchor_pt = get_posterior_poses_for_frame_for_object(new_per_frame_info, o_id, -SMOOTHING_WINDOW_SIZE-1, c2f_level=2)
                sample_pt = get_posterior_poses_for_frame_for_object(new_per_frame_info, o_id, -1, c2f_level=2)
                linear_velocity_dict[str(o_id)] = [
                    compute_linear_velocity(
                        np.array(sample_pt[0][i][7:]),
                        np.array(anchor_pt[-1][7:]),  # using optim pose for window frame
                        SMOOTHING_WINDOW_SIZE / FPS,
                    )
                    for i in range(NUM_SAMPLE_FROM_POSTERIOR)
                ]
        new_tracking_final_frame[k] = linear_velocity_dict
    elif k == 'angular_velocity':
        angular_velocity_dict = {}
        for o_id in v.keys():
            if o_id not in list(new_per_frame_info["pose"][-1].keys()):
                angular_velocity_dict[str(o_id)] = [{"x": 0.0, "y": 0.0, "z": 0.0} for _ in range(NUM_SAMPLE_FROM_POSTERIOR)]
            else:
                anchor_pt = get_posterior_poses_for_frame_for_object(new_per_frame_info, o_id, -SMOOTHING_WINDOW_SIZE-1, c2f_level=2)
                sample_pt = get_posterior_poses_for_frame_for_object(new_per_frame_info, o_id, -1, c2f_level=2)
                angular_velocity_dict[str(o_id)] = [
                    compute_angular_velocity(
                        np.array(anchor_pt[-1][3:7]),
                        np.array(sample_pt[0][i][3:7]),
                        SMOOTHING_WINDOW_SIZE / FPS,
                    )
                    for i in range(NUM_SAMPLE_FROM_POSTERIOR)
                ]
        new_tracking_final_frame[k] = angular_velocity_dict
    else:
        new_tracking_final_frame[k] = v

In [7]:
with open("/home/haoliangwang/data/b3d_tracking_results/test.json", "w") as f:
    json.dump(new_tracking_final_frame, f)