In [23]:
import torch
import matplotlib.pyplot as plt
import cv2
import joblib
import quaternion as q
import numpy as np
from src.models.autoencoder.autoenc import Embedder
import torch.nn.functional as F
import imageio
from IPython.display import clear_output, HTML, display
from src.utils.render_utils import add_title, add_agent_view_on_w, get_angle

## Load Models

In [30]:
device = 'cuda'
embedder = Embedder(pretrained_ckpt='pretrained/autoenc_large.ckpt',
                   img_res=128, w_size=128, coordinate_scale=32, w_ch=32, nerf_res=128, voxel_res=128)
embedder = embedder.to(device).eval()

from src.utils.image_rotator import ImageRotator
from src.models.localization.models import UConv
localizer = UConv(w_size=128,num_rot=36, w_ch=32, angle_ch=18)
rotator = ImageRotator(36)
sd = torch.load('pretrained/img_loc.ckpt', map_location='cpu')
localizer.load_state_dict(sd)
localizer = localizer.cuda().eval()

## Setup Habitat Simulator

In [3]:
import os
os.environ['GLOG_minloglevel'] = "3"
os.environ['MAGNUM_LOG'] = "quiet"
os.environ['HABITAT_SIM_LOG'] = "quiet"
import habitat
from habitat import get_config
from habitat.sims import make_sim

config = get_config()
cfg = config
cfg.defrost()
habitat_api_path = os.path.join(os.path.dirname(habitat.__file__), '../')
cfg.DATASET.SCENES_DIR = os.path.join(habitat_api_path, cfg.DATASET.SCENES_DIR)
cfg.DATASET.DATA_PATH = os.path.join(habitat_api_path, cfg.DATASET.DATA_PATH.replace("habitat-test-scenes","gibson"))
#cfg.SIMULATOR.SCENE_DATASET = 'data/scene_datasets/mp3d.scene_dataset_config.json'
#cfg.SIMULATOR.SCENE_DATASET = os.path.join(habitat_api_path, cfg.SIMULATOR.SCENE_DATASET)
cfg.DATASET.TYPE = "PointNav-v1"
cfg.SIMULATOR.RGB_SENSOR.HEIGHT = 128
cfg.SIMULATOR.RGB_SENSOR.WIDTH = 128
cfg.SIMULATOR.DEPTH_SENSOR.HEIGHT = 128
cfg.SIMULATOR.DEPTH_SENSOR.WIDTH = 128
cfg.TASK.SENSORS = cfg.SIMULATOR.AGENT_0.SENSORS = ['RGB_SENSOR', 'DEPTH_SENSOR']
cfg.freeze()

from habitat.datasets import make_dataset
dataset = make_dataset('PointNav-v1')
cfg.defrost()
cfg.DATASET.SPLIT = 'val'
cfg.freeze()
val_scenes = dataset.get_scenes_to_load(cfg.DATASET)
all_scenes =  val_scenes
print(f'Total {len(all_scenes)} scenes found')

2023-06-09 13:44:18,225 Initializing dataset PointNav-v1


Total 14 scenes found


In [4]:
scene = np.random.choice(all_scenes)
print(scene)

Cantwell


In [5]:
try: sim.close()
except: pass
cfg.defrost()
cfg.SIMULATOR.SCENE = os.path.join(cfg.DATASET.SCENES_DIR,'gibson_habitat/{}.glb'.format(scene))
cfg.freeze()
past_room = scene
sim = make_sim(id_sim=cfg.SIMULATOR.TYPE, config=cfg.SIMULATOR)

2023-06-09 13:44:25,524 initializing sim Sim-v0


In [6]:
NUM_GOALS = 5
VIS_RES = 256

# Draw RNR-Map from random trajectory

In [7]:
from habitat.tasks.nav.shortest_path_follower import ShortestPathFollower

follower = ShortestPathFollower(sim, 0.5, return_one_hot=False)

