In [1]:
import matplotlib.cm as cm  # 用于颜色映射
import open3d as o3d
from mpl_toolkits.mplot3d import Axes3D  # For 3D plotting
import matplotlib.pyplot as plt
import zarr
from equi_diffpo.model.common.rotation_transformer import RotationTransformer
import pickle
import numpy as np
import h5py

import os
import torch
from natsort import natsorted
from codebase.z_utils.Rotation_torch import matrix_to_rotation_6d, euler2mat
from equi_diffpo.model.common.normalizer import LinearNormalizer, SingleFieldLinearNormalizer
from equi_diffpo.common.normalize_util import (
    robomimic_abs_action_only_normalizer_from_stat,
    robomimic_abs_action_only_dual_arm_normalizer_from_stat,
    get_range_normalizer_from_stat,
    get_image_range_normalizer,
    get_identity_normalizer_from_stat,
    array_to_stats
)
from equi_diffpo.dataset.robomimic_replay_image_dataset import _convert_actions

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


In [2]:
tasks_meta = {
    "A": {"name": "stack_d1", "average_steps": 108, "color_rgb": [0.994, 0.406, 0.406], "color_hex": "#FD6767"},
    "B": {"name": "square_d2", "average_steps": 153, "color_rgb": [0.994, 0.655, 0.406], "color_hex": "#FDA767"},
    "C": {"name": "coffee_d2", "average_steps": 224, "color_rgb": [0.994, 0.905, 0.406], "color_hex": "#FDE667"},
    "D": {"name": "threading_d2", "average_steps": 227, "color_rgb": [0.834, 0.994, 0.406], "color_hex": "#D4FD67"},
    "E": {"name": "stack_three_d1", "average_steps": 255, "color_rgb": [0.584, 0.994, 0.406], "color_hex": "#94FD67"},
    "F": {"name": "hammer_cleanup_d1", "average_steps": 286, "color_rgb": [0.406, 0.994, 0.477], "color_hex": "#67FD79"},
    "G": {"name": "three_piece_assembly_d2", "average_steps": 335, "color_rgb": [0.406, 0.994, 0.727], "color_hex": "#67FDB9"},
    "H": {"name": "mug_cleanup_d1", "average_steps": 338, "color_rgb": [0.406, 0.994, 0.976], "color_hex": "#67FDF8"},
    "I": {"name": "nut_assembly_d0", "average_steps": 358, "color_rgb": [0.406, 0.762, 0.994], "color_hex": "#67C2FD"},
    "J": {"name": "kitchen_d1", "average_steps": 619, "color_rgb": [0.406, 0.513, 0.994], "color_hex": "#6782FD"},
    "K": {"name": "pick_place_d0", "average_steps": 677, "color_rgb": [0.549, 0.406, 0.994], "color_hex": "#8B67FD"},
    "L": {"name": "coffee_preparation_d1", "average_steps": 687, "color_rgb": [0.798, 0.406, 0.994], "color_hex": "#CB67FD"}
}

In [3]:



def get_actions_all():
    dataset_dir = "data/robomimic/datasets"

    # 确保 actions_all 按照 tasks_meta 的键的字母顺序排列
    # 获取 tasks_meta 中任务名称的有序列表
    # sorted(tasks_meta.keys()) 默认会按字母顺序排序 'A', 'B', 'C'...
    ordered_task_names = [tasks_meta[key]["name"] for key in sorted(tasks_meta.keys())]

    actions_all = []  # 存储按tasks_meta顺序排列的actions

    for task_name in ordered_task_names:
        # 构造完整的数据集路径，注意文件名格式通常是 "task_name_abs_traj_eePose.hdf5"
        dataset_filename = f"{task_name}_abs_traj_eePose.hdf5"
        dataset_path = os.path.join(dataset_dir, task_name, dataset_filename)  # 假设数据集在 /dataset_dir/task_name/task_name_abs_traj_eePose.hdf5

        # 检查文件是否存在，以避免错误
        if not os.path.exists(dataset_path):
            print(f"警告: 数据集文件未找到，跳过: {dataset_path}")
            continue

        print(f"处理数据集: {dataset_path}")
        this_actions_all = []  # 存储当前数据集的所有 demonstration 的 actions
        try:
            with h5py.File(dataset_path, 'r') as f:
                data = f['data']
                demo_names = natsorted(list(data.keys()))  # 演示名称仍然按自然顺序排序
                print(f"演示数量: {len(demo_names)}")
                for demo_name in demo_names:
                    this_actions_all.append(data[demo_name]['actions'][:])
            actions_all.append(this_actions_all)
        except Exception as e:
            print(f"读取数据集 {dataset_path} 时发生错误: {e}")
            continue

    return actions_all


