# Overview

This notebook analyzes the use of levenberg-marquardt for IK refinement

In [None]:
%load_ext autoreload
%autoreload 2

from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [None]:
from time import time 

import torch
import numpy as np
from jrl.utils import set_seed, make_text_green_or_red, evenly_spaced_colors
from jrl.math_utils import geodesic_distance_between_quaternions
import matplotlib.pyplot as plt

from ikflow.model_loading import get_ik_solver

torch.set_printoptions(linewidth=300, precision=6, sci_mode=False)

In [None]:
MODEL_NAME = "panda__full__lp191_5.25m"
POS_ERROR_THRESHOLD = 0.001
ROT_ERROR_THRESHOLD = 0.01

ikflow_solver, _ = get_ik_solver(MODEL_NAME)
robot = ikflow_solver.robot

In [None]:
def check_converged(robot, qs, target_poses, i):
    # check error
    pose_realized = robot.forward_kinematics_batch(qs)
    pos_errors = torch.norm(pose_realized[:, 0:3] - target_poses[:, 0:3], dim=1)
    rot_errors = geodesic_distance_between_quaternions(target_poses[:, 3:], pose_realized[:, 3:])
    #
    converged = pos_errors.max().item() < POS_ERROR_THRESHOLD and rot_errors.max().item() < ROT_ERROR_THRESHOLD
    # print("solve_lma", i, converged, round(pos_errors.max().item(), 6), "\t", round(rot_errors.max().item(), 6), "\t", pos_errors.data)
    # print("solve_lma", i, converged, round(pos_errors.max().item(), 6), "\t", round(rot_errors.max().item(), 6),)
    #
    # converged = pos_errors.mean().item() < POS_ERROR_THRESHOLD and rot_errors.mean().item() < ROT_ERROR_THRESHOLD
    # print("solve_lma", i, converged, round(pos_errors.mean().item(), 6), "\t", round(rot_errors.mean().item(), 6), "\t", pos_errors.data, rot_errors.data)
    return converged, pos_errors, rot_errors


def plot_errors(all_pos_errors, all_rot_errors):
    fig, (axl, axr) = plt.subplots(1, 2, figsize=(18, 8))
    fig.suptitle(f"Levenberg-Marquardt IK Convergence")

    axl.set_title(f"Positional errors")
    axl.grid(alpha=0.2)
    axl.set_xlabel("iteration")
    axl.set_ylabel("error (m)")

    axr.set_title(f"Rotational errors")
    axr.grid(alpha=0.2)
    axr.set_xlabel("iteration")
    axr.set_ylabel("error (rad)")

    n = all_pos_errors[0].shape[0]
    xs = np.arange(n)
    colors = evenly_spaced_colors(int(1.5*len(all_pos_errors)))
    axl.plot([0, xs[-1]], [POS_ERROR_THRESHOLD, POS_ERROR_THRESHOLD], label="threshold", color="green")
    axr.plot([0, xs[-1]], [ROT_ERROR_THRESHOLD, ROT_ERROR_THRESHOLD], label="threshold", color="green")
    for i, (pos_errors, rot_errors) in enumerate(zip(all_pos_errors, all_rot_errors)):
        axl.scatter(xs, pos_errors.cpu().numpy(), label=f"iteration {i}", s=15, color=colors[i])
        axr.scatter(xs, rot_errors.cpu().numpy(), label=f"iteration {i}", s=15, color=colors[i])
    # plt.savefig(f"lma_errors_{i}.pdf", bbox_inches="tight")
    # axl.legend()
    axr.legend()
    plt.show()

def debug_dist_to_jlims(robot, qs):
    """ test with:
    qs_random, _ = ikflow_solver.robot.sample_joint_angles_and_poses(5) 
    debug_dist_to_jlims(robot, torch.tensor(qs_random))
    """
    
    mins = 100*torch.ones(len(qs))
    eps = 1e-5
    for i, (l, u) in enumerate(robot.actuated_joints_limits):
        assert torch.min(qs[:, i]) > l - eps
        assert torch.max(qs[:, i]) < u + eps, f"{torch.max(qs[:, i])} !< {u}"
        mins = torch.minimum(mins, torch.abs(qs[:, i] - l))
        mins = torch.minimum(mins, torch.abs(qs[:, i] - u))
    print("distances to joint limits:", mins)
    return mins

### Difficult config solution 1: Additional ikflow solutions for each target pose


In [None]:
set_seed(0)
# n_solutions = 10
n_solutions = 3
device = "cpu"

