In [1]:
REWARD_FNS = {}

### Reward functions for Walker2d-v5

In this project, we want to study the effect of different reward functions for the `Walker2d-v5` environment (Gymnasium / MuJoCo). In the base version, the reward at each step is the sum of three terms:

* **healthy_reward**: a survival bonus when the robot stays within a zone considered “healthy”;
* **forward_reward**: a reward proportional to forward velocity (movement in x per unit of time);
* **ctrl_cost**: a quadratic cost on the actions (penalizes actions with too much torque).

The total reward is therefore:

> reward = healthy_reward + forward_reward − ctrl_cost

and `info` contains the individual terms under the keys
`"reward_forward"`, `"reward_ctrl"`, `"reward_survive"`.
(See the official `Walker2d-v5` documentation.)

In the context of *reward shaping*, we modify this reward function to guide learning, for example by adding posture terms, target speed, or action smoothing. This type of modification is studied theoretically in the classic paper by Ng, Harada, and Russell, *"Policy Invariance Under Reward Transformations: Theory and Application to Reward Shaping"* (ICML 1999), which shows in which cases certain reward transformations preserve the optimal policy.

The code below implements several reward variants for `Walker2d-v5` in the form of a Gymnasium wrapper, so that a reward function can be easily selected and an agent trained with it.


### Reward function 1: speed – energy – survival

In the `Walker2d-v5` environment (Gymnasium / MuJoCo), the default reward is split into three terms:

* `reward_forward`: reward for forward speed;
* `reward_ctrl`: control cost (negative), proportional to the squared actions;
* `reward_survive`: survival bonus as long as the robot stays in a “healthy” state.

See the `Walker2d-v5` documentation, which explains this reward decomposition in the `info` dictionary.

For our first reward function, we define a linear combination of these three terms:

$
r_t = w_{\text{forward}} \cdot \text{reward_forward}
+ w_{\text{ctrl}} \cdot \text{reward_ctrl}
+ w_{\text{survive}} \cdot \text{reward_survive}
$

where:

* $w_{\text{forward}}$ controls the importance of speed;
* $w_{\text{ctrl}}$ controls the importance of energy consumption (since `reward_ctrl` is negative, a positive weight means “we penalize energy use”);
* $w_{\text{survive}}$ controls the importance of the survival bonus.

By varying these weights, we can study the trade-off between **speed**, **energy consumption**, and **stability** (survival) of the walker, which is especially interesting in a context with perturbations (observation noise, randomization of initial conditions).

In [2]:
from typing import Dict, Any

def reward_speed_energy(
    info: Dict[str, Any],
    w_forward: float = 1.0,
    w_ctrl: float = 1.0,
    w_survive: float = 1.0,
) -> float:
    """
    Fonction de récompense 1 : combinaison vitesse / énergie / survie
    pour Walker2d-v5.

    Paramètres
    ----------
    info : dict
        Le dictionnaire `info` renvoyé par env.step(action).
        On suppose qu'il contient les clés :
        - "reward_forward"
        - "reward_ctrl"
        - "reward_survive"
    w_forward : float
        Poids de la récompense de vitesse (reward_forward).
    w_ctrl : float
        Poids du coût de contrôle (reward_ctrl).
        Attention : reward_ctrl est déjà négatif dans Walker2d.
        Un w_ctrl > 0 correspond donc à une pénalisation de l'énergie.
    w_survive : float
        Poids du bonus de survie (reward_survive).

    Retour
    ------
    float
        La récompense scalaire r_t.
    """
    forward = float(info.get("reward_forward", 0.0))
    ctrl = float(info.get("reward_ctrl", 0.0))         # déjà négatif
    survive = float(info.get("reward_survive", 0.0))

    reward = (
        w_forward * forward
        + w_ctrl * ctrl
        + w_survive * survive
    )
    return reward

# obs, base_reward, terminated, truncated, info = env.step(action)
# new_reward = reward_speed_energy(
#     info,
#     w_forward=1.0,
#     w_ctrl=1.0,
#     w_survive=1.0,
# )


In [None]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from stable_baselines3 import SAC

# --- 1) Wrapper qui remplace la récompense par reward_speed_energy ---

class SpeedEnergyRewardWrapper(gym.Wrapper):
    def __init__(self, env, w_forward=1.0, w_ctrl=1.0, w_survive=1.0):
        super().__init__(env)
        self.w_forward = w_forward
        self.w_ctrl = w_ctrl
        self.w_survive = w_survive

    def step(self, action):
        # on appelle l'env "normal"
        obs, base_reward, terminated, truncated, info = self.env.step(action)

        # on calcule NOTRE récompense à partir de info
        new_reward = reward_speed_energy(
            info,
            w_forward=self.w_forward,
            w_ctrl=self.w_ctrl,
            w_survive=self.w_survive,
        )

        # on renvoie obs, new_reward (et pas base_reward)
        return obs, new_reward, terminated, truncated, info

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)


# --- 2) Création de l'environnement + enregistrement vidéo ---

video_folder = "./videos_speed_energy"

# IMPORTANT : render_mode="rgb_array" pour pouvoir faire une vidéo
env_base = gym.make("Walker2d-v5", render_mode="rgb_array")

# on applique notre wrapper de récompense
env_wrapped = SpeedEnergyRewardWrapper(
    env_base,
    w_forward=1.0,
    w_ctrl=1.0,
    w_survive=1.0,
)

