In [94]:
import numpy as np
from dataclasses import dataclass
from ipycanvas import Canvas, hold_canvas

import ipywidgets as widgets

import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributions as dist

from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader

import torch.nn.functional as F

In [95]:
@dataclass
class Joint:
    d_min : float = 1
    d_max : float = 1
    theta_min : float = -1*np.pi
    theta_max : float = np.pi
    ud_min : float = 0
    ud_max : float = 0
    utheta_min : float = -1*np.pi/30
    utheta_max : float = np.pi/30
        
    def __post_init__(self):
        if self.d_max < self.d_min: self.d_max = self.d_min
        if self.theta_max < self.theta_min: self.theta_max = self.theta_min
        if self.ud_max < self.ud_min: self.ud_max = self.ud_min
        if self.utheta_max < self.utheta_min: self.utheta_max=self.utheta_min
            
    # returns a random action (u_d, u_theta) within actuator constraints
    def rand_action(self):
        return np.random.random(2) * \
            [self.ud_max - self.ud_min, self.utheta_max - self.utheta_min] + \
            [self.ud_min, self.utheta_min]
    
    # returns a random state (d, theta) within joint constraints
    def rand_state(self):
        return np.random.random(2) * \
            [self.d_max - self.d_min, self.theta_max - self.theta_min] + \
            [self.d_min, self.theta_min]
    
    # given state (d, theta) and action (u_d, u_theta),
    # return next state within joint constraints
    def next_state(self, s, a):
        
        d = np.clip(s[0] + a[0], self.d_min, self.d_max)
        theta = np.clip(s[1] + a[1], self.theta_min, self.theta_max)
        
        return np.array([d, theta])

In [96]:
class Robot:
    
    def __init__(self, joints=[]):
        self.joints = []
        for j in joints: self.add_joint(j)
        self.state = np.empty((0,2))
        self.init_rand_state()
    
    def add_joint(self, j):
        self.joints.append(j)
        self.state = np.append(self.state,np.array(
            [[(j.d_max+j.d_min)/2, (j.theta_max+j.theta_min)/2]]), 0)
    
    # returns transformed coord: translate from end of linkage to joint origin,
    # then rotate into reference frame of previous joint (i.e. theta=0)
    def linkage_end_position(self, j_index, state=None):
        if state is None: state = self.state
        point = np.array([0,0,1]).reshape((3,1))
        for (d, theta) in state[j_index::-1]:
            point = mat_transform(theta=theta) @ mat_transform(d=d) @ point
        return point[:2,:]
    
    # returns a list of random actions (T time steps, action changes every period)
    def rand_actions(self, T, period=6):
        n = int(np.ceil(T/period))
        return np.array(
            [[[j.rand_action() for j in self.joints]] * period  for i in range(n)]
        ).reshape(-1, len(self.joints), 2)[:T]
    
    # changes to a random state
    def init_rand_state(self):
        self.state = self.rand_state()
        self.init_trajectory()
        
    def rand_state(self):
        state = np.empty((len(self.joints),2))
        for i,j in enumerate(self.joints):
            state[i] = j.rand_state()
        return state
        
    # given action, return next state
    def next_state(self, action, state=None):
        if state is None: state = self.state
        return np.array([j.next_state(state[i], action[i]) \
                         for (i,j) in enumerate(self.joints)])
    
    def end_effector_state(self, state=None):
        return self.linkage_end_position(len(self.joints)-1, state=state).T
    
    # update robot state and add current state to trajectory
    def update_state(self, action):
        # record action taken
        self.traj_action.append(action)
        
        # update and record joint state 
        self.state = self.next_state(action)
        self.traj_state.append(self.state)
        
        # record end effector state
        self.traj_ee.append(self.end_effector_state())
        
    def init_trajectory(self):
        self.traj_state = []
        self.traj_ee = []
        self.traj_action = []
        self.traj_state.append(self.state)
        self.traj_ee.append(self.end_effector_state())
        
    def simulate(self, actions):
        self.init_trajectory()
        for a in actions: self.update_state(a)
        return self.traj_state


def mat_transform(d=0, theta=0):
    return np.array([
        [np.cos(theta), -1*np.sin(theta), d],
        [np.sin(theta), np.cos(theta), 0],
        [0, 0, 1]
    ])

In [97]:
def init_draw():
    canvas = Canvas(width=320, height=320)
    canvas.translate(canvas.width/2, canvas.height/2)
    return canvas

