# Outlook

In this notebook, using BBRL, we code the DDPG algorithm.

To understand this code, you need to know more about [the BBRL interaction
model](https://github.com/osigaud/bbrl/blob/master/docs/overview.md) Then you
should run [a didactical
example](https://github.com/osigaud/bbrl/blob/master/docs/notebooks/03-multi_env_autoreset.student.ipynb)
to see how agents interact in BBRL when autoreset=True.

The DDPG algorithm is explained in [this
video](https://www.youtube.com/watch?v=0D6a0a1HTtc) and you can also read [the
corresponding slides](http://pages.isir.upmc.fr/~sigaud/teach/ddpg.pdf).

In [11]:
import gym
import torch
from torch import nn
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
from bbrl.agents import Agent
import matplotlib.pyplot as plt
from stable_baselines3 import TD3
from stable_baselines3.common.logger import configure

## Actor

In [12]:
class Actor(nn.Module):
    def __init__(self, state_shape, action_dim):
        super().__init__()
        
        hidden_size = 256
        
        self.dense_1 = nn.Linear(state_shape, hidden_size)
        self.dense_2 = nn.Linear(hidden_size, hidden_size)
        self.dense_3 = nn.Linear(hidden_size, action_dim)

    def forward(self, x):
        x = F.relu(self.dense_1(x))
        x = F.relu(self.dense_2(x))
        x = torch.tanh(self.dense_3(x))
        return x

class Critic(nn.Module):
    def __init__(self, state_shape, action_dim):
        super().__init__()
        
        hidden_size = 256
        
        # Q1 architecture
        self.dense_1 = nn.Linear(state_shape + action_dim, hidden_size)
        self.dense_2 = nn.Linear(hidden_size, hidden_size)
        self.dense_3 = nn.Linear(hidden_size, 1)
        
        # Q2 architecture
        self.dense_4 = nn.Linear(state_shape + action_dim, hidden_size)
        self.dense_5 = nn.Linear(hidden_size, hidden_size)
        self.dense_6 = nn.Linear(hidden_size, 1)

    def forward(self, x, x_actions):
        x = torch.cat([x, x_actions], dim=1)

        q1 = F.relu(self.dense_1(x))
        q1 = F.relu(self.dense_2(q1))
        q1 = self.dense_3(q1)

        q2 = F.relu(self.dense_4(x))
        q2 = F.relu(self.dense_5(q2))
        q2 = self.dense_6(q2)

        return q1, q2
    def forward_q1(self, x, x_actions):
        x = torch.cat([x, x_actions], dim=1)
        x = F.relu(self.dense_1(x))
        x = F.relu(self.dense_2(x))
        x = self.dense_3(x)
        return x      

 ## ReplaBuffer

In [13]:
class ReplayBuffer:
    def __init__(self, max_len, state_shape, action_dim, device):
        self.device = device
        
        # Maximum number of items that can be stored in buffer
        self.max_len = max_len
        
        # Buffer
        self.state_buffer = torch.zeros((max_len, state_shape), dtype=torch.float32).to(device)
        self.action_buffer = torch.zeros((max_len, action_dim), dtype=torch.float32).to(device)
        self.reward_buffer = torch.zeros(max_len, dtype=torch.float32).to(device)
        self.next_state_buffer = torch.zeros((max_len, state_shape), dtype=torch.float32).to(device)
        self.done_buffer = torch.zeros(max_len, dtype=torch.float32).to(device)
        
        # Pointer record
        self.ptr = 0
        
        # Keep track of size
        self.size = 0

    def __len__(self):
        return self.size

    def append(self, state, action, reward, next_state, done):
        self.state_buffer[self.ptr] = torch.tensor(state).to(self.device)
        self.action_buffer[self.ptr] = torch.tensor(action).to(self.device)
        self.reward_buffer[self.ptr] = torch.tensor(reward).to(self.device)
        self.next_state_buffer[self.ptr] = torch.tensor(next_state).to(self.device)
        self.done_buffer[self.ptr] = torch.tensor(done).to(self.device)

        self.ptr = (self.ptr + 1) % self.max_len
        self.size = min(self.size + 1, self.max_len)

    def sample(self, batch_size):
        indices = np.random.randint(0, self.size, size=batch_size)
        states = self.state_buffer[indices]
        actions = self.action_buffer[indices]
        rewards = self.reward_buffer[indices]
        next_states = self.next_state_buffer[indices]
        dones = self.done_buffer[indices]
        return states, actions, rewards, next_states, dones

Gaussienne noise.

In [14]:
class GaussianNoiseGenerator:
    def __init__(self, sigma=0.1):
        self.sigma = sigma
    
    def sample(self, *args):
        return self.sigma * np.random.randn(*args)

##Logger

In [15]:
class Logger:
    def __init__(self):
        self.steps = 0
        self.total_steps = 0
        self.new_line_every = 25000
        self.cumulative_reward = 0
        self.current_episode_length = 0
        self.episode_rewards = []
        self.episode_lengths = []
        self.episode_rewards_ma = 0
        self.episode_lengths_ma = 0
        
    def log(self, reward, done):
        self.cumulative_reward += reward
        self.current_episode_length += 1

        if done:
            self.episode_rewards.append(self.cumulative_reward)
            self.episode_lengths.append(self.current_episode_length)
            self.episode_rewards_ma = np.mean(self.episode_rewards[-50:])
            self.episode_lengths_ma = np.mean(self.episode_lengths[-50:])
            self.cumulative_reward = 0
            self.current_episode_length = 0
        self.steps += 1
        
    def print_logs(self):
        end_char = "\n" if self.steps % self.new_line_every == 0 else "\r"
        print(f"Step: {self.steps}/{self.total_steps} | Avg reward per episode: {self.episode_rewards_ma:.4f} | Avg steps per episode: {self.episode_lengths_ma:.2f}", end=end_char)
 

Params

In [16]:
params = {
    'learning_rate_actor': 0.001,  # Policy learning rate
    'learning_rate_critic': 0.001, # Learning rate of value function
    'tau': 0.005,                  # Target network update rate
    'buffer_max_length': 500000,   # Maximum number of experiences held in replay buffer
    'batch_size': 256,             # Number of transitions sampled per batch
    'start_timesteps': 10000,      # Time steps initial random policy is used - 25k in original
    'updates_per_step': 1,         # Learning updates per timestep
    'policy_freq': 2,              # Frequency of delayed policy updates
    'gamma': 0.99,                 # Discount factor
    'sigma': 0.2,                  # Action noise standard deviation
    'clip_noise': 0.5,             # Maximum noise amplitude 
    'action_low': -1,              # Lowest possible action value
    'action_high': 1               # Highest possible action value
}


TD3_Agent

In [7]:
class DiscreteActionWrapper(gym.ActionWrapper):
    "Bin continuous actions into discrete intervals."
    def __init__(self, env, n_actions=5):
        super().__init__(env)
        self.n_actions = n_actions
        self.action_space = gym.spaces.Discrete(n_actions)

    def action(self, action):
        if action == 0:
            return np.array([-2])
        elif action == 1:
            return np.array([-1])
        elif action == 2:
            return np.array([0])
        elif action == 3:
            return np.array([1])
        elif action == 4:
            return np.array([2])
    
class ObservationWrapper(gym.ObservationWrapper):
    "Scale the third value of the observation by 1/8."
    def __init__(self, env):
        super().__init__(env)

    def observation(self, observation):
        observation[2] *= 1/8.0
        return observation
    
class RewardScalingWrapper(gym.RewardWrapper):
    "Scale the reward by the given factor."
    def __init__(self, env, scaling_factor=1/10.0):
        super().__init__(env)
        self.scaling_factor = scaling_factor

    def reward(self, reward):
        return reward * self.scaling_factor
    
env = gym.make('LunarLander-v2')
env = RewardScalingWrapper(DiscreteActionWrapper(ObservationWrapper(env)))


In [None]:
### CleanRL

In [8]:

import gym
import torch

class TD3Agent:
    def __init__(self, env, Actor, Critic, config):
        """
        args:
            env (gym.Env): A gym environment
            Actor (torch.nn.Module): Policy neural network class
            Critic (torch.nn.Module): Q-function neural network class
            config (dict): A dictionary containing the TD3 configuration settings
        """
        self.env = env
        
        self.state_shape = self.env.observation_space.shape[0]
        self.action_dim = self.env.action_space.shape[0]
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self.actor = Actor(self.state_shape, self.action_dim).to(self.device)
        self.critic = Critic(self.state_shape, self.action_dim).to(self.device)

        self.actor_target = Actor(self.state_shape, self.action_dim).to(self.device)
        self.critic_target = Critic(self.state_shape, self.action_dim).to(self.device)
        
        # Configuration
        self.learning_rate_actor = config['learning_rate_actor']
        self.learning_rate_critic = config['learning_rate_critic']
        self.tau = config['tau']
        self.buffer_max_length = config['buffer_max_length']
        self.batch_size = config['batch_size']
        self.start_timesteps = config['start_timesteps']
        self.updates_per_step = config['updates_per_step']
        self.policy_freq = config['policy_freq']
        self.gamma = config['gamma']
        self.sigma = config['sigma']
        self.clip_noise = config['clip_noise']
        self.action_low = config['action_low']
        self.action_high = config['action_high']

        # Optimizers
        self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.learning_rate_actor)
        self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.learning_rate_critic)
        
        # Initialize replay buffer
        self.buffer = ReplayBuffer(self.buffer_max_length, self.state_shape, self.action_dim, self.device)

        # Gaussian noise
        self.gaussian_noise = GaussianNoiseGenerator(self.sigma)
        
        # Set target network parameters
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())
        
        # Logging
        self.logger = Logger()

    def act(self, state):
        """Predict a single action from a state without gradient computation."""
        state_tensor = torch.tensor(state, dtype=torch.float, device=self.device)
        with torch.no_grad():
            return self.actor(state_tensor).detach().cpu().numpy()
        

    def train_agent(self,seed,  max_timesteps=250000):
        
        # Reset environment
    
        state, info = env.reset(seed=seed)
        
        # Store 
        self.logger.total_steps = max_timesteps
        
        for current_timestep in range(max_timesteps):
            
            # Exploring start
            if len(self.buffer) < self.start_timesteps:
                action = self.env.action_space.sample()
            else:
                action = self.act(state)
                noise = self.gaussian_noise.sample(self.action_dim)
                action = np.clip(action + noise, a_min=self.action_low, a_max=self.action_high)

            next_state, reward, terminated, truncated, info = self.env.step(action)
            done = terminated or truncated
            
            self.buffer.append(state, action, reward, next_state, done)
            
            # Log reward and done, then print progress
            self.logger.log(reward, done)
            self.logger.print_logs()
            
            # Reset environment if done, else prepare state for next iteration
            if done:
                state, info = self.env.reset()
            else:
                state = next_state
            
            if current_timestep < self.start_timesteps:
                continue
                
            # Sample from replay buffer
            states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)

            # Compute target for critic loss calculation
            with torch.no_grad():
                noise = torch.tensor(self.gaussian_noise.sample(self.batch_size, self.action_dim), dtype=torch.float, device=self.device).clip(-self.clip_noise, self.clip_noise)
                
                # Clipped actions + noise
                next_actions = torch.clip(self.actor_target(next_states) + noise, min=self.action_low, max=self.action_high)

                targets_q1, targets_q2 = self.critic(next_states, next_actions)
                targets_q = torch.min(targets_q1, targets_q2).squeeze(-1)
                targets_q = rewards + self.gamma * (1 - dones) * targets_q

            # Current Q predictions
            pred_q1, pred_q2 = self.critic(states, actions)

            # Critic loss
            loss_critic = F.mse_loss(pred_q1.squeeze(-1), targets_q) + F.mse_loss(pred_q2.squeeze(-1), targets_q)

            # Critic backward pass
            self.optimizer_critic.zero_grad()
            loss_critic.backward()
            self.optimizer_critic.step()

            # Delayed policy learning updates
            if current_timestep % self.policy_freq == 0:
                
                # Actor loss
                loss_actor = -self.critic.forward_q1(states, self.actor(states)).mean()
                
                # Actor backward pass
                self.optimizer_actor.zero_grad()
                loss_actor.backward()
                self.optimizer_actor.step()
                
                # Update target network parameters
                for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
                    target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

                for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                    target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)