def draw_with_default_color(actions_all):
    """
    绘制点云，使用默认颜色。
    """

    actions_all_flattened = np.concatenate([np.concatenate(actions) for actions in actions_all], axis=0)
    print(f"扁平化后的 action 形状: {actions_all_flattened.shape}")

    # 确保 actions_all_flattened 至少有 3 列，分别代表 x, y, z
    if actions_all_flattened.shape[1] < 3:
        print("Action 的维度不足以绘制 3D 点云。")
    else:
        # 提取前三个维度（例如，x, y, z 坐标）
        # Open3D 期望一个 (N, 3) 的 numpy 数组作为点
        points = actions_all_flattened[:, :3]

        # 创建一个 Open3D PointCloud 对象
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(points)

        # 计算边界框
        bbox = pcd.get_axis_aligned_bounding_box()
        bbox.color = (0, 0, 0)
        print("bbox is:", bbox)

        # 可选：为点云添加颜色
        # 你也可以根据其他维度或属性来着色
        # pcd.colors = o3d.utility.Vector3dVector(np.array([[1, 0, 0] for _ in range(len(points))]))
        return pcd, bbox



def draw_with_respect_to_task(actions_all):

    # 为 actions_all 中的每个子列表生成一组不同的颜色
    num_groups = len(actions_all)
    # 使用 viridis 颜色映射，生成 num_groups 种不同的颜色
    colors = [cm.viridis(i / float(num_groups)) for i in range(num_groups)]

    geometries_to_draw = []

    print("准备点云进行可视化...")
    for i, group_of_actions in enumerate(actions_all):
        # 连接此组中的所有 actions (等同于 this_actions_all)
        if not group_of_actions:
            print(f"跳过索引 {i} 处的空组")
            continue

        concatenated_group_actions = np.concatenate(group_of_actions, axis=0)

        # 确保至少有 3 个维度用于绘图
        if concatenated_group_actions.shape[1] < 3:
            print(f"组 {i} 的 actions 没有足够的维度用于 3D 绘图。跳过。")
            continue

        points = concatenated_group_actions[:, :3]

        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(points)

        # 为此组分配唯一的颜色
        # Open3D 期望颜色是 (R, G, B) 元组，每个分量在 0-1 之间
        # Matplotlib 颜色映射返回 (R, G, B, A)，所以我们取前 3 个
        group_color = colors[i][:3]
        pcd.colors = o3d.utility.Vector3dVector(np.tile(group_color, (len(points), 1)))

        geometries_to_draw.append(pcd)

    # 在所有组合点周围添加一个边界框
    if geometries_to_draw:
        # 首先，组合所有点以计算全局边界框
        all_points_combined = np.concatenate([np.asarray(p.points) for p in geometries_to_draw], axis=0)

        temp_pcd_for_bbox = o3d.geometry.PointCloud()
        temp_pcd_for_bbox.points = o3d.utility.Vector3dVector(all_points_combined)

        bbox = temp_pcd_for_bbox.get_axis_aligned_bounding_box()
        bbox.color = (1, 0, 0)  # 全局边界框为红色
        geometries_to_draw.append(bbox)



        return geometries_to_draw
    else:
        print("没有可绘制的点云数据。")
        return []

actions_all = get_actions_all()


处理数据集: data/robomimic/datasets/stack_d1/stack_d1_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/square_d2/square_d2_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/coffee_d2/coffee_d2_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/threading_d2/threading_d2_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/stack_three_d1/stack_three_d1_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/hammer_cleanup_d1/hammer_cleanup_d1_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/three_piece_assembly_d2/three_piece_assembly_d2_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/mug_cleanup_d1/mug_cleanup_d1_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/nut_assembly_d0/nut_assembly_d0_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/kitchen_d1/kitchen_d1_abs_traj_eePose.hdf5
演示数量: 1000
处理数据集: data/robomimic/datasets/pick_place_d0/pick_place_d0_abs_traj_eePose.hdf5
演示数量: 10