start_position = sim.sample_navigable_point()
start_rotation = q.from_euler_angles([0., np.random.rand() * 2 * np.pi, 0.])
goal_points = []
for i in range(NUM_GOALS):
    while True:
        random_point = sim.sample_navigable_point()
        if abs(random_point[1]-start_position[1]) > 0.5: continue
        break
    goal_points.append(random_point)


w, w_mask = None, None

K = torch.eye(3)
K[0,0] = (embedder.img_res/2.) / np.tan(np.deg2rad(90.0) / 2)
K[1,1] = -(embedder.img_res/2.) / np.tan(np.deg2rad(90.0) / 2)
K = K.unsqueeze(0).to(device)

orig_Rt = np.eye(4)
orig_Rt[:3,3] = start_position
orig_Rt[:3,:3] = q.as_rotation_matrix(start_rotation)
orig_Rt = np.linalg.inv(orig_Rt)

sim.set_agent_state(start_position, start_rotation)
done = False
goal_point = goal_points.pop(0)
rgbs, depths, positions, rotations = [], [], [], []
while not done:
    state = sim.get_agent_state()
    obs = sim.get_observations_at(state.position, state.rotation)
    rgbs.append(obs['rgb'])
    depths.append(obs['depth'])
    positions.append(state.position)
    rotations.append(state.rotation.components)


    Rt_t = np.eye(4)
    Rt_t[:3,3] = state.position
    Rt_t[:3,:3] = q.as_rotation_matrix(state.rotation)
    Rt_t = np.linalg.inv(Rt_t)
    Rt_t = Rt_t @ np.linalg.inv(orig_Rt)

    rgb_t = torch.from_numpy(obs['rgb']/255.).unsqueeze(0).permute(0,3,1,2).to(device)
    depth_t = torch.from_numpy(obs['depth']).unsqueeze(0).permute(0,3,1,2).to(device)
    Rt_t = torch.from_numpy(Rt_t).unsqueeze(0).float().to(device)

    with torch.no_grad():
        output = embedder.calculate_mask_func(depth_t*10.0, Rt_t, K)
        sorted_indices, seq_unique_list, seq_unique_counts, _ = output
        input_dict = {'rgb': rgb_t.unsqueeze(1),
                      'depth': depth_t.unsqueeze(1),
                    'sorted_indices': sorted_indices.unsqueeze(1),
                    'seq_unique_counts': seq_unique_counts.unsqueeze(1),
                      'seq_unique_list': seq_unique_list.unsqueeze(1)}
        w, w_mask = embedder.embed_obs(input_dict, past_w=w, past_w_num_mask=w_mask)

        w_im = w.mean(0).mean(0).detach().cpu().numpy()
        w_im = ((w_im - w_im.min())/(w_im.max()-w_im.min()) * 255).astype(np.uint8)
        w_im = cv2.applyColorMap(w_im, cv2.COLORMAP_VIRIDIS)[:,:,::-1]
        last_w_im = w_im

        rgb = add_title(obs['rgb'], 'observation')
        w_im = add_agent_view_on_w(w_im, Rt_t, embedder.coordinate_scale, embedder.w_size, agent_size=4, view_size=15)
        w_im = np.fliplr(w_im)
        w_im = add_title(w_im, 'RNR-Map')

    cv2.imshow("View", np.concatenate([rgb, w_im],1)[:,:,::-1])
    cv2.waitKey(1)

    action = follower.get_next_action(goal_point)
    if action is None or action == 0:
        if len(goal_points) == 0:
            break
        goal_point = goal_points.pop(0)
        action = follower.get_next_action(goal_point)
    sim.step(action)

data = {'rgb': rgbs, 'depth': depths, 'position': positions, 'rotation': rotations, 'map': w, 'orig_Rt': orig_Rt}

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


# Localize seen images

In [14]:
origin = torch.eye(4).unsqueeze(0).to(device)
images = []
diffs = []