### Test


In [None]:
# Crée l'environnement
env = gym.make('LunarLanderContinuous-v2')  # Utilise l'environnement continu
env = RewardScalingWrapper(ObservationWrapper(env))

# Crée l'agent TD3
agent_lunar = TD3Agent(env, Actor, Critic, params)

# Entraîne l'agent
seed = 42  # Utilise un seed pour la reproductibilité, par exemple
agent_lunar.train_agent(seed, 250000)
def plot_results(steps, avg_rewards, avg_steps):
    plt.figure(figsize=(14, 6))

    # Courbe des récompenses moyennes
    plt.subplot(1, 2, 1)
    plt.plot(steps, avg_rewards, marker='o', color='b')
    plt.title('Récompense Moyenne par Épisode')
    plt.xlabel('Steps')
    plt.ylabel('Récompense Moyenne')
    plt.grid()

    # Courbe des étapes moyennes
    plt.subplot(1, 2, 2)
    plt.plot(steps, avg_steps, marker='o', color='r')
    plt.title('Étapes Moyennes par Épisode')
    plt.xlabel('Steps')
    plt.ylabel('Étapes Moyennes')
    plt.grid()

    plt.tight_layout()
    plt.show()

# Tracer les résultats
plot_results(agent_lunar.steps, agent_lunar.avg_rewards, agent_lunar.avg_steps)


  if not isinstance(terminated, (bool, np.bool8)):


