# Test via point, and end point for the reaching task in the pybullet working space

In [1]:
import os, time, tqdm, sys
import torch.nn as nn
import numpy as np
import torch

# add parent dictionary to path
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(parent_dir)

from models.vae import TrajGen, DM
from models.dmp import CanonicalSystem, SingleDMP
from utils.data_loader import TorqueLoader as Torque_dataset
from utils.early_stop import EarlyStop
from matplotlib import pyplot as plt
from collections import OrderedDict
print(os.getcwd())

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

/media/binzhao/binzhao_u/VAE_DMP_mani/manipulation_task
cuda


create cVAE-dmp model

In [2]:
# create dmp model 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
run_time = 1
dt = 0.01
# define the shape of each damonstration
shape = (5, 100)
# create dataset for training and testing
cs = CanonicalSystem(dt=0.01, ax=1)
dmp = SingleDMP(n_bfs=50, cs=cs, run_time=1.0, dt=0.01)
print("device: ", device) 

device:  cuda


In [3]:
# get max, min from dataset to de-normalize the data
train_dataset = Torque_dataset(run_time=1, dmp=dmp, dt=0.01, dof=2)
train_dataset.load_data('../data/manipulation_data/train_torque.npz', device=device)
train_dataset.torque = train_dataset.normalize_data(device=device)

#get normalize  paramters of data
max = train_dataset.max.cpu().numpy()
min = train_dataset.min.cpu().numpy()
print(max, min)

[ 9.639522 40.198143 65.1502   54.927094 89.87197 ] [-35.03298   -13.038735   -6.0426774  -4.746726  -25.30069  ]


In [4]:
# load pretrained model
save_name = "./models/cVAE_torque_manipulation.pt"
checkpoint = torch.load(save_name, map_location=device)
dict = checkpoint['net']

In [5]:
# load the decoder part only -- generator, label_embedding
decoder_param = OrderedDict() 
    
key_word_1 = {"decoder."}
key_word_2 = {"label_embedding."}

for layer_name, param in checkpoint["net"].items():
    for key_word in key_word_1:
        if key_word in layer_name:            
            layer_name = layer_name.replace(key_word, "")            
            decoder_param[layer_name] = param

torch.save(decoder_param, "./models/decoder.pt")

# save label encoder
label_encoder_param = OrderedDict()
for layer_name, param in checkpoint["net"].items():
    for key_word in key_word_2:
        if key_word in layer_name:            
            layer_name = layer_name.replace(key_word, "")            
            label_encoder_param[layer_name] = param

torch.save(label_encoder_param, "./models/label_encoder.pt")

In [6]:
# create trajectory generator -- decoder + dynamics system
shape = (5, 100)
nclass = 4
nhid = 8
ncond = 8

# create trajectory generator
traj_gen = TrajGen(shape=shape, nclass=nclass, nhid=nhid, ncond=ncond, min=min, max=max, device=device)
# send to device
traj_gen.decoder_o.load_state_dict(torch.load('./models/decoder.pt'))
traj_gen.decoder_n.load_state_dict(torch.load('./models/decoder.pt'))
traj_gen.label_embedding.load_state_dict(torch.load('./models/label_encoder.pt'))

<All keys matched successfully>

Create pybullet based environment and load demonstration data

In [7]:
robot_model_path = "../manipulation_env/models/urdf/ur10_robotiq_85.urdf"
table_path = "../manipulation_env/models/urdf/objects/table/table.urdf"
block_path = "../manipulation_env/models/YcbPottedMeatCan/model.urdf"

demon_path = "../data/manipulation_data"

# create dictionary to store the data
demons_data = {}
for file in os.listdir(demon_path):

    if file.endswith(".npy"):
        data = np.load(os.path.join(demon_path, file), allow_pickle=True)
        task_id = int(file.split("_")[0])
        demons_data["task_id_" + str(task_id)] = data
print("Number of demonstrations: ", len(demons_data))  

# show the structure of the data
for key, value in demons_data.items():
    print(key, value.shape)

Number of demonstrations:  3
task_id_1 (19, 5)
task_id_2 (33, 5)
task_id_3 (27, 5)


In [8]:
# create pybullet environment
from manipulation_env.env import ManiEnv
from manipulation_env.robot import UR10Robotiq85
robot = UR10Robotiq85(model_path=robot_model_path)
env = ManiEnv(robot, block_path=block_path, table_path=table_path, vis=True)
env.reset()