# on ajoute le wrapper vidéo
env = RecordVideo(
    env_wrapped,
    video_folder=video_folder,
    name_prefix="walker2d-speed_energy",
    episode_trigger=lambda ep_id: True,  # filme tous les épisodes
    video_length=0,                       # 0 = filme l'épisode complet
)


# --- 3) Entraînement rapide avec SAC ---

model = SAC("MlpPolicy", env, verbose=1)

# nombre de pas tout petit pour tester (à augmenter plus tard)
model.learn(total_timesteps=5_000)


# --- 4) On filme un épisode avec le modèle entraîné ---

obs, info = env.reset()
terminated = False
truncated = False

while not (terminated or truncated):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)

env.close()
print(f"Épisode terminé. Vidéo enregistrée dans : {video_folder}")


### Reward function 2: target speed

The idea of this reward is no longer “faster = better,” but instead “the robot must walk at a **target speed** $v^*$,” for example $v^* \in {1.0, 2.0}$ m/s.

We define:

* $v_x$: the horizontal velocity of the torso (in `Walker2d-v5`, this is `obs[8]`, the torso velocity in x).
* $a_t$: the action vector (torques) at time $t$,
* `reward_survive`: the survival bonus provided by the environment.

The reward is defined as:

$ r_t = - \alpha\, |v_x - v^*| - \beta \, \|a_t\|^2 + w_{\text{survive}} \cdot \text{reward\_survive} $

where:

* $\alpha$ controls the importance of staying close to the target speed $v^*$,
* $\beta$ controls how strongly energy use is penalized (squared norm of the actions),
* $w_{\text{survive}}$ adjusts the importance of the survival term.

#### Interest under perturbations

When noise is added (to observations or initial conditions), simply maximizing speed often pushes the agent to **run faster and faster**, with jerky and unstable movements.

With this **target speed** reward function:

* the agent is encouraged to maintain a **steady speed** close to $v^*$,
* we can measure:

  * the variance of the speed $v_x$ during an episode,
  * the variance of the actions,
  * the energy consumed ($\sum_t |a_t|^2$),

which makes it possible to compare different policies in terms of **stability** and **robustness** to perturbations.

[https://energy-locomotion.github.io/resources/CoRL-Energy-Locomotion.pdf](https://energy-locomotion.github.io/resources/CoRL-Energy-Locomotion.pdf)


In [5]:
from typing import Dict, Any
import numpy as np

def reward_target_speed(
    obs,
    action,
    info: Dict[str, Any],
    v_target: float = 1.5,
    alpha: float = 1.0,
    beta: float = 1e-3,
    w_survive: float = 1.0,
) -> float:
    """
    Récompense 2 : vitesse cible + coût d'énergie + survie.

        r_t = - alpha * |v_x - v_target|
              - beta  * ||a_t||^2
              + w_survive * reward_survive

    - v_x : vitesse en x du torse (obs[8] dans Walker2d-v5)
    - a_t : action au temps t
    """

    # Vitesse horizontale du torse.
    # D'après la doc Walker2d-v5, obs[8] = vitesse en x du torse.
    vx = float(obs[8])

    # Norme au carré de l'action (énergie "dépensée")
    energy = float(np.sum(np.square(action)))

    # Terme de survie fourni par l'env (comme pour reward_speed_energy)
    survive = float(info.get("reward_survive", 0.0))

    # Terme de vitesse : on veut que v_x soit proche de v_target
    speed_term = -alpha * abs(vx - v_target)

    # Terme d'énergie (pénalisation)
    energy_term = -beta * energy

    # Récompense totale
    reward = speed_term + energy_term + w_survive * survive
    return reward


In [6]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo
from stable_baselines3 import SAC

# --- Wrapper pour la récompense "vitesse cible" ---

class TargetSpeedRewardWrapper(gym.Wrapper):
    def __init__(self, env, v_target=1.5, alpha=1.0, beta=1e-3, w_survive=1.0):
        super().__init__(env)
        self.v_target = v_target
        self.alpha = alpha
        self.beta = beta
        self.w_survive = w_survive

    def step(self, action):
        # Env de base
        obs, base_reward, terminated, truncated, info = self.env.step(action)

        # Notre nouvelle récompense
        new_reward = reward_target_speed(
            obs=obs,
            action=action,
            info=info,
            v_target=self.v_target,
            alpha=self.alpha,
            beta=self.beta,
            w_survive=self.w_survive,
        )

        return obs, new_reward, terminated, truncated, info

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)


# --- Création de l'env + enregistrement vidéo ---

video_folder = "./videos_target_speed"

env_base = gym.make("Walker2d-v5", render_mode="rgb_array")

env_wrapped = TargetSpeedRewardWrapper(
    env_base,
    v_target=1.5,   # vitesse cible en m/s (tu peux tester 1.0, 2.0, etc.)
    alpha=1.0,
    beta=1e-3,
    w_survive=1.0,
)

env = RecordVideo(
    env_wrapped,
    video_folder=video_folder,
    name_prefix="walker2d-target_speed",
    episode_trigger=lambda ep_id: True,
    video_length=0,
)

# --- Entraînement rapide avec SAC (juste pour tester) ---

model = SAC("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=10_000)   # à augmenter plus tard

# --- On filme un épisode avec le modèle entraîné ---

obs, info = env.reset()
terminated = False
truncated = False

while not (terminated or truncated):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)

