In [25]:
from stable_baselines3 import HerReplayBuffer, DDPG, DQN, SAC, TD3
from stable_baselines3.her.goal_selection_strategy import GoalSelectionStrategy
from stable_baselines3.common.envs import BitFlippingEnv

model_class = DQN

In [32]:
import random
import metaworld

ml1 = metaworld.ML1("door-lock-v2")

testing_envs = []
for name, env_cls in ml1.train_classes.items():
    env = env_cls()  # Create an environment
    task = random.choice([task for task in ml1.train_tasks
                            if task.env_name == name])
    env.set_task(task)
    testing_envs.append(env)


In [35]:
import gymnasium as gym
import numpy as np
from gymnasium.spaces import Discrete, Box

class DiscreteActionWrapper(gym.ActionWrapper):
    def __init__(self, env, n_bins):
        super(DiscreteActionWrapper, self).__init__(env)
        self.n_bins = n_bins
        # Define the new action space
        self.action_space = Discrete(n_bins)
        # Calculate the bin edges
        self.bin_edges = np.linspace(self.env.action_space.low, self.env.action_space.high, n_bins + 1)[1:-1]
        
    def action(self, action):
        # Map discrete action to continuous action
        bin_width = (self.env.action_space.high - self.env.action_space.low) / self.n_bins
        continuous_action = self.env.action_space.low + (action + 0.5) * bin_width
        return np.clip(continuous_action, self.env.action_space.low, self.env.action_space.high)

# Example usage with Stable Baselines:
from stable_baselines3 import PPO

# Create your original environment
original_env = env

# Wrap it to convert the action space
n_bins = 5
wrapped_env = DiscreteActionWrapper(original_env, n_bins)

goal_selection_strategy = "future" # equivalent to GoalSelectionStrategy.FUTURE
model_class = DQN
# Initialize the model
model = model_class(
    "MultiInputPolicy",
    wrapped_env,
    replay_buffer_class=HerReplayBuffer,
    # Parameters for HER
    replay_buffer_kwargs=dict(
        n_sampled_goal=4,
        goal_selection_strategy=goal_selection_strategy,
    ),
    verbose=1,
)

model.learn(total_timesteps=1000)




Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


AssertionError: DictReplayBuffer must be used with Dict obs space only

In [31]:
# Test the trained model
obs, info = wrapped_env.reset()
for _ in range(1000):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, truncated, info = wrapped_env.step(action)
    if done:
        obs = wrapped_env.reset()

In [28]:
goal_selection_strategy = "future" # equivalent to GoalSelectionStrategy.FUTURE

# Initialize the model
model = model_class(
    "MultiInputPolicy",
    env,
    replay_buffer_class=HerReplayBuffer,
    # Parameters for HER
    replay_buffer_kwargs=dict(
        n_sampled_goal=4,
        goal_selection_strategy=goal_selection_strategy,
    ),
    verbose=1,
)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


AssertionError: The algorithm only supports (<class 'gymnasium.spaces.discrete.Discrete'>,) as action spaces but Box(-1.0, 1.0, (4,), float32) was provided

In [19]:
import numpy as np

obs, _ = env.reset()

print(env.observation_space)
print(env.action_space)
print(env.action_space.sample())

GO_LEFT = 0
# Hardcoded best agent: always go left!
n_steps = 2
for step in range(n_steps):
    print(f"Step {step + 1}")
    obs, reward, terminated, truncated, info = env.step(np.array([0, 0.5, 1, 1]))
    done = terminated or truncated
    print("obs=", obs, "reward=", reward, "done=", done)
    if done:
        print("Goal reached!", "reward=", reward)
        break