# repeat_count = 1  # converged in 27 iterations
# repeat_count = 2  # converged in 10 iterations
# repeat_count = 3  # converged in 10 iterations
# repeat_count = 4  # converged in 10 iterations
repeat_count = 5  # converged in 1 iterations
# repeat_count = 10  # converged in 1 iterations

# --- target pose
_, target_poses_original = ikflow_solver.robot.sample_joint_angles_and_poses(n_solutions, only_non_self_colliding=True, tqdm_enabled=False)
target_poses = torch.tensor(target_poses_original, dtype=torch.float32, device="cuda:0")
target_poses_tiled = target_poses.repeat((repeat_count, 1))

# target_poses: tensor([[ 0.555446,  0.504079,  0.332677,  0.012954,  0.487856,  0.854263,  0.179060],
#         [-0.281688,  0.697731, -0.016917,  0.030757, -0.934636, -0.219964, -0.277716],
#         [ 0.623912,  0.485226,  0.128579,  0.375592,  0.023773,  0.851167,  0.365896]], device='cuda:0')


# --- ik solution seeds
t0_ikf = time()
q0 = ikflow_solver.generate_ik_solutions(target_poses, None)
print(f"Got {len(q0)} ik solutions from IKFlow in {time() - t0_ikf} seconds")
q = q0.clone()


print("\ntarget_poses:", target_poses)
print("q0:          ", q)
print()
for j in range(n_solutions):
    qs_sol_j = q[j + torch.arange(0, q.shape[0], n_solutions, device=device), :]
    debug_dist_to_jlims(robot, qs_sol_j)
print()


def one_q_is_valid(qs_j, target_pose):
    assert qs_j.shape[0] == target_pose.shape[0], f"{qs_j.shape} != {target_pose.shape}"
    assert target_pose.shape[1] == 7
    _, pos_errors, rot_errors = check_converged(robot, qs_j, target_pose, j)
    valids = torch.logical_and(pos_errors < POS_ERROR_THRESHOLD, rot_errors < ROT_ERROR_THRESHOLD)
    print(" ", valids, pos_errors, rot_errors)
    return valids.any()
    

q = q.to(device)
target_poses = target_poses.to(device)
target_poses_tiled = target_poses_tiled.to(device)
converged = False
t0 = time()

for i in range(30):

    # qprev = q.clone()
    # q = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses_tiled, q, clamp_to_joint_limits=False)
    q = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses_tiled, q)
    # print("diff:", q - qprev)
    # assert False

    converged_notes = torch.zeros(n_solutions, dtype=torch.bool, device=device)
    # print("\ni:", i)

    for j in range(n_solutions):
        # print("\n  j:", j)
        qs_sol_j = q[j + torch.arange(0, q.shape[0], n_solutions, device=device), :]
        target_poses_j = target_poses[j].repeat((repeat_count, 1))  # todo: use .expand(), which is 0 copy
        converged_notes[j] = one_q_is_valid(qs_sol_j, target_poses_j)
        # print("  qs_sol_j: ", qs_sol_j)

    # print("\nconverged_notes:", converged_notes)
    converged = converged_notes.all().item()
    if converged:
        break

    # converged, pos_errors, rot_errors = check_converged(robot, qs, target_poses, i)
    # print(pos_errors.data, rot_errors.data)
    # all_pos_errors.append(pos_errors)
    # all_rot_errors.append(rot_errors)
    # if converged:
    #     break

print(f"\nperformed {i+1} optimization steps in {time() - t0} seconds")
print()
if not converged:
    # n_invalid = torch.logical_or(pos_errors > POS_ERROR_THRESHOLD, rot_errors > ROT_ERROR_THRESHOLD).sum().item()
    # print(make_text_green_or_red(f"Failed to converge after {i} iterations ({n_invalid}/{len(qs)} invalid)", False))
    print(make_text_green_or_red(f"Failed to converge after {i+1} iterations ({converged_notes.sum().item()}/{converged_notes.numel()} valid)", False))
else:
    print(make_text_green_or_red(f"Converged after {i+1} iterations", True))

# plot_errors(all_pos_errors, all_rot_errors)

### Difficult config solution 2: Add noise to configs solution


In [None]:
set_seed(0)
n_solutions = 10
# n_solutions = 3
device = "cpu"
repeat_count = 1  # performed 20 optimization steps in 0.5879652500152588 seconds
# repeat_count = 100 # performed 20 optimization steps in 1.2729570865631104 seconds

# --- target pose
_, target_poses_original = ikflow_solver.robot.sample_joint_angles_and_poses(n_solutions, only_non_self_colliding=True, tqdm_enabled=False)
target_poses = torch.tensor(target_poses_original, dtype=torch.float32, device="cuda:0")
target_poses_tiled = target_poses.repeat((repeat_count, 1))