Step: 25000/250000 | Avg reward per episode: -10.6694 | Avg steps per episode: 312.88
Step: 47709/250000 | Avg reward per episode: -2.2112 | Avg steps per episode: 502.768

In [9]:
import numpy as np
import torch
import torch.nn.functional as F

class TD3Agent:
    def __init__(self, env, Actor, Critic, config):
        """
        args:
            env (gym.Env): A gym environment
            Actor (torch.nn.Module): Policy neural network class
            Critic (torch.nn.Module): Q-function neural network class
            config (dict): A dictionary containing the TD3 configuration settings
        """
        self.env = env
        
        self.state_shape = self.env.observation_space.shape[0]
        self.action_dim = self.env.action_space.shape[0]
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

        self.actor = Actor(self.state_shape, self.action_dim).to(self.device)
        self.critic = Critic(self.state_shape, self.action_dim).to(self.device)

        self.actor_target = Actor(self.state_shape, self.action_dim).to(self.device)
        self.critic_target = Critic(self.state_shape, self.action_dim).to(self.device)
        
        # Configuration
        self.learning_rate_actor = config['learning_rate_actor']
        self.learning_rate_critic = config['learning_rate_critic']
        self.tau = config['tau']
        self.buffer_max_length = config['buffer_max_length']
        self.batch_size = config['batch_size']
        self.start_timesteps = config['start_timesteps']
        self.updates_per_step = config['updates_per_step']
        self.policy_freq = config['policy_freq']
        self.gamma = config['gamma']
        self.sigma = config['sigma']
        self.clip_noise = config['clip_noise']
        self.action_low = config['action_low']
        self.action_high = config['action_high']

        # Optimizers
        self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=self.learning_rate_actor)
        self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=self.learning_rate_critic)

        # Initialize replay buffer
        self.buffer = ReplayBuffer(self.buffer_max_length, self.state_shape, self.action_dim, self.device)

        # Gaussian noise
        self.gaussian_noise = GaussianNoiseGenerator(self.sigma)
        
        # Set target network parameters
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())
        
        # Logging
        self.logger = Logger()
        
    def train(self, max_timesteps):
        """Trains the agent for a maximum number of timesteps."""
        # Reset environment
        state, info = self.env.reset()
        
        # Store 
        self.logger.total_steps = max_timesteps
        
        for current_timestep in range(max_timesteps):
            # Exploring start
            if len(self.buffer) < self.start_timesteps:
                action = self.env.action_space.sample()
            else:
                action = self.act(state)
                noise = self.gaussian_noise.sample(self.action_dim)
                action = np.clip(action + noise, a_min=self.action_low, a_max=self.action_high)

            next_state, reward, terminated, truncated, info = self.env.step(action)
            done = terminated or truncated
            
            self.buffer.append(state, action, reward, next_state, done)
            
            # Log reward and done, then print progress
            self.logger.log(reward, done)
            self.logger.print_logs()

            # Reset environment if done, else prepare state for next iteration
            if done:
                state, info = self.env.reset()
            else:
                state = next_state
            
            if current_timestep < self.start_timesteps:
                continue
            
            # Sample from replay buffer
            states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)

            # Compute target for critic loss calculation
            with torch.no_grad():
                # Clipped noise
                noise = torch.tensor(self.gaussian_noise.sample(self.batch_size, self.action_dim), 
                                     dtype=torch.float, device=self.device).clip(-self.clip_noise, self.clip_noise)
                
                # Clipped actions + noise
                next_actions = torch.clip(self.actor_target(next_states) + noise, min=self.action_low, max=self.action_high)

                targets_q1, targets_q2 = self.critic(next_states, next_actions)
                targets_q = torch.min(targets_q1, targets_q2).squeeze(-1)
                targets_q = rewards + self.gamma * (1 - dones) * targets_q

            # Current Q predictions
            pred_q1, pred_q2 = self.critic(states, actions)

            # Critic loss
            loss_critic = F.mse_loss(pred_q1.squeeze(-1), targets_q) + F.mse_loss(pred_q2.squeeze(-1), targets_q)

            # Critic backward pass
            self.optimizer_critic.zero_grad()
            loss_critic.backward()
            self.optimizer_critic.step()

            # Delayed policy learning updates
            if current_timestep % self.policy_freq == 0:
                # Actor loss
                loss_actor = -self.critic.forward_q1(states, self.actor(states)).mean()
                
                # Actor backward pass
                self.optimizer_actor.zero_grad()
                loss_actor.backward()
                self.optimizer_actor.step()
                
                # Update target network parameters
                for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
                    target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

                for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                    target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

    def act(self, state):
        """Predict a single action from a state without gradient computation."""
        state_tensor = torch.tensor(state, dtype=torch.float, device=self.device)
        with torch.no_grad():
            return self.actor(state_tensor).detach().cpu().numpy()


In [10]:
env = gym.make('LunarLander-v2', continuous=True)
env = RewardScalingWrapper(ObservationWrapper(env))

agent_lunar = TD3Agent(env, Actor, Critic, params)
agent_lunar.train(250000)

  if not isinstance(terminated, (bool, np.bool8)):