Box([-0.525   0.348  -0.0525 -1.        -inf    -inf    -inf    -inf    -inf
    -inf    -inf    -inf    -inf    -inf    -inf    -inf    -inf    -inf
 -0.525   0.348  -0.0525 -1.        -inf    -inf    -inf    -inf    -inf
    -inf    -inf    -inf    -inf    -inf    -inf    -inf    -inf    -inf
  0.      0.      0.    ], [0.525 1.025 0.7   1.      inf   inf   inf   inf   inf   inf   inf   inf
   inf   inf   inf   inf   inf   inf 0.525 1.025 0.7   1.      inf   inf
   inf   inf   inf   inf   inf   inf   inf   inf   inf   inf   inf   inf
 0.    0.    0.   ], (39,), float64)
Box(-1.0, 1.0, (4,), float32)
[0.41551828 0.02334407 0.7677168  0.28135398]
Step 1
obs= [0.0060631  0.60006955 0.19476265 0.99635346 0.13220172 0.64367595
 0.22098755 1.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.00632151 0.5995818  0.19393998 1.         0.13220169 0.64367595
 0.22098783 1.         0.         0.         0.         0.
 0.         0.        

In [5]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.env_util import make_vec_env

# Instantiate the env
vec_env = make_vec_env(GoLeftEnv, n_envs=1)

In [24]:
from stable_baselines3 import PPO, A2C, DQN

# Train the agent
model = A2C("MlpPolicy", env, verbose=1).learn(5000)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 500      |
|    ep_rew_mean        | 606      |
| time/                 |          |
|    fps                | 54       |
|    iterations         | 100      |
|    time_elapsed       | 9        |
|    total_timesteps    | 500      |
| train/                |          |
|    entropy_loss       | -5.61    |
|    explained_variance | 8.82e-05 |
|    learning_rate      | 0.0007   |
|    n_updates          | 99       |
|    policy_loss        | 14.6     |
|    std                | 0.985    |
|    value_loss         | 6.37     |
------------------------------------
------------------------------------
| rollout/              |          |
|    ep_len_mean        | 500      |
|    ep_rew_mean        | 537      |
| time/                 |          |
|    fps                | 53       |
|    iterations  

In [37]:
import gymnasium as gym
import numpy as np
from gymnasium.spaces import Discrete, Box, Dict

class DiscreteActionDictObsWrapper(gym.Wrapper):
    def __init__(self, env, n_bins):
        super(DiscreteActionDictObsWrapper, self).__init__(env)
        self.n_bins = n_bins
        # Define the new action space
        self.action_space = Discrete(n_bins)
        # Calculate the bin edges
        self.bin_edges = np.linspace(self.env.action_space.low, self.env.action_space.high, n_bins + 1)[1:-1]
        
        # Define the new observation space
        self.observation_space = Dict({
            'observation': self.env.observation_space,
            'discrete_action': Discrete(n_bins)
        })
        
    def action(self, action):
        # Map discrete action to continuous action
        bin_width = (self.env.action_space.high - self.env.action_space.low) / self.n_bins
        continuous_action = self.env.action_space.low + (action + 0.5) * bin_width
        return np.clip(continuous_action, self.env.action_space.low, self.env.action_space.high)

    def reset(self, **kwargs):
        # Reset the environment and return the observation in a dict
        obs = self.env.reset(**kwargs)
        return {'observation': obs, 'discrete_action': 0}

    def step(self, action):
        # Step the environment with the converted action
        continuous_action = self.action(action)
        obs, reward, done, info = self.env.step(continuous_action)
        return {'observation': obs, 'discrete_action': action}, reward, done, info

# Example usage with Stable Baselines:
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env

# Create your original environment
original_env = gym.make("Pendulum-v1")

# Wrap it to convert the action space and observation space
n_bins = 5
wrapped_env = DiscreteActionDictObsWrapper(original_env, n_bins)

# Use make_vec_env for parallel environments
vec_env = make_vec_env(lambda: wrapped_env, n_envs=1)

# Now you can use the wrapped environment with Stable Baselines
model = PPO("MultiInputPolicy", vec_env, verbose=1)
model.learn(total_timesteps=10000)

# Test the trained model
obs = vec_env.reset()
for _ in range(1000):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, info = vec_env.step(action)
    if done:
        obs = vec_env.reset()


Using cuda device


TypeError: string indices must be integers