In [17]:
import mujoco_py
import os
import numpy as np
from mujoco_dynamics import MujocoDynamics
import multiprocessing as mp
import time

In [2]:
xml_path = os.path.join(os.getcwd(), 'xmls', 'inverted_pendulum.xml')
model = mujoco_py.load_model_from_path(xml_path)
sim = mujoco_py.MjSim(model)

In [4]:
sim.data.qpos[:] = [0.00184113, -0.00434254]
sim.data.qvel[:] = [0.18378366, -0.43146432]
print(sim.data.qpos, sim.data.qvel)
sim.data.ctrl[:] = [2.28478247]
sim.step()
print(sim.data.qpos, sim.data.qvel)

[ 0.00184113 -0.00434254] [ 0.18378366 -0.43146432]
[ 0.00970685 -0.0227588 ] [ 0.60210889 -1.40465707]


In [5]:
print(sim.data.qpos, sim.data.qvel)
sim.data.ctrl[:] = [1.000001]
sim.step()
print(sim.data.qpos, sim.data.qvel)

[ 0.00970685 -0.0227588 ] [ 0.60210889 -1.40465707]
[ 0.02353548 -0.05476015] [ 0.78066916 -1.79556659]


In [6]:
print(sim.model.opt.timestep)
print(sim.model.actuator_ctrlrange)

0.02
[[-3.  3.]]


In [7]:
print(np.concatenate([sim.data.qpos, sim.data.qvel]))
print(sim.model.nq, sim.model.nv)

[ 0.02353548 -0.05476015  0.78066916 -1.79556659]
2 2


In [8]:
dynamics = MujocoDynamics(xml_path)

In [9]:
print(dynamics.get_state())
dynamics.set_state(np.array([0.00184113, -0.00434254, 0.18378366, -0.43146432]))
print(dynamics.step(np.array([1.0])), dynamics.constrain(np.array([1.0])))

[0. 0. 0. 0.]
[ 0.00970685 -0.0227588   0.60210889 -1.40465707] [2.28478247]


In [10]:
viewer = mujoco_py.MjViewer(sim)
for i in range(1000):
    sim.step()
    viewer.render()

Creating window glfw


SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [11]:
sim.data.qpos[0] += 1
print(sim.data.qpos, sim.data.qvel)

[ 4.00086607 -9.7618978 ] [ 5.91281464e-05 -8.08604038e-01]


In [12]:
for i in range(2000):
    sim.step()
print(sim.data.qpos, sim.data.qvel)

[  3.00080341 -21.99281524] [-4.10269552e-15 -6.74079068e-13]


In [13]:
from mujoco_dynamics import MujocoDynamics
import os
import numpy as np
import multiprocessing as mp

In [19]:
xml_path = os.path.join('..', 'ilqr', 'xmls', 'inverted_pendulum.xml')
dynamics = MujocoDynamics(xml_path, frame_skip = 2)

In [92]:
def _worker_init(xml_path):
    global sim
    model = mujoco_py.load_model_from_path(xml_path)
    sim = mujoco_py.MjSim(model)
    print("hi")
def _worker(u):
    sim.reset()
    for i in range(100):
        sim.step(u)
    
pool = mp.Pool(initializer = _worker_init, initargs = (xml_path,))



hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi
hi


In [93]:
t0 = time.time()
results = pool.map(_worker, [np.array([0.0]) for i in range(100)])
print(time.time() - t0)

0.0238189697265625


In [94]:
pool.close()

In [95]:
t0 = time.time()

model = mujoco_py.load_model_from_path(xml_path)
sim = mujoco_py.MjSim(model)
for i in range(100):
    sim.reset()
    for i in range(100):
        sim.step(np.array([0.0]))

print(time.time() - t0)

0.08756303787231445


In [100]:
t = ([1,2], [3,4], [5,6])
tuple(zip(*t))

((1, 3, 5), (2, 4, 6))

In [12]:
import os
import mujoco_py as mj
from mujoco_py_deriv import MjDerivative, checkderiv

In [13]:
xml_path = os.path.join(os.getcwd(), 'xmls', 'inverted_pendulum.xml')
model = mj.load_model_from_path(xml_path)
sim = mj.MjSim(model, nsubsteps=2)
dmain = sim.data

In [14]:
sim.step()
dmain.ctrl[:] = [0.5]
f = ["qacc"]
x = ["ctrl"]
deriv_obj = MjDerivative(model, dmain, f, x)
deriv = deriv_obj.compute()
print(dmain.qpos)
print(deriv)

[-4.13189137e-06  4.03269077e-05]
[[[[  9.24630187   0.09246302]
   [-22.06742649  -0.22067426]]]]


