In [None]:
import rerun as rr
import numpy as np

In [None]:
print(rr.__version__)

In [None]:
import matplotlib.pyplot as plt

In [None]:
import sys
import os

parent_dir = os.path.abspath(os.path.join(os.path.dirname("__file__"), ".."))
if parent_dir not in sys.path:
    print(parent_dir)
    sys.path.append(parent_dir)

In [None]:
import geom

---

In [None]:
rr.__version__

_NB_: This works only on rerun 0.22.1

In [None]:
rec = rr.dataframe.load_recording('/Users/vertix/Downloads/noclip-sleep01.rrd')

In [None]:
# view = rec.view(index='log_time', contents='/target_position/translation/0:Scalar')
view = rec.view(index='log_time', contents='/target_position/**')

tab = view.select().read_all()
df = tab.to_pandas()
df[:7]

In [None]:
pos = []
for i in range(len(df) // 7):
    t, q = np.zeros(3), np.zeros(4)
    log_time = None
    for j in range(7):
        rec = df.iloc[i * 7 + j]
        log_time = rec['log_time'] if log_time is None else min(log_time, rec['log_time'])
        for k in range(3):
            name = f'/target_position/translation/{k}:Scalar'
            if rec[name] is not None:
                t[k] = rec[name]
        for k in range(4):
            name = f'/target_position/quat/{k}:Scalar'
            if rec[name] is not None:
                q[k] = rec[name]
    pos.append((log_time.value // 10 ** 6, geom.Transform3D(t, geom.Rotation.from_quat(q))))

In [None]:
import pickle
with open('pos_data.pkl', 'wb') as f:
    pickle.dump(pos, f)

In [None]:
# rec = rr.dataframe.load_recording('/Users/vertix/Downloads/bad_ik.rrd')
rec = rr.dataframe.load_recording('/Users/vertix/Downloads/more-step-limits.rrd')
view = rec.view(index='time', contents='/target_position/**')

tab = view.select().read_all()
df = tab.to_pandas()
df[:7]

In [None]:
rec['/target_position/translation:Scalar']

---

In [None]:
import mujoco
import scipy.optimize
from cvxopt import matrix, solvers


_MAX_ANGLE_CHANGE = np.deg2rad(45)
_DAMPING_COEFF = 1e-12

def _wrap_joint_angle(q, q_base):
    return q_base + np.mod(q - q_base + np.pi, 2 * np.pi) - np.pi


class KinematicsSolver:
    """Solves forward and inverse kinematics for the Kinova arm."""

    def __init__(self, ee_offset=0.0):
        self.model = mujoco.MjModel.from_xml_path('/Users/vertix/Documents/positronic/positronic/drivers/roboarm/kinova/gen3.xml')
        self.data = mujoco.MjData(self.model)
        self.model.body_gravcomp[:] = 1.0

        # Cache references
        # self.qpos0 = self.model.key('retract').qpos  # TODO: Is it good for IK null space?
        self.qpos0 = np.zeros(7) # TODO: Is it good for IK null space?
        self.site_id = self.model.site('pinch_site').id
        self.site_pos = self.data.site(self.site_id).xpos
        self.site_mat = self.data.site(self.site_id).xmat

        # Add end effector offset for gripper
        # 0.061525 comes from the Kinova URDF
        self.model.site(self.site_id).pos = np.array([0.0, 0.0, -0.061525 - ee_offset])

        # Preallocate arrays
        self.err = np.empty(6)
        self.err_pos, self.err_rot = self.err[:3], self.err[3:]
        self.site_quat = np.empty(4)
        self.site_quat_inv = np.empty(4)
        self.err_quat = np.empty(4)
        self.jac = np.empty((6, self.model.nv))
        self.jac_pos, self.jac_rot = self.jac[:3], self.jac[3:]
        self.damping = _DAMPING_COEFF * np.eye(6)
        self.eye = np.eye(self.model.nv)

        limits = self.model.jnt_range[:self.model.nq]
        self.joint_limits_lower, self.joint_limits_upper = limits[:, 0], limits[:, 1]
        self.joint_limits_lower = np.where(self.joint_limits_lower == 0, -100 * np.pi, self.joint_limits_lower)
        self.joint_limits_upper = np.where(self.joint_limits_upper == 0, 100 * np.pi, self.joint_limits_upper)

    def forward(self, qpos):
        self.data.qpos = qpos
        mujoco.mj_kinematics(self.model, self.data)
        mujoco.mj_comPos(self.model, self.data)

        pos = self.data.site(self.site_id).xpos.copy()
        mat = self.data.site(self.site_id).xmat.copy()
        quat = np.empty(4)
        mujoco.mju_mat2Quat(quat, mat)
        return geom.Transform3D(pos, geom.Rotation.from_quat(quat))

    def inverse(self, pos: geom.Transform3D, qpos0: np.ndarray, max_iters: int = 20, err_thresh: float = 1e-4, clip_joints: bool = True):
        self.data.qpos = qpos0
        qpos0_err = np.zeros(self.model.nv)
        last_update = np.zeros(self.model.nv)
        last_null_update = np.zeros(self.model.nv)

        iter = 0
        for i in range(max_iters):
            mujoco.mj_kinematics(self.model, self.data)
            mujoco.mj_comPos(self.model, self.data)

            # Translational error
            self.err_pos[:] = pos.translation - self.site_pos

            # Rotational error
            mujoco.mju_mat2Quat(self.site_quat, self.site_mat)
            mujoco.mju_negQuat(self.site_quat_inv, self.site_quat)
            mujoco.mju_mulQuat(self.err_quat, pos.rotation.as_quat, self.site_quat_inv)
            mujoco.mju_quat2Vel(self.err_rot, self.err_quat, 1.0)

            if np.linalg.norm(self.err) < err_thresh:
                break

            mujoco.mj_jacSite(self.model, self.data, self.jac_pos, self.jac_rot, self.site_id)
            update = self.jac.T @ np.linalg.solve(self.jac @ self.jac.T + self.damping, self.err)
            last_update += update.copy()
            qpos0_err = np.mod(self.qpos0 - self.data.qpos + np.pi, 2 * np.pi) - np.pi
            # qpos0_err[:3] = 0.0
            # qpos0_err[4:] = 0.0
            # qpos0_err *= 10
            null_update = (self.eye -
                           (self.jac.T @ np.linalg.pinv(self.jac @ self.jac.T + self.damping)) @ self.jac) @ qpos0_err
            last_null_update += null_update.copy()

            update += null_update

            # Enforce max angle change
            update_max = np.abs(update).max()
            if update_max > _MAX_ANGLE_CHANGE:
                update *= _MAX_ANGLE_CHANGE / update_max

            # Apply update
            mujoco.mj_integratePos(self.model, self.data.qpos, update, 1)
            if clip_joints:
                self.data.qpos[:] = np.clip(self.data.qpos, self.low, self.high)

            iter += 1

        rr.log('ik/iter', rr.Scalars(iter))
        rr.log('ik/err', rr.Scalars(np.linalg.norm(self.err)))
        rr.log('ik/err/pos', rr.Scalars(np.linalg.norm(self.err_pos)))
        rr.log('ik/err/rot', rr.Scalars(np.linalg.norm(self.err_rot)))
        rr.log('ik/qpos0_err', rr.Scalars(qpos0_err))
        rr.log('ik/updates/main', rr.Scalars(last_update))
        rr.log('ik/updates/null', rr.Scalars(last_null_update))

        return self.data.qpos.copy()

    def inverse_limits(self, pos: geom.Transform3D, qpos0: np.ndarray, max_iters: int = 20, err_thresh: float = 1e-4, clamp=False):
        solvers.options['show_progress'] = False
        def clamp_to_pi(x):
            return np.mod(x + np.pi, 2 * np.pi) - np.pi

        self.data.qpos = clamp_to_pi(qpos0) if clamp else qpos0

        iter = 0
        for i in range(max_iters):
            rec = rr.dataframe.load_recording('/Users/vertix/Downloads/more-step-limits.rrd')
            mujoco.mj_kinematics(self.model, self.data)
            mujoco.mj_comPos(self.model, self.data)

            # Translational error
            self.err_pos[:] = pos.translation - self.site_pos

            # Rotational error
            mujoco.mju_mat2Quat(self.site_quat, self.site_mat)
            mujoco.mju_negQuat(self.site_quat_inv, self.site_quat)
            mujoco.mju_mulQuat(self.err_quat, pos.rotation.as_quat, self.site_quat_inv)
            mujoco.mju_quat2Vel(self.err_rot, self.err_quat, 1.0)

            if np.linalg.norm(self.err) < err_thresh:
                break

            # Get Jacobian
            mujoco.mj_jacSite(self.model, self.data, self.jac_pos, self.jac_rot, self.site_id)

            # Setup QP problem
            # min_x (1/2) x^T P x + q^T x
            # s.t. G x <= h
            #      A x = b

            # Objective: min ||J·Δq - e||² + α||Δq - (q₀ - q)||²
            # where α is a small weight for the null space objective
            alpha = 1e-9
            n = self.model.nv

            # P = J^T J + α I
            P = self.jac.T @ self.jac + alpha * np.eye(n)
            P = matrix(P)

            # q = -J^T e - α (q₀ - q)
            qpos0_err = np.mod(self.qpos0 - self.data.qpos + np.pi, 2 * np.pi) - np.pi
            q = -self.jac.T @ self.err - alpha * qpos0_err
            q = matrix(q)

            # Joint limit constraints: lb - q <= Δq <= ub - q
            # Rewrite as:
            # Δq <= ub - q
            # -Δq <= q - lb

            # G = [I; -I]
            G = np.vstack([np.eye(n), -np.eye(n)])
            G = matrix(G)

            # h = [ub - q; q - lb]
            h_upper = self.joint_limits_upper - self.data.qpos
            h_lower = self.data.qpos - self.joint_limits_lower
            h = np.concatenate([h_upper, h_lower])
            h = matrix(h)

            # No equality constraints
            A = matrix(0.0, (0, n))
            b = matrix(0.0, (0, 1))

            solution = solvers.qp(P, q, G, h, A, b)
            update = np.array(solution['x']).flatten()
            # else:
            #     # If QP fails, use a small damped least squares step as fallback
            #     damping = _DAMPING_COEFF * 10 * np.eye(6)
            #     update = self.jac.T @ np.linalg.solve(self.jac @ self.jac.T + damping, self.err)
            #     update_max = np.abs(update).max()
            #     if update_max > _MAX_ANGLE_CHANGE * 0.5:
            #         update *= (_MAX_ANGLE_CHANGE * 0.5) / update_max

            mujoco.mj_integratePos(self.model, self.data.qpos, update, 1.0)
            self.data.qpos = clamp_to_pi(self.data.qpos) if clamp else self.data.qpos

            iter += 1

        rr.log('ik/iter', rr.Scalars(iter))
        rr.log('ik/err', rr.Scalars(np.linalg.norm(self.err)))
        rr.log('ik/err/pos', rr.Scalars(np.linalg.norm(self.err_pos)))
        rr.log('ik/err/rot', rr.Scalars(np.linalg.norm(self.err_rot)))
        return self.data.qpos.copy()


In [None]:
import rerun as rr
rr.__version__

In [None]:
import math
from ruckig import InputParameter, OutputParameter, Result, Ruckig
import pinocchio as pin

K_r = np.diag([0.3, 0.3, 0.3, 0.3, 0.18, 0.18, 0.18])
K_l = np.diag([75.0, 75.0, 75.0, 75.0, 40.0, 40.0, 40.0])
K_lp = np.diag([5.0, 5.0, 5.0, 5.0, 4.0, 4.0, 4.0])
K_p = np.diag([100.0, 100.0, 100.0, 100.0, 50.0, 50.0, 50.0])
K_d = np.diag([3.0, 3.0, 3.0, 3.0, 2.0, 2.0, 2.0])
K_r_inv = np.linalg.inv(K_r)
K_r_K_l = K_r @ K_l
_DT = 0.001

class JointCompliantController:
    """Implements compliant joint control for the Kinova arm."""

    class LowPassFilter:
        """Simple low-pass filter implementation."""

        def __init__(self, alpha, initial_value):
            assert 0 < alpha <= 1, 'Alpha must be between 0 and 1'
            self.alpha = alpha
            self.y = initial_value

        def filter(self, x):
            self.y = self.alpha * x + (1 - self.alpha) * self.y
            return self.y

    def __init__(self, actuator_count, relative_dynamics_factor=0.5):
        self.q_s = None
        self.q_d = None
        self.dq_d = None
        self.q_n = None
        self.dq_n = None
        self.tau_filter = None

        self.actuator_count = actuator_count
        self.otg = None
        self.otg_inp = None
        self.otg_out = None
        self.otg_res = None
        self.relative_dynamics_factor = relative_dynamics_factor

        self.target_qpos = None

        # Initialize pinocchio model and data
        self.model = pin.buildModelFromUrdf('/Users/vertix/Documents/positronic/positronic/drivers/roboarm/kinova/model.urdf')
        self.data = self.model.createData()
        self._q_pin = np.zeros(11)

    def set_target_qpos(self, qpos):
        self.target_qpos = qpos

    @property
    def finished(self):
        return self.otg_res == Result.Finished

    def gravity(self, q):
        q_pin = self._q_pin  # Reuse pre-allocated q_pin
        q_pin[0], q_pin[1], q_pin[2] = math.cos(q[0]), math.sin(q[0]), q[1]
        q_pin[3], q_pin[4], q_pin[5] = math.cos(q[2]), math.sin(q[2]), q[3]
        q_pin[6], q_pin[7], q_pin[8] = math.cos(q[4]), math.sin(q[4]), q[5]
        q_pin[9], q_pin[10] = math.cos(q[6]), math.sin(q[6])
        return pin.computeGeneralizedGravity(self.model, self.data, q_pin)

    def compute_torque(self, q, dq, tau):
        gravity = self.gravity(q)

        # Initialize controller state if needed
        if self.q_s is None:
            self.q_s = q.copy()
            self.q_d = q.copy()
            self.dq_d = np.zeros_like(q)
            self.q_n = q.copy()
            self.dq_n = dq.copy()
            self.tau_filter = JointCompliantController.LowPassFilter(0.01, tau.copy())

            self.otg = Ruckig(self.actuator_count, _DT)
            self.otg_inp = InputParameter(self.actuator_count)
            self.otg_out = OutputParameter(self.actuator_count)
            self.otg_inp.max_velocity = (4 * [math.radians(80 * self.relative_dynamics_factor)] +
                                         3 * [math.radians(140 * self.relative_dynamics_factor)])
            self.otg_inp.max_acceleration = (4 * [math.radians(240 * self.relative_dynamics_factor)] +
                                             3 * [math.radians(450 * self.relative_dynamics_factor)])
            self.otg_inp.current_position = q.copy()
            self.otg_inp.current_velocity = dq.copy()
            self.otg_inp.target_position = q.copy()
            self.otg_inp.target_velocity = np.zeros(self.actuator_count)
            self.otg_res = Result.Finished

        self.q_s = _wrap_joint_angle(q, self.q_s)
        dq_s = dq.copy()  # TODO: It seems that we don't need copy here
        tau_s_f = self.tau_filter.filter(tau)

        if self.target_qpos is not None:
            qpos = _wrap_joint_angle(self.target_qpos, self.q_s)
            self.otg_inp.target_position = qpos
            self.otg_res = Result.Working

            self.target_qpos = None

        if self.otg_res == Result.Working:
            self.otg_res = self.otg.update(self.otg_inp, self.otg_out)
            self.otg_out.pass_to_input(self.otg_inp)
            self.q_d[:] = self.otg_out.new_position
            self.dq_d[:] = self.otg_out.new_velocity

        tau_task = -K_p @ (self.q_n - self.q_d) - K_d @ (self.dq_n - self.dq_d) + gravity

        # Nominal motor plant
        ddq_n = K_r_inv @ (tau_task - tau_s_f)
        self.dq_n += ddq_n * _DT
        self.q_n += self.dq_n * _DT

        tau_f = K_r_K_l @ ((self.dq_n - dq_s) + K_lp @ (self.q_n - self.q_s))  # Nominal friction

        return tau_task + tau_f

---

In [None]:
pos

In [None]:
import pickle
with open('pos_data.pkl', 'rb') as f:
    pos = pickle.load(f)

In [None]:
# q_start = np.array([0.0, -0.34906585, 3.14159265, -2, 0.0, 0, 1.57079633])
# q_start = np.array([-0.7, -0, 0.5, -1.5, 0.0, -0.5, 1.57079633])

torque_constant = np.array([11.0, 11.0, 11.0, 11.0, 7.6, 7.6, 7.6])
current_limit_max = np.array([10.0, 10.0, 10.0, 10.0, 6.0, 6.0, 6.0])
tau_limit = torque_constant * current_limit_max

In [None]:
model = mujoco.MjModel.from_xml_path('/Users/vertix/Documents/positronic/positronic/drivers/roboarm/kinova/gen3.xml')

In [None]:
solver, controller = KinematicsSolver(), JointCompliantController(7)
data = mujoco.MjData(model)

In [None]:
res = solver.inverse_limits(pos[0][1], q_start, max_iters=20)
solver.err, np.linalg.norm(solver.err)

In [None]:
res = solver.inverse_limits(pos[0][1], q_start, max_iters=200, clamp=True)
solver.err, np.linalg.norm(solver.err)

In [None]:
target_pos, pos[2][1]

In [None]:
solver.err, np.linalg.norm(solver.err)

In [None]:
rr.init('notebook_zero')
rr.save('notebook.rrd')
# rr.notebook_show(width=1200, height=600)

renderer = mujoco.Renderer(model, height=480, width=640)
renderer.update_scene(data, camera='viewer')

tau = controller.gravity(q_start)
data.qpos = q_start
data.ctrl[:] = tau
mujoco.mj_forward(model, data)

tau_filter = JointCompliantController.LowPassFilter(0.1, tau)
q, dq = data.qpos, data.qvel

step, next_command = 0, 0
start_time = pos[0][0] - 3 * 10 ** 3

# rr.log('pos/target', rr.Points3D(np.array([p.translation for _, p in pos])), static=True)
rr.log('ik/qpos', rr.SeriesPoints(markers="circle", marker_sizes=1))
rr.log('ik/updates/main', rr.SeriesPoints(markers="cross", marker_sizes=1.0))
rr.log('ik/updates/null', rr.SeriesPoints(markers="cross", marker_sizes=1.0))

while data.time < 60 and next_command < len(pos):
    tau = controller.compute_torque(q, dq, tau)
    np.clip(tau, -tau_limit, tau_limit, out=tau)
    data.ctrl[:] = tau_filter.filter(tau)
    mujoco.mj_step(model, data)
    q, dq, tau = data.qpos, data.qvel, data.ctrl
    rr.set_time_seconds('sim_time', data.time)

    if start_time + data.time * 10 ** 3 > pos[next_command][0]:
        rr.log('pos/target', rr.Points3D(np.array([p.translation for _, p in pos[:next_command + 1]])))
        # q_ik = solver.inverse(pos[next_command][1], q, max_iters=10000)
        q_ik = solver.inverse_limits(pos[next_command][1], q, max_iters=100)
        rr.log('ik/qpos', rr.Scalars(q_ik))
        rr.log('pos/cur_target', rr.Points3D(pos[next_command][1].translation, colors=[255, 255, 255]))
        controller.set_target_qpos(q_ik)
        next_command += 1

    if step % 100 == 0:
        rr.log('state/qpos', rr.Scalars(q))
        rr.log('state/qvel', rr.Scalars(dq))
        rr.log('state/tau', rr.Scalars(tau))
        renderer.update_scene(data, camera='viewer')
        rr.log('render', rr.Image(renderer.render()).compress())
    step += 1

---
Research IK solution for the last "blocking" joint

In [None]:
ik_solutions = []
target = pos[-1][1]

low, high = solver.model.jnt_range[:solver.model.nq, 0], solver.model.jnt_range[:solver.model.nq, 1]
low = np.where(low == 0, -np.pi, low)
high = np.where(high == 0, np.pi, high)

for i in range(1000):
    q = np.random.uniform(low, high)
    q_ik = solver.inverse_limits(target, q)
    err = np.linalg.norm(solver.forward(q_ik).translation - target.translation)
    ik_solutions.append((q_ik, err))

In [None]:
rr.init('ik_solutions')
rr.notebook_show()
scatter = rr.Points2D(
    [[np.abs(solution[3]), err] for solution, err in ik_solutions],
    colors=[255, 255, 255],
)
rr.log('ik_solutions/scatter', scatter, static=True)

for i, (solution, err) in enumerate(sorted(ik_solutions, key=lambda x: np.abs(x[0][3]))):
    rr.set_time_sequence('ik_solutions', i)
    rr.log('ik_solutions/qpos', rr.Scalars(np.abs(solution[3])))

In [None]:
sorted(ik_solutions, key=lambda x: np.abs(x[0][3]))[0]

In [None]:
high

---

In [None]:
rec = rr.dataframe.load_recording('/Users/vertix/Downloads/more-step-limits.rrd')
rec.view(index='time', contents='/ik/**').select().read_all().to_pandas()

In [None]:
rec = rr.dataframe.load_recording('/Users/vertix/Downloads/more-step-limits.rrd')
rec.view(index='time', contents='/target_joints/**').select().read_all().to_pandas()

In [None]:
rec = rr.dataframe.load_recording('/Users/vertix/Downloads/more-step-limits.rrd')
view = rec.view(index='time', contents='/target_position/**')

tab = view.select().read_all()
df = tab.to_pandas()
pos = []

for i in range(len(df)):
    rec = df.iloc[i]
    _translation = rec['/target_position/translation:Scalar']
    _rotation = rec['/target_position/quat:Scalar']
    pos.append((rec['log_time'].value // 10 ** 6, geom.Transform3D(_translation, geom.Rotation.from_quat(_rotation))))

rec = rr.dataframe.load_recording('/Users/vertix/Downloads/more-step-limits.rrd')
view = rec.view(index='time', contents='/current_joints/**')

tab = view.select().read_all()
df = tab.to_pandas()
q_start = df.iloc[0]['/current_joints:Scalar']
pos[0][1], q_start

In [None]:
solver, controller = KinematicsSolver(), JointCompliantController(7)
data = mujoco.MjData(model)

In [None]:
res = solver.inverse_limits(pos[0][1], q_start, max_iters=100)
solver.err, np.linalg.norm(solver.err)