coordinate_scale = embedder.coordinate_scale
map_size = embedder.w_size
patch_size = map_size//4
angle_bin = 18
for t in range(len(rgbs)):
    # make target RNR-Map
    rgb = torch.from_numpy(data['rgb'][t]).unsqueeze(0).permute(0,3,1,2).to(device)
    depth = torch.from_numpy(data['depth'][t]).unsqueeze(0).permute(0,3,1,2).to(device)
    sorted_indices, seq_unique_list, seq_unique_counts, pose_map = embedder.calculate_mask_func(depth * 10.0, origin, K)

    sample_dict = {'sorted_indices': sorted_indices.unsqueeze(0),
               'seq_unique_list': seq_unique_list.unsqueeze(0),
               'seq_unique_counts': seq_unique_counts.unsqueeze(0),
              'rgb': rgb.unsqueeze(1)/255., 'depth': depth.unsqueeze(1)}
    for k,v in sample_dict.items():
        sample_dict[k] = v.cuda()

    with torch.no_grad():
        latent_target, _ = embedder.embed_obs(sample_dict)
        latent_target = latent_target[:, :, map_size//2 - patch_size//2 : map_size//2 + patch_size//2,
                                            map_size//2 - patch_size//2 : map_size//2 + patch_size//2]

        # Localize
        pred_heatmap, pred_angle = localizer(data['map'], latent_target, rotator)
        seen_area = (data['map'].mean(dim=1) != 0)
        bs, ws, hs = torch.where(seen_area == 0)
        pred_heatmap[bs, :, ws, hs] = -99999
        pred_heatmap[bs, :, :, -1] = -99999
        pred_heatmap[bs, :, -1, :] = -99999
        pred_heatmap_flattened = F.softmax(pred_heatmap.view(1, -1), dim=-1)
        pred = pred_heatmap_flattened.view(map_size+1, map_size+1)

    pred_max = pred_heatmap.view(1, -1).argmax(dim=1).item()
    pred_h, pred_w = pred_max//pred_heatmap.shape[-1], pred_max%pred_heatmap.shape[-1]
    pred_x = (pred_h-(map_size//2))/(map_size//2) * (coordinate_scale/2)
    pred_y = (pred_w-(map_size//2))/(map_size//2) * (coordinate_scale/2)

    pred_Rt = np.eye(4)
    pred_Rt[:3,3] = np.array([pred_x, 0., pred_y])
    pred_Rt[:3,:3] = q.as_rotation_matrix(q.from_euler_angles([0., 2*np.pi/angle_bin * pred_angle.argmax().item(), 0.0]))
    pred_Rt = np.linalg.inv(pred_Rt)
    pred_sim_Rt = np.linalg.inv(np.matmul(pred_Rt, data['orig_Rt']))

    # Get observation from predicted pose
    pred_position_ = pred_sim_Rt[:3,3]
    pred_rotation = q.from_rotation_matrix(pred_sim_Rt[:3,:3])
    pred_obs = sim.get_observations_at(pred_position_, pred_rotation)

    # Calculate Localization error
    Rtt = np.eye(4)
    Rtt[:3,3] = data['position'][t]
    Rtt[:3,:3] = q.as_rotation_matrix(q.from_float_array(data['rotation'][t]))
    Rtt = np.linalg.inv(Rtt)@np.linalg.inv(data['orig_Rt'])
    answer_x, _, answer_y = np.linalg.inv(Rtt)[:3,3]
    answer_h = int(answer_x*(map_size/2)/(coordinate_scale/2.)+(map_size/2))
    answer_w = int(answer_y*(map_size/2)/(coordinate_scale/2.)+(map_size/2))

    
    diff = np.linalg.norm(data['position'][t][:2]-pred_position_[:2])
    diffs.append(diff)

    # Visualization
    map_im = last_w_im.copy()
    map_im = add_agent_view_on_w(map_im, Rtt, embedder.coordinate_scale, embedder.w_size, agent_size=4, view_size=15, agent_color=(255,0,0), view_color=(255,0,0))
    map_im = add_agent_view_on_w(map_im, pred_Rt, embedder.coordinate_scale, embedder.w_size, agent_size=4, view_size=15, agent_color=(0,0,255), view_color=(0,0,255))
    map_im = cv2.resize(map_im, dsize=(VIS_RES, VIS_RES))

    pred_im = pred[:-1,:-1].detach().cpu().numpy()
    pred_im = (pred_im - pred_im.min())/(pred_im.max()-pred_im.min())
    pred_im = cv2.resize((pred_im*255).astype(np.uint8), dsize=(VIS_RES, VIS_RES))
    pred_im = cv2.applyColorMap(pred_im, cv2.COLORMAP_VIRIDIS)[:,:,::-1]
    pred_im = cv2.addWeighted(cv2.resize(last_w_im, dsize=(VIS_RES, VIS_RES)), 0.3, pred_im, 0.7, 0.0)

    map_im = add_title(map_im, 'RNR-Map')
    pred_im = add_title(pred_im, 'Loc. Heatmap')

    rgb = cv2.resize(data['rgb'][t], dsize=(VIS_RES, VIS_RES))
    rgb = add_title(rgb, 'Query Img.')
    pred_rgb = cv2.resize(pred_obs['rgb'], dsize=(VIS_RES, VIS_RES))
    pred_rgb = add_title(pred_rgb, 'Localized')

    view_im = np.concatenate([rgb, pred_rgb,map_im, pred_im],1)
    images.append(view_im)
    cv2.imshow("view", view_im[:,:,::-1])
    key = cv2.waitKey(1)
    if key == ord("q"): break

print(f'average error: {np.mean(diffs):.3f}m' )

average error: 0.173m


In [16]:
imageio.mimwrite("demo/image-based-localization.gif", images, fps=15)
display(HTML('<img src={}>'.format("demo/image-based-localization.gif")))

# Localize images from unseen view

In [32]:
origin = torch.eye(4).unsqueeze(0).to(device)
images = []
diffs = []

coordinate_scale = embedder.coordinate_scale
map_size = embedder.w_size
patch_size = map_size//4
angle_bin = 18

noise_dist = 2
noise_angle = np.pi/3
for t in range(len(rgbs)):
    while True:
        query_position = data['position'][t] + np.array([np.random.rand()*noise_dist-noise_dist/2,0,
                                                        np.random.rand()*noise_dist-noise_dist/2])
        if not sim.is_navigable(query_position): continue
        query_angle = get_angle(q.from_float_array(data['rotation'][t])) + np.random.rand()*noise_angle - noise_angle/2.
        query_rotation = q.from_euler_angles([0, query_angle, 0])
        query_obs = sim.get_observations_at(query_position, query_rotation)
        break
    
    # make target RNR-Map
    rgb = torch.from_numpy(query_obs['rgb']).unsqueeze(0).permute(0,3,1,2).to(device)
    depth = torch.from_numpy(query_obs['depth']).unsqueeze(0).permute(0,3,1,2).to(device)
    sorted_indices, seq_unique_list, seq_unique_counts, pose_map = embedder.calculate_mask_func(depth * 10.0, origin, K)

    sample_dict = {'sorted_indices': sorted_indices.unsqueeze(0),
               'seq_unique_list': seq_unique_list.unsqueeze(0),
               'seq_unique_counts': seq_unique_counts.unsqueeze(0),
              'rgb': rgb.unsqueeze(1)/255., 'depth': depth.unsqueeze(1)}
    for k,v in sample_dict.items():
        sample_dict[k] = v.cuda()

    with torch.no_grad():
        latent_target, _ = embedder.embed_obs(sample_dict)
        latent_target = latent_target[:, :, map_size//2 - patch_size//2 : map_size//2 + patch_size//2,
                                            map_size//2 - patch_size//2 : map_size//2 + patch_size//2]

        # Localize
        pred_heatmap, pred_angle = localizer(data['map'], latent_target, rotator)
        seen_area = (data['map'].mean(dim=1) != 0)
        bs, ws, hs = torch.where(seen_area == 0)
        pred_heatmap[bs, :, ws, hs] = -99999
        pred_heatmap[bs, :, :, -1] = -99999
        pred_heatmap[bs, :, -1, :] = -99999
        pred_heatmap_flattened = F.softmax(pred_heatmap.view(1, -1), dim=-1)
        pred = pred_heatmap_flattened.view(map_size+1, map_size+1)

    pred_max = pred_heatmap.view(1, -1).argmax(dim=1).item()
    pred_h, pred_w = pred_max//pred_heatmap.shape[-1], pred_max%pred_heatmap.shape[-1]
    pred_x = (pred_h-(map_size//2))/(map_size//2) * (coordinate_scale/2)
    pred_y = (pred_w-(map_size//2))/(map_size//2) * (coordinate_scale/2)

    pred_Rt = np.eye(4)
    pred_Rt[:3,3] = np.array([pred_x, 0., pred_y])
    pred_Rt[:3,:3] = q.as_rotation_matrix(q.from_euler_angles([0., 2*np.pi/angle_bin * pred_angle.argmax().item(), 0.0]))
    pred_Rt = np.linalg.inv(pred_Rt)
    pred_sim_Rt = np.linalg.inv(np.matmul(pred_Rt, data['orig_Rt']))

    # Get observation from predicted pose
    pred_position_ = pred_sim_Rt[:3,3]
    pred_rotation = q.from_rotation_matrix(pred_sim_Rt[:3,:3])
    pred_obs = sim.get_observations_at(pred_position_, pred_rotation)

    # Calculate Localization error
    Rtt = np.eye(4)
    Rtt[:3,3] = query_position
    Rtt[:3,:3] = q.as_rotation_matrix(query_rotation)
    Rtt = np.linalg.inv(Rtt)@np.linalg.inv(data['orig_Rt'])
    answer_x, _, answer_y = np.linalg.inv(Rtt)[:3,3]
    answer_h = int(answer_x*(map_size/2)/(coordinate_scale/2.)+(map_size/2))
    answer_w = int(answer_y*(map_size/2)/(coordinate_scale/2.)+(map_size/2))

    
    diff = np.linalg.norm(query_position[:2]-pred_position_[:2])
    diffs.append(diff)

    # Visualization
    map_im = last_w_im.copy()
    map_im = add_agent_view_on_w(map_im, Rtt, embedder.coordinate_scale, embedder.w_size, agent_size=4, view_size=15, agent_color=(255,0,0), view_color=(255,0,0))
    map_im = add_agent_view_on_w(map_im, pred_Rt, embedder.coordinate_scale, embedder.w_size, agent_size=4, view_size=15, agent_color=(0,0,255), view_color=(0,0,255))
    map_im = cv2.resize(map_im, dsize=(VIS_RES, VIS_RES))

    pred_im = pred[:-1,:-1].detach().cpu().numpy()
    pred_im = (pred_im - pred_im.min())/(pred_im.max()-pred_im.min())
    pred_im = cv2.resize((pred_im*255).astype(np.uint8), dsize=(VIS_RES, VIS_RES))
    pred_im = cv2.applyColorMap(pred_im, cv2.COLORMAP_VIRIDIS)[:,:,::-1]
    pred_im = cv2.addWeighted(cv2.resize(last_w_im, dsize=(VIS_RES, VIS_RES)), 0.3, pred_im, 0.7, 0.0)

    map_im = add_title(map_im, 'RNR-Map')
    pred_im = add_title(pred_im, 'Loc. Heatmap')

    rgb = cv2.resize(query_obs['rgb'], dsize=(VIS_RES, VIS_RES))
    rgb = add_title(rgb, 'Query Img.')
    pred_rgb = cv2.resize(pred_obs['rgb'], dsize=(VIS_RES, VIS_RES))
    pred_rgb = add_title(pred_rgb, 'Localized')

    view_im = np.concatenate([rgb, pred_rgb,map_im, pred_im],1)
    images.append(view_im)
    cv2.imshow("view", view_im[:,:,::-1])
    key = cv2.waitKey(1)
    if key == ord("q"): break

print(f'average error: {np.mean(diffs):.3f}m' )

average error: 0.481m


In [34]:
imageio.mimwrite("demo/image-based-localization-noise.gif", images, fps=5)
display(HTML('<img src={}>'.format("demo/image-based-localization-noise.gif")))