In [1]:
import gymnasium as gym
import gymnasium_robotics

# PyTorch
import torch

# from collections import deque
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv

In [2]:
env_id = 'FrankaKitchen-v1'
task = 'kettle'
gym.register_envs(gymnasium_robotics)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
n_actions = 9

In [None]:
granularity = 3
transform_action_space = np.linspace(start=-1.0, stop=1.0, num=granularity)

def transform_action_from_int(action: int):
    if isinstance(action, list) :
        return action
    array_action = []
    for _ in range(9):
        quantized = action % granularity
        action = action // granularity
        array_action.append(transform_action_space[quantized])
    return array_action

def transform_action_to_int(action) -> int:
    if isinstance(action, np.int64) :
        return action
    res = 0
    for i in range(9):
        res = res * granularity + action[i]
    return res

def flatten_observation(observation):
    if not isinstance(observation, dict):
        return observation
    achieved = observation['achieved_goal'][task].astype(np.float32)
    desired = observation['desired_goal'][task].astype(np.float32)
    obs = observation['observation'].astype(np.float32)

    flat_obs = np.concatenate([achieved, desired, obs], dtype=np.float32)
    return flat_obs

In [4]:
flat_dim = 73
action_space_size = granularity**9
obs_low = np.full((flat_dim,), -1e10, dtype=np.float32)
obs_high = np.full((flat_dim,), 1e10, dtype=np.float32)

class FlattenDictWrapper(gym.Wrapper):    
    def __init__(self, env):
        super().__init__(env)
        self.keys = env.observation_space.spaces.keys()
        self.observation_space = gym.spaces.Box(low=obs_low, high=obs_high, shape=(flat_dim,), dtype=np.float32)
        self.action_space = gym.spaces.Discrete(n=action_space_size)

    def observation(self, observation):
        return flatten_observation(observation)
    
    def action(self, action):
        return transform_action_to_int(action)
    
    def step(self, action):
        transformed_action = transform_action_to_int(action)
        obs, reward, terminated, truncated, info = self.env.step(transformed_action)
        obs = flatten_observation(obs)
        return obs, reward, terminated, truncated, info

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        return flatten_observation(obs)
    
    
def make_env():
    env = gym.make(env_id, render_mode=None, tasks_to_complete=[task])  # Or your actual task
    env = FlattenDictWrapper(env)
    return env


In [5]:
env = DummyVecEnv([make_env])

In [9]:
model = DQN("MlpPolicy", env, device=device, verbose=1)

Using cuda:0 device


In [None]:
# model = DQN.load("dqn_3_10000_"+task)

In [10]:
model.learn(total_timesteps=100000)

----------------------------------
| rollout/            |          |
|    exploration_rate | 0.894    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 53       |
|    time_elapsed     | 21       |
|    total_timesteps  | 1120     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.051    |
|    n_updates        | 254      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.787    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 54       |
|    time_elapsed     | 41       |
|    total_timesteps  | 2240     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.0415   |
|    n_updates        | 534      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rat

<stable_baselines3.dqn.dqn.DQN at 0x7f9e48282f50>

In [11]:
model.save("dqn_3_100000_"+task)

In [12]:
for i in range(10):
	env_eval = make_env()
	obs, _ = env_eval.reset()
	done = False
	ep_reward = 0

	while not done:
		action, _ = model.predict(obs, deterministic=True)
		obs, reward, terminated, truncated, _ = env_eval.step(transform_action_from_int(action))
		obs = flatten_observation(obs)
		done = terminated or truncated
		ep_reward += reward
	print(f"Episode reward: {ep_reward}")

Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0


In [13]:
model = DQN("MlpPolicy", env, device=device, verbose=1)
model.learn(total_timesteps=400000)
model.save("dqn_3_400000_"+task)

Using cuda:0 device
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.973    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 70       |
|    time_elapsed     | 15       |
|    total_timesteps  | 1120     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.0774   |
|    n_updates        | 254      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.947    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 71       |
|    time_elapsed     | 31       |
|    total_timesteps  | 2240     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.0617   |
|    n_updates        | 534      |
----------------------------------
----------------------------------
| rollout/            |          |


In [14]:
for i in range(10):
	env_eval = make_env()
	obs, _ = env_eval.reset()
	done = False
	ep_reward = 0

	while not done:
		action, _ = model.predict(obs, deterministic=True)
		obs, reward, terminated, truncated, _ = env_eval.step(transform_action_from_int(action))
		obs = flatten_observation(obs)
		done = terminated or truncated
		ep_reward += reward
	print(f"Episode reward: {ep_reward}")

Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
Episode reward: 0.0
