In [2]:
import os; os.environ["D4RL_SUPPRESS_IMPORT_ERROR"] = "1"

In [3]:
import gym
from d4rl.pointmaze import waypoint_controller
from d4rl.pointmaze import maze_model
import numpy as np
import h5py
import argparse


def reset_data():
    return {'observations': [],
            'actions': [],
            'terminals': [],
            'rewards': [],
            'infos/goal': [],
            'infos/qpos': [],
            'infos/qvel': [],
            'infos/start': [],
            }


def append_data(data, s, a, tgt, done, env_data, start, reward):
    data['observations'].append(s)
    data['actions'].append(a)
    data['rewards'].append(reward)
    data['terminals'].append(done)
    data['infos/goal'].append(tgt)
    data['infos/qpos'].append(env_data.qpos.ravel().copy())
    data['infos/qvel'].append(env_data.qvel.ravel().copy())
    data['infos/start'].append(start)       

    
def npify(data):
    for k in data:
        if k == 'terminals':
            dtype = np.bool_
        else:
            dtype = np.float32

        data[k] = np.array(data[k], dtype=dtype)
    
    
def generate_starts_and_targets(env_name, n_traj):
    env = gym.make(env_name)
    maze = env.str_maze_spec
    env = maze_model.MazeEnv(maze)       
    
    starts_and_targets = []
    for _ in range(n_traj):
        s = env.reset()
        start = s[0:2]
        env.set_target()
        target = env._target
        starts_and_targets.append([start, target])
    starts_and_targets = np.array(starts_and_targets)
    
    fname = f'st-{env_name}-{n_traj}.hdf5'
    with h5py.File(fname, "w") as f:
        f.create_dataset("starts_and_targets", data=starts_and_targets)
    
    
def set_loc(self, location):
    qpos = location
    qvel = self.init_qvel
    self.set_state(qpos, qvel)

maze_model.MazeEnv.set_loc = set_loc

In [4]:
env_name = 'maze2d-large-dense-v1'
n_traj = 10
generate_starts_and_targets(env_name, n_traj)

In [5]:
!ls

'd4rl scratchpad.ipynb'   st-maze2d-large-dense-v1-10.hdf5


In [None]:
class WaypointControllerWrapper(waypoint_controller.WaypointController):
    def __init__(self, **kwargs):
        self.env = kwargs['env']
        del kwargs['env']
        super().__init__(**kwargs)

class RandomController(WaypointControllerWrapper):
        
    def get_action(self, location, velocity, target):
        _, done = super().get_action(location, velocity, target)
        action = self.env.action_space.sample()
        return action, done
    
class NoisyWaypointController(WaypointControllerWrapper):
    
    def get_action(self, location, velocity, target):
        action, done = super().get_action(location, velocity, target)
        action = np.clip(action + np.random.randn(*action.shape) * 0.5, -1.0, 1.0)
        return action, done

In [None]:
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--render', action='store_true', help='Render trajectories')
    parser.add_argument('--env_name', type=str, default='maze2d-large-dense-v1', help='Maze type')
    parser.add_argument('--starts_and_targets_path', type=str, required=True, help='Path to starts and targets file')
    parser.add_argument('--controller_type', type=str, default='random', choices=['waypoint', 'waypoint_noisy', 'random'])
    parser.add_argument('--reward_type', type=str, default='dense', choices=['dense', 'sparse'])

    args = parser.parse_args(args=['--controller_type', 'waypoint', 
                                   '--starts_and_targets_path', 'st-maze2d-large-dense-v1-10.hdf5'])
    
    with h5py.File(args.starts_and_targets_path, "r") as f:
        starts_and_targets = f['starts_and_targets'][:]
        
    env = gym.make(args.env_name)
    maze = env.str_maze_spec
    # max_episode_steps = env._max_episode_steps

    env = maze_model.MazeEnv(maze_spec=maze, reward_type=args.reward_type)
                        
    if args.controller_type == 'waypoint':
        controller = WaypointControllerWrapper(maze_str=maze, env=env)
    elif args.controller_type == 'waypoint_noisy':
        controller = NoisyWaypointController(maze_str=maze, env=env)
    elif args.controller_type == 'random':
        controller = RandomController(maze_str=maze, env=env)
    else:
        raise ValueError("controller not supported")
      
    data = reset_data()
    
    for i in range(starts_and_targets.shape[0]):
        
        start, target = starts_and_targets[i]
        env.set_loc(start)
        env.set_target(target)
        
        s = env._get_obs()
        assert np.allclose(env.sim.data.qpos, start) and np.allclose(start, s[0:2])
        assert np.allclose(env._target, target)
        
        act = env.action_space.sample()
        done = False
        ts = 0
        
        while(True):
            position = s[0:2]
            velocity = s[2:4]
            act, done = controller.get_action(position, velocity, env._target)
            ns, reward, _, _ = env.step(act)
            ts += 1

            append_data(data, s, act, env._target, done, env.sim.data, start, reward)
           
            if args.render:
                env.render()
    
            if done:
                break

            else:
                s = ns         

    fname = f'{args.env_name}-{args.controller_type}.hdf5'
    dataset = h5py.File(fname, 'w')
    npify(data)
    for k in data:
        dataset.create_dataset(k, data=data[k], compression='gzip')


# if __name__ == "__main__":
#     main()
main()

In [None]:
!ls *.hdf5

In [None]:
env_name = 'maze2d-large-dense-v1'
env = gym.make(env_name)
controller_type = 'random'
fname = f'{env_name}-{controller_type}.hdf5'
data = env.get_dataset(h5path=fname)

In [None]:
print(data.keys())

In [None]:
print(np.array2string(np.unique(data['infos/start'], axis=0), precision=2))
print(np.array2string(np.unique(data['infos/goal'], axis=0), precision=2))

In [None]:
l = 0
starts = data['infos/start']
targets = data['infos/goal']
for i in range(1, len(starts)):
    if np.allclose(starts[i], starts[i-1]):
        l+=1
    else:
        print(f"start: {np.array2string(starts[i-1], precision=2)} target: {np.array2string(targets[i-1], precision=2)}, length: {l}")
        l=0
print(f"start: {np.array2string(starts[len(starts)-1], precision=2)} target: {np.array2string(targets[len(starts)-1], precision=2)}, length: {l}")

In [None]:
for i in range(len(data['infos/start'])):
    for key in ['infos/start', 'infos/qpos', 'infos/goal']:
        print(np.array2string(data[key][i], precision=2), end=" ")
    print()

In [None]:
data['observations'][:,:2]