# Tutorial of DiffCo for Trajectory Optimization

## 1. Create Environment for Geometrical Collision Checking

### Provide a customized class that provides the following functions:
* sample random configuration of the robot
* check if a configuration is in collision with any obstacles in the environment
* (optional) plot the robot and all objects in the environment
  
The base class CollisionEnv located at `envs/collision_env.py` provides a reference of what functions the class should provide.

```
class CollisionEnv:
    '''
    A template class for collision environment.
    Use as a reference for implementing your own collision environment.
    '''
    def __init__(self):
        pass

    def is_collision(self, qs):
        return [self._single_collision(q) for q in qs]
    
    def _single_collision(self, q):
        raise NotImplementedError

    def distance(self, qs):
        return [self._single_distance(q) for q in qs]
    
    def _single_distance(self, q):
        raise NotImplementedError
    
    def sample_q(self):
        raise NotImplementedError
    
    def plot(self, qs):
        raise NotImplementedError
```

In this tutorial, we are going to use an environment using roboticstoolbox-python as backend, because it supports collision checking + visualization in notebook. The robot is a Franka Emika Panda robot, and the obstacles are 3 cylinders. 

*Note: the RTB simulator does not always run correctly in Colab or a notebook on a remote server.*

In [1]:
from envs.rtb import PandaThreeCylinderEnv

cenv = PandaThreeCylinderEnv(launch_args={'realtime': True, 'browser': 'notebook'})
# you may also use browser='firefox' or 'google-chrome'

## 2. Generate Dataset for Training DiffCo

In [3]:
import torch
import numpy as np
from diffco.routines import train_test_split

num_samples = 12000
num_train = 10000
try:
    dataset = torch.load('tutorial_dataset.pt')
    train_cfgs = dataset['train_cfgs']
    test_cfgs = dataset['test_cfgs']
    train_labels = dataset['train_labels']
    test_labels = dataset['test_labels']
    cfgs = torch.cat([train_cfgs, test_cfgs])
    labels = torch.cat([train_labels, test_labels])
except FileNotFoundError:
    cfgs = torch.tensor(np.stack([cenv.sample_q() for _ in range(num_samples)]), dtype=torch.float32)
    labels = torch.tensor(cenv.is_collision(cfgs.numpy()), dtype=torch.float32)
    labels = 2 * labels - 1

    train_indices, test_indices = train_test_split(num_samples, num_train)
    train_cfgs, test_cfgs = cfgs[train_indices], cfgs[test_indices]
    train_labels, test_labels = labels[train_indices], labels[test_indices]

    torch.save({
        'train_cfgs': train_cfgs,
        'test_cfgs': test_cfgs,
        'train_labels': train_labels,
        'test_labels': test_labels,
    }, 'tutorial_dataset.pt')

print(f'cfgs.shape = {cfgs.shape}')
print(f'labels.shape = {labels.shape}')
print(f'train_cfgs.shape = {train_cfgs.shape}')
print(f'test_cfgs.shape = {test_cfgs.shape}')
print(f'train_labels.shape = {train_labels.shape}')
print(f'test_labels.shape = {test_labels.shape}')

cfgs.shape = torch.Size([12000, 7])
labels.shape = torch.Size([12000])
train_cfgs.shape = torch.Size([10000, 7])
test_cfgs.shape = torch.Size([2000, 7])
train_labels.shape = torch.Size([10000])
test_labels.shape = torch.Size([2000])


## 3. Train DiffCo

