In [83]:
import os
os.environ['MUJOCO_GL'] = 'egl'

if 'notebooks' not in os.listdir(os.getcwd()):
    os.chdir('../') #changing directories so that output/gsplat_full etc. exists

from utils.mujoco_utils import compute_camera_extrinsic_matrix
from collision.utils import DummyCam, ImageDemoDataset, generate_camera, put_pose_into_mujoco, update_reconstruction_dict, get_normalized_function
from collision.chain_utils import build_chain_relation_map
from collision.network import SingleNetwork, HyperNetwork
from scene.cameras import Camera_Pose
from gaussian_renderer import render
from video_api import initialize_gaussians

import cv2
import torch
from PIL import Image
import numpy as np
import mujoco
import matplotlib.pyplot as plt
import torch.nn.functional as F
from tqdm import tqdm, trange
from mujoco.usd import exporter

from IPython.display import display, clear_output
from torchvision.transforms import transforms

import sys 
import time
from pathlib import Path
from itertools import cycle

In [2]:
# load mujoco
model_xml_dir = Path("collision_scene/universal_robots_ur5e_scene3")
model_xml_path = model_xml_dir / "scene.xml"

model = mujoco.MjModel.from_xml_path(model_xml_path.as_posix())
data = mujoco.MjData(model)

mujoco.mj_resetData(model, data)


def sample_collision_pose():
    pose = np.random.uniform(model.jnt_range[:, 0], model.jnt_range[:, 1])
    put_pose_into_mujoco(model, data, pose)
    return pose

In [18]:
# set camera
dummy_cams = [
    DummyCam(0, -45.0, 2.5),
    DummyCam(0, -60.0, 2.5),
    DummyCam(0, -75.0, 2.5),
    DummyCam(0, -90.0, 2.5),
]
cams = [generate_camera(dummy_cam) for dummy_cam in dummy_cams]

renderer = mujoco.Renderer(model, 480, 480)
renderer.update_scene(data, camera=cams[0])
background_tuple = (255, 255, 255)

mujoco.mj_resetData(model, data)

In [None]:
# while True:
#     init_pose = sample_collision_pose()
#     if data.ncon == 0:
#         break

# reconstruction = update_reconstruction_dict(renderer, data, dummy_cams, cams)
# init_dataset = ImageDemoDataset(reconstruction, background_color=(255, 255, 255))

init_pose = np.array([-np.pi, -np.pi/4, 0,  -np.pi/2, 0, 0])
put_pose_into_mujoco(model, data, init_pose)
print(data.ncon)

renderer.update_scene(data, camera=cams[1])
pixels = renderer.render()
image = Image.fromarray(pixels)
image

In [None]:
# while True:
#     final_pose = sample_collision_pose()
#     if data.ncon == 0:
#         break

final_pose = np.array([-np.pi/6, -np.pi/4,  0, -np.pi/2, 0, 0])

put_pose_into_mujoco(model, data, final_pose)
print(data.ncon)

reconstruction = update_reconstruction_dict(renderer, data, dummy_cams, cams)
finl_dataset = ImageDemoDataset(reconstruction, background_color=(255, 255, 255))

renderer.update_scene(data, camera=cams[0])
pixels = renderer.render()
image = Image.fromarray(pixels)
image

load SDF

In [None]:
output_path = Path("output/universal_robots_ur5e_scene3_collision")

relation_map, chain = build_chain_relation_map(model_xml_path.as_posix())
sdf_model = HyperNetwork(chain.n_joints, relation_map)
state_dict = torch.load(output_path / 'sdf_net.ckpt', weights_only=True)
sdf_model.load_state_dict(state_dict)
for parameters in sdf_model.parameters():
    parameters.requires_grad_(False)
sdf_model.cuda()
del state_dict

# Interpolation

In [342]:
INTERPOLATION = 99

init_pose_fix = init_pose.copy()
final_pose_fix = final_pose.copy()

intp_pose = np.linspace(0, 1, INTERPOLATION)[..., np.newaxis] * final_pose_fix[np.newaxis] + np.linspace(1, 0, INTERPOLATION)[..., np.newaxis] * init_pose_fix[np.newaxis]

