Code to test the precision of the end point with dmp and cvae

In [1]:
import os, time, tqdm, sys
import torch.nn as nn
import numpy as np
import torch

# add parent dictionary to path
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(parent_dir)

from models.vae import TrajGen, DM
from models.dmp import CanonicalSystem, SingleDMP
from utils.data_loader import TorqueLoader as Torque_dataset
from utils.early_stop import EarlyStop
from matplotlib import pyplot as plt
from collections import OrderedDict
print(os.getcwd())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

/home/binzhao/Documents/code/VAE_DMP_mani/number_write_task


In [2]:
# load the data set and the class to normalize the data
# denormalize the torque
run_time = 1
dt = 0.01
shape = (2, 100)
# create dataset for training and testing
cs = CanonicalSystem(dt=0.01, ax=1)
dmp = SingleDMP(n_bfs=50, cs=cs, run_time=1.0, dt=0.01)


train_dataset = Torque_dataset(run_time=1, dmp=dmp, dt=0.01, dof=2)
train_dataset.load_data('../data/number_write/train_torque.npz', device=device)
train_dataset.torque = train_dataset.normalize_data(device=device)

#get normalize  paramters of data
max = train_dataset.max.cpu().numpy()
min = train_dataset.min.cpu().numpy()
print(max, min)

[178.56984 199.07835] [-187.85435 -179.23535]


load the pretrain torque based trajectory generator (cVAE based)

In [None]:
# load the pretrained model
save_name = "cVAE_torque.pt"
checkpoint = torch.load(save_name, map_location=device)
dict = checkpoint['net']

In [5]:
# load the decoder part only -- generator
decoder_param = OrderedDict() 
    
key_word_1 = {"decoder."}
key_word_2 = {"label_embedding."}

for layer_name, param in checkpoint["net"].items():
    for key_word in key_word_1:
        if key_word in layer_name:            
            layer_name = layer_name.replace(key_word, "")            
            decoder_param[layer_name] = param

torch.save(decoder_param, "../models/decoder.pt")

# save label encoder
label_encoder_param = OrderedDict()
for layer_name, param in checkpoint["net"].items():
    for key_word in key_word_2:
        if key_word in layer_name:            
            layer_name = layer_name.replace(key_word, "")            
            label_encoder_param[layer_name] = param

torch.save(label_encoder_param, "../models/label_encoder.pt")

create trajecotr generator  --  decoder + dynamics systems

In [None]:
shape = (2, 100)
nclass=10
nhid=8
ncond=8
traj_gen = TrajGen(shape=shape, nclass=nclass, nhid=nhid, ncond=ncond, min=min, max=max, device=device)
# send to device
traj_gen = traj_gen.to(device)
# load model
traj_gen.decoder_o.load_state_dict(torch.load('../models/decoder.pt'))
traj_gen.decoder_n.load_state_dict(torch.load('../models/decoder.pt'))
traj_gen.label_embedding.load_state_dict(torch.load('../models/label_encoder.pt'))

In [None]:
x0 = np.array([0, 1]).T
goal = np.array([1, 0]).T
plt.plot(x0[0], x0[1], 'go', label='start', markersize=10)
plt.plot(goal[0], goal[1], 'ro', label='goal', markersize=10)

goal_all = 0.3*(np.random.rand(50, 2) - 0.5) + goal
plt.scatter(goal_all[:, 0], goal_all[:,1], color='b')

number = 3
traj_gen.eval()
with torch.no_grad():
    traj1 = traj_gen(number, x0=x0, goal=goal)   
    # plot the generated torque
np_traj1 = traj1.detach().cpu().numpy()

plt.plot(np_traj1[0, 0, :].T, np_traj1[0, 1, :].T, linewidth=5)
plt.tick_params(labelsize=30)
plt.axis("equal")
# save the figure
plt.savefig("traj_7.png")
plt.show()