### Provide a PyTorch FK function
that calculates a set of predefined points on your robot given a configuration. The points can be any unique points on the robot that covers all links nicely. A common choice is the origins of link frames. The function `fkine(q) -> tensor(m, d)` maps a configuration to a set of points in the d-dimensional workspace (usually, d=2 or 3)
* This is optional, but it allows us to use the FK function in the kernel, which results in much better accuracy
* This may be automatically generated in the future with a URDF file of the robot using, e.g., [differentiable-robot-model](https://github.com/facebookresearch/differentiable-robot-model) from facebookresearch 

### 3.1 Method I: Step by step

In [4]:
# define the pytorch FK function. 
from diffco import model
drobot = model.PandaFK()
fkine = drobot.fkine # Could be None if you don't need it

# specify kernel function for training
from diffco import kernel
train_kernel_func = kernel.FKKernel(fkine, kernel.RQKernel(gamma=10.))

# define the kernel perceptron
from diffco import DiffCo
checker = DiffCo(None, train_kernel_func, beta=1.0)

# train the kernel perceptron
checker.train(train_cfgs, train_labels, max_iteration=num_train)

# fit the support configurations and labels with polyharmonic kernel
inference_kernel_func = kernel.Polyharmonic(k=1, epsilon=1)
checker.fit_poly(kernel_func=inference_kernel_func, target='label', fkine=fkine)
col_func = checker.poly_score

  0%|          | 17/10000 [00:00<01:01, 163.24it/s]

DiffCo training...


 20%|█▉        | 1959/10000 [00:06<00:26, 305.66it/s]


Ended at iteration 1959
ACC: 1.0
original training done. 6.4852 secs cost


In [5]:
# Test a random configuration
col_func(torch.tensor(cenv.sample_q(), dtype=torch.float32))

tensor([[-5.4086]])

In [6]:
# Choose a safety margin. 
# A good choice is to set it to be proportional to the minimum score.
min_score = torch.min(col_func(train_cfgs)).item()
safety_margin = min_score / 10
print(f'safety_margin = {safety_margin:.4f}')

# test the correctness of the kernel perceptron
# Ideally, TPR should be > 0.9 and ACC should be > 0.9
num_test = num_samples - num_train
test_preds = (col_func(test_cfgs)-safety_margin > 0) * 2 - 1
test_labels = test_labels.reshape(test_preds.shape)
test_acc = torch.sum(test_preds == test_labels).item() / num_test
test_tpr = torch.sum(test_preds[test_labels == 1] == 1).item() / torch.sum(test_labels == 1).item()
test_tnr = torch.sum(test_preds[test_labels == -1] == -1).item() / torch.sum(test_labels == -1).item()
print(f'test_acc = {test_acc:.4f}')
print(f'test_tpr = {test_tpr:.4f}')
print(f'test_tnr = {test_tnr:.4f}')
print(f'num of support points = {len(checker.gains)}')

safety_margin = -0.6154
test_acc = 0.8980
test_tpr = 0.9693
test_tnr = 0.8858
num of support points = 1114


### 3.2 Method II: Using Predefined routines

In [7]:
from diffco import DiffCo
from diffco.routines import train_checker, test_checker, fit_checker, get_estimator

# define the pytorch FK function. 
from diffco import model
drobot = model.PandaFK()
fkine = drobot.fkine # Could be None if you don't need it

checker = train_checker(
    checker_type=DiffCo, 
    train_data=train_cfgs, 
    train_labels=train_labels, train_dists=None, 
    fkine=fkine, obstacles=None, 
    trained_checker_dump='tutorial_checker.pt',
    lmbda=10.)

from diffco import kernel
fit_checker(checker, kernel.Polyharmonic, False, 
    fitting_target='label',
    fitting_epsilon=1.0, 
    fkine=fkine,)
col_func = get_estimator(checker, 'poly_score')

# Choose a safety margin. 
# A good choice is to set it to be proportional to the minimum score.
# Ideally, TPR should be > 0.9 and ACC should be > 0.9
min_score = torch.min(col_func(train_cfgs)).item()
safety_margin = min_score / 10
print(f'safety_margin = {safety_margin:.4f}')

test_checker(checker, col_func, test_cfgs, test_labels, safety_margin=safety_margin)

  0%|          | 21/10000 [00:00<00:48, 205.51it/s]

DiffCo training...


 20%|█▉        | 1959/10000 [00:05<00:24, 332.76it/s]


Ended at iteration 1959
ACC: 1.0
original training done. 5.9582 secs cost
checker saved: results/tutorial_checker.pt.p
safety_margin = -0.6154
Test acc: 0.8980, TPR 0.9693, TNR 0.8858
1114 Support Points


## 4. Define Trajectory Optimization Problem

Now we choose two free configurations from our dataset and attempt to find a path between them with trajectory optimization. It is possible optimization does not yield a successful result because it may get stuck at a local minimum. So we recommend obtaining a valid path with a motion planning algorithm as the initial solution for optimization.

The following block gets a pair of (collision-free) start and end configuration

In [8]:
free_cfgs = cfgs[labels == -1]
two_idx = np.random.choice(len(free_cfgs), 2, replace=False)
start_cfg, target_cfg = free_cfgs[two_idx]

from spatialmath import SE3
Tr = cenv.robot.fkine(cenv.robot.qr)
start_point = np.array([0.5, -0.3, 0.3])
target_point = np.array([0.5, 0.3, 0.5])
Tstart = SE3.Rt(R=Tr.R, t=start_point)
Ttarget = SE3.Rt(R=Tr.R, t=target_point)

while True:
    start_cfg = cenv.robot.ikine_LM(Tstart, mask=np.array([1., 1., 1., 1, 1, 1]), q0=cenv.robot.qr).q
    start_cfg = torch.tensor(start_cfg, dtype=torch.float32)
    target_cfg = cenv.robot.ikine_LM(Ttarget, mask=np.array([1., 1., 1., 1, 1, 1]), q0=cenv.robot.qr).q
    target_cfg = torch.tensor(target_cfg, dtype=torch.float32)
    if not any(cenv.is_collision([start_cfg, target_cfg])):
        break
print(f'start_cfg = {start_cfg}, fk_t = {cenv.robot.fkine(start_cfg).t}')
print(f'target_cfg = {target_cfg}, fk_t = {cenv.robot.fkine(target_cfg).t}')
print(f'collision = {cenv.is_collision([start_cfg, target_cfg])}')


start_cfg = tensor([-0.1864,  0.0947, -0.3750, -2.0538,  0.1104,  2.2240,  0.1634]), fk_t = [ 0.49999999 -0.3         0.29999999]
target_cfg = tensor([ 0.1778,  0.0207,  0.3855, -1.6237, -0.0618,  1.7273,  1.3554]), fk_t = [0.5        0.29999999 0.5       ]
collision = [False, False]


In [9]:
# set up an environmet in your browser for easier visualization
# (otherwise, you will need to scroll up a lot to see the environment)
tmp_cenv = PandaThreeCylinderEnv(launch_args={'realtime': False, 'browser': 'google-chrome'})
tmp_cenv.robot.q = start_cfg.numpy()
tmp_cenv.env.step(0.01)

### 4.1 (Recommended) Obtain a valid path with OMPL

In [10]:
from scripts.motion_planner import MotionPlanner

def my_motion_cost(s1, s2):
    # print(f'motion cost: {s1}, {s2}')
    p_tensor = torch.stack([s1, s2])
    control_points = drobot.fkine(p_tensor)
    diff = (control_points[1:]-control_points[:-1]).pow(2).sum()
    return diff.item()
def valid_check_func(x):
    # print(f'valid check: {x}')
    if not isinstance(x, np.ndarray):
        x = np.array(x)
    return not cenv._single_collision(x)
# valid_check_func = lambda x: not cenv.is_collision(x)
mp = MotionPlanner(drobot, valid_check_func, my_motion_cost)
ompl_rec = mp.plan(start_cfg, target_cfg, {'maxtime': 10.0})
print(f'OMPL success?: {ompl_rec["success"]}')
ompl_path = torch.tensor(ompl_rec['solution'], dtype=torch.double)

planner: RRTConnect
OMPL success?: True


In [12]:
import time
print(f'OMPL path length {len(ompl_path)}')
for cfg in ompl_path:
    tmp_cenv.robot.q = cfg.numpy()
    tmp_cenv.env.step(0.05)
    time.sleep(0.05)

OMPL path length 43


### 4.2 Trajectory Optimization with DiffCo

In [13]:
# Make sure joint limits align in the diffco robot model and the RTB robot model
print(f'diffco robot model joint limits: {drobot.limits}')
print(f'RTB robot joint limits: {cenv.robot.qlim}')

diffco robot model joint limits: tensor([[-2.8973,  2.8973],
        [-1.7628,  1.7628],
        [-2.8973,  2.8973],
        [-3.0718, -0.0698],
        [-2.8973,  2.8973],
        [-0.0175,  3.7525],
        [-2.8973,  2.8973]])
RTB robot joint limits: [[-2.8973 -1.7628 -2.8973 -3.0718 -2.8973 -0.0175 -2.8973]
 [ 2.8973  1.7628  2.8973 -0.0698  2.8973  3.7525  2.8973]]


In [25]:
# Set up some optimization options
from diffco import optim
import importlib
importlib.reload(optim)
from diffco.optim import givengrad_traj_optimize, adam_traj_optimize, trustconstr_traj_optimize

optim_options = {
    'N_WAYPOINTS': 20,
    'NUM_RE_TRIALS': 1,
    'MAXITER': 200, # 200 for adam, 50 for slsqp(givengrad), 30 for trust_constr
    'safety_margin': safety_margin,
    'max_speed': 0.1, # max interval between adjacent configs for collision checks
    'seed': 1234,
    'history': False,
    'extra_optimizer_options': {
        'disp': True, # False to suppress output
        # 'iprint': 10, # make slsqp verbose
        'verbose': 3, # make trust_constr verbose
        'lr': 0.1, # learning rate for adam
    },
}
robot = model.PandaFK()
try:
    if ompl_path is not None:
        init_path = ompl_path.clone()
        def sparsify_path(path, n):
            assert n >= 2
            if len(path) <= n:
                return path
            indices = np.linspace(0, len(path)-1, n, dtype=int)
            return path[indices]
        init_path = sparsify_path(init_path, optim_options['N_WAYPOINTS'])

        optim_options['init_solution'] = init_path
except NameError:
    pass

#### Choose one optimizer from below 
(and maybe adjust options above accordingly)

In [26]:
# Fastest, lower success rate and path quality
optim_rec = adam_traj_optimize(
    robot, col_func, start_cfg, target_cfg, optim_options)

# Medium success rate, better convergence than Adam
# optim_rec = givengrad_traj_optimize(
#     robot, col_func, start_cfg, target_cfg, optim_options)

# Slowest, highest success rate and path quality
# optim_rec = trustconstr_traj_optimize(
#     robot, col_func, start_cfg, target_cfg, optim_options)

# print(f'optim_rec = {optim_rec}')

Adam lr = 0.1


In [27]:
# Compare path cost before and after optimization
qs = optim_options['init_solution']
fk_points = robot.fkine(qs)
init_cost = (fk_points[1:] - fk_points[:-1]).pow(2).sum()
print(f'init cost = {init_cost}')
qs = optim_rec['solution']
qs = torch.tensor(qs)
fk_points = robot.fkine(qs)
cost = (fk_points[1:] - fk_points[:-1]).pow(2).sum()
print(f'cost = {cost}')
assert not torch.allclose(qs, optim_options['init_solution'].to(qs.dtype))

init cost = 0.19630893275691091
cost = 0.18197554349899292


In [28]:
# Verify and visualize the optimized path.
# You may also modify the code to visualize the initial path before optimization.

from diffco.utils import dense_path
qs = dense_path(qs, 0.2)

in_collision = cenv.is_collision(qs)
proxy_in_collision = (col_func(qs) - safety_margin > 0).reshape(-1)
print(f'Any in_collision ?: {any(in_collision)}')
print(f'Any proxy collision ?: {any(proxy_in_collision)}')

print(f'qs.shape = {qs.shape}')
import time
for q in qs:
    tmp_cenv.robot.q = q.numpy()
    tmp_cenv.env.step(0.05)
    time.sleep(0.05)

Any in_collision ?: False
Any proxy collision ?: False
qs.shape = torch.Size([47, 7])


### 4.3 Trajectory Optimization with Geometrical Collision Checking
We then also try optimization with the geometrical collison checker that comes with the environment.

In [198]:
from diffco.optim import gradient_free_traj_optimize
optim_options = {
    'N_WAYPOINTS': 20,
    'NUM_RE_TRIALS': 1,
    'MAXITER': 30,
    'max_speed': 0.1,
    'seed': 1234,
    'history': False
}
try:
    optim_options['init_solution'] = init_path
except NameError:
    print('No init path, starting with straight line')
robot = model.PandaFK()
dist_func = lambda qs: torch.tensor(cenv.distance(qs.numpy()), dtype=torch.float32)
optim_rec = gradient_free_traj_optimize(
    robot, dist_func, start_cfg, target_cfg, optim_options)

  NIT    FC           OBJFUN            GNORM
    1   128     3.126395E-01     4.834584E-01
    2   261     3.096849E+01     4.834574E-01
    3   391     5.999839E-01     3.433784E+00
    4   518     4.523255E+00     2.288069E+00
    5   655     3.025401E+01     2.286993E+00
    6   792     1.751171E+00     2.279933E+00
    7   927     4.044384E+01     2.279933E+00
    8  1064     3.719636E+01     2.279933E+00
    9  1199     3.847449E+00     2.279930E+00
   10  1336     3.226599E+01     2.279843E+00
   11  1473     2.318569E+00     2.279706E+00
   12  1610     3.203977E+01     2.279706E+00
   13  1747     2.607822E+01     2.279706E+00
   14  1878     3.431159E+01     2.283759E+00
   15  2013     2.501381E+01     2.283747E+00
   16  2150     4.910220E+00     2.176751E+00
   17  2280     1.035906E+01     1.784468E+00
   18  2417     1.066398E+00     1.784450E+00
   19  2554     2.826861E+01     1.784254E+00
   20  2683     3.177908E+01     5.858423E+00
   21  2813     2.997067E+01     5

In [208]:
qs = optim_options['init_solution']
fk_points = robot.fkine(qs)
init_cost = (fk_points[1:] - fk_points[:-1]).pow(2).sum()
print(f'init cost = {init_cost}')
qs = optim_rec['solution']
qs = torch.tensor(qs)
fk_points = robot.fkine(qs)
cost = (fk_points[1:] - fk_points[:-1]).pow(2).sum()
print(f'cost = {cost}')
assert not torch.allclose(qs, optim_options['init_solution'].to(qs.dtype))
from diffco.utils import dense_path
qs = dense_path(qs, 0.2)

in_collision = cenv.is_collision(qs)
proxy_in_collision = (col_func(qs) - safety_margin > 0).reshape(-1)
print(f'Any in_collision = {any(in_collision)}')
print(f'Any proxy collision = {any(proxy_in_collision)}')

print(f'qs.shape = {qs.shape}')
import time
for q in qs:
    tmp_cenv.robot.q = q.numpy()
    tmp_cenv.env.step(0.05)
    time.sleep(0.05)

init cost = 0.3183942385406464
cost = 1.688124179840088
Any in_collision = True
Any proxy collision = True
qs.shape = torch.Size([48, 7])