Step: 25000/250000 | Avg reward per episode: -2.2762 | Avg steps per episode: 312.406
Step: 50000/250000 | Avg reward per episode: 24.5511 | Avg steps per episode: 376.00
Step: 75000/250000 | Avg reward per episode: 23.3703 | Avg steps per episode: 291.06
Step: 100000/250000 | Avg reward per episode: 23.8254 | Avg steps per episode: 285.24
Step: 125000/250000 | Avg reward per episode: 27.1390 | Avg steps per episode: 256.64
Step: 150000/250000 | Avg reward per episode: 27.1703 | Avg steps per episode: 239.98
Step: 175000/250000 | Avg reward per episode: 25.8402 | Avg steps per episode: 266.32
Step: 200000/250000 | Avg reward per episode: 25.2306 | Avg steps per episode: 263.06
Step: 225000/250000 | Avg reward per episode: 25.7221 | Avg steps per episode: 254.24
Step: 250000/250000 | Avg reward per episode: 27.6247 | Avg steps per episode: 242.64


In [None]:
### stable_baselines3: TD3

In [9]:
from typing import Any, Dict, List, Optional, Type, Union

import torch as th
from gymnasium import spaces
from torch import nn
import gym

from stable_baselines3.common.policies import BasePolicy, ContinuousCritic
from stable_baselines3.common.preprocessing import get_action_dim
from stable_baselines3.common.torch_layers import (
    BaseFeaturesExtractor,
    CombinedExtractor,
    FlattenExtractor,
    NatureCNN,
    create_mlp,
    get_actor_critic_arch,
)
from stable_baselines3.common.type_aliases import PyTorchObs, Schedule



class Actor(BasePolicy):
    """
    Actor network (policy) for TD3.

    :param observation_space: Observation space
    :param action_space: Action space
    :param net_arch: Network architecture
    :param features_extractor: Network to extract features
        (a CNN when using images, a nn.Flatten() layer otherwise)
    :param features_dim: Number of features
    :param activation_fn: Activation function
    :param normalize_images: Whether to normalize images or not,
         dividing by 255.0 (True by default)
    """

    def __init__(
        self,
        observation_space: spaces.Space,
        action_space: spaces.Box,
        net_arch: List[int],
        features_extractor: nn.Module,
        features_dim: int,
        activation_fn: Type[nn.Module] = nn.ReLU,
        normalize_images: bool = True,
    ):
        super().__init__(
            observation_space,
            action_space,
            features_extractor=features_extractor,
            normalize_images=normalize_images,
            squash_output=True,
        )

        self.net_arch = net_arch
        self.features_dim = features_dim
        self.activation_fn = activation_fn

        action_dim = get_action_dim(self.action_space)
        actor_net = create_mlp(features_dim, action_dim, net_arch, activation_fn, squash_output=True)
        # Deterministic action
        self.mu = nn.Sequential(*actor_net)

    def _get_constructor_parameters(self) -> Dict[str, Any]:
        data = super()._get_constructor_parameters()

        data.update(
            dict(
                net_arch=self.net_arch,
                features_dim=self.features_dim,
                activation_fn=self.activation_fn,
                features_extractor=self.features_extractor,
            )
        )
        return data

    def forward(self, obs: th.Tensor) -> th.Tensor:
        # assert deterministic, 'The TD3 actor only outputs deterministic actions'
        features = self.extract_features(obs, self.features_extractor)
        return self.mu(features)

    def _predict(self, observation: PyTorchObs, deterministic: bool = False) -> th.Tensor:
        # Note: the deterministic deterministic parameter is ignored in the case of TD3.
        #   Predictions are always deterministic.
        return self(observation)