naive interpolation resolution

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
collision_list = []
# qpos
for action in intp_pose:
    put_pose_into_mujoco(model, data, action)
    exp.update_scene(data)
    collision_list.append(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.1)
exp.save_scene()
print(collision_list)

use naive interpolation resolution to control

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
for action in intp_pose:
    data.ctrl = action
    mujoco.mj_step(model, data, 100)
    mujoco.mj_collision(model, data)
    print(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.1)
exp.save_scene()

## Ours Resolution with naive interpolation

In [311]:
init_pose_t = torch.tensor(init_pose_fix[None], dtype=torch.float32, device='cuda')
finl_pose_t = torch.tensor(final_pose_fix[None], dtype=torch.float32, device='cuda')
intp_pose_t = torch.tensor(intp_pose, dtype=torch.float32, device='cuda')

target_pose = torch.nn.Parameter(
    torch.cat([intp_pose_t], dim=0)
)
optimizer = torch.optim.Adam([target_pose], lr=0.1)
schedular = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5000, gamma=0.5)

In [None]:
tbar = trange(1000, leave=True)
for i in tbar:
    sdf_loss = torch.relu(sdf_model(target_pose)[0]+0.2).mean()
    tv_loss = (torch.cat([init_pose_t, target_pose], dim=0) - torch.cat([target_pose, finl_pose_t], dim=0)).norm(dim=-1).mean()

    loss = sdf_loss + 10 * tv_loss

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    schedular.step()
    
    tbar.set_postfix({
        "SDF": format(sdf_loss.item(), ".5f"),
        "TV": format(tv_loss.item(), ".5f"),
        "LR": format(optimizer.param_groups[0]['lr'], ".5f"),
        "GRAD": format(target_pose.grad.max().item(), ".5f"),
    })

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
for action in target_pose.detach().cpu().numpy():
    put_pose_into_mujoco(model, data, action)
    print(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.3)
exp.save_scene()

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
for action in target_pose.detach().cpu().numpy():
    data.ctrl = action

    mujoco.mj_step(model, data, 100)
    mujoco.mj_collision(model, data)

    renderer.update_scene(data, camera=cams[1])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.3)
exp.save_scene()

# Use Dr. Robot

load model

In [None]:
sys.argv = ['']
gaussians, background_color, sample_cameras, kinematic_chain = initialize_gaussians(model_path='output/universal_robots_ur5e_experiment')

norm_fun, unnorm_fun = get_normalized_function(*kinematic_chain.get_joint_limits())

In [266]:
bg_color_t = torch.tensor(background_tuple).float().cuda() / 255.0

cam_list = []
mask_list = []
image_list = []
depth_list = []

for dummy_cam, image, segment, depth in finl_dataset:
    camera_extrinsic_matrix = compute_camera_extrinsic_matrix(dummy_cam)
    cam_list.append(
        Camera_Pose(torch.tensor(camera_extrinsic_matrix).clone().detach().float().cuda(), 0.78, 0.78, 480, 480, joint_pose=norm_fun(init_pose_t), zero_init=True).cuda()
    )
    mask_list.append(segment[0].bool().cuda())
    image_list.append(image.cuda())
    depth_list.append(depth[0].cuda())

Display dataset

In [None]:
plt.figure(figsize=(480, 480), dpi=1)
# plt.imshow(finl_dataset.reconstruction['depth'][3])
plt.imshow(finl_dataset.reconstruction['segment'][3])
plt.subplots_adjust(left=0, bottom=0, right=1, top=1, wspace=0, hspace=0)
plt.axis('off')
plt.show()
plt.close()

In [None]:
joint_angles = torch.nn.Parameter(torch.tensor(init_pose_t[0], dtype=torch.float32, device='cuda'))
optimizer = torch.optim.Adam([joint_angles], lr=0.05)

max_iteration = 400

topil = transforms.ToPILImage()

camera_pkg = cycle(zip(cam_list, mask_list, image_list, depth_list))

intp_pose = []