In [None]:
number = 3
traj_gen.eval()
with torch.no_grad():
    error = []
    for goal_t in goal_all:
        traj1 = traj_gen(number, x0=x0, goal=goal_t)   
        error.append(np.linalg.norm(traj1.detach().cpu().numpy()[0, :, -1] - goal_t))
print(np.mean(error))

In [None]:
x0 = np.array([0, 1]).T
goal = np.array([1, 0]).T
number = 2

# plot via point and start and goal
plt.plot(x0[0], x0[1], 'go', label='start')
plt.plot(goal[0], goal[1], 'ro', label='goal')


traj_gen.eval()
with torch.no_grad():
    traj1 = traj_gen(number, x0=x0, goal=goal)  

np_traj1 = traj1.detach().cpu().numpy()
plt.plot(np_traj1[0, 0, :].T, np_traj1[0, 1, :].T, linewidth=2, color='gray')

via_point = np_traj1[0, :, 50] - np.array([0.2,0.1])
plt.plot(via_point[0], via_point[1], 'bo', label='via point')
plt.tick_params(labelsize=20)
plt.axis("equal")
plt.show()
print(via_point)

In [None]:
# show all the trainable parameters
for name, param in traj_gen.named_parameters():
    print(f"Parameter: {name}, Size: {param.size()}, Grad: {param.requires_grad}")

retrain the decoder network to pass the via point 

In [195]:
def traj_loss(point, class_idx, x0, goal):
    """
    point: (batch_size, n_dof) -- the via point must get through
    class_idx: (batch_size, ) -- the class of trajectory
    x0: (n_dof) -- the initial state of the trajectory
    goal: (n_dof) -- the goal of the trajectory
    """
    # unsqueeze the point to (batch_size, n_dof, 1)
    point = point.unsqueeze(-1)
    
    if len(point.shape) == 2:
        point = point.unsqueeze(0)

    o_traj = traj_gen.generator(class_idx, x0, goal, d_type="old")
    n_traj = traj_gen.generator(class_idx, x0, goal, d_type="new")
    
    # via point to the trajectory
    p_loss = torch.min(torch.norm(n_traj - point, p=2, dim=1))
    # trajectory shape loss
    s_loss = torch.mean(torch.norm(o_traj - n_traj, p=2, dim=1))    
    # end position loss
    e_loss = torch.norm(n_traj[:, :, -1] - goal, p=2, dim=1)

    print("s_loss:", s_loss)
    print("e_loss:", e_loss)
    print("p_loss:", p_loss)
    
    return 0.2*e_loss + 0.2*p_loss + 0.6*s_loss

In [None]:
number = 2
x0 = torch.tensor(np.array([0, 1]).T).to(device)
goal= torch.tensor(np.array([1, 0]).T).to(device)

via_point = torch.tensor(via_point).to(device)

optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, traj_gen.parameters()), lr=0.01)
epoch=100

traj_gen.train()   # in training model
for i in range(epoch):
    optimizer.zero_grad()
    loss = traj_loss(point=via_point, class_idx=number, x0=x0, goal=goal)
    loss.backward()
    optimizer.step()

    # print the loss every 100 epoch
    if i % 20 == 0:
        print('epoch: {}, loss: {}'.format(i, loss.item()))

x0 = x0.cpu().numpy()
goal = goal.cpu().numpy()
via_point = via_point.cpu().numpy()

plot the trajectory after retain

In [None]:
number = 2

plt.plot(x0[0], x0[1], 'go', label='start', markersize=10)
plt.plot(goal[0], goal[1], 'ro', label='goal', markersize=10)
plt.plot(via_point[0], via_point[1], 'bo', label='via point')

traj_gen.eval()
with torch.no_grad():
    traj = traj_gen(number, x0=x0, goal=goal)    
    # plot the generated torque
    new_traj = traj.detach().cpu().numpy()
    
plt.plot(new_traj[0, 0, :].T, new_traj[0, 1, :].T, color='purple', linewidth=2)
plt.plot(np_traj1[0, 0, :].T, np_traj1[0, 1, :].T, color='gray', label='generated torque')
plt.tick_params(labelsize=20)
plt.axis("equal")