env.close()
print(f"Épisode terminé. Vidéo enregistrée dans : {video_folder}")

  logger.warn(


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 27.2     |
|    ep_rew_mean     | -36.5    |
| time/              |          |
|    episodes        | 4        |
|    fps             | 43       |
|    time_elapsed    | 2        |
|    total_timesteps | 109      |
| train/             |          |
|    actor_loss      | -5.27    |
|    critic_loss     | 1.89     |
|    ent_coef        | 0.998    |
|    ent_coef_loss   | -0.0208  |
|    learning_rate   | 0.0003   |
|    n_updates       | 8        |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 27.8     |
|    ep_rew_mean     | -37.6    |
| time/              |          |
|    episodes        | 8        |
|    fps             | 34       |
|    time_elapsed    | 6        |
|    total_timesteps | 222      |
| train/             |

### Reward function 3: stable posture

This reward aims to encourage not only forward movement but also a **stable posture** — walking “upright and straight.”

In `Walker2d-v5`:

* `info["reward_forward"]`: forward speed term,
* `info["reward_ctrl"]`: control cost (negative, penalizes large actions),
* `info["reward_survive"]`: survival bonus when the robot stays in a “healthy” state,
* `obs[0]`: torso height,
* `obs[1]`: torso angle (0 = vertical).

The reward is defined as:

$ r_t = w_{\text{forward}} \cdot \text{reward\_forward} + w_{\text{ctrl}} \cdot \text{reward\_ctrl} + w_{\text{survive}} \cdot \text{reward\_survive} - w_h \, \max(0, h_{\text{target}} - h_t)^2 - w_{\text{angle}} \, \theta_t^2 $

where:

* $h_t$ is the torso height at time $t$,
* $\theta_t$ is the torso angle (0 = straight),
* $h_{\text{target}}$ is a target height (e.g., 1.25),
* $w_h$ controls the importance of staying high enough,
* $w_{\text{angle}}$ controls the importance of staying upright.

#### Motivation and interest with noise / random resets

With observation noise and random initial conditions, the walker tends to:

* lean forward or backward too much,
* sag (drop torso height) before falling.

This “stable posture” reward:

* gradually penalizes **dangerous postures** (too tilted, too low),
* provides a reward signal **before falling**,
* encourages more **stable** gaits, which are often more **robust** to perturbations.

We can compare this reward to others by measuring:

* the average torso height,
* the variance of the torso angle,
* the number of falls / terminations per episode.


In [13]:
from typing import Dict, Any

def reward_posture_stability(
    obs,
    action,
    info: Dict[str, Any],
    h_target: float = 1.25,
    w_forward: float = 1.0,
    w_ctrl: float = 1.0,
    w_survive: float = 1.0,
    w_h: float = 1.0,
    w_angle: float = 1.0,
) -> float:
    """
    Récompense 3 : posture stable

    Idée :
      - garder les termes classiques (vitesse, coût de contrôle, survie)
      - ajouter des termes qui encouragent une posture "debout et droite"

    r_t = w_forward * reward_forward
        + w_ctrl    * reward_ctrl
        + w_survive * reward_survive
        + height_term
        + angle_term

    où :
      - height_term pénalise si la hauteur du torse est en dessous d'une cible h_target
      - angle_term pénalise si l'angle du torse s'éloigne de 0
    """

    # --- 1) Termes classiques de Walker2d ---

    forward = float(info.get("reward_forward", 0.0))
    ctrl = float(info.get("reward_ctrl", 0.0))         # déjà négatif
    survive = float(info.get("reward_survive", 0.0))

    base_terms = (
        w_forward * forward
        + w_ctrl * ctrl
        + w_survive * survive
    )

    # --- 2) Termes de posture ---

    # D'après la doc Walker2d-v5 :
    #   obs[0] = hauteur du torse
    #   obs[1] = angle du torse (0 = droit).  :contentReference[oaicite:0]{index=0}
    h = float(obs[0])
    angle = float(obs[1])

    # pénalité si le torse est plus bas que h_target (quadratique)
    height_penalty = -w_h * max(0.0, h_target - h) ** 2

    # pénalité quadratique sur l'angle (on veut angle ≈ 0)
    angle_penalty = -w_angle * angle ** 2

    reward = base_terms + height_penalty + angle_penalty
    return reward


In [14]:
import gymnasium as gym
from stable_baselines3 import SAC

# --- ENV D'ENTRAÎNEMENT (pas de vidéo ici) ---

train_env_base = gym.make("Walker2d-v5")  # PAS de render_mode pour l'entraînement

train_env = PostureStabilityRewardWrapper(
    train_env_base,
    h_target=1.25,
    w_forward=1.0,
    w_ctrl=1.0,
    w_survive=1.0,
    w_h=5.0,
    w_angle=1.0,
)

model = SAC("MlpPolicy", train_env, verbose=1)
model.learn(total_timesteps=10_000)   # à augmenter plus tard

# (optionnel) sauver le modèle
model.save("sac_walker2d_posture_stable")


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 18.5     |
|    ep_rew_mean     | -5.22    |
| time/              |          |
|    episodes        | 4        |
|    fps             | 2286     |
|    time_elapsed    | 0        |
|    total_timesteps | 74       |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 19.6     |
|    ep_rew_mean     | -5.74    |
| time/              |          |
|    episodes        | 8        |
|    fps             | 229      |
|    time_elapsed    | 0        |
|    total_timesteps | 157      |
| train/             |          |
|    actor_loss      | -7.55    |
|    critic_loss     | 3.52     |
|    ent_coef        | 0.984    |
|    ent_coef_loss   | -0.167   |
|    learning_rate   | 0.0003   |
|    n_updates       | 56       |
----------------------

In [15]:
from gymnasium.wrappers import RecordVideo

# --- ENV D'ÉVALUATION AVEC VIDÉO ---

video_folder = "./videos_posture_stable"

eval_env_base = gym.make("Walker2d-v5", render_mode="rgb_array")

eval_env_wrapped = PostureStabilityRewardWrapper(
    eval_env_base,
    h_target=1.25,
    w_forward=1.0,
    w_ctrl=1.0,
    w_survive=1.0,
    w_h=5.0,
    w_angle=1.0,
)

eval_env = RecordVideo(
    eval_env_wrapped,
    video_folder=video_folder,
    name_prefix="walker2d-posture_stable",
    episode_trigger=lambda ep_id: True,  # on filme le premier épisode
    video_length=0,                       # 0 = épisode complet
)

# si tu as sauvegardé le modèle :
# model = SAC.load("sac_walker2d_posture_stable", env=eval_env)

obs, info = eval_env.reset()
terminated = False
truncated = False

while not (terminated or truncated):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = eval_env.step(action)

eval_env.close()
print(f"Vidéo enregistrée dans : {video_folder}")


  logger.warn(


Vidéo enregistrée dans : ./videos_posture_stable


### Reward function 4: smooth actions

This reward aims to avoid policies that react **too nervously** to noise (noisy observations, random resets, etc.).
The idea is to penalize **sudden changes in actions** between two time steps:

$ r_t = \text{base\_reward}_t - \lambda \, \lVert a_t - a_{t-1} \rVert^2, $

where:

* $\text{base\_reward}_t$ is the original reward from the `Walker2d-v5` environment (speed + survival − control cost),
* $a_t$ is the action at time $t$,
* $a_{t-1}$ is the action at time $t-1$,
* $\lambda$ (called `lambda_smooth` in the code) controls how strongly action changes are penalized.

#### Motivation and interest with noise

With observation noise, RL algorithms tend to:

* react strongly to small fluctuations,
* produce actions that oscillate very quickly,
* show less stable gaits and sometimes higher energy usage.

The “smooth actions” reward encourages:

* **more regular control signals** (fewer high-frequency oscillations),
* gaits that are often more **stable** and more **robust** to noise,
* a more “conservative” policy in its action changes.

We can compare different values of $\lambda$ (e.g., `1e-3`, `1e-2`, `5e-2`) and measure:

* the variance of the actions,
* energy consumption,
* robustness to perturbations.

[https://medium.com/correll-lab/towards-robust-humanoid-loco-manipulation-using-deep-reinforcement-learning-45c8a5a0fcbf](https://medium.com/correll-lab/towards-robust-humanoid-loco-manipulation-using-deep-reinforcement-learning-45c8a5a0fcbf)


In [16]:
from typing import Dict, Any
import numpy as np

def reward_smooth_actions(
    obs,
    action,
    info: Dict[str, Any],
    base_reward: float,
    prev_action,
    lambda_smooth: float = 1e-2,
) -> float:
    """
    Récompense 4 : actions lisses (anti-réaction nerveuse au bruit)

    r_t = base_reward_t - lambda_smooth * ||a_t - a_{t-1}||^2

    - base_reward_t : récompense originale de l'environnement Walker2d-v5
    - a_t           : action actuelle
    - a_{t-1}       : action précédente
    """
    # Si on n'a pas encore d'action précédente (premier step), on ne pénalise pas
    if prev_action is None:
        return float(base_reward)

    # Différence entre action actuelle et précédente
    delta = action - prev_action

    # Norme au carré de la différence (grands changements d'actions = pénalisés)
    smooth_penalty = lambda_smooth * float(np.sum(delta ** 2))

    new_reward = float(base_reward) - smooth_penalty
    return new_reward


In [17]:
import gymnasium as gym

class SmoothActionsRewardWrapper(gym.Wrapper):
    def __init__(self, env, lambda_smooth: float = 1e-2):
        super().__init__(env)
        self.lambda_smooth = lambda_smooth
        self.prev_action = None

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        # Au reset, on n'a pas encore d'action précédente
        self.prev_action = None
        return obs, info

    def step(self, action):
        # Appel de l'env de base
        obs, base_reward, terminated, truncated, info = self.env.step(action)

        # Calcul de notre récompense "actions lisses"
        new_reward = reward_smooth_actions(
            obs=obs,
            action=action,
            info=info,
            base_reward=base_reward,
            prev_action=self.prev_action,
            lambda_smooth=self.lambda_smooth,
        )

        # Mise à jour de l'action précédente
        self.prev_action = np.array(action, copy=True)

        return obs, new_reward, terminated, truncated, info


In [18]:
from stable_baselines3 import SAC

# --- ENV D'ENTRAÎNEMENT (sans vidéo) ---

train_env_base = gym.make("Walker2d-v5")  # pas de render_mode ici
train_env = SmoothActionsRewardWrapper(
    train_env_base,
    lambda_smooth=1e-2,  # tu peux tester différentes valeurs
)

model_smooth = SAC("MlpPolicy", train_env, verbose=1)
model_smooth.learn(total_timesteps=10_000)   # à augmenter plus tard

# optionnel : sauver le modèle
# model_smooth.save("sac_walker2d_smooth_actions")


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 19.2     |
|    ep_rew_mean     | -2.6     |
| time/              |          |
|    episodes        | 4        |
|    fps             | 1648     |
|    time_elapsed    | 0        |
|    total_timesteps | 77       |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 19.1     |
|    ep_rew_mean     | -2.29    |
| time/              |          |
|    episodes        | 8        |
|    fps             | 234      |
|    time_elapsed    | 0        |
|    total_timesteps | 153      |
| train/             |          |
|    actor_loss      | -7.68    |
|    critic_loss     | 2.6      |
|    ent_coef        | 0.985    |
|    ent_coef_loss   | -0.156   |
|    learning_rate   | 0.0003   |
|    n_updates       | 52       |
----------------------

<stable_baselines3.sac.sac.SAC at 0x2963cc2e2f0>

In [19]:
from gymnasium.wrappers import RecordVideo

video_folder = "./videos_smooth_actions"

# --- ENV D'ÉVALUATION AVEC VIDÉO ---

eval_env_base = gym.make("Walker2d-v5", render_mode="rgb_array")
eval_env_wrapped = SmoothActionsRewardWrapper(
    eval_env_base,
    lambda_smooth=1e-2,
)

eval_env = RecordVideo(
    eval_env_wrapped,
    video_folder=video_folder,
    name_prefix="walker2d-smooth_actions",
    episode_trigger=lambda ep_id: True,  # on filme le premier épisode
    video_length=0,                       # épisode complet
)

# Si tu avais sauvegardé : model_smooth = SAC.load("sac_walker2d_smooth_actions", env=eval_env)

obs, info = eval_env.reset()
terminated = False
truncated = False

while not (terminated or truncated):
    action, _ = model_smooth.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = eval_env.step(action)

eval_env.close()
print(f"Vidéo enregistrée dans : {video_folder}")


Vidéo enregistrée dans : ./videos_smooth_actions


### Reward function 5: dynamic stability (penalizing state jolts)

This reward aims to make the walking not only “good” at a given moment, but also **stable over time**, by penalizing changes in the robot’s state that are too abrupt.

We define:

* $\mathbf{s}_t$: the state / observation vector at time $t$ (in `Walker2d-v5`, this includes for example torso height, angle, velocities, etc.),
* $\mathbf{a}_t$: the action at time $t$,
* $\text{base\_reward}_t$: the environment’s base reward (speed, survival, control cost).

We approximate a **state acceleration** using a discrete second derivative:

$ \mathbf{a}^{(\text{state})}_t \approx \mathbf{s}_t - 2 \mathbf{s}_{t-1} + \mathbf{s}_{t-2}, $

and we define:

$ r_t = \text{base\_reward}_t - \lambda_{\text{state}} \, \left\|\mathbf{a}^{(\text{state})}_t\right\|^2, $

where $\lambda_{\text{state}} > 0$ controls the strength of the penalty.
The more the state “accelerates” (rapid changes in posture or velocity), the larger the penalty.

#### Interest under noise and perturbations

With observation noise or random resets, learned policies can become:

* very **reactive** to small variations in the state,
* producing jerky movements,
* less **dynamically stable**.

By penalizing **state accelerations**, we encourage trajectories that are:

* **smoother** over time,
* with more regular posture changes,
* often more **robust** to perturbations.

This idea is related to many works that add terms on joint velocities / accelerations or on trunk dynamics to produce more stable and natural walking in RL locomotion tasks.

[https://medium.com/correll-lab/towards-robust-humanoid-loco-manipulation-using-deep-reinforcement-learning-45c8a5a0fcbf](https://medium.com/correll-lab/towards-robust-humanoid-loco-manipulation-using-deep-reinforcement-learning-45c8a5a0fcbf)


In [1]:
from typing import Dict, Any
import numpy as np

def reward_dynamic_stability(
    obs,
    action,
    info: Dict[str, Any],
    base_reward: float,
    prev_obs,
    prev_prev_obs,
    lambda_state: float = 1e-2,
) -> float:
    """
    Récompense 5 : stabilité dynamique

    Idée :
      - on part de la récompense de base de l'environnement (base_reward)
      - on pénalise les "secousses" de l'état, c-à-d les fortes accélérations
        entre trois pas de temps consécutifs.

    On approxime une dérivée seconde de l'état par :
        accel ≈ obs_t - 2 * obs_{t-1} + obs_{t-2}

    Puis :
        r_t = base_reward - lambda_state * ||accel||^2
    """

    # Si on n'a pas encore assez d'historique (au début de l'épisode),
    # on ne pénalise pas.
    if prev_obs is None or prev_prev_obs is None:
        return float(base_reward)

    # On calcule une "accélération" approximative sur tout le vecteur d'observation
    accel = obs - 2.0 * prev_obs + prev_prev_obs

    # Norme au carré des accélérations de l'état
    accel_penalty = lambda_state * float(np.sum(accel ** 2))

    new_reward = float(base_reward) - accel_penalty
    return new_reward


In [2]:
import gymnasium as gym

class DynamicStabilityRewardWrapper(gym.Wrapper):
    def __init__(self, env, lambda_state: float = 1e-2):
        super().__init__(env)
        self.lambda_state = lambda_state
        self.prev_obs = None
        self.prev_prev_obs = None

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        # Au début de l'épisode, on n'a pas encore d'historique
        self.prev_obs = None
        self.prev_prev_obs = None
        return obs, info

    def step(self, action):
        # step de l'env de base
        obs, base_reward, terminated, truncated, info = self.env.step(action)

        # Calcul de notre récompense "stabilité dynamique"
        new_reward = reward_dynamic_stability(
            obs=obs,
            action=action,
            info=info,
            base_reward=base_reward,
            prev_obs=self.prev_obs,
            prev_prev_obs=self.prev_prev_obs,
            lambda_state=self.lambda_state,
        )

        # Mise à jour de l'historique des observations
        self.prev_prev_obs = self.prev_obs
        self.prev_obs = np.array(obs, copy=True)

        return obs, new_reward, terminated, truncated, info


In [3]:
from stable_baselines3 import SAC

# --- ENV D'ENTRAÎNEMENT (sans vidéo) ---

train_env_base = gym.make("Walker2d-v5")  # pas de render_mode ici

train_env = DynamicStabilityRewardWrapper(
    train_env_base,
    lambda_state=1e-2,   # tu peux tester 1e-3, 1e-2, 5e-2, etc.
)

model_dyn_stab = SAC("MlpPolicy", train_env, verbose=1)
model_dyn_stab.learn(total_timesteps=10_000)  # à augmenter plus tard

# optionnel :
# model_dyn_stab.save("sac_walker2d_dynamic_stability")


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 29       |
|    ep_rew_mean     | -87.9    |
| time/              |          |
|    episodes        | 4        |
|    fps             | 520      |
|    time_elapsed    | 0        |
|    total_timesteps | 116      |
| train/             |          |
|    actor_loss      | -3.93    |
|    critic_loss     | 9.55     |
|    ent_coef        | 0.996    |
|    ent_coef_loss   | -0.0421  |
|    learning_rate   | 0.0003   |
|    n_updates       | 15       |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 27       |
|    ep_rew_mean     | -75      |
| time/              |          |
|    episodes        | 8        |
|    fps             | 150      |
|    time_elapsed    | 1        |
|    total_timesteps | 216      |
| train/             |

<stable_baselines3.sac.sac.SAC at 0x23fff996110>

In [4]:
from gymnasium.wrappers import RecordVideo

video_folder = "./videos_dynamic_stability"

# --- ENV D'ÉVALUATION AVEC VIDÉO ---

eval_env_base = gym.make("Walker2d-v5", render_mode="rgb_array")

eval_env_wrapped = DynamicStabilityRewardWrapper(
    eval_env_base,
    lambda_state=1e-2,
)

eval_env = RecordVideo(
    eval_env_wrapped,
    video_folder=video_folder,
    name_prefix="walker2d-dynamic_stability",
    episode_trigger=lambda ep_id: True,  # on filme le 1er épisode
    video_length=0,                       # épisode complet
)

# si tu as sauvegardé :
# model_dyn_stab = SAC.load("sac_walker2d_dynamic_stability", env=eval_env)

obs, info = eval_env.reset()
terminated = False
truncated = False

while not (terminated or truncated):
    action, _ = model_dyn_stab.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = eval_env.step(action)

eval_env.close()
print(f"Vidéo enregistrée dans : {video_folder}")


Vidéo enregistrée dans : ./videos_dynamic_stability


### Reward function 6: progressive anti-fall (shaping around dangerous states)

This reward aims to help the walker **avoid falling** by progressively penalizing “dangerous” states, instead of giving only a large penalty when the episode ends.

In `Walker2d-v5`, we use in particular:

* `base_reward`: the environment’s base reward (forward speed, survival, control cost)
* `obs[0]`: torso height
* `obs[1]`: torso angle (0 = vertical torso)

We introduce two thresholds:

* $h_{\text{crit}}$: critical height below which the robot is considered too low (close to collapsing)
* $\theta_{\text{crit}}$: critical angle above which the torso is too tilted

The reward is defined as:

$ r_t = \text{base\_reward}_t - w_h \, \max(0, h_{\text{crit}} - h_t)^2 - w_{\text{angle}} \, \max(0, |\theta_t| - \theta_{\text{crit}})^2 $

where:

* $h_t$ is the torso height at time $t$,
* $\theta_t$ is the torso angle,
* $w_h$ controls the importance of penalizing dangerous height,
* $w_{\text{angle}}$ controls the importance of penalizing dangerous angle.

#### Interest of this shaping

* States **close to falling** (too low, too tilted) are penalized before the end of the episode.
* The agent therefore receives a **progressive danger signal**, enabling it to learn to:

  * straighten up,
  * avoid sagging,
  * correct its posture instead of simply falling.

In a context of **noise** (noisy observations, random resets), this reward helps test whether RL algorithms learn more **robust** behaviors by reducing:

* the number of falls per episode,
* the time spent in dangerous postures (height < $h_{\text{crit}}$, angle > $\theta_{\text{crit}}$).


In [1]:
from typing import Dict, Any
import numpy as np

def reward_anti_fall_progressive(
    obs,
    action,
    info: Dict[str, Any],
    base_reward: float,
    h_crit: float = 0.9,
    angle_crit: float = 0.5,
    w_h: float = 5.0,
    w_angle: float = 1.0,
) -> float:
    """
    Récompense 6 : anti-chute progressive (shaping autour des états dangereux)

    Idée :
      - on part de la récompense de base de l'environnement (base_reward)
      - on enlève un bonus quand le walker se rapproche de la chute :
          * torse trop bas  (h < h_crit)
          * torse trop penché (|angle| > angle_crit)

    r_t = base_reward
          - w_h     * max(0, h_crit - h_t)^2
          - w_angle * max(0, |angle_t| - angle_crit)^2
    """

    # D'après Walker2d-v5 :
    #   obs[0] = hauteur du torse
    #   obs[1] = angle du torse (0 = vertical)
    h = float(obs[0])
    angle = float(obs[1])

    # Danger de hauteur : si le torse descend sous h_crit
    height_danger = max(0.0, h_crit - h)
    height_penalty = w_h * height_danger**2

    # Danger d'angle : si |angle| dépasse angle_crit
    angle_danger = max(0.0, abs(angle) - angle_crit)
    angle_penalty = w_angle * angle_danger**2

    new_reward = float(base_reward) - height_penalty - angle_penalty
    return new_reward


In [2]:
import gymnasium as gym

class AntiFallProgressiveRewardWrapper(gym.Wrapper):
    def __init__(
        self,
        env,
        h_crit: float = 0.9,
        angle_crit: float = 0.5,
        w_h: float = 5.0,
        w_angle: float = 1.0,
    ):
        super().__init__(env)
        self.h_crit = h_crit
        self.angle_crit = angle_crit
        self.w_h = w_h
        self.w_angle = w_angle

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        # step de l'env original
        obs, base_reward, terminated, truncated, info = self.env.step(action)

        # récompense 6 : anti-chute progressive
        new_reward = reward_anti_fall_progressive(
            obs=obs,
            action=action,
            info=info,
            base_reward=base_reward,
            h_crit=self.h_crit,
            angle_crit=self.angle_crit,
            w_h=self.w_h,
            w_angle=self.w_angle,
        )

        return obs, new_reward, terminated, truncated, info


In [3]:
from stable_baselines3 import SAC

# --- ENV D'ENTRAÎNEMENT (pas de vidéo ici) ---

train_env_base = gym.make("Walker2d-v5")  # pas de render_mode

train_env = AntiFallProgressiveRewardWrapper(
    train_env_base,
    h_crit=0.9,
    angle_crit=0.5,
    w_h=5.0,
    w_angle=1.0,
)

model_anti_fall = SAC("MlpPolicy", train_env, verbose=1)
model_anti_fall.learn(total_timesteps=10_000)  # à augmenter plus tard

# optionnel :
# model_anti_fall.save("sac_walker2d_anti_fall_progressive")


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 16.2     |
|    ep_rew_mean     | 0.436    |
| time/              |          |
|    episodes        | 4        |
|    fps             | 1964     |
|    time_elapsed    | 0        |
|    total_timesteps | 65       |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 16.5     |
|    ep_rew_mean     | 1.11     |
| time/              |          |
|    episodes        | 8        |
|    fps             | 269      |
|    time_elapsed    | 0        |
|    total_timesteps | 132      |
| train/             |          |
|    actor_loss      | -7.24    |
|    critic_loss     | 3.65     |
|    ent_coef        | 0.991    |
|    ent_coef_loss   | -0.0907  |
|    learning_rate   | 0.0003   |
|    n_updates       | 31       |
----------------------

<stable_baselines3.sac.sac.SAC at 0x1f5e77c3a90>

In [4]:
from gymnasium.wrappers import RecordVideo

video_folder = "./videos_anti_fall_progressive"

# --- ENV D'ÉVALUATION AVEC VIDÉO ---

eval_env_base = gym.make("Walker2d-v5", render_mode="rgb_array")

eval_env_wrapped = AntiFallProgressiveRewardWrapper(
    eval_env_base,
    h_crit=0.9,
    angle_crit=0.5,
    w_h=5.0,
    w_angle=1.0,
)

eval_env = RecordVideo(
    eval_env_wrapped,
    video_folder=video_folder,
    name_prefix="walker2d-anti_fall",
    episode_trigger=lambda ep_id: True,
    video_length=0,  # épisode complet
)

# si tu avais sauvegardé :
# model_anti_fall = SAC.load("sac_walker2d_anti_fall_progressive", env=eval_env)

obs, info = eval_env.reset()
terminated = False
truncated = False

while not (terminated or truncated):
    action, _ = model_anti_fall.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = eval_env.step(action)

eval_env.close()
print(f"Vidéo enregistrée dans : {video_folder}")


  logger.warn(


Vidéo enregistrée dans : ./videos_anti_fall_progressive


### Reward function 7: robust & economical walking (simple multi-objective)

This reward explicitly combines three objectives:

1. **Moving forward** (speed),
2. **Using little energy** (moderate actions),
3. **Staying high enough** (overall stable posture).

In `Walker2d-v5`:

* `obs[8]` corresponds to the torso’s x-velocity,
* `obs[0]` corresponds to the torso height,
* the energy used can be approximated by $|a_t|^2$ (squared norm of the action).

We define:

$ r_t = w_v \, v_x - w_E \, \|a_t\|^2 + w_h \, \max(0, h_t - h_{\min}) + w_{\text{survive}} \cdot \text{reward\_survive} $

where:

* $v_x$ is the torso’s horizontal speed,
* $|a_t|^2$ measures the instantaneous energy of the actions,
* $h_t$ is the torso height,
* $h_{\min}$ is a desired minimum height (e.g., 1.0),
* $w_v$ (`v_weight`) sets the importance of speed,
* $w_E$ (`energy_weight`) sets the importance of energy saving,
* $w_h$ (`h_weight`) sets the importance of staying high,
* $w_{\text{survive}}$ (`w_survive`) optionally keeps a survival term.

#### Interest for robustness

This reward makes it possible to study a clear trade-off between:

* **performance** (distance traveled, average speed),
* **energy cost** (sum of (|a_t|^2)),
* **overall stability** (average torso height).

Under perturbations (observation noise, randomized resets), we can compare different policies by looking at:

* the distance traveled,
* total energy consumed,
* average height and number of falls.


In [7]:
from typing import Dict, Any
import numpy as np

def reward_robust_econ(
    obs,
    action,
    info: Dict[str, Any],
    v_weight: float = 1.0,
    energy_weight: float = 1e-3,
    h_weight: float = 1.0,
    h_min: float = 1.0,
    w_survive: float = 0.0,
) -> float:
    """
    Récompense 7 : marche robuste & économe (multi-objectif simple)

    Objectif : combiner trois critères :
      - vitesse vers l'avant (vx)
      - énergie consommée (||a_t||^2)
      - hauteur du torse (h)

    r_t = v_weight     * v_x
        - energy_weight * ||a_t||^2
        + h_weight     * max(0, h_t - h_min)
        + w_survive    * reward_survive

    où :
      - v_x     : vitesse en x du torse (obs[8] dans Walker2d-v5)
      - a_t     : action au temps t
      - h_t     : hauteur du torse (obs[0])
      - h_min   : hauteur minimale désirée
    """

    # Vitesse horizontale (gait plus rapide)
    vx = float(obs[8])          # velocity of x-coordinate of torso

    # Énergie des actions
    energy = float(np.sum(np.square(action)))

    # Hauteur du torse
    h = float(obs[0])

    # Terme de survie de l'env (optionnel)
    survive = float(info.get("reward_survive", 0.0))

    # Terme de hauteur : on récompense si h > h_min
    height_term = h_weight * max(0.0, h - h_min)

    reward = (
        v_weight * vx
        - energy_weight * energy
        + height_term
        + w_survive * survive
    )
    return reward


In [None]:
import gymnasium as gym

class RobustEconRewardWrapper(gym.Wrapper):
    def __init__(
        self,
        env,
        v_weight: float = 1.0,
        energy_weight: float = 1e-3,
        h_weight: float = 0,
        h_min: float = 1.0,
        w_survive: float = 0.0,
    ):
        super().__init__(env)
        self.v_weight = v_weight
        self.energy_weight = energy_weight
        self.h_weight = h_weight
        self.h_min = h_min
        self.w_survive = w_survive

    def reset(self, **kwargs):
        return self.env.reset(**kwargs)

    def step(self, action):
        # step de l'env de base
        obs, base_reward, terminated, truncated, info = self.env.step(action)

        # récompense 7 : marche robuste & économe
        new_reward = reward_robust_econ(
            obs=obs,
            action=action,
            info=info,
            v_weight=self.v_weight,
            energy_weight=self.energy_weight,
            h_weight=self.h_weight,
            h_min=self.h_min,
            w_survive=self.w_survive,
        )

        return obs, new_reward, terminated, truncated, info


In [9]:
from stable_baselines3 import SAC

# --- ENV D'ENTRAÎNEMENT (pas de vidéo ici) ---

train_env_base = gym.make("Walker2d-v5")

train_env = RobustEconRewardWrapper(
    train_env_base,
    v_weight=1.0,
    energy_weight=1e-3,
    h_weight=1.0,
    h_min=1.0,
    w_survive=0.0,  # tu peux tester 0.0 ou 1.0
)

model_robust_econ = SAC("MlpPolicy", train_env, verbose=1)
model_robust_econ.learn(total_timesteps=10_000)  # à augmenter plus tard

# optionnel :
# model_robust_econ.save("sac_walker2d_robust_econ")


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 19.8     |
|    ep_rew_mean     | -17      |
| time/              |          |
|    episodes        | 4        |
|    fps             | 2079     |
|    time_elapsed    | 0        |
|    total_timesteps | 79       |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 19.6     |
|    ep_rew_mean     | -14.3    |
| time/              |          |
|    episodes        | 8        |
|    fps             | 220      |
|    time_elapsed    | 0        |
|    total_timesteps | 157      |
| train/             |          |
|    actor_loss      | -7.11    |
|    critic_loss     | 1.77     |
|    ent_coef        | 0.983    |
|    ent_coef_loss   | -0.167   |
|    learning_rate   | 0.0003   |
|    n_updates       | 56       |
----------------------

<stable_baselines3.sac.sac.SAC at 0x1f593389660>

In [10]:
from gymnasium.wrappers import RecordVideo

video_folder = "./videos_robust_econ"

# --- ENV D'ÉVALUATION AVEC VIDÉO ---

eval_env_base = gym.make("Walker2d-v5", render_mode="rgb_array")

eval_env_wrapped = RobustEconRewardWrapper(
    eval_env_base,
    v_weight=1.0,
    energy_weight=1e-3,
    h_weight=1.0,
    h_min=1.0,
    w_survive=0.0,
)

eval_env = RecordVideo(
    eval_env_wrapped,
    video_folder=video_folder,
    name_prefix="walker2d-robust_econ",
    episode_trigger=lambda ep_id: True,  # on filme le 1er épisode
    video_length=0,                       # épisode complet
)

# si tu avais sauvegardé :
# model_robust_econ = SAC.load("sac_walker2d_robust_econ", env=eval_env)

obs, info = eval_env.reset()
terminated = False
truncated = False

while not (terminated or truncated):
    action, _ = model_robust_econ.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = eval_env.step(action)

eval_env.close()
print(f"Vidéo enregistrée dans : {video_folder}")


Vidéo enregistrée dans : ./videos_robust_econ


The groupings:

* Speed vs. energy: R1 or R7
* Target speed: R2
* Posture / avoiding falls: R3 or R6
* Smooth actions: R4 and R5