tbar = trange(max_iteration, leave=True)
for iteration in tbar:
    camera, gt_mask, gt_image, gt_depth = next(camera_pkg)
    
    camera.joint_pose = norm_fun(joint_angles)

    output_pkg = render(camera, gaussians, bg_color_t)
    image_tensor = output_pkg['render']
    depth_tensor = output_pkg['depth']

    Ll2_s = F.mse_loss(image_tensor[:, gt_mask], gt_image[:, gt_mask])
    Ldepth_s = F.mse_loss(depth_tensor[gt_mask], gt_depth[gt_mask])

    sdf, s = sdf_model(joint_angles[None])

    pose_loss = torch.nn.functional.l1_loss(joint_angles, finl_pose_t)

    loss = Ll2_s + Ldepth_s
    loss = loss / len(cam_list)

    loss.backward()

    if iteration % len(cam_list) == 0:
        optimizer.step()
        optimizer.zero_grad()
        clear_output(wait=True)
        pil_image = topil(image_tensor)
        display(pil_image)
        if iteration % 16 == 0: pil_image.save(f'output/demonstration/{iteration:04d}.png')
        intp_pose.append(joint_angles.detach().cpu().numpy())
    
    tbar.set_postfix({
        "L2": format(Ll2_s, ".3f"),
        "Ld": format(Ldepth_s, ".3f"),
        "SDF": format(sdf.item(), ".3f"),
        "Angle": format(pose_loss.item(), ".3f"),
    })

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
for action in intp_pose:
    put_pose_into_mujoco(model, data, action)
    print(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.1)
exp.save_scene()

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
for action in intp_pose:
    data.ctrl = action
    mujoco.mj_step(model, data, 100)
    mujoco.mj_collision(model, data)
    
    print(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.1)
exp.save_scene()

ours

In [None]:
joint_angles = torch.nn.Parameter(torch.tensor(init_pose_t[0], dtype=torch.float32, device='cuda'))
optimizer = torch.optim.Adam([joint_angles], lr=0.05)

max_iteration = 400

topil = transforms.ToPILImage()

camera_pkg = cycle(zip(cam_list, mask_list, image_list, depth_list))

intp_pose = []

tbar = trange(max_iteration, leave=True)
for iteration in tbar:
    camera, gt_mask, gt_image, gt_depth = next(camera_pkg)
    
    camera.joint_pose = norm_fun(joint_angles)

    output_pkg = render(camera, gaussians, bg_color_t)
    image_tensor = output_pkg['render']
    depth_tensor = output_pkg['depth']

    Ll2_s = F.mse_loss(image_tensor[:, gt_mask], gt_image[:, gt_mask])
    Ldepth_s = F.mse_loss(depth_tensor[gt_mask], gt_depth[gt_mask])

    sdf, s = sdf_model(joint_angles[None])

    pose_loss = torch.nn.functional.l1_loss(joint_angles, finl_pose_t)

    sdf, s = sdf_model(joint_angles[None])

    # pose_loss = (joint_angles - finl_pose_t).abs().mean()

    loss = Ll2_s + Ldepth_s + 0.05 * torch.relu(sdf)
    loss = loss / len(cam_list) * 100

    loss.backward()

    if iteration % len(cam_list) == 0:

        optimizer.step()
        optimizer.zero_grad()
        clear_output(wait=True)
        pil_image = topil(image_tensor)
        display(pil_image)
        if iteration % 16 == 0: pil_image.save(f'output/demonstration/{iteration:04d}.png')
        intp_pose.append(joint_angles.detach().cpu().numpy())
    
    tbar.set_postfix({
        "L2": format(Ll2_s, ".3f"),
        "Ld": format(Ldepth_s, ".3f"),
        "SDF": format(sdf.item(), ".3f"),
        "Angle": format(pose_loss.item(), ".3f"),
    })

In [324]:
init_pose_t = torch.tensor(init_pose_fix[None], dtype=torch.float32, device='cuda')
finl_pose_t = torch.tensor(final_pose_fix[None], dtype=torch.float32, device='cuda')
intp_pose_t = torch.tensor(intp_pose, dtype=torch.float32, device='cuda')

