
# Training A2C with Vector Envs and Domain Randomization


## Notice

If you encounter an RuntimeError like the following comment raised on multiprocessing/spawn.py, wrap up the code from ``gym.vector.make=`` or ``gym.vector.AsyncVectorEnv`` to the end of the code by ``if__name__ == '__main__'``.

``An attempt has been made to start a new process before the current process has finished its bootstrapping phase.``




------------------------------




## Introduction
在本教程中，您将学习如何使用矢量化环境来训练 Advantage Actor-Critic 代理。我们将使用 A2C，它是 A3C 算法的同步版本 [1]。

矢量化环境 [3] 允许同一环境的多个实例并行运行（在多个 CPU 上），有助于实现更快、更稳健的训练。这可以显着减少方差，从而加快训练速度。

我们将从头开始实现一个 Advantage Actor-Critic，看看如何将批量状态输入到网络中以获得动作向量（每个环境一个动作），并计算小批量转换中 Actor 和 Critic 的损失。每个小批量包含一个采样阶段的转换： n_steps_per_update步骤在n_envs环境中并行执行（将两者相乘以获得小批量中的转换数量）。在每个采样阶段之后，计算损失并执行一个梯度步骤。为了计算优势，我们将使用广义优势估计（GAE）方法[2]，该方法平衡优势估计的方差和偏差之间的权衡。

A2C 代理类使用输入状态的特征数量、代理可以采取的操作数量、学习率以及并行运行以收集经验的环境数量进行初始化。定义了参与者和批评者网络，并初始化了它们各自的优化器。网络的前向传递接受批量状态向量并返回状态值张量和动作逻辑张量。 select_action 方法返回所选操作的元组、这些操作的日志概率以及每个操作的状态值。此外，它还返回策略分布的熵，稍后从损失中减去该熵（使用权重因子ent_coef ）以鼓励探索。

get_losses 函数计算参与者网络和评论家网络的损失（使用 GAE），然后使用 update_parameters 函数进行更新。

------------------------------




In [6]:
# Author: Till Zemann
# License: MIT License

from __future__ import annotations

import os

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torch import optim
from tqdm import tqdm

import gymnasium as gym

## Advantage Actor-Critic (A2C)

Actor-Critic 结合了基于价值和基于策略的方法的元素。在 A2C 中，代理有两个独立的神经网络：一个估计状态值函数的批评者网络，以及一个输出所有动作的分类概率分布的对数的参与者网络。训练批评家网络以最小化预测状态值与代理收到的实际回报之间的均方误差（这相当于最小化优势平方，因为动作的优势是回报与状态之间的差值） -value：A(s,a) = Q(s,a) - V(s)。演员网络经过训练，通过根据批评者网络选择具有高期望值的动作来最大化期望回报。

本教程的重点不会是 A2C 本身的细节。相反，本教程将重点介绍如何使用矢量化环境和域随机化来加速 A2C（和其他强化学习算法）的训练过程。



------------------------------