pybullet build time: Jun 24 2024 11:27:31
0it [00:00, ?it/s]

startThreads creating 1 threads.
starting thread 0
[jointInfo(id=0, name='world_arm_joint', type=4, damping=0.0, friction=0.0, lowerLimit=0.0, upperLimit=-1.0, maxForce=0.0, maxVelocity=0.0, controllable=False), jointInfo(id=1, name='robot_shoulder_pan_joint', type=0, damping=0.0, friction=0.0, lowerLimit=-6.28318530718, upperLimit=6.28318530718, maxForce=330.0, maxVelocity=2.16, controllable=True), jointInfo(id=2, name='robot_shoulder_lift_joint', type=0, damping=0.0, friction=0.0, lowerLimit=-6.28318530718, upperLimit=6.28318530718, maxForce=330.0, maxVelocity=2.16, controllable=True), jointInfo(id=3, name='robot_elbow_joint', type=0, damping=0.0, friction=0.0, lowerLimit=-3.14159265359, upperLimit=3.14159265359, maxForce=150.0, maxVelocity=3.15, controllable=True), jointInfo(id=4, name='robot_wrist_1_joint', type=0, damping=0.0, friction=0.0, lowerLimit=-6.28318530718, upperLimit=6.28318530718, maxForce=54.0, maxVelocity=3.2, controllable=True), jointInfo(id=5, name='robot_wrist_2_j

1it [00:03,  3.17s/it]

processing baseLink


Task 1: reaching task, test basic cVAE-dmp model, and end point fine-tuning
the robot manipulation space:
- x: [0.3, 0.8]
- y: [-0.45, 0.45]

In [9]:
# get the initial position of end-effector, and block in the demonstration
gripper_offset = 0.17
block_position = np.array([0, 0, 0.04])
reach_demo = demons_data["task_id_1"]
robot_inial_pose = reach_demo[0]
robot_end_pose = reach_demo[-1]

In [10]:
# --------------------- random block position --------------------------
# reset the environment
# store all the reaching trajectory
import time 
tractories = []

env.clean_traj_plot()
env.step(robot_inial_pose)

for num_traj in range(30):   
    block_position = np.array([0, 0, 0.04])
    random_x = np.random.rand() * 0.3 + 0.5
    random_y = (np.random.rand() - 0.5) * 0.8
    block_position[0] = random_x
    block_position[1] = random_y

    # set the final position of the robot
    env.reset_block(pos = block_position, euler_angle=[0, 0, 0])

    block_pos_sim = env.get_block_position()
    reach_point = block_pos_sim + np.array([0, 0, 0.02 + gripper_offset])
    robot_end_pose[:3] = reach_point

    # generate trajectory from cVAE-dmp    
    traj_gen.eval()
    torch_robot_inial_pose = torch.tensor(robot_inial_pose, dtype=torch.float32).to(device)
    torch_robot_end_pose = torch.tensor(robot_end_pose, dtype=torch.float32).to(device)
    
    with torch.no_grad():
        traj = traj_gen(class_idx=1, x0=torch_robot_inial_pose, goal=torch_robot_end_pose)     
 
    traj = traj.cpu().numpy()[0]

    # # plot the trajectory
    env.traj_plot(traj[:3,:].T)

    # check if touch the block
    success = False
    env.step(robot_inial_pose)
    for i in range(traj.shape[1]):
        env.step(traj[:, i])
        if env.check_touching():
            success = True
            break
    
    # store the trajectory
    tractories.append({"traj":traj, "success":success, "block_pose":block_pos_sim})  
    print("Trajectory: ", num_traj, "Success: ", success)     


  self.x = torch.tensor(x0).to(self.device)
  goal = torch.tensor(goal).to(self.device)


In [None]:
# save the data
np.savez("reaching_trajecotries.npz", tractories=tractories)

In [None]:
# load the data 
reaching_results = np.load("./save_trajectory/reaching_trajecotries.npz", allow_pickle=True)
# calculate the success rate
success_rate = 0
for traj in reaching_results["tractories"]:
    if traj["success"]:
        success_rate += 1

print("Success rate: ", success_rate/len(reaching_results["tractories"]))