In [None]:
# 该脚本用于生成BC预测结果的可视化图
import torch
import numpy as np
import matplotlib.pyplot as plt
import pickle
from pathlib import Path
from bc_pipeline import BCPolicy, Standardizer


# ========================================================
# ==== 用户需要修改的路径 ====
ckpt_path = "/home/wdy02/software/isaacsim/wdy_data/bc_data/bc_ckpts/ONE_PEG_IN_HOLE_60/best.pt"
scaler_path = "/home/wdy02/software/isaacsim/wdy_data/bc_data/bc_ckpts/ONE_PEG_IN_HOLE_60/scaler.pkl"
test_data_path = "/home/wdy02/software/isaacsim/wdy_data/bc_data/npz/ONE_PEG_IN_HOLE_test"  # 存放 .npz 文件的目录
save_dir = "./bc_eval_plots"
Path(save_dir).mkdir(exist_ok=True)

# ==== 加载模型 ====
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ckpt = torch.load(ckpt_path, map_location=device)
model = ckpt['model_class'](**ckpt['model_kwargs']).to(device)
model.load_state_dict(ckpt['model'])
model.eval()

# ==== 加载scaler ====
scaler = Standardizer.load(scaler_path)

# ==== 读取测试集 ====
obs_list, act_list = [], []
front_cam_list, wrist_cam_list = [], []
for npz_file in Path(test_data_path).glob("*.npz"):
    data = np.load(npz_file)
    ee_pose = data['ee_pose']  # 假设保存时名字就是ee_pose
    joint_state = data['joint_states']  # 假设保存时名字就是joint_states
    joint_forces = data['joint_forces']  # 假设保存时名字就是joint_forces
    contact_forces = data['contact_forces']  # 假设保存时名字就是contact_forces
    obs = np.concatenate([ee_pose, joint_state, joint_forces, contact_forces], axis=1)
    front_cam = data['front_rgb']  # 假设保存时名字就是front_cam
    wrist_cam = data['wrist_rgb']  # 假设保存时名字就是wrist_cam
    front_cam_list.append(front_cam)
    wrist_cam_list.append(wrist_cam)
    act = data['action']  # 假设保存时名字就是action
    obs_list.append(obs)
    act_list.append(act)

obs_array = np.concatenate(obs_list, axis=0)
front_cam_array = np.concatenate(front_cam_list, axis=0)
wrist_cam_array = np.concatenate(wrist_cam_list, axis=0)
act_array = np.concatenate(act_list, axis=0)

obs_tensor = torch.tensor(obs_array, dtype=torch.float32).to(device)  # ✅ .to(device)  
front_cam_tensor = torch.tensor(front_cam_array, dtype=torch.float32).permute(0, 3, 1, 2).to(device)  # ✅
wrist_cam_tensor = torch.tensor(wrist_cam_array, dtype=torch.float32).permute(0, 3, 1, 2).to(device)  # ✅

# ==== 标准化 ====
print("obs_tensor.shape:", obs_tensor.shape)
print("front_cam_tensor.shape:", front_cam_tensor.shape)
print("wrist_cam_tensor.shape:", wrist_cam_tensor.shape)


# ==== 标准化 ====
obs_scaled = scaler.transform(obs_tensor)

# ==== 模型预测 ====
with torch.no_grad():
    pred_array = model(front_cam_tensor, wrist_cam_tensor, obs_scaled)
    pred_array = pred_array.cpu().numpy()

# ==== 计算每个维度的MSE ====
mse_per_dim = np.mean((pred_array - act_array) ** 2, axis=0)
mse_total = np.mean(mse_per_dim)

print("总MSE:", mse_total)
print("各维度MSE:", mse_per_dim)

# ==== 1. 曲线对比图 ====
#每个动作维度的预测曲线 vs 专家曲线
plt.figure(figsize=(10, 8))
for dim in range(act_array.shape[1]):
    plt.subplot(act_array.shape[1], 1, dim + 1)
    plt.plot(act_array[:200, dim], label="Expert", alpha=0.7)
    plt.plot(pred_array[:200, dim], label="BC Pred", alpha=0.7)
    plt.ylabel(f"Dim {dim}")
    if dim == 0:
        plt.legend()
plt.suptitle("BC Prediction vs Expert (前200步)")
plt.tight_layout()
plt.savefig(f"{save_dir}/bc_pred_vs_expert.png")
plt.close()