In [1]:
from functools import wraps

def cached_method(method, cache_name='_cache'):
    default = object()
    @wraps(method)
    def wrapper(self):
        name = method.__name__
        _cache = getattr(self, cache_name, dict())
        cache = (_cache
                 if isinstance(_cache, dict)
                 else vars(_cache))
        setattr(self, cache_name, _cache)
        val = cache.get(name, default)
        if val is default:
            val = method(self)
            cache[name] = val

        assert val is not default
        return val
    return wrapper


def cached_property(method, **kw):
    return property(cached_method(method, **kw))


import numpy as np
import mujoco_py as mj

def mjc_addr_to_indices(addr):
    indices = (np.arange(*addr)
               if isinstance(addr, tuple)
               else np.arange(addr, addr+1))
    return indices

def mjc_qpos_indices_from_jnt_names(model, joints):
    return np.hstack([
        mjc_addr_to_indices(model.get_joint_qpos_addr(j))
        for j in joints])


def mjc_dof_indices_from_jnt_names(model, joints):
    return np.hstack([
        mjc_addr_to_indices(model.get_joint_qvel_addr(j))
        for j in joints])




Choosing the latest nvidia driver: /usr/lib/nvidia-418, among ['/usr/lib/nvidia-375', '/usr/lib/nvidia-418']
Choosing the latest nvidia driver: /usr/lib/nvidia-418, among ['/usr/lib/nvidia-375', '/usr/lib/nvidia-418']


In [3]:
from argparse import Namespace

import numpy as np
import mujoco_py
from mujoco_py_deriv import MjDerivative



TOL = 1e-6


def safe_div(x, y, default=0, tol=TOL):
    return np.where(np.abs(y) < tol, default, x / y)