class TD3Policy(BasePolicy):
    """
    Policy class (with both actor and critic) for TD3.

    :param observation_space: Observation space
    :param action_space: Action space
    :param lr_schedule: Learning rate schedule (could be constant)
    :param net_arch: The specification of the policy and value networks.
    :param activation_fn: Activation function
    :param features_extractor_class: Features extractor to use.
    :param features_extractor_kwargs: Keyword arguments
        to pass to the features extractor.
    :param normalize_images: Whether to normalize images or not,
         dividing by 255.0 (True by default)
    :param optimizer_class: The optimizer to use,
        ``th.optim.Adam`` by default
    :param optimizer_kwargs: Additional keyword arguments,
        excluding the learning rate, to pass to the optimizer
    :param n_critics: Number of critic networks to create.
    :param share_features_extractor: Whether to share or not the features extractor
        between the actor and the critic (this saves computation time)
    """

    actor: Actor
    actor_target: Actor
    critic: ContinuousCritic
    critic_target: ContinuousCritic

    def __init__(
        self,
        observation_space: spaces.Space,
        action_space: spaces.Box,
        lr_schedule: Schedule,
        net_arch: Optional[Union[List[int], Dict[str, List[int]]]] = None,
        activation_fn: Type[nn.Module] = nn.ReLU,
        features_extractor_class: Type[BaseFeaturesExtractor] = FlattenExtractor,
        features_extractor_kwargs: Optional[Dict[str, Any]] = None,
        normalize_images: bool = True,
        optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
        optimizer_kwargs: Optional[Dict[str, Any]] = None,
        n_critics: int = 2,
        share_features_extractor: bool = False,
    ):
        super().__init__(
            observation_space,
            action_space,
            features_extractor_class,
            features_extractor_kwargs,
            optimizer_class=optimizer_class,
            optimizer_kwargs=optimizer_kwargs,
            squash_output=True,
            normalize_images=normalize_images,
        )

        # Default network architecture, from the original paper
        if net_arch is None:
            if features_extractor_class == NatureCNN:
                net_arch = [256, 256]
            else:
                net_arch = [400, 300]

        actor_arch, critic_arch = get_actor_critic_arch(net_arch)

        self.net_arch = net_arch
        self.activation_fn = activation_fn
        self.net_args = {
            "observation_space": self.observation_space,
            "action_space": self.action_space,
            "net_arch": actor_arch,
            "activation_fn": self.activation_fn,
            "normalize_images": normalize_images,
        }
        self.actor_kwargs = self.net_args.copy()
        self.critic_kwargs = self.net_args.copy()
        self.critic_kwargs.update(
            {
                "n_critics": n_critics,
                "net_arch": critic_arch,
                "share_features_extractor": share_features_extractor,
            }
        )

        self.share_features_extractor = share_features_extractor

        self._build(lr_schedule)

    def _build(self, lr_schedule: Schedule) -> None:
        # Create actor and target
        # the features extractor should not be shared
        self.actor = self.make_actor(features_extractor=None)
        self.actor_target = self.make_actor(features_extractor=None)
        # Initialize the target to have the same weights as the actor
        self.actor_target.load_state_dict(self.actor.state_dict())

        self.actor.optimizer = self.optimizer_class(
            self.actor.parameters(),
            lr=lr_schedule(1),  # type: ignore[call-arg]
            **self.optimizer_kwargs,
        )

        if self.share_features_extractor:
            self.critic = self.make_critic(features_extractor=self.actor.features_extractor)
            # Critic target should not share the features extractor with critic
            # but it can share it with the actor target as actor and critic are sharing
            # the same features_extractor too
            # NOTE: as a result the effective poliak (soft-copy) coefficient for the features extractor
            # will be 2 * tau instead of tau (updated one time with the actor, a second time with the critic)
            self.critic_target = self.make_critic(features_extractor=self.actor_target.features_extractor)
        else:
            # Create new features extractor for each network
            self.critic = self.make_critic(features_extractor=None)
            self.critic_target = self.make_critic(features_extractor=None)

        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic.optimizer = self.optimizer_class(
            self.critic.parameters(),
            lr=lr_schedule(1),  # type: ignore[call-arg]
            **self.optimizer_kwargs,
        )

        # Target networks should always be in eval mode
        self.actor_target.set_training_mode(False)
        self.critic_target.set_training_mode(False)

    def _get_constructor_parameters(self) -> Dict[str, Any]:
        data = super()._get_constructor_parameters()

        data.update(
            dict(
                net_arch=self.net_arch,
                activation_fn=self.net_args["activation_fn"],
                n_critics=self.critic_kwargs["n_critics"],
                lr_schedule=self._dummy_schedule,  # dummy lr schedule, not needed for loading policy alone
                optimizer_class=self.optimizer_class,
                optimizer_kwargs=self.optimizer_kwargs,
                features_extractor_class=self.features_extractor_class,
                features_extractor_kwargs=self.features_extractor_kwargs,
                share_features_extractor=self.share_features_extractor,
            )
        )
        return data

    def make_actor(self, features_extractor: Optional[BaseFeaturesExtractor] = None) -> Actor:
        actor_kwargs = self._update_features_extractor(self.actor_kwargs, features_extractor)
        return Actor(**actor_kwargs).to(self.device)

    def make_critic(self, features_extractor: Optional[BaseFeaturesExtractor] = None) -> ContinuousCritic:
        critic_kwargs = self._update_features_extractor(self.critic_kwargs, features_extractor)
        return ContinuousCritic(**critic_kwargs).to(self.device)

    def forward(self, observation: PyTorchObs, deterministic: bool = False) -> th.Tensor:
        return self._predict(observation, deterministic=deterministic)

    def _predict(self, observation: PyTorchObs, deterministic: bool = False) -> th.Tensor:
        # Note: the deterministic deterministic parameter is ignored in the case of TD3.
        #   Predictions are always deterministic.
        return self.actor(observation)

    def set_training_mode(self, mode: bool) -> None:
        """
        Put the policy in either training or evaluation mode.

        This affects certain modules, such as batch normalisation and dropout.

        :param mode: if true, set to training mode, else set to evaluation mode
        """
        self.actor.set_training_mode(mode)
        self.critic.set_training_mode(mode)
        self.training = mode


MlpPolicy = TD3Policy


class CnnPolicy(TD3Policy):
    """
    Policy class (with both actor and critic) for TD3.

    :param observation_space: Observation space
    :param action_space: Action space
    :param lr_schedule: Learning rate schedule (could be constant)
    :param net_arch: The specification of the policy and value networks.
    :param activation_fn: Activation function
    :param features_extractor_class: Features extractor to use.
    :param features_extractor_kwargs: Keyword arguments
        to pass to the features extractor.
    :param normalize_images: Whether to normalize images or not,
         dividing by 255.0 (True by default)
    :param optimizer_class: The optimizer to use,
        ``th.optim.Adam`` by default
    :param optimizer_kwargs: Additional keyword arguments,
        excluding the learning rate, to pass to the optimizer
    :param n_critics: Number of critic networks to create.
    :param share_features_extractor: Whether to share or not the features extractor
        between the actor and the critic (this saves computation time)
    """

    def __init__(
        self,
        observation_space: spaces.Space,
        action_space: spaces.Box,
        lr_schedule: Schedule,
        net_arch: Optional[Union[List[int], Dict[str, List[int]]]] = None,
        activation_fn: Type[nn.Module] = nn.ReLU,
        features_extractor_class: Type[BaseFeaturesExtractor] = NatureCNN,
        features_extractor_kwargs: Optional[Dict[str, Any]] = None,
        normalize_images: bool = True,
        optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
        optimizer_kwargs: Optional[Dict[str, Any]] = None,
        n_critics: int = 2,
        share_features_extractor: bool = False,
    ):
        super().__init__(
            observation_space,
            action_space,
            lr_schedule,
            net_arch,
            activation_fn,
            features_extractor_class,
            features_extractor_kwargs,
            normalize_images,
            optimizer_class,
            optimizer_kwargs,
            n_critics,
            share_features_extractor,
        )


class MultiInputPolicy(TD3Policy):
    """
    Policy class (with both actor and critic) for TD3 to be used with Dict observation spaces.

    :param observation_space: Observation space
    :param action_space: Action space
    :param lr_schedule: Learning rate schedule (could be constant)
    :param net_arch: The specification of the policy and value networks.
    :param activation_fn: Activation function
    :param features_extractor_class: Features extractor to use.
    :param features_extractor_kwargs: Keyword arguments
        to pass to the features extractor.
    :param normalize_images: Whether to normalize images or not,
         dividing by 255.0 (True by default)
    :param optimizer_class: The optimizer to use,
        ``th.optim.Adam`` by default
    :param optimizer_kwargs: Additional keyword arguments,
        excluding the learning rate, to pass to the optimizer
    :param n_critics: Number of critic networks to create.
    :param share_features_extractor: Whether to share or not the features extractor
        between the actor and the critic (this saves computation time)
    """

    def __init__(
        self,
        observation_space: spaces.Dict,
        action_space: spaces.Box,
        lr_schedule: Schedule,
        net_arch: Optional[Union[List[int], Dict[str, List[int]]]] = None,
        activation_fn: Type[nn.Module] = nn.ReLU,
        features_extractor_class: Type[BaseFeaturesExtractor] = CombinedExtractor,
        features_extractor_kwargs: Optional[Dict[str, Any]] = None,
        normalize_images: bool = True,
        optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
        optimizer_kwargs: Optional[Dict[str, Any]] = None,
        n_critics: int = 2,
        share_features_extractor: bool = False,
    ):
        super().__init__(
            observation_space,
            action_space,
            lr_schedule,
            net_arch,
            activation_fn,
            features_extractor_class,
            features_extractor_kwargs,
            normalize_images,
            optimizer_class,
            optimizer_kwargs,
            n_critics,
            share_features_extractor,
        )