# target_poses: tensor([[ 0.555446,  0.504079,  0.332677,  0.012954,  0.487856,  0.854263,  0.179060],
#         [-0.281688,  0.697731, -0.016917,  0.030757, -0.934636, -0.219964, -0.277716],
#         [ 0.623912,  0.485226,  0.128579,  0.375592,  0.023773,  0.851167,  0.365896]], device='cuda:0')


# --- ik solution seeds
t0_ikf = time()
qs0_original = ikflow_solver.generate_ik_solutions(target_poses, None)
print(f"Got {n_solutions} ik solutions from IKFlow in {time() - t0_ikf} seconds")
q0 = qs0_original.repeat((repeat_count, 1))
# noise = torch.randn_like(q0[n_solutions:, :]) * 0.025
# noise = torch.randn_like(q0[n_solutions:, :]) * 0.05
noise = torch.randn_like(q0[n_solutions:, :]) * 0.1
q0[n_solutions:, :] += noise
q0 = robot.clamp_to_joint_limits(q0)
q = q0.clone()


print("\ntarget_poses:", target_poses)
print("q0:          ", q)
print("noise:       ", noise)
print()
for j in range(n_solutions):
    qs_sol_j = q[j + torch.arange(0, q.shape[0], n_solutions, device=device), :]
    debug_dist_to_jlims(robot, qs_sol_j)
print()


def one_q_is_valid(qs_j, target_pose):
    assert qs_j.shape[0] == target_pose.shape[0], f"{qs_j.shape} != {target_pose.shape}"
    assert target_pose.shape[1] == 7
    _, pos_errors, rot_errors = check_converged(robot, qs_j, target_pose, j)
    valids = torch.logical_and(pos_errors < POS_ERROR_THRESHOLD, rot_errors < ROT_ERROR_THRESHOLD)
    # print(" ", valids, pos_errors, rot_errors)
    return valids.any()
    

q = q.to(device)
target_poses = target_poses.to(device)
target_poses_tiled = target_poses_tiled.to(device)
converged = False
t0 = time()

for i in range(20):

    qprev = q.clone()
    # q = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses_tiled, q, clamp_to_joint_limits=False)
    q = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses_tiled, q, clamp_to_joint_limits=True)
    # print("diff:", q - qprev)
    # assert False

    converged_notes = torch.zeros(n_solutions, dtype=torch.bool, device=device)
    # print("\ni:", i)

    for j in range(n_solutions):
        # print("\n  j:", j)
        qs_sol_j = q[j + torch.arange(0, q.shape[0], n_solutions, device=device), :]
        target_poses_j = target_poses[j].repeat((repeat_count, 1))  # todo: use .expand(), which is 0 copy
        converged_notes[j] = one_q_is_valid(qs_sol_j, target_poses_j)
        # print("  qs_sol_j: ", qs_sol_j)

    # print("\nconverged_notes:", converged_notes)
    converged = converged_notes.all().item()
    if converged:
        break

    # converged, pos_errors, rot_errors = check_converged(robot, qs, target_poses, i)
    # print(pos_errors.data, rot_errors.data)
    # all_pos_errors.append(pos_errors)
    # all_rot_errors.append(rot_errors)
    # if converged:
    #     break

print(f"\nperformed {i+1} optimization steps in {time() - t0} seconds")
print()
if not converged:
    # n_invalid = torch.logical_or(pos_errors > POS_ERROR_THRESHOLD, rot_errors > ROT_ERROR_THRESHOLD).sum().item()
    # print(make_text_green_or_red(f"Failed to converge after {i} iterations ({n_invalid}/{len(qs)} invalid)", False))
    print(make_text_green_or_red(f"Failed to converge after {i} iterations ({converged_notes.sum().item()}/{converged_notes.numel()} valid)", False))
else:
    print(make_text_green_or_red(f"Converged after {i} iterations", True))

# plot_errors(all_pos_errors, all_rot_errors)

## Default optimization

In [None]:
set_seed(0)
# n_solutions = 40
n_solutions = 200
_, target_poses = ikflow_solver.robot.sample_joint_angles_and_poses(n_solutions, only_non_self_colliding=True, tqdm_enabled=False)
target_poses = torch.tensor(target_poses, dtype=torch.float32, device="cuda:0")

qs = ikflow_solver.generate_ik_solutions(target_poses, None)
# set_seed(10) # n_solutions = 40
# qs = ikflow_solver.generate_ik_solutions(target_poses, None)