class MujocoDynamics:
    def __init__(self, mjsim, joints, dt=None, goal_body="fingertip"):
        self.joints = joints
        self.sim = mjsim
        self.model = mjsim.model
        self.data = mjsim.data
        if dt is not None:
            self.model.opt.timestep = dt
        self.goal_body = goal_body
        self.mjderiv = MjDerivative(self.model, self.data, ["qacc"],
                                    ["qfrc_applied", "qvel", "qpos"],
                                    isforward=1)
        self._cache = Namespace()
        self._cache.state = np.empty(self.state_size)
        self._cache.f_x = np.empty((self.state_size, self.state_size))
        self._cache.f_u = np.empty((self.state_size, self.action_size))
        self._cache.deriv = np.empty(self.mjderiv.ext.deriv_shape(),
                                     dtype=np.float64)
        if self.model.nq != self.model.nv:
            raise NotImplementedError("Not implemented for quaternions")

    @cached_property
    def qpos_indices(self):
        return mjc_qpos_indices_from_jnt_names(self.model, self.joints)

    @cached_property
    def dof_indices(self):
        return mjc_dof_indices_from_jnt_names(self.model, self.joints)

    @property
    def state_size(self):
        return len(self.qpos_indices) + len(self.dof_indices)

    @property
    def action_size(self):
        """Action size."""
        return self.model.nu

    @property
    def has_hessians(self):
        """Whether the second order derivatives are available."""
        return False

    def _set_temp_data(self):
        self.sim.data = None

    def _set_data(self, x, u):
        assert not np.isnan(x).any() and not np.isinf(x).any()
        if u is not None:
            assert not np.isnan(u).any() and not np.isinf(x).any()
        qpos = self.data.qpos.reshape(-1).copy()
        qpos[self.qpos_indices] = x[:len(self.qpos_indices)]
        qvel = self.data.qvel.reshape(-1).copy()
        qvel[self.dof_indices] = x[len(self.qpos_indices):]
        self.set_state(qpos, qvel)
        # Only computes end-effector position etc. Is not same as
        self.sim.forward()
        if u is not None:
            self.sim.data.ctrl[:] = u
        # self.sim.step()

    def set_state(self, qpos, qvel):
        assert qpos.shape == (self.model.nq, ) and qvel.shape == (
            self.model.nv, )
        old_state = self.sim.get_state()
        new_state = mujoco_py.MjSimState(old_state.time, qpos, qvel,
                                         old_state.act, old_state.udd_state)
        self.sim.set_state(new_state)

    def get_state(self):
        state = self._cache.state
        state[:len(self.qpos_indices)] = self.data.qpos[self.qpos_indices]
        state[len(self.qpos_indices):] = self.data.qvel[self.dof_indices]
        return state

    x0 = property(get_state)

    def f(self, x, u, i):
        """Dynamics model.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            Next state [state_size].
        """
        self._set_data(x, u)
        self.do_simulation()
        state = self.get_state()
        assert not np.isnan(state).any() and not np.isinf(state).any()
        return state

    def f_x(self, x, u, i):
        """Partial derivative of dynamics model with respect to x.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            df/dx [state_size, state_size].
        """
        self._set_data(x, u)
        deriv = self.mjderiv.compute(self._cache.deriv)
        assert not np.isnan(deriv).any() and not np.isinf(deriv).any()

        dqacc__dqvel = deriv[0, 1, self.dof_indices, :][:, self.dof_indices]
        dqacc__dqpos = deriv[0, 2, self.dof_indices, :][:, self.dof_indices]
        dqvel__dqpos = safe_div(self.data.qacc[self.dof_indices].reshape(-1, 1),
                                self.data.qvel[self.dof_indices].reshape(1, -1))

        J_x = self._cache.f_x
        J_x[:dqvel__dqpos.shape[0], :dqvel__dqpos.shape[1]] = dqvel__dqpos
        J_x[:dqvel__dqpos.shape[0], dqvel__dqpos.shape[1]:] = np.eye(
            len(self.dof_indices))
        J_x[dqvel__dqpos.shape[0]:, :dqvel__dqpos.shape[1]] = dqacc__dqpos
        J_x[dqvel__dqpos.shape[0]:, dqvel__dqpos.shape[1]:] = dqacc__dqvel
        assert not np.isnan(J_x).any() and not np.isinf(J_x).any()
        return J_x

    def f_u(self, x, u, i):
        """Partial derivative of dynamics model with respect to u.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            df/du [state_size, action_size].
        """
        self._set_data(x, u)
        deriv = self.mjderiv.compute(self._cache.deriv)
        assert not np.isnan(deriv).any()

        dqacc__dqfrc_applied = deriv[0, 0, self.dof_indices, :][:, self.dof_indices]
        dqacc__dqvel = deriv[0, 1, self.dof_indices, :][:, self.dof_indices]

        assert (self.model.actuator_dyntype == 0).all()
        assert (self.model.actuator_gaintype == 0).all()
        assert (self.model.actuator_biastype == 0).all()

        nv = len(self.dof_indices)
        dqfrc_applied__dctrl = np.ones(
            (nv, 1)) * self.data.actuator_length.ravel()

        dqacc__dctrl = dqacc__dqfrc_applied.dot(dqfrc_applied__dctrl)
        assert not np.isnan(dqacc__dctrl).any()

        dqvel__dqacc = safe_div(1, dqacc__dqvel)
        dqvel__dctrl = dqvel__dqacc.T.dot(dqacc__dctrl)
        assert not np.isnan(dqvel__dctrl).any()

        J_u = self._cache.f_u
        J_u[:dqvel__dctrl.shape[0], :] = dqvel__dctrl
        J_u[dqvel__dctrl.shape[0]:, :] = dqacc__dctrl
        assert not np.isnan(J_u).any() and not np.isinf(J_u).any()
        return J_u

    def get_body_jac(self, name):
        bodyid = self.model.body_name2id(name)
        bodyxpos_dqpos = self.data.body_jacp[bodyid, :].reshape(-1, 3).T
        return bodyxpos_dqpos

    def dgoal__dqpos(self):
        return self.get_body_jac(self.goal_body)[:, self.qpos_indices]

    def dgoal__dx(self, x, u):
        qpos = self.data.qpos.reshape(-1).copy()
        qpos[self.qpos_indices] = x[:len(self.qpos_indices)]
        qvel = self.data.qvel.reshape(-1).copy()
        qvel[self.dof_indices] = x[len(self.qpos_indices):]
        self._set_data(x, u)
        dgoal__dqpos = self.dgoal__dqpos()
        qacc = self.data.qacc
        dqpos__dqvel = safe_div(self.data.qvel[self.dof_indices].reshape(-1, 1),
                                self.data.qacc[self.dof_indices].reshape(1, -1))
        dgoal__dqvel = dgoal__dqpos.dot(dqpos__dqvel)
        dgoal__dx = np.hstack((dgoal__dqpos, dgoal__dqvel))
        return dgoal__dx

    @property
    def goal_size(self):
        return 3

    def d2goal__d2x(self, x, u):
        return np.zeros((self.goal_size, self.state_size, self.state_size))


    @property
    def dt(self):
        return self.model.opt.timestep

    def do_simulation(self):
        self.sim.step()

    @classmethod
    def augment_state(self, x):
        return x

    @classmethod
    def reduce_state(self, x):
        return x

    def hessian_not_supported(self, *a):
        raise NotImplementedError("Hessians not supported")

    f_xx = hessian_not_supported
    f_ux = hessian_not_supported
    f_uu = hessian_not_supported