In [4]:

def get_o3d_with_task_colors_all(actions_all):
    """
    根据每个任务的专属颜色绘制点云。
    actions_all 是一个列表，每个元素代表一个任务的所有动作数据。
    颜色根据任务在列表中的顺序从 tasks_meta 字典中获取。
    """

    # 这是您提供的包含彩虹色系的任务元数据。
    # The tasks_meta dictionary with rainbow colors.

    # 按顺序获取任务键 ('A', 'B', 'C', ...)
    # Get task keys in order.
    task_keys = list(tasks_meta.keys())

    all_points_list = []
    all_colors_list = []

    # 遍历每个任务的动作数据
    # Iterate through each task's action data.
    for i, actions in enumerate(actions_all):
        if not actions:
            continue

        # 获取当前任务的键和颜色
        # Get the key and color for the current task.
        task_key = task_keys[i]
        color = tasks_meta[task_key]['color_rgb']

        # 将当前任务的所有 action 连接成一个点集
        # Concatenate all actions for the current task into one set of points.
        task_points_raw = np.concatenate(actions, axis=0)

        # 提取 x, y, z 坐标
        # Extract x, y, z coordinates.
        task_points = task_points_raw[:, :3]

        # 为这个任务的所有点创建颜色数组
        # Create a color array for all points of this task.
        num_points = len(task_points)
        task_colors = np.tile(color, (num_points, 1))

        # 将处理好的点和颜色添加到列表中
        # Add the processed points and colors to our lists.
        all_points_list.append(task_points)
        all_colors_list.append(task_colors)

    # 检查是否收集到了任何点
    # Check if any points were collected.
    if not all_points_list:
        print("没有可供绘制的点。")
        return

    # 将所有任务的点和颜色合并成一个大的Numpy数组
    # Combine points and colors from all tasks into single large NumPy arrays.
    final_points = np.concatenate(all_points_list, axis=0)
    final_colors = np.concatenate(all_colors_list, axis=0)

    print(f"总点数: {len(final_points)}")

    # 创建一个 Open3D PointCloud 对象
    # Create an Open3D PointCloud object.
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(final_points)
    pcd.colors = o3d.utility.Vector3dVector(final_colors)

    # 计算并添加边界框
    # Calculate and add a bounding box.
    bbox = pcd.get_axis_aligned_bounding_box()
    bbox.color = (0, 0, 0)  # Bounding box in black.

    return[pcd, bbox]
    # 可视化点云和边界框
    # Visualize the point cloud and the bounding box.
    print("使用 Open3D 可视化带任务颜色的 action 点云...")
    o3d.visualization.draw_geometries([pcd, bbox],
                                      window_name="Action Point Cloud by Task Color",
                                      width=800, height=600,
                                      left=50, top=50,
                                      mesh_show_back_face=False)

    print("Open3D 可视化已关闭。")

In [5]:

def get_o3d_from_actions_with_task_colors(actions_all, selected_tasks="ABCD"):
    """
    根据每个任务的专属颜色和指定的任务键来绘制点云。
    Draws a point cloud with colors specific to each task, based on selected task keys.

    Args:
        actions_all (list): 一个列表，每个元素代表一个任务的所有动作数据。
                          A list where each element contains all action data for a single task.
                          The order must correspond to the order in tasks_meta (A, B, C...).
        selected_tasks (str): 一个包含要显示的任务键的字符串，例如 "ACD"。
                              A string containing the keys of the tasks to display, e.g., "ACD".
    """

    # 按顺序获取任务键 ('A', 'B', 'C', ...)
    # Get task keys in order.
    task_keys_in_order = list(tasks_meta.keys())
    # 创建从任务键到其在 actions_all 中索引的映射
    # Create a mapping from task key to its index in actions_all.
    key_to_index = {key: i for i, key in enumerate(task_keys_in_order)}

    all_points_list = []
    all_colors_list = []

    # 遍历选定的任务键
    # Iterate through the selected task keys.
    print(f"将要显示的任务 (Tasks to display): {list(selected_tasks)}")
    for task_key in selected_tasks:
        if task_key not in tasks_meta:
            print(f"警告：任务键 '{task_key}' 在 tasks_meta 中未找到，已跳过。")
            print(f"Warning: Task key '{task_key}' not found in tasks_meta, skipping.")
            continue

        # 获取当前任务的索引
        # Get the index for the current task.
        task_index = key_to_index.get(task_key)

        # 检查索引是否有效并且在 actions_all 的范围内
        # Check if the index is valid and within the bounds of actions_all.
        if task_index is None or task_index >= len(actions_all):
            print(f"警告：任务键 '{task_key}' 对应的索引超出了 actions_all 的范围，已跳过。")
            print(f"Warning: Index for task key '{task_key}' is out of bounds for actions_all, skipping.")
            continue

        actions = actions_all[task_index]
        if not actions or all(a.shape[0] == 0 for a in actions):
            print(f"信息：任务 '{task_key}' 没有动作数据，已跳过。")
            print(f"Info: Task '{task_key}' has no action data, skipping.")
            continue

        # 获取当前任务的颜色
        # Get the color for the current task.
        color = tasks_meta[task_key]['color_rgb']

        # 将当前任务的所有 action 连接成一个点集
        # Concatenate all actions for the current task into one set of points.
        task_points_raw = np.concatenate(actions, axis=0)

        # 提取 x, y, z 坐标
        # Extract x, y, z coordinates.
        task_points = task_points_raw[:, :3]

        # 为这个任务的所有点创建颜色数组
        # Create a color array for all points of this task.
        num_points = len(task_points)
        task_colors = np.tile(color, (num_points, 1))

        # 将处理好的点和颜色添加到列表中
        # Add the processed points and colors to our lists.
        all_points_list.append(task_points)
        all_colors_list.append(task_colors)

    # 检查是否收集到了任何点
    # Check if any points were collected.
    if not all_points_list:
        print("没有根据您的选择找到可供绘制的点。")
        print("No points to draw based on your selection.")
        return

    # 将所有任务的点和颜色合并成一个大的Numpy数组
    # Combine points and colors from all tasks into single large NumPy arrays.
    final_points = np.concatenate(all_points_list, axis=0)
    final_colors = np.concatenate(all_colors_list, axis=0)

    print(f"总点数 (Total points): {len(final_points)}")

    # 创建一个 Open3D PointCloud 对象
    # Create an Open3D PointCloud object.
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(final_points)
    pcd.colors = o3d.utility.Vector3dVector(final_colors)

    # 计算并添加边界框
    # Calculate and add a bounding box.
    bbox = pcd.get_axis_aligned_bounding_box()
    bbox.color = (0, 0, 0)  # Bounding box in black.
    return [pcd, bbox]


In [6]:


def get_o3d_mesh_from_file(mesh_path="scene_mesh_export.obj"):
    """
    Loads and visualizes a 3D mesh file using Open3D.

    Args:
        mesh_path (str): The full path to the mesh file.
    """
    # 1. Check if the file exists before trying to load it.
    if not os.path.exists(mesh_path):
        print(f"Error: Mesh file not found at '{mesh_path}'")
        return

    # 2. Load the triangle mesh from the file.
    print(f"Loading mesh from: {mesh_path}")
    try:
        mesh = o3d.io.read_triangle_mesh(mesh_path)
    except Exception as e:
        print(f"Error: Failed to load mesh. {e}")
        return

    # 3. Check if the mesh was loaded successfully.
    if mesh.is_empty():
        print("Warning: The loaded mesh is empty. Nothing to visualize.")
        return

    # 4. Add color and normals for better visualization (optional but recommended).
    # If the mesh has no colors, paint it a uniform gray color.
    if not mesh.has_vertex_colors():
        mesh.paint_uniform_color([0.7, 0.7, 0.7])  # A nice light gray

    # Compute normals if they are not present, which is essential for lighting.
    if not mesh.has_vertex_normals():
        mesh.compute_vertex_normals()

    # 5. Visualize the mesh in an interactive window.
    print("Visualizing mesh. Press 'Q' or close the window to exit.")
    return mesh