target_pose = torch.nn.Parameter(
    torch.cat([intp_pose_t], dim=0)
)
optimizer = torch.optim.Adam([target_pose], lr=0.1)
schedular = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5000, gamma=0.5)

In [None]:
tbar = trange(1000, leave=True)
for i in tbar:
    sdf_loss = torch.relu(sdf_model(target_pose)[0]+0.3).mean()
    tv_loss = (torch.cat([init_pose_t, target_pose], dim=0) - torch.cat([target_pose, finl_pose_t], dim=0)).norm(dim=-1).mean()

    loss = sdf_loss + tv_loss

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    schedular.step()
    
    tbar.set_postfix({
        "SDF": format(sdf_loss.item(), ".5f"),
        "TV": format(tv_loss.item(), ".5f"),
        "LR": format(optimizer.param_groups[0]['lr'], ".5f"),
        "GRAD": format(target_pose.grad.max().item(), ".5f"),
    })

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix
collision_list = []
exp = exporter.USDExporter(model)
for action in target_pose.detach().cpu().numpy():
    put_pose_into_mujoco(model, data, action)
    print(data.ncon)
    collision_list.append(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.1)
exp.save_scene()
print(collision_list)

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
for action in intp_pose:
    data.ctrl = action
    mujoco.mj_step(model, data, 100)
    mujoco.mj_collision(model, data)
    
    print(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.1)
exp.save_scene()

# Use tangent

In [None]:
joint_angles = torch.nn.Parameter(torch.tensor(init_pose_t[0], dtype=torch.float32, device='cuda'))
optimizer = torch.optim.Adam([joint_angles], lr=0.01)

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
intp_pose = []
for i in range(50):
    optimizer.zero_grad()
    sdf, s = sdf_model(joint_angles[None])
    sdf.backward()
    sdf_direction = joint_angles.grad.clone() # 梯度方向

    sdf_norm = sdf_direction / sdf_direction.norm()

    optimizer.zero_grad()
    pose_loss = (joint_angles - finl_pose_t).abs().mean()
    pose_loss.backward()
    pose_direction = joint_angles.grad.clone() # 梯度方向

    with torch.no_grad():
        pose_dot_sdf = torch.dot(pose_direction, sdf_norm)

        proj_pose_n = pose_dot_sdf * sdf_norm
        
        # if pose_dot_sdf > 0:
        #     move_n = -proj_pose_n
        # else:
        #     move_n = proj_pose_n

        proj_pose_t = pose_direction - proj_pose_n

        movement = proj_pose_t

        joint_angles.data -= 1.5 * movement

        intp_pose.append(joint_angles.detach().cpu().numpy())
    
    put_pose_into_mujoco(model, data, joint_angles.data.detach().cpu().numpy())
    print(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.01)
exp.save_scene()

In [None]:
mujoco.mj_resetData(model, data)

data.qpos = init_pose_fix

exp = exporter.USDExporter(model)
for action in intp_pose:
    data.ctrl = action
    mujoco.mj_step(model, data, 100)
    mujoco.mj_collision(model, data)
    
    print(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    exp.update_scene(data)
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.1)
exp.save_scene()

In [None]:
tbar = trange(1000, leave=True)
for i in tbar:
    sdf_loss = torch.relu(sdf_model(target_pose)[0]+0.1).mean()
    tv_loss = (torch.cat([init_pose_t, target_pose[:-1]], dim=0) - torch.cat([target_pose[:-1], finl_pose_t], dim=0)).abs().mean()

    loss = sdf_loss + tv_loss

    tbar.set_postfix({
        "SDF": format(sdf_loss.item(), ".5f"),
        "TV": format(tv_loss.item(), ".5f"),
    })

    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

In [None]:
import time
for action in target_pose.detach().cpu().numpy():
    put_pose_into_mujoco(model, data, action)
    print(data.ncon)
    renderer.update_scene(data, camera=cams[0])
    pixels = renderer.render()
    image = Image.fromarray(pixels)
    clear_output(wait=True)
    display(image)
    time.sleep(0.1)

In [None]:
exp = exporter.USDExporter(model=model)
exp.update_scene(data=data)
exp.save_scene(filetype="usd")