In [5]:
import os
xml_path = os.path.join(os.getcwd(), 'xmls', 'inverted_pendulum.xml')


In [19]:
model = mj.load_model_from_path(xml_path)
sim = mj.MjSim(model, nsubsteps=1)
dynamics1 = MujocoDynamics(sim, ['slider', 'hinge'])
print(dynamics1.get_state())
print(dynamics1.f_x(dynamics1.get_state(), np.array([0.0]), 0))

[0. 0. 0. 0.]
[[ 0.          0.          1.          0.        ]
 [ 0.          0.          0.          1.        ]
 [ 0.09246302 -0.22067429 -0.09246302  0.22067429]
 [-0.22067429  2.15174748  0.22067429 -2.15174749]]


  del sys.path[0]


In [9]:
import mujoco_dynamics
dynamics2 = mujoco_dynamics.MujocoDynamics(xml_path)
print(dynamics2.get_state())
print(dynamics2.dt)
print(dynamics2.f_x(dynamics2.get_state(), np.array([0.0])))

[0. 0. 0. 0.]
0.02
[[ 1.00000000e+00 -2.11951285e-04  1.99815783e-02  4.14078506e-05]
 [ 0.00000000e+00  1.00206670e+00  4.35273334e-05  1.95960431e-02]
 [ 0.00000000e+00 -3.13170275e-02  9.98161123e-01  3.90563824e-03]
 [ 1.44560290e-11  3.05510772e-01  4.32477970e-03  9.61888811e-01]]


In [2]:
import os
import numpy as np
xml_path = os.path.join(os.getcwd(), 'xmls', 'inverted_pendulum.xml')
import mujoco_dynamics
dynamics3 = mujoco_dynamics.MujocoDynamics(xml_path)

In [50]:
import mujoco_py as mj
from mujoco_py_deriv import MjDerivative
sim = dynamics3.sim
dmain = sim.data
model = sim.model
deriv_obj1 = MjDerivative(model, dmain, ["qacc"], ["qpos"])
deriv_obj2 = MjDerivative(model, dmain, ["qacc"], ["qvel"])
deriv1 = deriv_obj1.compute()
deriv2 = deriv_obj2.compute()
dqacc_dqpos = deriv1[0][0]
dqacc_dqvel = deriv2[0][0]
print(dqacc_dqpos)
print(dqacc_dqvel)

f_x = np.eye(dynamics3.state_size)
f_x[:dynamics3.state_size//2, :dynamics3.state_size//2] += 0.5 * (dynamics3.dt ** 2) * dqacc_dqpos
f_x[:dynamics3.state_size//2, dynamics3.state_size//2:] += np.eye(dynamics3.state_size // 2) * dynamics3.dt
f_x[:dynamics3.state_size//2, dynamics3.state_size//2:] += 0.5 * (dynamics3.dt ** 2) * dqacc_dqvel

f_x[dynamics3.state_size//2:, :dynamics3.state_size//2] += dynamics3.dt * dqacc_dqpos
f_x[dynamics3.state_size//2:, dynamics3.state_size//2:] += dynamics3.dt * dqacc_dqvel
print(f_x)

[[-8.67361738e-13 -3.17926915e+00]
 [ 1.38777878e-11  3.10004552e+01]]
[[-0.09246302  0.22067429]
 [ 0.22067429 -2.15174749]]
[[ 1.00000000e+00 -6.35853830e-04  1.99815074e-02  4.41348580e-05]
 [ 2.77555756e-15  1.00620009e+00  4.41348579e-05  1.95696505e-02]
 [-1.73472348e-14 -6.35853830e-02  9.98150740e-01  4.41348580e-03]
 [ 2.77555756e-13  6.20009103e-01  4.41348579e-03  9.56965050e-01]]


In [58]:
import mujoco_py as mj
from mujoco_py_deriv import MjDerivative, checkderiv

# Prepare mujoco model and data
model = mj.load_model_from_path(xml_path)
sim = mj.MjSim(model, nsubsteps=1)
dmain = sim.data

# To compute δf/δx
f = ["qacc"]
x = ["qpos", "qvel"]
deriv_obj = MjDerivative(model, dmain, f, x)
deriv = deriv_obj.compute()
print(deriv)

[[[[-3.27173217e+00 -2.95859486e+00]
   [ 3.12211294e+01  2.88487077e+01]]

  [[-8.67361738e-13 -3.17926915e+00]
   [ 1.38777878e-11  3.10004552e+01]]]]