# ==== 2. 各维度MSE柱状图 ====
#哪个自由度误差大一目了然
plt.figure()
plt.bar(range(len(mse_per_dim)), mse_per_dim)
plt.xlabel("Action Dim")
plt.ylabel("MSE")
plt.title("MSE per Action Dimension")
plt.savefig(f"{save_dir}/bc_mse_per_dim.png")
plt.close()

# ==== 3. 误差分布直方图 ====
#模型整体误差分布
error_flat = (pred_array - act_array).flatten()
plt.figure()
plt.hist(error_flat, bins=50, alpha=0.7)
plt.xlabel("Prediction Error")
plt.ylabel("Count")
plt.title("Prediction Error Distribution")
plt.savefig(f"{save_dir}/bc_error_hist.png")
plt.close()

print(f"图已保存到 {save_dir}")


In [None]:
# code for generating GIFs for behavior cloning evaluation
"""
论文展示可以直接截图，直观对比专家与模型

看模型在哪些时刻偏差大

方便做多个任务的对比（多生成几个 GIF）
"""
import torch
import numpy as np
import matplotlib.pyplot as plt
import pickle
from pathlib import Path
import imageio
from bc_pipeline import BCPolicy, Standardizer

# ==== 用户路径 ====
ckpt_path = "/home/wdy02/software/isaacsim/wdy_data/bc_data/bc_ckpts/ONE_PEG_IN_HOLE_60/best.pt"
scaler_path = "/home/wdy02/software/isaacsim/wdy_data/bc_data/bc_ckpts/ONE_PEG_IN_HOLE_60/scaler.pkl"
test_data_path = "/home/wdy02/software/isaacsim/wdy_data/bc_data/npz/ONE_PEG_IN_HOLE_60"
save_dir = "./bc_eval_plots"
Path(save_dir).mkdir(exist_ok=True)

# ==== 加载模型 ====
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
ckpt = torch.load(ckpt_path, map_location=device)
model = ckpt['model_class'](**ckpt['model_kwargs']).to(device)
model.load_state_dict(ckpt['model'])
model.eval()

# ==== 加载scaler ====

scaler = Standardizer.load(scaler_path)

# ==== 从一个demo加载数据（做GIF用） ====
npz_file = sorted(Path(test_data_path).glob("*.npz"))[0]  # 取第一个演示
data = np.load(npz_file)
ee_pose = data['ee_pose']  # 假设保存时名字就是ee_pose
joint_state = data['joint_states']  # 假设保存时名字就是joint_states
joint_forces = data['joint_forces']  # 假设保存时名字就是joint_forces
contact_forces = data['contact_forces']  # 假设保存时名字就是contact_forces
obs = np.concatenate([ee_pose, joint_state, joint_forces, contact_forces], axis=1)
actions = data['action']
frames = data['front_rgb']  # 或 'hand_camera_rgb'
wrist_frames = data['wrist_rgb']  # 可选

# ==== 预处理数据 ====
obs_tensor = torch.tensor(obs, dtype=torch.float32).to(device)
front_cam_tensor = torch.tensor(frames, dtype=torch.float32).permute(0, 3, 1, 2).to(device)
wrist_cam_tensor = torch.tensor(wrist_frames, dtype=torch.float32).permute(0, 3, 1, 2).to(device)


# ==== 标准化并预测 ====
obs_scaled = scaler.transform(obs_tensor)
with torch.no_grad():
    pred_actions = model(front_cam_tensor, wrist_cam_tensor, obs_scaled).cpu().numpy()

# ==== 生成带曲线的帧 ====
gif_frames = []
for t in range(len(frames)):
    fig, ax = plt.subplots(1, 1, figsize=(4, 4))
    ax.imshow(frames[t])
    ax.axis('off')

    # 在右上角画一个动作对比的小图
    inset_ax = fig.add_axes([0.65, 0.65, 0.3, 0.3])
    inset_ax.plot(actions[:t+1, 0], label="Expert", color="blue")  # 这里只画第一个维度
    inset_ax.plot(pred_actions[:t+1, 0], label="BC Pred", color="red", linestyle="--")
    inset_ax.set_xticks([])
    inset_ax.set_yticks([])
    inset_ax.legend(fontsize=6)

    # 保存到内存
    fig.canvas.draw()
    image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
    image = image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
    gif_frames.append(image)
    plt.close(fig)

# ==== 保存GIF ====
gif_path = f"{save_dir}/bc_demo.gif"
imageio.mimsave(gif_path, gif_frames, fps=10)
print(f"GIF已保存到 {gif_path}")