converged = False
i = 0
all_pos_errors = []
all_rot_errors = []

_, pos_errors, rot_errors = check_converged(robot, qs, target_poses, "init")
all_pos_errors.append(pos_errors)
all_rot_errors.append(rot_errors)

alphas = torch.ones((n_solutions, 1), dtype=torch.float32, device="cuda:0")

thresh = 0.015
above_thresh_scale = 0.5
alphas[pos_errors > thresh] = above_thresh_scale

while not converged and i < 20:

    qs = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses, qs)
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses, qs, alpha=0.1)           # 53 iterations
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses, qs, alpha=0.25)          # 20 iterations
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses, qs, alpha=0.75)          # 27 iterations
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses, qs, alpha=0.5)         # 10 iterations
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses, qs, alphas=alphas)

    converged, pos_errors, rot_errors = check_converged(robot, qs, target_poses, i)
    all_pos_errors.append(pos_errors)
    all_rot_errors.append(rot_errors)

    # 
    alphas[pos_errors > thresh] = above_thresh_scale
    alphas[pos_errors < thresh] = 1.0

    # print(i, converged)
    i += 1

print()
if not converged:
    n_invalid = torch.logical_or(pos_errors > POS_ERROR_THRESHOLD, rot_errors > ROT_ERROR_THRESHOLD).sum().item()
    print(make_text_green_or_red(f"Failed to converge after {i} iterations ({n_invalid}/{len(qs)} invalid)", False))
else:
    print(make_text_green_or_red(f"Converged after {i} iterations", True))

plot_errors(all_pos_errors, all_rot_errors)

# Negative mining for difficult configs

In [None]:
n_solutions = 5000

set_seed(0)
_, target_poses = ikflow_solver.robot.sample_joint_angles_and_poses(n_solutions, only_non_self_colliding=True, tqdm_enabled=False)
target_poses = torch.tensor(target_poses, dtype=torch.float32, device="cuda:0")
q0s = ikflow_solver.generate_ik_solutions(target_poses, None)
qs = q0s.clone()
for _ in range(8):
    qs = robot.inverse_kinematics_single_step_levenburg_marquardt(target_poses, qs)

# select non converged solutions
converged, pos_errors, rot_errors = check_converged(robot, qs, target_poses, i)
difficult_idxs = torch.logical_and(pos_errors > POS_ERROR_THRESHOLD, rot_errors > ROT_ERROR_THRESHOLD)
difficult_poses = target_poses[difficult_idxs]
difficult_qs = q0s[difficult_idxs]

print(difficult_poses.shape)

### Difficult config solution 1: lower alpha when far from solution

In [None]:
all_pos_errors = []
all_rot_errors = []

qs = difficult_qs.clone()
_, pos_errors, rot_errors = check_converged(robot, qs, difficult_poses, "init")
all_pos_errors.append(pos_errors)
all_rot_errors.append(rot_errors)


alphas = torch.ones((len(qs), 1), dtype=torch.float32, device="cuda:0")
thresh = 0.015
# above_thresh_scale = 0.5
above_thresh_scale = 0.25
alphas[pos_errors > thresh] = above_thresh_scale


i = 0
converged = False
while not converged and i < 20:

    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(difficult_poses, qs)
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(difficult_poses, qs, alpha=0.1)           # 53 iterations
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(difficult_poses, qs, alpha=0.25)          # 20 iterations
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(difficult_poses, qs, alpha=0.75)          # 27 iterations
    # qs = robot.inverse_kinematics_single_step_levenburg_marquardt(difficult_poses, qs, alpha=0.5)         # 10 iterations
    qs = robot.inverse_kinematics_single_step_levenburg_marquardt(difficult_poses, qs, alphas=alphas)

    converged, pos_errors, rot_errors = check_converged(robot, qs, difficult_poses, i)
    all_pos_errors.append(pos_errors)
    all_rot_errors.append(rot_errors)

    print(pos_errors)

    # 
    alphas[pos_errors > thresh] = above_thresh_scale
    alphas[pos_errors < thresh] = 1.0

    # print(i, converged)
    i += 1

print()
if not converged:
    n_invalid = torch.logical_or(pos_errors > POS_ERROR_THRESHOLD, rot_errors > ROT_ERROR_THRESHOLD).sum().item()
    print(make_text_green_or_red(f"Failed to converge after {i} iterations ({n_invalid}/{len(qs)} invalid)", False))
else:
    print(make_text_green_or_red(f"Converged after {i} iterations", True))

plot_errors(all_pos_errors, all_rot_errors)