In [8]:
class A2C(nn.Module):
    """
    (Synchronous) Advantage Actor-Critic agent class

    Args:
        n_features: The number of features of the input state.
        n_actions: The number of actions the agent can take.
        device: The device to run the computations on (running on a GPU might be quicker for larger Neural Nets,
                for this code CPU is totally fine).
        critic_lr: The learning rate for the critic network (should usually be larger than the actor_lr).
        actor_lr: The learning rate for the actor network.
        n_envs: The number of environments that run in parallel (on multiple CPUs) to collect experiences.
    """

    def __init__(
        self,
        n_features: int,
        n_actions: int,
        device: torch.device,
        critic_lr: float,
        actor_lr: float,
        n_envs: int,
    ) -> None:
        """Initializes the actor and critic networks and their respective optimizers."""
        super().__init__()
        self.device = device
        self.n_envs = n_envs

        critic_layers = [
            nn.Linear(n_features, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU(),
            nn.Linear(32, 1),  # estimate V(s)
        ]

        actor_layers = [
            nn.Linear(n_features, 32),
            nn.ReLU(),
            nn.Linear(32, 32),
            nn.ReLU(),
            nn.Linear(
                32, n_actions
            ),  # estimate action logits (will be fed into a softmax later)
        ]

        # define actor and critic networks
        self.critic = nn.Sequential(*critic_layers).to(self.device)
        self.actor = nn.Sequential(*actor_layers).to(self.device)

        # define optimizers for actor and critic
        self.critic_optim = optim.RMSprop(self.critic.parameters(), lr=critic_lr)
        self.actor_optim = optim.RMSprop(self.actor.parameters(), lr=actor_lr)

    def forward(self, x: np.ndarray) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Forward pass of the networks.

        Args:
            x: A batched vector of states.

        Returns:
            state_values: A tensor with the state values, with shape [n_envs,].
            action_logits_vec: A tensor with the action logits, with shape [n_envs, n_actions].
        """
        x = torch.Tensor(x).to(self.device)
        state_values = self.critic(x)  # shape: [n_envs,]
        action_logits_vec = self.actor(x)  # shape: [n_envs, n_actions]
        return (state_values, action_logits_vec)

    def select_action(
        self, x: np.ndarray
    ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Returns a tuple of the chosen actions and the log-probs of those actions.

        Args:
            x: A batched vector of states.

        Returns:
            actions: A tensor with the actions, with shape [n_steps_per_update, n_envs].
            action_log_probs: A tensor with the log-probs of the actions, with shape [n_steps_per_update, n_envs].
            state_values: A tensor with the state values, with shape [n_steps_per_update, n_envs].
        """
        state_values, action_logits = self.forward(x)
        action_pd = torch.distributions.Categorical(
            logits=action_logits
        )  # implicitly uses softmax
        actions = action_pd.sample()
        action_log_probs = action_pd.log_prob(actions)
        entropy = action_pd.entropy()
        return (actions, action_log_probs, state_values, entropy)

    def get_losses(
        self,
        rewards: torch.Tensor,
        action_log_probs: torch.Tensor,
        value_preds: torch.Tensor,
        entropy: torch.Tensor,
        masks: torch.Tensor,
        gamma: float,
        lam: float,
        ent_coef: float,
        device: torch.device,
    ) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Computes the loss of a minibatch (transitions collected in one sampling phase) for actor and critic
        using Generalized Advantage Estimation (GAE) to compute the advantages (https://arxiv.org/abs/1506.02438).

        Args:
            rewards: A tensor with the rewards for each time step in the episode, with shape [n_steps_per_update, n_envs].
            action_log_probs: A tensor with the log-probs of the actions taken at each time step in the episode, with shape [n_steps_per_update, n_envs].
            value_preds: A tensor with the state value predictions for each time step in the episode, with shape [n_steps_per_update, n_envs].
            masks: A tensor with the masks for each time step in the episode, with shape [n_steps_per_update, n_envs].
            gamma: The discount factor.
            lam: The GAE hyperparameter. (lam=1 corresponds to Monte-Carlo sampling with high variance and no bias,
                                          and lam=0 corresponds to normal TD-Learning that has a low variance but is biased
                                          because the estimates are generated by a Neural Net).
            device: The device to run the computations on (e.g. CPU or GPU).

        Returns:
            critic_loss: The critic loss for the minibatch.
            actor_loss: The actor loss for the minibatch.
        """
        T = len(rewards)
        advantages = torch.zeros(T, self.n_envs, device=device)

        # compute the advantages using GAE
        gae = 0.0
        for t in reversed(range(T - 1)):
            td_error = (
                rewards[t] + gamma * masks[t] * value_preds[t + 1] - value_preds[t]
            )
            gae = td_error + gamma * lam * masks[t] * gae
            advantages[t] = gae

        # calculate the loss of the minibatch for actor and critic
        critic_loss = advantages.pow(2).mean()

        # give a bonus for higher entropy to encourage exploration
        actor_loss = (
            -(advantages.detach() * action_log_probs).mean() - ent_coef * entropy.mean()
        )
        return (critic_loss, actor_loss)

    def update_parameters(
        self, critic_loss: torch.Tensor, actor_loss: torch.Tensor
    ) -> None:
        """
        Updates the parameters of the actor and critic networks.

        Args:
            critic_loss: The critic loss.
            actor_loss: The actor loss.
        """
        self.critic_optim.zero_grad()
        critic_loss.backward()
        self.critic_optim.step()

        self.actor_optim.zero_grad()
        actor_loss.backward()
        self.actor_optim.step()

## Using Vectorized Environments

当您计算两个神经网络仅在一个时期内的损失时，它可能会有很高的方差。使用矢量化环境，我们可以并行使用n_envs ，从而达到线性加速（这意味着理论上，我们收集样本的速度要快n_envs倍），我们可以用它来计算当前策略和批评者网络的损失。当我们使用更多样本来计算损失时，它将具有更低的方差，因此可以更快地学习。

A2C 是一种同步方法，这意味着对网络的参数更新是确定性地发生的（在每个采样阶段之后），但我们仍然可以利用异步向量环境来生成多个进程以进行并行环境执行。

创建矢量环境的最简单方法是调用gym.vector.make ，它创建同一环境的多个实例

In [11]:
from gymnasium.vector import AsyncVectorEnv
import gymnasium as gym

# Function to create individual environments
def make_env():
    return gym.make("LunarLander-v3", max_episode_steps=600)

# Create vectorized environments
num_envs = 3
envs = AsyncVectorEnv([make_env for _ in range(num_envs)])

## Domain Randomization

如果我们想要随机化训练环境以获得更鲁棒的代理（可以处理环境的不同参数化，因此可能具有更高程度的泛化），我们可以手动设置所需的参数或使用伪随机数生成器来生成它们。

手动设置 3 个具有不同参数的并行“LunarLander-v3”环境：


In [12]:
envs = gym.vector.AsyncVectorEnv(
    [
        lambda: gym.make(
            "LunarLander-v3",
            gravity=-10.0,
            enable_wind=True,
            wind_power=15.0,
            turbulence_power=1.5,
            max_episode_steps=600,
        ),
        lambda: gym.make(
            "LunarLander-v3",
            gravity=-9.8,
            enable_wind=True,
            wind_power=10.0,
            turbulence_power=1.3,
            max_episode_steps=600,
        ),
        lambda: gym.make(
            "LunarLander-v3", gravity=-7.0, enable_wind=False, max_episode_steps=600
        ),
    ]
)

------------------------------

随机生成 3 个并行“LunarLander-v3”环境的参数，使用np.clip保留在推荐的参数空间中：



In [13]:
envs = gym.vector.AsyncVectorEnv(
    [
        lambda: gym.make(
            "LunarLander-v3",
            gravity=np.clip(
                np.random.normal(loc=-10.0, scale=1.0), a_min=-11.99, a_max=-0.01
            ),
            enable_wind=np.random.choice([True, False]),
            wind_power=np.clip(
                np.random.normal(loc=15.0, scale=1.0), a_min=0.01, a_max=19.99
            ),
            turbulence_power=np.clip(
                np.random.normal(loc=1.5, scale=0.5), a_min=0.01, a_max=1.99
            ),
            max_episode_steps=600,
        )
        for i in range(3)
    ]
)

------------------------------

在这里，我们使用正态分布，以环境的标准参数化作为平均值和任意标准偏差（尺度）。根据问题的不同，您可以尝试更高的方差并使用不同的分布。

如果您在整个训练时间内在相同的n_envs环境上进行训练，并且n_envs的数字相对较低（与环境的复杂程度成正比），则您可能仍然会对您选择的特定参数化产生一些过度拟合。为了缓解这种情况，您可以选择大量随机参数化环境，或者每隔几个采样阶段重新创建环境以生成一组新的伪随机参数。




## Setup




In [15]:
# environment hyperparams
n_envs = 10
n_updates = 1000
n_steps_per_update = 128
randomize_domain = False

# agent hyperparams
gamma = 0.999
lam = 0.95  # hyperparameter for GAE
ent_coef = 0.01  # coefficient for the entropy bonus (to encourage exploration)
actor_lr = 0.001
critic_lr = 0.005

# Note: the actor has a slower learning rate so that the value targets become
# more stationary and are theirfore easier to estimate for the critic

# environment setup
if randomize_domain:
    envs = AsyncVectorEnv(
        [
            lambda: gym.make(
                "LunarLander-v2",  # "v2" is the correct version for LunarLander
                gravity=np.clip(
                    np.random.normal(loc=-10.0, scale=1.0), a_min=-11.99, a_max=-0.01
                ),
                enable_wind=np.random.choice([True, False]),
                wind_power=np.clip(
                    np.random.normal(loc=15.0, scale=1.0), a_min=0.01, a_max=19.99
                ),
                turbulence_power=np.clip(
                    np.random.normal(loc=1.5, scale=0.5), a_min=0.01, a_max=1.99
                ),
                max_episode_steps=600,
            )
            for _ in range(n_envs)
        ]
    )
else:
    envs = AsyncVectorEnv(
        [
            lambda: gym.make("LunarLander-v3", max_episode_steps=600)
            for _ in range(n_envs)
        ]
    )

obs_shape = envs.single_observation_space.shape[0]
action_shape = envs.single_action_space.n

# set the device
use_cuda = False
device = torch.device("cuda" if torch.cuda.is_available() and use_cuda else "cpu")

# init the agent
agent = A2C(obs_shape, action_shape, device, critic_lr, actor_lr, n_envs)


## Training the A2C Agent
对于我们的训练循环，我们使用RecordEpisodeStatistics包装器来记录episode lengths和returns，并且我们还保存损失和熵以在代理完成训练后绘制它们。

您可能会注意到，我们不会像通常那样在每集开始时重置矢量化环境。这是因为一旦episode结束，每个环境都会自动重置（由于随机种子，每个环境需要不同数量的时间步来完成剧集）。因此，我们也不会收集Episode中的数据，而只是在每个环境中播放一定数量的步骤 ( n_steps_per_update )（例如，这可能意味着我们播放 20 个时间步来完成一个剧集，然后使用其余的时间步）开始新的步骤的时间步骤）。



In [None]:
# Create a wrapper environment to save episode returns and episode lengths
envs_wrapper = gym.wrappers.RecordEpisodeStatistics(envs)

critic_losses = []
actor_losses = []
entropies = []

# Use tqdm to get a progress bar for training
for sample_phase in tqdm(range(n_updates)):
    # We don't have to reset the envs, they just continue playing
    # until the episode is over and then reset automatically

    # Reset lists that collect experiences of an episode (sample phase)
    ep_value_preds = torch.zeros(n_steps_per_update, n_envs, device=device)
    ep_rewards = torch.zeros(n_steps_per_update, n_envs, device=device)
    ep_action_log_probs = torch.zeros(n_steps_per_update, n_envs, device=device)
    masks = torch.zeros(n_steps_per_update, n_envs, device=device)

    # At the start of training reset all envs to get an initial state
    if sample_phase == 0:
        states, info = envs_wrapper.reset(seed=42)

    # Play n steps in our parallel environments to collect data
    for step in range(n_steps_per_update):
        # Select an action A_{t} using S_{t} as input for the agent
        actions, action_log_probs, state_value_preds, entropy = agent.select_action(
            states
        )

        # Perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
        states, rewards, terminated, truncated, infos = envs_wrapper.step(
            actions.cpu().numpy()
        )

        ep_value_preds[step] = torch.squeeze(state_value_preds)
        ep_rewards[step] = torch.tensor(rewards, device=device)
        ep_action_log_probs[step] = action_log_probs

        # Add a mask (for the return calculation later);
        # for each env the mask is 1 if the episode is ongoing and 0 if it is terminated (not by truncation!)
        masks[step] = torch.tensor([not term for term in terminated])

    # Calculate the losses for actor and critic
    critic_loss, actor_loss = agent.get_losses(
        ep_rewards,
        ep_action_log_probs,
        ep_value_preds,
        entropy,
        masks,
        gamma,
        lam,
        ent_coef,
        device,
    )

    # Update the actor and critic networks
    agent.update_parameters(critic_loss, actor_loss)

    # Log the losses and entropy
    critic_losses.append(critic_loss.detach().cpu().numpy())
    actor_losses.append(actor_loss.detach().cpu().numpy())
    entropies.append(entropy.detach().mean().cpu().numpy())


AssertionError: 

## Plotting




In [None]:
""" plot the results """

# %matplotlib inline

rolling_length = 20
fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(12, 5))
fig.suptitle(
    f"Training plots for {agent.__class__.__name__} in the LunarLander-v3 environment \n \
             (n_envs={n_envs}, n_steps_per_update={n_steps_per_update}, randomize_domain={randomize_domain})"
)

# episode return
axs[0][0].set_title("Episode Returns")
episode_returns_moving_average = (
    np.convolve(
        np.array(envs_wrapper.return_queue).flatten(),
        np.ones(rolling_length),
        mode="valid",
    )
    / rolling_length
)
axs[0][0].plot(
    np.arange(len(episode_returns_moving_average)) / n_envs,
    episode_returns_moving_average,
)
axs[0][0].set_xlabel("Number of episodes")

# entropy
axs[1][0].set_title("Entropy")
entropy_moving_average = (
    np.convolve(np.array(entropies), np.ones(rolling_length), mode="valid")
    / rolling_length
)
axs[1][0].plot(entropy_moving_average)
axs[1][0].set_xlabel("Number of updates")


# critic loss
axs[0][1].set_title("Critic Loss")
critic_losses_moving_average = (
    np.convolve(
        np.array(critic_losses).flatten(), np.ones(rolling_length), mode="valid"
    )
    / rolling_length
)
axs[0][1].plot(critic_losses_moving_average)
axs[0][1].set_xlabel("Number of updates")


# actor loss
axs[1][1].set_title("Actor Loss")
actor_losses_moving_average = (
    np.convolve(np.array(actor_losses).flatten(), np.ones(rolling_length), mode="valid")
    / rolling_length
)
axs[1][1].plot(actor_losses_moving_average)
axs[1][1].set_xlabel("Number of updates")

plt.tight_layout()
plt.show()

<img src="file://_static/img/tutorials/vector_env_a2c_training_plots.png" alt="training_plots">




## Performance Analysis of Synchronous and Asynchronous Vectorized Environments




------------------------------

Asynchronous environments can lead to quicker training times and a higher speedup
for data collection compared to synchronous environments. This is because asynchronous environments
allow multiple agents to interact with their environments in parallel,
while synchronous environments run multiple environments serially.
This results in better efficiency and faster training times for asynchronous environments.




<img src="file://_static/img/tutorials/vector_env_performance_plots.png" alt="performance_plots">




------------------------------

According to the Karp-Flatt metric (a metric used in parallel computing to estimate the limit for the
speedup when scaling up the number of parallel processes, here the number of environments),
the estimated max. speedup for asynchronous environments is 57, while the estimated maximum speedup
for synchronous environments is 21. This suggests that asynchronous environments have significantly
faster training times compared to synchronous environments (see graphs).




<img src="file://_static/img/tutorials/vector_env_karp_flatt_plot.png" alt="karp_flatt_metric">




------------------------------

However, it is important to note that increasing the number of parallel vector environments
can lead to slower training times after a certain number of environments (see plot below, where the
agent was trained until the mean training returns were above -120). The slower training times might occur
because the gradients of the environments are good enough after a relatively low number of environments
(especially if the environment is not very complex). In this case, increasing the number of environments
does not increase the learning speed, and actually increases the runtime, possibly due to the additional time
needed to calculate the gradients. For LunarLander-v3, the best performing configuration used a AsyncVectorEnv
with 10 parallel environments, but environments with a higher complexity may require more
parallel environments to achieve optimal performance.




<img src="file://_static/img/tutorials/vector_env_runtime_until_threshold.png" alt="runtime_until_threshold_plot">




## Saving/ Loading Weights




In [None]:
save_weights = False
load_weights = False

actor_weights_path = "weights/actor_weights.h5"
critic_weights_path = "weights/critic_weights.h5"

if not os.path.exists("weights"):
    os.mkdir("weights")

""" save network weights """
if save_weights:
    torch.save(agent.actor.state_dict(), actor_weights_path)
    torch.save(agent.critic.state_dict(), critic_weights_path)


""" load network weights """
if load_weights:
    agent = A2C(obs_shape, action_shape, device, critic_lr, actor_lr)

    agent.actor.load_state_dict(torch.load(actor_weights_path))
    agent.critic.load_state_dict(torch.load(critic_weights_path))
    agent.actor.eval()
    agent.critic.eval()

## Showcase the Agent




In [None]:
""" play a couple of showcase episodes """

n_showcase_episodes = 3

for episode in range(n_showcase_episodes):
    print(f"starting episode {episode}...")

    # create a new sample environment to get new random parameters
    if randomize_domain:
        env = gym.make(
            "LunarLander-v3",
            render_mode="human",
            gravity=np.clip(
                np.random.normal(loc=-10.0, scale=2.0), a_min=-11.99, a_max=-0.01
            ),
            enable_wind=np.random.choice([True, False]),
            wind_power=np.clip(
                np.random.normal(loc=15.0, scale=2.0), a_min=0.01, a_max=19.99
            ),
            turbulence_power=np.clip(
                np.random.normal(loc=1.5, scale=1.0), a_min=0.01, a_max=1.99
            ),
            max_episode_steps=500,
        )
    else:
        env = gym.make("LunarLander-v3", render_mode="human", max_episode_steps=500)

    # get an initial state
    state, info = env.reset()

    # play one episode
    done = False
    while not done:
        # select an action A_{t} using S_{t} as input for the agent
        with torch.no_grad():
            action, _, _, _ = agent.select_action(state[None, :])

        # perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
        state, reward, terminated, truncated, info = env.step(action.item())

        # update if the environment is done
        done = terminated or truncated

env.close()

## Try playing the environment yourself




In [None]:
# from gymnasium.utils.play import play
#
# play(gym.make('LunarLander-v3', render_mode='rgb_array'),
#     keys_to_action={'w': 2, 'a': 1, 'd': 3}, noop=0)

## References

[1] V. Mnih, A. P. Badia, M. Mirza, A. Graves, T. P. Lillicrap, T. Harley, D. Silver, K. Kavukcuoglu. "Asynchronous Methods for Deep Reinforcement Learning" ICML (2016).

[2] J. Schulman, P. Moritz, S. Levine, M. Jordan and P. Abbeel. "High-dimensional continuous control using generalized advantage estimation." ICLR (2016).

[3] Gymnasium Documentation: Vector environments. (URL: https://gymnasium.farama.org/api/vector/)

