In [1]:
from collections import defaultdict
from typing import Optional

import numpy as np
import torch
import tqdm
from tensordict.nn import TensorDictModule
from tensordict.tensordict import TensorDict, TensorDictBase
from torch import nn

from torchrl.data import BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec
from torchrl.envs import (
    CatTensors,
    EnvBase,
    Transform,
    TransformedEnv,
    UnsqueezeTransform,
)
from torchrl.envs.transforms.transforms import _apply_to_composite
from torchrl.envs.utils import check_env_specs, step_mdp

In [2]:
import time

import mujoco
import mujoco.viewer

model = mujoco.MjModel.from_xml_path('./myo_sim/elbow/myoelbow_2dof6muscles.xml')
data = mujoco.MjData(model)

print("--- Body ---")
for i in range(model.nbody):
    # print(m.body(i)) / to print the struct
    print(model.body(i).name)
print("\n")

print("--- Joints ---")
for i in range(model.njnt):
    print(model.jnt(i).name)
print("\n")

print("--- Muscles / Actuators ---")
for i in range(model.nu):
    print(model.actuator(i).name)
print("\n")

print("--- Control inputs for actuators ---")
print(data.ctrl)
print("\n")

print(f"Number of position coordinates: {model.nq}")
print("\n")
      
print([model.geom(i).name for i in range(model.ngeom)])
print("\n")

mujoco.mj_step(model, data)
print(f"Named access: {data.geom('arm_r_trapezium').xpos}")
# print(d.geom_xpos)
print("\n")

print('Total number of DoFs in the model:', model.nv)
print('Generalized positions:', data.qpos)
print('Generalized velocities:', data.qvel)
print('Actuators:', data.act)

--- Body ---
world
full_body
base
r_humerus
r_ulna_radius_hand


--- Joints ---
r_shoulder_elev
r_elbow_flex


--- Muscles / Actuators ---
TRIlong
TRIlat
TRImed
BIClong
BICshort
BRA


--- Control inputs for actuators ---
[0. 0. 0. 0. 0. 0.]


Number of position coordinates: 2


['floor', '', '', '', 'body', 'arm_r_humerus', 'TRIlonghh_wrap', 'BIClonghh_wrap', 'TRI_wrap', 'arm_r_ulna', 'arm_r_radius', 'arm_r_lunate', 'arm_r_scaphoid', 'arm_r_pisiform', 'arm_r_triquetrum', 'arm_r_capitate', 'arm_r_trapezium', 'arm_r_trapezoid', 'arm_r_hamate', 'arm_r_1mc', 'arm_r_2mc', 'arm_r_3mc', 'arm_r_4mc', 'arm_r_5mc', 'arm_r_thumbprox', 'arm_r_thumbdist', 'arm_r_2proxph', 'arm_r_2midph', 'arm_r_2distph', 'arm_r_3proxph', 'arm_r_3midph', 'arm_r_3distph', 'arm_r_4proxph', 'arm_r_4midph', 'arm_r_4distph', 'arm_r_5proxph', 'arm_r_5midph', 'arm_r_5distph']


Named access: [-0.24160722  0.07148558  0.8348181 ]


Total number of DoFs in the model: 2
Generalized positions: [-4.10965221e-06  8.78631573e-05]

In [None]:
with mujoco.viewer.launch_passive(model, data) as viewer:
    # Close the viewer automatically after 30 wall-seconds.
    start = time.time()
    while viewer.is_running() and time.time() - start < 30:
        step_start = time.time()
        
        mujoco.mj_step(model, data)
        
        viewer.sync()
        
        time_until_next_step = model.opt.timestep - (time.time() - step_start)
        if time_until_next_step > 0:
            time.sleep(time_until_next_step)
        
        

In [None]:
def _step(self, tensordict):
    data = tensordict["data"]
    
    # state is defined by positions and velocities (simplified by mujoco's DoF approach)
    # TODO: assess what other information would be valuable for the state
    positions = data.qpos
    velocities = data.qvel
    actuators = data.act
    # tweak accordingly to reward function
    trapezium_pos = data.geom('arm_r_trapezium').xpos
    
    # here action should be an array of shape 6
    action = tensordict["action"].squeeze(-1)
    # activations go from 0 to 1
    action = action.clamp(0, 1)
    
    data.ctrl = action
    
    mujoco.mj_step(self.model, data)
    
    # basic starting reward: based on euclidian distance from one joint to arbitrary position in 3D space.
    reward = -np.linalg.norm(trapezium_pos - np.array[1, 1, 1])
    
    done = False
    
    out = TensorDict(
        {
            next: {
                "data": data,
                "positions": positions,
                "velocities": velocities,
                "actuators": actuators,
                "trapezium_pos": trapezium_pos,
                "action": action,
                "reward": reward,
                "done": done,
            }
        },
        tensordict.shape,
    )
    return out

In [None]:
def _reset(self, tensordict):
    if tensordict is None or tensordict.is_empty():
        data = mujoco.MjData(self.model)
    else:
        data = mujoco.mj_resetData(self.model, data)
    
    out = TensorDict(
        {
            "data": data
        },
        batch_size=tensordict.shape,
    )
    return out

In [3]:
from torchrl.data import BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec

def _make_spec(self, td_params):
    self.observation_spec = CompositeSpec(
        positions=UnboundedContinuousTensorSpec(
            shape=(2),
            dtype=torch.float32,
        ),
        velocities=UnboundedContinuousTensorSpec(
            shape=(2),
            dtype=torch.float32,
        ),
        actuators=BoundedTensorSpec(
            minimum=0,
            maximum=1,
            shape=(6),
            dtype=torch.float32,
        ),
        trapezium_pos=UnboundedContinuousTensorSpec(
            shape=(3),
            dtype=torch.float32,
        ),
        shape=(),
    )
    
    self.state_spec = self.observation_spec.clone()
    
    self.action_spec = BoundedTensorSpec(
        minimum=0,
        maximum=1,
        shape=(6,),
        dtype=torch.float32,
    )
    self.reward_spec = UnboundedContinuousTensorSpec(shape=(*td_params.shape, 1))
        

In [4]:
def _set_seed(self, seed: Optional[int]):
    rng = torch.manual_seed(seed)
    self.rng = rng

NameError: name 'Optional' is not defined

In [5]:
def gen_params(batch_size=None) -> TensorDictBase:
    if batch_size is None:
        batch_size = []
    td = TensorDict(
        {},
        [],
    )
    if batch_size:
        td = td.expand(batch_size).contiguous()
    return td

NameError: name 'TensorDictBase' is not defined

In [6]:
class ElbowEnv(EnvBase):
    batch_locked = False
    
    def __init__(self, td_params=None, seed=None, device="cpu"):
        if td_params is None:
            td_params = self.gen_params()

        super().__init__(device=device, batch_size=[])
        self._make_spec(td_params)
        if seed is None:
            seed = torch.empty((), dtype=torch.int64).random_().item()
        self.set_seed(seed)

    gen_params = staticmethod(gen_params)
    _make_spec = _make_spec

    _reset = _reset
    _step = staticmethod(_step)
    _set_seed = _set_seed

NameError: name 'EnvBase' is not defined

In [7]:
env = ElbowEnv()
check_env_specs(env)

NameError: name 'ElbowEnv' is not defined