In [None]:
import gym
import torch as th
from stable_baselines3 import TD3
from stable_baselines3.common.vec_env import DummyVecEnv  # Corrigé ici

# Créez un environnement
env_id = 'LunarLanderContinuous-v2'
env = gym.make(env_id)
env = DummyVecEnv([lambda: env])  # Wrap the environment

# Initialisez le modèle TD3
model = TD3('MlpPolicy', env, verbose=1,)

# Entraînez le modèle
model.learn(total_timesteps=250000)

# Sauvegardez le modèle
model.save("td3_lunarlander")

# Évaluer le modèle
obs = env.reset()
for _ in range(1000):
    action, _states = model.predict(obs)
    obs, rewards, dones, info = env.step(action)
    env.render()



Using cuda device


  if not isinstance(terminated, (bool, np.bool8)):


---------------------------------
| time/              |          |
|    episodes        | 4        |
|    fps             | 194      |
|    time_elapsed    | 2        |
|    total_timesteps | 432      |
| train/             |          |
|    actor_loss      | 4.8      |
|    critic_loss     | 92.8     |
|    learning_rate   | 0.001    |
|    n_updates       | 331      |
---------------------------------
---------------------------------
| time/              |          |
|    episodes        | 8        |
|    fps             | 179      |
|    time_elapsed    | 4        |
|    total_timesteps | 746      |
| train/             |          |
|    actor_loss      | 12.6     |
|    critic_loss     | 47.5     |
|    learning_rate   | 0.001    |
|    n_updates       | 645      |
---------------------------------
---------------------------------
| time/              |          |
|    episodes        | 12       |
|    fps             | 173      |
|    time_elapsed    | 6        |
|    total_tim

In [None]:
### stable_baselines3: TD3-version2

In [None]:
import gymnasium as gym
import numpy as np
from stable_baselines3 import TD3
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
import itertools
import pandas as pd

def test_hyperparameters():
    # Définition des hyperparamètres à tester
    hyperparameters = {
        'learning_rate': [0.0001, 0.0005, 0.001],
        'tau': [0.001, 0.005, 0.01],
        'batch_size': [64, 128, 256],
        'policy_delay': [1, 2, 4]
    }

    # Création de l'environnement
    env = DummyVecEnv([lambda: gym.make('LunarLanderContinuous-v2')])
    
    # Création du bruit d'action
    n_actions = env.action_space.shape[-1]
    action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.2 * np.ones(n_actions))

    # Génération de toutes les combinaisons d'hyperparamètres
    keys = hyperparameters.keys()
    values = hyperparameters.values()
    combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

    results = []
    for i, params in enumerate(combinations):
        print(f"\nTest {i + 1}/{len(combinations)}")
        print(f"Paramètres: {params}")
        
        # Création et entraînement du modèle
        model = TD3('MlpPolicy', env, action_noise=action_noise, verbose=0, **params)
        try:
            model.learn(total_timesteps=100000)  # Réduit pour le test
            
            # Évaluation
            mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=5)
            
            results.append({
                **params,
                'mean_reward': mean_reward,
                'std_reward': std_reward
            })
            print(f"Récompense moyenne: {mean_reward:.2f} +/- {std_reward:.2f}")
        except Exception as e:
            print(f"Erreur avec les paramètres {params}: {e}")
            continue  # Continue to the next set of hyperparameters
    
    return pd.DataFrame(results)

def main():
    print("Début des tests d'hyperparamètres...")
    results_df = test_hyperparameters()
    
    # Trier et afficher les résultats
    results_df = results_df.sort_values('mean_reward', ascending=False)
    print("\nRésultats triés par récompense moyenne:")
    print(results_df)
    
    # Sauvegarder les résultats
    results_df.to_csv('td3_hyperparameter_results.csv', index=False)
    print("\nRésultats sauvegardés dans 'td3_hyperparameter_results.csv'")
    
    # Afficher les meilleurs paramètres
    if not results_df.empty:
        best_params = results_df.iloc[0].to_dict()
        print("\nMeilleurs paramètres trouvés:")
        for key, value in best_params.items():
            if key not in ['mean_reward', 'std_reward']:
                print(f"{key}: {value}")
        print(f"Récompense moyenne: {best_params['mean_reward']:.2f} +/- {best_params['std_reward']:.2f}")
    else:
        print("Aucun résultat valide trouvé.")

if __name__ == "__main__":
    main()


Début des tests d'hyperparamètres...

Test 1/81
Paramètres: {'learning_rate': 0.0001, 'tau': 0.001, 'batch_size': 64, 'policy_delay': 1}




Récompense moyenne: -49.84 +/- 34.61

Test 2/81
Paramètres: {'learning_rate': 0.0001, 'tau': 0.001, 'batch_size': 64, 'policy_delay': 2}
Récompense moyenne: -44.38 +/- 24.61

Test 3/81
Paramètres: {'learning_rate': 0.0001, 'tau': 0.001, 'batch_size': 64, 'policy_delay': 4}
Récompense moyenne: -29.36 +/- 115.69

Test 4/81
Paramètres: {'learning_rate': 0.0001, 'tau': 0.001, 'batch_size': 128, 'policy_delay': 1}
Récompense moyenne: 23.24 +/- 109.18

Test 5/81
Paramètres: {'learning_rate': 0.0001, 'tau': 0.001, 'batch_size': 128, 'policy_delay': 2}
Récompense moyenne: -107.40 +/- 22.91

Test 6/81
Paramètres: {'learning_rate': 0.0001, 'tau': 0.001, 'batch_size': 128, 'policy_delay': 4}
Récompense moyenne: -83.05 +/- 32.03

