In [None]:
import sys
import os
project_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(project_root)
import habitat_sim
import habitat
from habitat.utils.visualizations import maps
import numpy as np
import cv2
import imageio
import os
import yaml
import torch
from vint_train.utils import msg_to_pil, to_numpy, transform_images, load_model
from diffusers.schedulers.scheduling_ddpm import DDPMScheduler
from vint_train.training.train_utils import get_action

In [None]:

# --- 1. 配置模拟器 (和你之前的代码类似) ---
def make_sim_config(scene_path):
    sim_cfg = habitat_sim.SimulatorConfiguration()
    sim_cfg.scene_id = scene_path
    sim_cfg.enable_physics = True
    
    agent_cfg = habitat_sim.agent.AgentConfiguration()
    
    # RGB 传感器
    rgb_sensor = habitat_sim.CameraSensorSpec()
    rgb_sensor.uuid = "color_sensor"
    rgb_sensor.sensor_type = habitat_sim.SensorType.COLOR
    rgb_sensor.resolution = [480, 640]
    
    agent_cfg.sensor_specifications = [rgb_sensor]
    return habitat_sim.Configuration(sim_cfg, [agent_cfg])

# --- 2. 辅助函数：绘制轨迹 ---
def draw_trajectory(top_down_map, trajectory_points, color=(255, 0, 0)):
    """在地图上画线"""
    if len(trajectory_points) < 2:
        return top_down_map
    
    # 复制一份地图以免污染原图
    map_with_traj = top_down_map.copy()
    
    # 将点连成线
    for i in range(1, len(trajectory_points)):
        pt1 = trajectory_points[i-1]
        pt2 = trajectory_points[i]
        cv2.line(map_with_traj, pt1, pt2, color, thickness=2)
        
    # 画当前位置（一个小圆圈）
    cv2.circle(map_with_traj, trajectory_points[-1], 5, (0, 255, 0), -1)
    return map_with_traj

# --- 3. 主程序 ---
# 替换为你的场景路径
scene_path = "/home/liuxh/vln/vint_docker_env/tmpdata/data/versioned_data/habitat_test_scenes/skokloster-castle.glb"

try:
    cfg = make_sim_config(scene_path)
    sim = habitat_sim.Simulator(cfg)
except Exception as e:
    print(f"Error: {e}")
    exit()

# 初始化 Agent
agent = sim.initialize_agent(0)

# 获取一个随机的可行走点
# max_tries=100 确保它尝试多次直到找到
start_position = sim.pathfinder.get_random_navigable_point()

# 设置给 Agent
state = agent.get_state()
state.position = start_position
agent.set_state(state)

# --- A. 预先生成静态地图 (只做一次) ---
print("生成静态 Top-down 地图...")
# 获取当前高度，因为地图是基于高度切片的
agent_height = agent.get_state().position[1]

# meters_per_pixel 越小，地图越清晰，但生成越慢
# 0.05 表示 1个像素代表 5cm
top_down_map = maps.get_topdown_map(
    sim.pathfinder, 
    height=agent_height, 
    meters_per_pixel=0.05
)
# 转换成彩色图片以便画红线 (原本是黑白的)
top_down_map = maps.colorize_topdown_map(top_down_map)

# 记录数据容器
trajectory_grid_points = [] # 存储地图上的像素坐标 (x, y)
video_frames = []
total_steps = 1000

print("开始录制...")

for step in range(total_steps):
    # 1. 随机动作
    action = np.random.choice(["move_forward", "turn_left", "turn_right"])
    sim.step(action)
    
    # 2. 获取 RGB 图像
    obs = sim.get_sensor_observations()
    rgb_img = obs["color_sensor"][..., :3] # 去掉 Alpha 通道
    
    # 3. 获取机器人真实位置
    agent_state = agent.get_state()
    position = agent_state.position
    
    # 4. 将 3D 世界坐标 转为 2D 地图像素坐标
    # maps.to_grid 需要 (z, x) 顺序，因为 Habitat 坐标系 y 是向上
    grid_loc = maps.to_grid(
        realworld_x=position[2],
        realworld_y=position[0],
        grid_resolution=top_down_map.shape[0:2],
        sim=sim
    )
    trajectory_grid_points.append([grid_loc[1],grid_loc[0]])
    
    # 5. 在地图上绘制轨迹
    map_frame = draw_trajectory(top_down_map, trajectory_grid_points)
    
    # 6. 拼接图像 (Side-by-Side)
    # 需要调整尺寸让它们高度一致
    h_rgb, w_rgb = rgb_img.shape[:2]
    h_map, w_map = map_frame.shape[:2]
    
    # 简单缩放地图以匹配 RGB 高度
    scale = h_rgb / h_map
    new_w_map = int(w_map * scale)
    map_frame_resized = cv2.resize(map_frame, (new_w_map, h_rgb))
    
    # 拼在一起: [RGB] | [MAP]
    combined_frame = np.hstack((rgb_img, map_frame_resized))
    
    video_frames.append(combined_frame)

# --- 4. 保存视频 ---
video_name = "trajectory_vis.mp4"
print(f"正在保存视频到 {video_name} ...")
imageio.mimsave(video_name, video_frames, fps=10)

print("完成！")


In [None]:
from collections import deque
from ImageProvider import ImageProvider
import Policy


class Explorer:
    def __init__(self, sim, policy, image_provider):
        self.sim = sim
        self.policy = policy
        self.image_provider = image_provider

    def step(self):
        pass

    def run(self):
        pass

    __call__ = run


