In [None]:
import os
import random
import numpy as np
import torch
from PIL import Image
from MoSim.src.dmc import DeepMindControl
import inspect

os.environ["MUJOCO_GL"] = "EGL"

import dm_control
dm_control_path = inspect.getfile(dm_control)
print("dm_control original path:", dm_control_path)

os.environ["CUDA_VISIBLE_DEVICES"] = "2"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

/home/weyl/anaconda3/envs/encoder/lib/python3.12/site-packages/glfw/__init__.py:914: GLFWError: (65544) b'X11: The DISPLAY environment variable is missing'


dm_control original path: /home/weyl/anaconda3/envs/encoder/lib/python3.12/site-packages/dm_control/__init__.py


In [None]:
def collect_data(env, step_num, random_ex, static_prop, save_path, data_name, env_name, sequence_num = 1, offset=0, multi_step=1, save_images=False):
    """
    Collect physical state data from the DMC simulator and save it to the specified path.

    Parameters:
    - env: The environment from which to collect data.
    - step_num: Number of steps per sequence.
    - random_ex: Expected timesteps for random action application.
    - static_prop: Proportion of no-action inputs.
    - save_path: Path to save the data.
    - data_name: Name of the data.
    - sequence_num: Number of sequences to collect.
    - offset: Offset value for data alignment.
    - multi_step: Number of steps to apply the action.
    """
    p = 1 - 1 / random_ex  # Probability of keeping the same action

    nv = env._env.physics.data.model.nv
    nq = env._env.physics.data.model.nq
    
    state_dim = nv + nq
    action_dim = env.action_space.shape[0]
    
    data = []
    
    for _ in range(sequence_num):
        qs = []
        vs = []
        actions = []
        image_save_path = ''
        
        if save_images:
            # Create a directory to save images
            image_save_path = os.path.join(save_path, 'images')
            if not os.path.exists(image_save_path):
                os.makedirs(image_save_path)
                print(f"Image folder created: {image_save_path}")
                
        env.reset()
        action = env.action_space.sample()
        
        for i in range(step_num + 1 + offset):
            if save_images:
                # Render and save the image
                frame = env._env.physics.render(camera_id=0, width=320, height=240)
                frame_path = os.path.join(image_save_path, f'frame_{i:04d}.png')
                Image.fromarray(frame).save(frame_path)
                
            state = env._env.physics.get_state()

            for _ in range(multi_step):
                env._env.step(action)

            q = state[:nq]
            v = state[nq:nq + nv]

            qs.append(q)
            vs.append(v)
            actions.append(action)
            
            # Poison sampling
            if random.random() > p:
                action = env.action_space.sample()
                if static_prop and random.random() < static_prop:
                    action = np.zeros_like(action)

        qs = np.array(qs)
        vs = np.array(vs)
        actions = np.array(actions)

        if offset > 0:
            sequence = np.concatenate((
                qs[:-1 - offset],
                vs[offset:-1],
                actions[offset:-1],
                qs[1:-offset],
                vs[1 + offset:]
            ), axis=1)
        else:
            sequence = np.concatenate((
                qs[:-1],
                vs[:-1],
                actions[:-1],
                qs[1:],
                vs[1:],
            ), axis=1)
        
        data.append(sequence)

    data = np.array(data, dtype=np.float32)
    if not os.path.exists(save_path):
        os.makedirs(save_path)
        print(f"The output folder has been created: {save_path}")

    metadata = {
        'state_dim': state_dim,
        'nq': nq,
        'nv': nv,
        'action_dim': action_dim,
        'random_ex': random_ex,
        'static_prop': static_prop,
        'type' : 'random',
        'env': env_name
    }

    save_path = os.path.join(save_path,data_name)
    np.savez_compressed(save_path, data=data, metadata=np.array(metadata, dtype=object))
    print(f'data saved in {save_path}.npz')

In [15]:
env_name = 'walker_walk'
data_name = 'dmc_walker_test'
# save_path = '../data/walker/random/'
save_path = '../ttt'

env = DeepMindControl(env_name)

if not os.path.exists(save_path):
    os.makedirs(save_path)
    print(f"The folder has been created: {save_path}")
    
# Old settings os Poison process: random_ex = 5, static_prop = 0.3
collect_data(
    env=env,
    step_num=1000,
    sequence_num=5,
    random_ex=1,
    static_prop=0.1,
    save_path=save_path,
    data_name=data_name,
    multi_step=1,
    save_images=False,
    env_name=env_name
)


  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