Test 7/81
Paramètres: {'learning_rate': 0.0001, 'tau': 0.001, 'batch_size': 256, 'policy_delay': 1}


In [None]:
import re
import matplotlib.pyplot as plt

# Logs d'entraînement
log_data = """
... (insérer ici les logs d'entraînement) ...
"""

# Initialiser les listes pour les pertes
actor_losses = []
critic_losses = []
n_updates = []

# Expression régulière pour extraire les pertes
for line in log_data.splitlines():
    if 'actor_loss' in line:
        actor_loss = float(re.search(r'actor_loss\s+([\d.-]+)', line).group(1))
        critic_loss = float(re.search(r'critic_loss\s+([\d.-]+)', line).group(1))
        n_update = int(re.search(r'n_updates\s+(\d+)', line).group(1))
        
        actor_losses.append(actor_loss)
        critic_losses.append(critic_loss)
        n_updates.append(n_update)

# Afficher les données pour vérifier
print("Actor losses:", actor_losses)
print("Critic losses:", critic_losses)
print("Number of updates:", n_updates)
# Tracer les pertes
plt.figure(figsize=(12, 6))
plt.plot(n_updates, actor_losses, label='Actor Loss', color='blue', marker='o')
plt.plot(n_updates, critic_losses, label='Critic Loss', color='red', marker='x')

plt.title('Actor and Critic Losses Over Time')
plt.xlabel('Number of Updates')
plt.ylabel('Loss')
plt.legend()
plt.grid()
plt.show()


In [None]:
### stable_baselines3: TD3-version2-test

In [None]:
import gymnasium as gym
import numpy as np
from stable_baselines3 import TD3
from stable_baselines3.common.noise import NormalActionNoise
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
import itertools
import pandas as pd

def test_hyperparameters():
    # Définition des hyperparamètres à tester
    hyperparameters = {
        'learning_rate': [0.0001, 0.0005, 0.001],
        'tau': [0.001, 0.005, 0.01],
        'batch_size': [64, 128, 256],
        'policy_delay': [1, 2, 4]
    }

    # Création de l'environnement
    env = DummyVecEnv([lambda: gym.make('LunarLanderContinuous-v2')])
    
    # Création du bruit d'action
    n_actions = env.action_space.shape[-1]
    action_noise = NormalActionNoise(mean=np.zeros(n_actions), sigma=0.2 * np.ones(n_actions))

    # Génération de toutes les combinaisons d'hyperparamètres
    keys = hyperparameters.keys()
    values = hyperparameters.values()
    combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

    results = []
    for i, params in enumerate(combinations):
        print(f"\nTest {i + 1}/{len(combinations)}")
        print(f"Paramètres: {params}")
        
        # Création et entraînement du modèle
        model = TD3('MlpPolicy', env, action_noise=action_noise, verbose=0, **params)
        try:
            model.learn(total_timesteps=100000)  # Réduit pour le test
            
            # Évaluation
            mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=5)
            
            results.append({
                **params,
                'mean_reward': mean_reward,
                'std_reward': std_reward
            })
            print(f"Récompense moyenne: {mean_reward:.2f} +/- {std_reward:.2f}")
        except Exception as e:
            print(f"Erreur avec les paramètres {params}: {e}")
            continue  # Continue to the next set of hyperparameters
    
    return pd.DataFrame(results)

def main():
    print("Début des tests d'hyperparamètres...")
    results_df = test_hyperparameters()
    
    # Trier et afficher les résultats
    results_df = results_df.sort_values('mean_reward', ascending=False)
    print("\nRésultats triés par récompense moyenne:")
    print(results_df)
    
    # Sauvegarder les résultats
    results_df.to_csv('td3_hyperparameter_results.csv', index=False)
    print("\nRésultats sauvegardés dans 'td3_hyperparameter_results.csv'")
    
    # Afficher les meilleurs paramètres
    if not results_df.empty:
        best_params = results_df.iloc[0].to_dict()
        print("\nMeilleurs paramètres trouvés:")
        for key, value in best_params.items():
            if key not in ['mean_reward', 'std_reward']:
                print(f"{key}: {value}")
        print(f"Récompense moyenne: {best_params['mean_reward']:.2f} +/- {best_params['std_reward']:.2f}")
    else:
        print("Aucun résultat valide trouvé.")

if __name__ == "__main__":
    main()


In [None]:
Taishu

In [17]:
import torch
from copy import deepcopy
import torch.nn.functional as F
from tianshou.policy import DDPGPolicy

class TD3Policy(DDPGPolicy):
    
    def __init__(self, actor, actor_optim, critic1, critic1_optim,
                 critic2, critic2_optim, tau=0.005, gamma=0.99,
                 exploration_noise=0.1, policy_noise=0.2, update_actor_freq=2,
                 noise_clip=0.5, action_range=None,
                 reward_normalization=False, ignore_done=False, **kwargs):
        super().__init__(actor, actor_optim, None, None, tau, gamma,
                         exploration_noise, action_range, reward_normalization,
                         ignore_done)
        self.critic1, self.critic1_old = critic1, deepcopy(critic1)
        self.critic1_old.eval()
        self.critic1_optim = critic1_optim
        self.critic2, self.critic2_old = critic2, deepcopy(critic2)
        self.critic2_old.eval()
        self.critic2_optim = critic2_optim
        self._policy_noise = policy_noise
        self._freq = update_actor_freq
        self._noise_clip = noise_clip
        self._cnt = 0
        self._last = 0

    def train(self):
        self.training = True
        self.actor.train()
        self.critic1.train()
        self.critic2.train()

    def eval(self):
        self.training = False
        self.actor.eval()
        self.critic1.eval()
        self.critic2.eval()

    def sync_weight(self):
        for o, n in zip(self.actor_old.parameters(), self.actor.parameters()):
            o.data.copy_(o.data * (1 - self._tau) + n.data * self._tau)
        for o, n in zip(self.critic1_old.parameters(), self.critic1.parameters()):
            o.data.copy_(o.data * (1 - self._tau) + n.data * self._tau)
        for o, n in zip(self.critic2_old.parameters(), self.critic2.parameters()):
            o.data.copy_(o.data * (1 - self._tau) + n.data * self._tau)

    def learn(self, batch, **kwargs):
        with torch.no_grad():
            a_ = self(batch, model='actor_old', input='obs_next').act
            dev = a_.device
            noise = torch.randn(size=a_.shape, device=dev) * self._policy_noise
            if self._noise_clip >= 0:
                noise = noise.clamp(-self._noise_clip, self._noise_clip)
            a_ += noise
            a_ = a_.clamp(self._range[0], self._range[1])
            target_q = torch.min(
                self.critic1_old(batch.obs_next, a_),
                self.critic2_old(batch.obs_next, a_)
            )
            rew = torch.tensor(batch.rew, dtype=torch.float, device=dev)[:, None]
            done = torch.tensor(batch.done, dtype=torch.float, device=dev)[:, None]
            target_q = (rew + (1. - done) * self._gamma * target_q)

        # Critic 1
        current_q1 = self.critic1(batch.obs, batch.act)
        critic1_loss = F.mse_loss(current_q1, target_q)
        self.critic1_optim.zero_grad()
        critic1_loss.backward()
        self.critic1_optim.step()

        # Critic 2
        current_q2 = self.critic2(batch.obs, batch.act)
        critic2_loss = F.mse_loss(current_q2, target_q)
        self.critic2_optim.zero_grad()
        critic2_loss.backward()
        self.critic2_optim.step()

        if self._cnt % self._freq == 0:
            actor_loss = -self.critic1(batch.obs, self(batch, eps=0).act).mean()
            self.actor_optim.zero_grad()
            actor_loss.backward()
            self._last = actor_loss.item()
            self.actor_optim.step()
            self.sync_weight()

        self._cnt += 1
        return {
            'loss/actor': self._last,
            'loss/critic1': critic1_loss.item(),
            'loss/critic2': critic2_loss.item(),
        }