def draw_robot(canvas, robot, state=None):

    with hold_canvas(canvas):
        canvas.clear_rect(-canvas.width/2, -canvas.height/2, canvas.width, canvas.height)
        
        # draw axes
        canvas.fill_style = 'gray'
        canvas.line_width = 0.5
        canvas.stroke_line(0, -canvas.height, 0, canvas.height)
        canvas.stroke_line(-canvas.width, 0, canvas.width, 0)
    
        # draw robot
        scale_factor = (canvas.width - 20) / 2
        canvas.fill_style = 'black'
        canvas.line_width = 1
        for (i,j) in enumerate(robot.joints):
            if i == 0: x1, y1 = (0,0)
            else: x1, y1 = robot.linkage_end_position(i-1, state).flatten() * scale_factor
            x2, y2 = robot.linkage_end_position(i, state).flatten() * scale_factor
            canvas.fill_circle(x1, -y1, 4)
            canvas.stroke_line(x1, -y1, x2, -y2)

            if i == len(robot.joints)-1:
                canvas.fill_style = 'red'
                canvas.fill_circle(x2, -y2, 5)
                
def draw_robot_interactive(canvas, robot):
    def draw_robot_at_index(state_index):
        draw_robot(canvas, robot, robot.traj_state[state_index])
    return draw_robot_at_index

In [98]:
robot_3dof = Robot()
for i in range(3):
    robot_3dof.add_joint(Joint(d_min=1/3, d_max=1/3))

canvas_3dof = init_draw()

# create a random trajectory
robot_3dof.init_rand_state()
robot_3dof.simulate(robot_3dof.rand_actions(100));

#examine trajectory
draw_robot_3dof = draw_robot_interactive(canvas_3dof, robot_3dof);
widgets.interact(draw_robot_3dof, state_index=widgets.IntSlider(min=0, max=100, step=1, value=0));
canvas_3dof