dict_items([('vint', {'config_path': '../../train/config/vint.yaml', 'ckpt_path': '../model_weights/vint.pth'}), ('late_fusion', {'config_path': '../../train/config/late_fusion.yaml', 'ckpt_path': '../model_weights/vint_late_fusion.pth'}), ('gnm', {'config_path': '../../train/config/gnm.yaml', 'ckpt_path': '../model_weights/gnm_large.pth'}), ('nomad', {'config_path': '/home/liuxh/vln/vint_docker_env/visualnav-transformer/deployment/config/models.yaml', 'ckpt_path': '/home/liuxh/vln/vint_docker_env/visualnav-transformer/deployment/model_weights/nomad.pth'})])

In [None]:
MODEL_CONFIG_PATH = "/home/liuxh/vln/vint_docker_env/visualnav-transformer/deployment/config/models.yaml"
MODEL_WEIGHTS_PATH = "/home/liuxh/vln/vint_docker_env/visualnav-transformer/deployment/model_weights/nomad.pth"
model_config_path = "/home/liuxh/vln/vint_docker_env/visualnav-transformer/train/config/nomad.yaml"
device = 'cuda'




with open("/home/liuxh/vln/vint_docker_env/visualnav-transformer/train/config/nomad.yaml", "r") as f:
    model_params = yaml.safe_load(f)

context_size = model_params["context_size"]

# load model weights
ckpth_path = MODEL_WEIGHTS_PATH
if os.path.exists(ckpth_path):
    print(f"Loading model from {ckpth_path}")
else:
    raise FileNotFoundError(f"Model weights not found at {ckpth_path}")
model = load_model(
    ckpth_path,
    model_params,
)
model = model.to(device)
model.eval()

num_diffusion_iters = model_params["num_diffusion_iters"]
noise_scheduler = DDPMScheduler(
    num_train_timesteps=model_params["num_diffusion_iters"],
    beta_schedule='squaredcos_cap_v2',
    clip_sample=True,
    prediction_type='epsilon'
)

# ROS
rospy.init_node("EXPLORATION", anonymous=False)
rate = rospy.Rate(RATE)
image_curr_msg = rospy.Subscriber(
    IMAGE_TOPIC, Image, callback_obs, queue_size=1)
waypoint_pub = rospy.Publisher(
    WAYPOINT_TOPIC, Float32MultiArray, queue_size=1)  
sampled_actions_pub = rospy.Publisher(SAMPLED_ACTIONS_TOPIC, Float32MultiArray, queue_size=1)

print("Registered with master node. Waiting for image observations...")

while not rospy.is_shutdown():
    # EXPLORATION MODE
    waypoint_msg = Float32MultiArray()
    if (
            len(context_queue) > model_params["context_size"]
        ):

        obs_images = transform_images(context_queue, model_params["image_size"], center_crop=False)
        obs_images = obs_images.to(device)
        fake_goal = torch.randn((1, 3, *model_params["image_size"])).to(device)
        mask = torch.ones(1).long().to(device) # ignore the goal

        # infer action
        with torch.no_grad():
            # encoder vision features
            obs_cond = model('vision_encoder', obs_img=obs_images, goal_img=fake_goal, input_goal_mask=mask)
            
            # (B, obs_horizon * obs_dim)
            if len(obs_cond.shape) == 2:
                obs_cond = obs_cond.repeat(8, 1)
            else:
                obs_cond = obs_cond.repeat(8, 1, 1)
            
            # initialize action from Gaussian noise
            noisy_action = torch.randn(
                (8, model_params["len_traj_pred"], 2), device=device)
            naction = noisy_action

            # init scheduler
            noise_scheduler.set_timesteps(num_diffusion_iters)

            start_time = time.time()
            for k in noise_scheduler.timesteps[:]:
                # predict noise
                noise_pred = model(
                    'noise_pred_net',
                    sample=naction,
                    timestep=k,
                    global_cond=obs_cond
                )

                # inverse diffusion step (remove noise)
                naction = noise_scheduler.step(
                    model_output=noise_pred,
                    timestep=k,
                    sample=naction
                ).prev_sample
            print("time elapsed:", time.time() - start_time)

        naction = to_numpy(get_action(naction))
        
        sampled_actions_msg = Float32MultiArray()
        sampled_actions_msg.data = np.concatenate((np.array([0]), naction.flatten()))
        sampled_actions_pub.publish(sampled_actions_msg)

        naction = naction[0] # change this based on heuristic

        chosen_waypoint = naction[args.waypoint]

        if model_params["normalize"]:
            chosen_waypoint *= (MAX_V / RATE)
        waypoint_msg.data = chosen_waypoint
        waypoint_pub.publish(waypoint_msg)
        print("Published waypoint")
    rate.sleep()




Loading model from /home/liuxh/vln/vint_docker_env/visualnav-transformer/deployment/model_weights/nomad.pth


NameError: name 'rospy' is not defined

In [29]:
noise_scheduler

DDPMScheduler {
  "_class_name": "DDPMScheduler",
  "_diffusers_version": "0.35.2",
  "beta_end": 0.02,
  "beta_schedule": "squaredcos_cap_v2",
  "beta_start": 0.0001,
  "clip_sample": true,
  "clip_sample_range": 1.0,
  "dynamic_thresholding_ratio": 0.995,
  "num_train_timesteps": 10,
  "prediction_type": "epsilon",
  "rescale_betas_zero_snr": false,
  "sample_max_value": 1.0,
  "steps_offset": 0,
  "thresholding": false,
  "timestep_spacing": "leading",
  "trained_betas": null,
  "variance_type": "fixed_small"
}