data saved in ../ttt/dmc_walker_test.npz


In [16]:
# test your data
from MoSim.src.tools import load_data

data_path = os.path.join(save_path, data_name + '.npz')
state, target, action, metadata = load_data(data_path)

print(metadata)

{'state_dim': 18, 'nq': 9, 'nv': 9, 'action_dim': 6, 'random_ex': 1, 'static_prop': 0.1, 'type': 'random', 'env': 'walker_walk'}


In [None]:
def collect_data_myo(env, step_num, random_ex, static_prop, save_path, data_name, env_name, sequence_num=1, offset=0, multi_step=1, save_images=False):
    """
    Collect physical state data from the Myosuite environment and save it to the specified path.

    Parameters:
    - env: The Myosuite environment from which to collect data.
    - step_num: Number of steps per sequence.
    - random_ex: Expected timesteps for random action application.
    - static_prop: Proportion of no-action inputs.
    - save_path: Path to save the data.
    - data_name: Name of the data.
    - sequence_num: Number of sequences to collect.
    - offset: Offset value for data alignment.
    - multi_step: Number of steps to apply the action.
    - save_images: Whether to save rendered images of the environment.
    """
    p = 1 - 1 / random_ex  # Probability of keeping the same action

    # Get dimensions for qpos and qvel
    nq = env.sim.model.nq  # Number of generalized coordinates
    nv = env.sim.model.nv  # Number of generalized velocities
    
    data = []

    for seq in range(sequence_num):
        qs = []
        vs = []
        actions = []
        image_save_path = ''
        
        env.reset()
        if save_images:
            # Create a directory to save images
            image_save_path = os.path.join(save_path, 'images')
            if not os.path.exists(image_save_path):
                os.makedirs(image_save_path)
                print(f"Image folder created: {image_save_path}")

        action_t = env.action_space.sample()
        for i in range(step_num + 1 + offset):

            if save_images:
                # Render and save the image
                frame=env.render()
                frame_path = os.path.join(image_save_path, f'frame_{seq}_{i:04d}.png')
                Image.fromarray(frame).save(frame_path)

            # Get current state
            qpos = env.sim.data.qpos
            qvel = env.sim.data.qvel

            for n in range(multi_step):
                env.step(action_t)
            # Extract qpos and qvel from the state
            qs.append(qpos.copy())  # Make a copy to avoid overwriting
            vs.append(qvel.copy())
            actions.append(action_t)
            
            if random.random() > p:
                action_t = env.action_space.sample()
                if static_prop and random.random() < static_prop:
                    action_t = np.zeros_like(action_t)  # No-action input
        # Convert lists to numpy arrays
        qs = np.array(qs)
        vs = np.array(vs)
        actions = np.array(actions)
        # Align and concatenate data
        if offset > 0:
            sequence = np.concatenate((
                qs[:-1 - offset],
                vs[offset:-1],
                actions[offset:-1],
                qs[1:-offset],
                vs[1 + offset:]
            ), axis=1)
        else:
            sequence = np.concatenate((
                qs[:-1],
                vs[:-1],
                actions[:-1],
                qs[1:],
                vs[1:]
            ), axis=1)
        
        data.append(sequence)

    # Save data as a compressed numpy file
    data = np.array(data, dtype=np.float32)
    if not os.path.exists(save_path):
        os.makedirs(save_path)
        print(f"Directory created: {save_path}")

    metadata = {
        'state_dim': nq+nv,
        'nq': nq,
        'nv': nv,
        'action_dim': len(action_t),
        'random_ex': random_ex,
        'static_prop': static_prop,
        'type': 'random',
        'env': env_name
    }

    save_path = os.path.join(save_path, data_name)
    np.savez_compressed(save_path, data=data, metadata=np.array(metadata, dtype=object))
    print(f'Data saved in {save_path}.npz')


In [None]:
from myosuite.utils import gym

# env = gym.make('motorFingerReachFixed-v0') 
env = gym.make('myoHandPoseFixed-v0', render_mode='human')

print(env.metadata.get("render_modes"))


save_path = '/projects/Neural-Simulator/data/myosuite/hand/'
collect_data_myo(env, step_num=1000, random_ex=5, static_prop=0.1, 
                     save_path=save_path, data_name='myohand_test_motor', 
                     env_name='myoHandPoseFixed-v0', sequence_num=300, save_images=False)
env.close()
