In [11]:
import numpy as np
import torch
import gym
import argparse
import os

import utils
# import OurDDPG
# import DDPG
from air_hockey_challenge.framework.air_hockey_challenge_wrapper import AirHockeyChallengeWrapper
from air_hockey_agent.agent_builder import build_agent
from air_hockey_challenge.utils.kinematics import inverse_kinematics, jacobian, forward_kinematics

In [7]:
def custom_rewards(base_env, state, action, next_state, absorbing):
    print(base_env, state, action, next_state, absorbing)
    print(base_env.info)


In [8]:
env = AirHockeyChallengeWrapper(env="7dof-hit", interpolation_order=1, debug=True)
policy = build_agent(env.env_info)
# evaluations = eval_policy(policy)

In [9]:
policy.env_info

{'table': {'length': 1.948, 'width': 1.038, 'goal_width': 0.25},
 'puck': {'radius': 0.03165},
 'mallet': {'radius': 0.04815},
 'n_agents': 2,
 'robot': {'n_joints': 7,
  'ee_desired_height': 0.1645,
  'joint_vel_limit': array([[-1.48352986, -1.48352986, -1.74532925, -1.30899694, -2.26892803,
          -2.35619449, -2.35619449],
         [ 1.48352986,  1.48352986,  1.74532925,  1.30899694,  2.26892803,
           2.35619449,  2.35619449]]),
  'joint_acc_limit': array([[-14.83529864, -14.83529864, -17.45329252, -13.08996939,
          -22.68928028, -23.5619449 , -23.5619449 ],
         [ 14.83529864,  14.83529864,  17.45329252,  13.08996939,
           22.68928028,  23.5619449 ,  23.5619449 ]]),
  'base_frame': [array([[ 1.  ,  0.  ,  0.  , -1.51],
          [ 0.  ,  1.  ,  0.  ,  0.  ],
          [ 0.  ,  0.  ,  1.  , -0.1 ],
          [ 0.  ,  0.  ,  0.  ,  1.  ]]),
   array([[-1.  ,  0.  ,  0.  ,  1.51],
          [ 0.  , -1.  ,  0.  ,  0.  ],
          [ 0.  ,  0.  ,  1.  , -0.1 ],


In [5]:
policy.robot_model

<mujoco._structs.MjModel at 0x7fdc36344570>

In [547]:
def solve_anchor_pos(hit_pos_2d, hit_dir_2d, q_0):
        hit_pos = np.concatenate([hit_pos_2d, [env.env_info['robot']["ee_desired_height"]]])
        hit_dir = np.concatenate([hit_dir_2d, [0]])
        success, q_star = solve_hit_config(hit_pos, hit_dir, q_0)
        print(success)
        if not success:
            q_star = q_0
        return q_star

In [7]:
def numerical_grad(fun, q):
    eps = np.sqrt(np.finfo(np.float64).eps)
    grad = np.zeros_like(q)
    for i in range(q.shape[0]):
        q_pos = q.copy()
        q_neg = q.copy()
        q_pos[i] += eps
        q_neg[i] -= eps
        grad[i] = (fun(q_pos, np.array([])) - fun(q_neg, np.array([]))) / 2 / eps
    return grad

In [8]:
import nlopt
import numpy as np
import osqp
import scipy.linalg
from scipy import sparse
from air_hockey_challenge.utils.kinematics import inverse_kinematics, jacobian, forward_kinematics
def solve_hit_config(x_des, v_des, q_0):
        reg = 1e-6
        dim = q_0.shape[0]
        opt = nlopt.opt(nlopt.LD_SLSQP, dim)

        def _nlopt_f(q, grad):
            if grad.size > 0:
                grad[...] = numerical_grad(_nlopt_f, q)
            f = v_des @ jacobian(policy.robot_model, policy.robot_data, q)[:3, :dim]
            return f @ f + reg * np.linalg.norm(q - q_0)

        def _nlopt_h(q, grad):
            if grad.size > 0:
                grad[...] = 2 * (forward_kinematics(policy.robot_model, policy.robot_data, q)[0] - x_des) @ \
                            jacobian(policy.robot_model, policy.robot_data, q)[:3, :dim]
            return np.linalg.norm(forward_kinematics(policy.robot_model, policy.robot_data, q)[0] - x_des) ** 2 - 1e-4

        opt.set_max_objective(_nlopt_f)
        opt.set_lower_bounds(policy.env_info['robot']['joint_pos_limit'][0])
        opt.set_upper_bounds(policy.env_info['robot']['joint_pos_limit'][1])
        opt.add_inequality_constraint(_nlopt_h)
        opt.set_ftol_abs(1e-6)
        opt.set_xtol_abs(1e-8)
        opt.set_maxtime(5e-3)

        success, x = inverse_kinematics(policy.robot_model, policy.robot_data, x_des, initial_q=q_0)
        if not success:
            raise NotImplementedError("Need to check")
        xopt = opt.optimize(x[:dim])
        
        return opt.last_optimize_result() > 0, xopt

In [10]:
state, done = env.reset(), False
env.render()

In [11]:
policy = build_agent(env.env_info) 

In [587]:
hit_pos_2d = np.array([ 6.800005e-01, -1.01327613e-38])
x_curr = forward_kinematics(policy.robot_model, policy.robot_data, q_0)[0][:2]
hit_dir_2d = (hit_pos_2d - x_curr)/0.02
q_0 = policy.robot_data.qpos.copy()
q = solve_anchor_pos(hit_pos_2d, hit_dir_2d, q_0)
print(q)
next_state, reward, done, info = env.step(q)
env.render()

True
[-5.44098112e-18 -1.54552975e-01 -5.01530089e-18 -1.83040804e+00
 -4.60353990e-18  9.17939732e-01  4.75070145e-21]


In [458]:
print(forward_kinematics(policy.robot_model, policy.robot_data, q_0)[0])

[ 6.50000005e-01 -1.01327613e-38  1.64500044e-01]


In [312]:
policy.n_joints

AttributeError: 'TD3_agent' object has no attribute 'n_joints'

In [210]:
def _solve_aqp(x_des, q_cur, dq_anchor):
        anchor_weights = np.array([10., 1., 10., 1., 10., 10., 1.])
        x_cur = forward_kinematics(policy.robot_model, policy.robot_data, q_cur)[0]
        jac = jacobian(policy.robot_model, policy.robot_data, q_cur)[:3, :policy.env_info['robot']['n_joints']]
        N_J = scipy.linalg.null_space(jac)
        b = np.linalg.lstsq(jac, (x_des - x_cur) / policy.env_info['dt'], rcond=None)[0]

        P = (N_J.T @ np.diag(anchor_weights) @ N_J) / 2
        q = (b - dq_anchor).T @ np.diag(anchor_weights) @ N_J
        A = N_J.copy()
        u = np.minimum(policy.env_info['robot']['joint_vel_limit'][1] * 0.9,
                       (policy.env_info['robot']['joint_pos_limit'][1] * 0.92 - q_cur) / policy.env_info['dt']) - b
        l = np.maximum(policy.env_info['robot']['joint_vel_limit'][0] * 0.9,
                       (policy.env_info['robot']['joint_pos_limit'][0] * 0.92 - q_cur) / policy.env_info['dt']) - b

        solver = osqp.OSQP()
        solver.setup(P=sparse.csc_matrix(P), q=q, A=sparse.csc_matrix(A), l=l, u=u, verbose=False, polish=False)

        result = solver.solve()
        if result.info.status == 'solved':
            return True, N_J @ result.x + b
        else:
            return False, b

In [20]:
from cvxpy import *
import numpy as np
import scipy as sp
from scipy import sparse

# Discrete time model of a quadcopter

nx = 3
nu = 7

# Constraints
u0 = 10.5916
umin = env.env_info['robot']['joint_vel_limit'][0]
umax = env.env_info['robot']['joint_vel_limit'][1]
xmin = np.array([-.536,-.5,0.165])
xmax = np.array([0,.5,0.17])

# Objective function
Q = sparse.diags([1.0,1.0,1.0])
QN = Q
R = 0.1*sparse.eye(7)

# Initial and reference states
x0 = np.array([-.536,-.5,0.16])
xr = np.array([-.536,.5,0.16])

# Prediction horizon
N = 10

# Define problem
u = Variable((nu, N))
x = Variable((nx, N+1))
x_init = Parameter(nx)
objective = 0
constraints = [x[:,0] == x_init]

Ad = sparse.diags([1.,1.,1.])
q_0 = policy.robot_data.qpos.copy()
Bd = sparse.csc_matrix(jacobian(policy.robot_model, policy.robot_data,q_0)[:3, :7]*env.env_info['dt'])

for k in range(N):
    objective += quad_form(x[:,k] - xr, Q) + quad_form(u[:,k], R)
    constraints += [x[:,k+1] == Ad@x[:,k] + Bd@u[:,k]]
    constraints += [xmin <= x[:,k], x[:,k] <= xmax]
    constraints += [umin <= u[:,k], u[:,k] <= umax]
objective += quad_form(x[:,N] - xr, QN)
prob = Problem(Minimize(objective), constraints)

# Simulate in closed loop
nsim = 15
for i in range(nsim):
    x_init.value = x0
    prob.solve(solver=OSQP)
    x0 = Ad@x0 + Bd@u[:,0].value
    print(x0)

TypeError: unsupported operand type(s) for @: 'csc_matrix' and 'NoneType'

In [216]:
def optimize_trajectory(cart_traj, q_start, dq_start, q_anchor):
        joint_trajectory = np.tile(np.concatenate([q_start]), (cart_traj.shape[0], 1))
        if len(cart_traj) > 0:
            q_cur = q_start.copy()
            dq_cur = dq_start.copy()

            for i, des_point in enumerate(cart_traj):
                if q_anchor is None:
                    dq_anchor = 0
                else:
                    dq_anchor = (q_anchor - q_cur)

                success, dq_next = _solve_aqp(des_point[:3], q_cur, dq_anchor)

                if not success:
                    return success, []
                else:
                    q_cur += (dq_cur + dq_next) / 2 * policy.env_info['dt']
                    # q_cur += dq_next * self.env_info['dt']
                    dq_cur = dq_next
                    joint_trajectory[i] = q_cur.copy()
            return True, joint_trajectory
        else:
            return False, []

In [591]:
env.reset()
env.render()

In [361]:
# hit_pos = np.array([ 6.600005e-01, -1.01327613e-38,  1.64500044e-01])
# print(forward_kinematics(policy.robot_model, policy.robot_data, q_0)[0])
# q_0 = policy.robot_data.qpos.copy()
# _,q = _solve_aqp(hit_pos, q_0,0)
# q_cur += dq_next * policy.env_info['dt']
# print(_,q)
# next_state, reward, done, info = env.step(q)
# env.render()

[ 6.50000005e-01 -1.01327613e-38  1.64500044e-01]
True [ 4.35637082e-11  1.33481178e+00 -2.74285594e-11  1.12376214e+00
 -2.16081042e-11 -1.35838313e-01 -1.62385446e-26]


In [426]:
dq_start = np.array([0])

In [611]:
hit_pos = np.array([ 6.3000005e-01, 3.01327613e-38,  1.64500044e-01])
x_cur = forward_kinematics(policy.robot_model, policy.robot_data, q_0)[0]
q_0 = policy.robot_data.qpos.copy()
# _,q = _solve_aqp(hit_pos,q_0,0)
cart_traj = np.array([hit_pos])
q_start = q_0

q_anchor = 0
_,q = optimize_trajectory(cart_traj, q_start, dq_start, q_anchor)
print(x_cur)
print(_,q)
next_state, reward, done, info = env.step(q[0])
env.render()
dq_start = next_state[13:20]

[ 6.50000005e-01 -1.01327613e-38  1.64500044e-01]
True [[-2.37163274e-05 -2.08884845e-01 -7.58625430e-05 -1.84406289e+00
   4.35520934e-04  9.96416611e-01 -7.93374299e-06]]


In [456]:
next_state[6:13]

array([-4.39372151e-05, -1.80638512e-01, -1.02590470e-04, -1.84279688e+00,
        4.83595636e-04,  9.50118644e-01, -1.71928170e-05])

In [457]:
print(q_0)

[ 4.02979867e-22 -1.96067345e-01  2.56756848e-21 -1.84363898e+00
 -1.86530826e-21  9.70422238e-01  4.75070145e-21]