In [18]:
import gymnasium as gym
import numpy as np
import torch
from torch import nn
from torch.optim import Adam
from tianshou.policy import DDPGPolicy
from copy import deepcopy
import itertools
import pandas as pd

# Supposons que TD3Policy est définie ici
class TD3Policy(DDPGPolicy):
    # Votre code TD3Policy ici...
    def __init__(self, actor, actor_optim, critic1, critic1_optim, critic2, critic2_optim, **kwargs):
        super().__init__(actor, actor_optim, None, None, **kwargs)
        self.critic1 = critic1
        self.critic1_optim = critic1_optim
        self.critic2 = critic2
        self.critic2_optim = critic2_optim
        # Initialiser d'autres attributs nécessaires

    # Ajoutez les méthodes nécessaires comme `learn`, `sync_weight`, etc.

def test_hyperparameters():
    # Définition des hyperparamètres à tester
    hyperparameters = {
        'learning_rate': [0.0001, 0.0005, 0.001],
        'tau': [0.001, 0.005, 0.01],
        'batch_size': [64, 128, 256],
        'policy_delay': [1, 2, 4]
    }

    # Création de l'environnement
    env = gym.make('LunarLanderContinuous-v2')
    
    # Création des modèles d'acteurs et de critiques
    actor = nn.Sequential(nn.Linear(8, 256), nn.ReLU(), nn.Linear(256, 2))
    critic1 = nn.Sequential(nn.Linear(8 + 2, 256), nn.ReLU(), nn.Linear(256, 1))
    critic2 = nn.Sequential(nn.Linear(8 + 2, 256), nn.ReLU(), nn.Linear(256, 1))
    
    results = []
    param_combinations = list(itertools.product(*hyperparameters.values()))
    for i, params in enumerate(param_combinations):
        param_dict = dict(zip(hyperparameters.keys(), params))
        print(f"\nTest {i + 1}/{len(param_combinations)}")
        print(f"Paramètres: {param_dict}")

        # Création et entraînement du modèle
        actor_optim = Adam(actor.parameters(), lr=param_dict['learning_rate'])
        critic1_optim = Adam(critic1.parameters(), lr=param_dict['learning_rate'])
        critic2_optim = Adam(critic2.parameters(), lr=param_dict['learning_rate'])
        
        model = TD3Policy(actor, actor_optim, critic1, critic1_optim, critic2, critic2_optim, **param_dict)
        
        try:
            for _ in range(1000):  # Boucle d'entraînement simplifiée
                # Simulez un batch d'entraînement ici
                obs = np.random.rand(32, 8)  # Exemples d'observations
                actions = np.random.rand(32, 2)  # Actions aléatoires
                rewards = np.random.rand(32)  # Récompenses aléatoires
                dones = np.random.randint(0, 2, size=(32,))  # Terminaison aléatoire

                batch = {
                    'obs': torch.tensor(obs, dtype=torch.float32),
                    'act': torch.tensor(actions, dtype=torch.float32),
                    'rew': torch.tensor(rewards, dtype=torch.float32),
                    'done': torch.tensor(dones, dtype=torch.float32)
                }

                model.learn(batch)  # Appel à la méthode learn
            
            # Évaluation (à personnaliser selon vos besoins)
            mean_reward = np.random.uniform(-100, 100)  # Remplacez ceci par une évaluation réelle
            std_reward = np.random.uniform(0, 50)  # Remplacez ceci par une évaluation réelle
            
            results.append({**param_dict, 'mean_reward': mean_reward, 'std_reward': std_reward})
            print(f"Récompense moyenne: {mean_reward:.2f} +/- {std_reward:.2f}")
        except Exception as e:
            print(f"Erreur avec les paramètres {param_dict}: {e}")

    return pd.DataFrame(results)

def main():
    print("Début des tests d'hyperparamètres...")
    results_df = test_hyperparameters()
    
    # Trier et afficher les résultats
    results_df = results_df.sort_values('mean_reward', ascending=False)
    print("\nRésultats triés par récompense moyenne:")
    print(results_df)
    
    # Sauvegarder les résultats
    results_df.to_csv('td3_hyperparameter_results.csv', index=False)
    print("\nRésultats sauvegardés dans 'td3_hyperparameter_results.csv'")
    
    # Afficher les meilleurs paramètres
    if not results_df.empty:
        best_params = results_df.iloc[0].to_dict()
        print("\nMeilleurs paramètres trouvés:")
        for key, value in best_params.items():
            if key not in ['mean_reward', 'std_reward']:
                print(f"{key}: {value}")
        print(f"Récompense moyenne: {best_params['mean_reward']:.2f} +/- {best_params['std_reward']:.2f}")
    else:
        print("Aucun résultat valide trouvé.")

if __name__ == "__main__":
    main()