interactive(children=(IntSlider(value=0, description='state_index'), Output()), _dom_classes=('widget-interact…

Canvas(height=320, width=320)

In [99]:
robot_4dof = Robot()
for i in range(4):
    robot_4dof.add_joint(Joint(d_min=1/4, d_max=1/4))

canvas_4dof = init_draw()

# create a random trajectory
robot_4dof.init_rand_state()
robot_4dof.simulate(robot_4dof.rand_actions(100));

#examine trajectory
draw_robot_4dof = draw_robot_interactive(canvas_4dof, robot_4dof);
widgets.interact(draw_robot_4dof, state_index=widgets.IntSlider(min=0, max=100, step=1, value=0));
canvas_4dof

interactive(children=(IntSlider(value=0, description='state_index'), Output()), _dom_classes=('widget-interact…

Canvas(height=320, width=320)

Generate Dataset

In [100]:
# default is 1000 random trajectories w/ 100 time steps each
def generate_data_numpy(robot, n_traj=1000, T=100, print_update=False):
    ro_state = np.empty((0, *robot.state.shape))
    ro_action = np.empty((0, *robot.rand_actions(1)[0].shape))
    ee_state = np.empty((0, *robot.end_effector_state().shape))
    ee_next_state = np.empty_like(ee_state)
    
    for i in range(n_traj):
        robot.init_rand_state()
        robot.simulate(robot.rand_actions(T))
        
        ro_state = np.append(ro_state, robot.traj_state[:-1], 0)
        ro_action = np.append(ro_action, robot.traj_action, 0)
        ee_state = np.append(ee_state, robot.traj_ee[:-1], 0)
        ee_next_state = np.append(ee_next_state, robot.traj_ee[1:], 0)        
        
        if print_update:
            if not (i+1) % 100: print(f'completed {i+1} trajectories')
    
        ee_delta = (ee_next_state - ee_state)
        
        ee_delta_direction = np.arctan2(ee_delta[:,:,1], ee_delta[:,:,0])
        ee_delta_dir_label = np.vectorize(bin_direction)(ee_delta_direction)
        ee_delta_magnitude = np.linalg.norm(ee_delta, axis=2)
        
    return (ro_state, ro_action, ee_state, ee_next_state, 
            ee_delta_direction, ee_delta_dir_label, ee_delta_magnitude)
    

def bin_direction(angle):
    if angle > -1*np.pi/8 and angle <= np.pi/8: return 0
    elif angle > 1*np.pi/8 and angle <= 3*np.pi/8: return 1
    elif angle > 3*np.pi/8 and angle <= 5*np.pi/8: return 2
    elif angle > 5*np.pi/8 and angle <= 7*np.pi/8: return 3
    elif angle > 7*np.pi/8 or angle <= -7*np.pi/8: return 4
    elif angle > -7*np.pi/8 and angle <= -5*np.pi/8: return 5
    elif angle > -5*np.pi/8 and angle <= -3*np.pi/8: return 6
    elif angle > -3*np.pi/8 or angle <= -1*np.pi/8: return 7
    else: return -1
    
def generate_dataset(ro_state, ro_action, ee_state, ee_next_state,
                     ee_delta_direction, ee_delta_dir_label, ee_delta_magnitude,
                     rev_only=False):
    
    # if we only want to include revolute joints, get rid of linkage length info
    if rev_only:
        ro_state = ro_state[:,:,[1]]
        ro_action = ro_action[:,:,[1]]
    
    n = ro_state.shape[0]
    
    return TensorDataset(
        torch.from_numpy(ro_state).float().reshape(n, -1),
        torch.from_numpy(ro_action).float().reshape(n, -1),
        torch.from_numpy(ee_state).float().reshape(n, -1),
        torch.from_numpy(ee_next_state).float().reshape(n, -1),
        F.one_hot(torch.from_numpy(ee_delta_dir_label)).reshape(n, -1),
        torch.from_numpy(ee_delta_magnitude).float().reshape(n, -1),
    )

In [103]:
data_3dof_train = generate_data_numpy(robot_3dof, 1000, print_update=True)
data_3dof_eval = generate_data_numpy(robot_3dof, 100)

data_4dof_train = generate_data_numpy(robot_4dof, 1000, print_update=True)
data_4dof_eval = generate_data_numpy(robot_4dof, 100)

completed 100 trajectories
completed 200 trajectories
completed 300 trajectories
completed 400 trajectories
completed 500 trajectories
completed 600 trajectories
completed 700 trajectories
completed 800 trajectories
completed 900 trajectories
completed 1000 trajectories
completed 100 trajectories
completed 200 trajectories
completed 300 trajectories
completed 400 trajectories
completed 500 trajectories
completed 600 trajectories
completed 700 trajectories
completed 800 trajectories
completed 900 trajectories
completed 1000 trajectories


In [106]:
dataset_3dof_train = generate_dataset(*data_3dof_train, rev_only=True)
dataset_3dof_eval = generate_dataset(*data_3dof_eval, rev_only=True)

dataset_4dof_train = generate_dataset(*data_4dof_train, rev_only=True)
dataset_4dof_eval = generate_dataset(*data_4dof_eval, rev_only=True)

In [121]:
# given robot state and action, yields a Gaussian distribution for the
# latent variable z
class ActionEncoder(nn.Module):
    
    def __init__(self, xr_dim, a_dim, z_dim):
        super().__init__()
        
        self.encoder_hidden = nn.Sequential(
            nn.Linear(in_features=xr_dim+a_dim, out_features=128), nn.ReLU(),
            nn.Linear(in_features=128, out_features=128), nn.ReLU(),
            nn.Linear(in_features=128, out_features=128)
        )
        
        self.encoder_mean = nn.Sequential(nn.Linear(128, z_dim))
        self.encoder_std = nn.Sequential(nn.Linear(128, z_dim), nn.Softplus())
        
    def forward(self, xr, a):
        
        print(xr)
        print(a)
        hidden = self.encoder_hidden(torch.cat([xr,a], dim=1))
        
        qz_mean = self.encoder_mean(hidden)
        qz_std = self.encoder_std(hidden) + 1e-7 # non-zero standard deviation
        
        dist_qz = dist.Normal(qz_mean, qz_std)
        
        return dist_qz
    
    
class ActionDecoder(nn.Module):
    
    def __init__(self, xr_dim, a_dim, z_dim):
        super().__init__()
    
        self.decoder = nn.Sequential(
            nn.Linear(in_features=z_dim+xr_dim, out_features=128), nn.ReLU(),
            nn.Linear(in_features=128, out_features=128), nn.ReLU(),
            nn.Linear(in_features=128, out_features=128), nn.ReLU(),
            nn.Linear(in_features=128, out_features=a_dim)
        )
        
    def forward(self, xr, a):
        return self.decoder(torch.cat([xr, a], dim=1))
    

class TransitionModel(nn.Module):
    
    def __init__(self, xe_dim, z_dim):
        super().__init__()
        
        self.transition = nn.Sequential(
            nn.Linear(in_features=xe_dim+z_dim, out_features=128), nn.ReLU(),
            nn.Linear(in_features=128, out_features=128), nn.ReLU(),
            nn.Linear(in_features=128, out_features=128), nn.ReLU(),
            nn.Linear(in_features=128, out_features=xe_dim)
        )
        
    def forward(self, xe, z):
        return self.transition(torch.cat([xe, z], dim=1))

In [144]:
class VAE(nn.Module):
    
    def __init__(self, n_domains, xr_dims, a_dims, xe_dim, z_dim):
        super().__init__()
        
        self.n = n_domains
        
        self.encoders = [
            ActionEncoder(xr_dims[i], a_dims[i], z_dim) for i in range(self.n)]
        
        self.decoders = [
            ActionDecoder(xr_dims[i], a_dims[i], z_dim) for i in range(self.n)]
        
        self.transition = TransitionModel(xe_dim, z_dim)
        
    def forward(self, xr, a, xe):
        
        dist_qz = [self.encoders[i](xr[i], a[i]) for i in range(self.n)]
        zq = [dist_qz[i].rsample() for i in range(self.n)]
        
        # Gaussian prior w/ mean=0 and std=1
        dist_pz = dist.Normal(torch.zeros_like(dist_qz[0].loc),
                              torch.ones_like(dist_qz[0].scale))
    
        ye_hat = [self.transition(xe[i], zq[i]) for i in range(self.n)]
        a_hat = [self.decoders[i](xr[i], zq[i]) for i in range(self.n)]

        return ye_hat, a_hat, zq, dist_qz, dist_pz
    
    def loss_function(self, xr, a, xe, ye):
        pass
        #loss_trans = F.mse_loss((ye_hat-xe)*100, (ye-xe)*100)
        
    

In [145]:
model = VAE(2, (3,4), (3,4), 2, 2)

In [146]:
model.forward(
    (dataset_3dof_train[0:10][0], dataset_4dof_train[0:10][0]),
    (dataset_3dof_train[0:10][1], dataset_4dof_train[0:10][1]),
    (dataset_3dof_train[0:10][2], dataset_4dof_train[0:10][2])
)

tensor([[ 2.7817, -0.3036, -2.4149],
        [ 2.7445, -0.3495, -2.5137],
        [ 2.7073, -0.3954, -2.6125],
        [ 2.6701, -0.4413, -2.7113],
        [ 2.6330, -0.4872, -2.8101],
        [ 2.5958, -0.5331, -2.9089],
        [ 2.5586, -0.5790, -3.0077],
        [ 2.6098, -0.5056, -2.9263],
        [ 2.6611, -0.4322, -2.8449],
        [ 2.7123, -0.3588, -2.7635]])
tensor([[-0.0372, -0.0459, -0.0988],
        [-0.0372, -0.0459, -0.0988],
        [-0.0372, -0.0459, -0.0988],
        [-0.0372, -0.0459, -0.0988],
        [-0.0372, -0.0459, -0.0988],
        [-0.0372, -0.0459, -0.0988],
        [ 0.0512,  0.0734,  0.0814],
        [ 0.0512,  0.0734,  0.0814],
        [ 0.0512,  0.0734,  0.0814],
        [ 0.0512,  0.0734,  0.0814]])
tensor([[-0.2762, -2.6643, -0.4705, -2.0971],
        [-0.3313, -2.7092, -0.4193, -2.1778],
        [-0.3863, -2.7542, -0.3681, -2.2586],
        [-0.4414, -2.7991, -0.3170, -2.3394],
        [-0.4965, -2.8441, -0.2658, -2.4202],
        [-0.5516, -2.8890, -

([tensor([[-0.0239,  0.0364],
          [-0.0162,  0.0026],
          [-0.0183,  0.0109],
          [-0.0216,  0.0238],
          [-0.0221,  0.0336],
          [-0.0132, -0.0025],
          [-0.0185,  0.0077],
          [-0.0192,  0.0309],
          [-0.0297,  0.0284],
          [-0.0295,  0.0274]], grad_fn=<AddmmBackward>),
  tensor([[-0.0331,  0.0091],
          [-0.0251,  0.0417],
          [-0.0317,  0.0243],
          [-0.0153,  0.0134],
          [-0.0356,  0.0193],
          [-0.0248,  0.0299],
          [-0.0526,  0.0403],
          [-0.0172,  0.0237],
          [-0.0319,  0.0262],
          [-0.0183,  0.0132]], grad_fn=<AddmmBackward>)],
 [tensor([[-0.0732, -0.0876,  0.0801],
          [-0.0845, -0.1138,  0.0090],
          [-0.0794, -0.1497,  0.0376],
          [-0.0689, -0.0947,  0.0494],
          [-0.0850, -0.0863,  0.0817],
          [-0.0791, -0.1094,  0.0077],
          [-0.0662, -0.1174,  0.0324],
          [-0.0715, -0.1159,  0.0698],
          [-0.0887, -0.0698,  0.0