In [7]:
from matplotlib import pyplot as plt
import copy
import random
def visualize_trajectories(trajectories,n_demo=1000):
    """
    将一系列轨迹高效地可视化为一个 LineSet。
    """
    if not trajectories:
        print("没有轨迹可供可视化。")
        return
    trajectories= copy.deepcopy([x[:,:3]for x in trajectories])  # 确保每个轨迹都是三维的
    
    random.shuffle(trajectories)  # 随机打乱轨迹顺序
    if len(trajectories) > n_demo:
        trajectories = trajectories[:n_demo]
    # 使用 'viridis', 'jet', 'hsv', 'rainbow' 等 colormap
    num_trajectories = len(trajectories)
    cmap = plt.get_cmap("viridis")
    colors_for_trajectories = [cmap(i / num_trajectories) for i in range(num_trajectories)]

    # 2. 初始化用于构建 LineSet 的列表
    all_points = []
    all_lines = []
    all_colors = []

    point_offset = 0  # 用于追踪全局顶点列表中的索引

    print(f"正在处理 {num_trajectories} 条轨迹...")

    # 3. 遍历所有轨迹，填充列表
    for i, trajectory_points in enumerate(trajectories):
        if len(trajectory_points) < 2:
            continue  # 一条轨迹至少需要两个点才能形成线

        # 获取当前轨迹的颜色 (Matplotlib返回RGBA，我们只需要RGB)
        current_color = colors_for_trajectories[i][:3]

        # 将当前轨迹的点添加到全局点列表
        all_points.extend(trajectory_points)

        # 为当前轨迹创建线段
        num_points_in_traj = len(trajectory_points)
        for j in range(num_points_in_traj - 1):
            # 线段连接的是全局索引
            start_point_index = point_offset + j
            end_point_index = point_offset + j + 1
            all_lines.append([start_point_index, end_point_index])
            all_colors.append(current_color)

        # 更新下一个轨迹的起始点索引
        point_offset += num_points_in_traj

    # 4. 创建 LineSet 对象
    if not all_points:
        print("没有有效的点来创建几何体。")
        return
    
    line_set = o3d.geometry.LineSet()
    line_set.points = o3d.utility.Vector3dVector(np.array(all_points))
    line_set.lines = o3d.utility.Vector2iVector(np.array(all_lines))
    # 为每条线段设置颜色
    line_set.colors = o3d.utility.Vector3dVector(np.array(all_colors))

    return line_set
    # 5. 可视化
    print("处理完成，正在启动可视化窗口...")
    o3d.visualization.draw_geometries(
        [line_set],
        window_name=f"可视化 {num_trajectories} 条轨迹",
        width=1280,
        height=720
    )

In [8]:
pcd_1,bbox_1=get_o3d_from_actions_with_task_colors(actions_all,"A")
print(bbox_1)
pcd_2, bbox_2 = get_o3d_from_actions_with_task_colors(actions_all, "ABCDEFGHIJKL")
print(bbox_2)
mesh=get_o3d_mesh_from_file("scene_mesh_export.obj")

line_set = visualize_trajectories(actions_all[0],1)  # 只取前三个维度作为点

# # geometries_to_draw =[bbox_2,pcd_1,mesh]
geometries_to_draw = [line_set,mesh]


lookat = np.array([0.0, 0.0, 1])
up = np.array([0.0, 0.0, 1.0])
front = np.array([0.5, -1, 0.5])  # 默认朝向Z轴负方向
zoom =0.3

o3d.visualization.draw_geometries(geometries_to_draw,
                                  window_name="Action Point Cloud with Mesh",
                                  width=1024, height=1024,
                                  left=50, top=50,
                                  mesh_show_back_face=False,
                                  lookat=lookat,
                                  up=up,
                                  front=front,
                                  zoom=zoom)

将要显示的任务 (Tasks to display): ['A']
总点数 (Total points): 108233
AxisAlignedBoundingBox: min: (-0.282648, -0.308863, 0.808787), max: (0.255813, 0.30305, 1.05473)
将要显示的任务 (Tasks to display): ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L']
总点数 (Total points): 4271753
AxisAlignedBoundingBox: min: (-0.548481, -0.478734, 0.808496), max: (0.378165, 0.555533, 1.2235)
Loading mesh from: scene_mesh_export.obj
Visualizing mesh. Press 'Q' or close the window to exit.
正在处理 1 条轨迹...


In [None]:
test_alphabets = "DFGEFGHIADJKL"

sorted_alphabets = sorted(test_alphabets)

print("Sorted alphabets:", sorted_alphabets)

Sorted alphabets: ['A', 'D', 'D', 'E', 'F', 'F', 'G', 'G', 'H', 'I', 'J', 'K', 'L']
