## PPO notes

These are my notes on the TorchRL PPO tutorial: <https://pytorch.org/rl/stable/tutorials/coding_ppo.html>



In [1]:
import torchrl
import torch
import tensordict
from datetime import datetime

In [2]:
device = (
    torch.device('cuda:0')
    #if torch.cuda.is_available()
    if False
    else torch.device('cpu')
)

# Number of cells in each layer
num_cells = 256

# Learning rate
lr = 3e-4

max_grad_norm = 1.0


# Number of frames in the whole training session.
# Frames are just steps in the simulation.
# May need to increase this significantly for real training.
total_frames = 20_000

# Number of frames in a batch collection.
# We'll collect this many frames and then perform the training
# optimization on them.
frames_per_batch = 100

# Number of frames in a sub-batch.
# We split each batch into smaller sub-batches in the training loop.
# (But why though?)
sub_batch_size = 64

# Number of training epochs per batch.
# After collecting a particular batch, we run the optimization on it
# multiple times in a row. Each time is called an epoch.
num_epochs = 10

# Clip value for PPO loss
clip_epsilon = 0.2

gamma = 0.99

lmbda = 0.95

entropy_eps = 1e-4

In [3]:
#base_env = torchrl.envs.GymEnv(
#    "InvertedPendulum-v4",
#    device=device,
#    render_mode="human"
#)
base_env = torchrl.envs.UnityMLAgentsEnv(
    #registered_name='3DBall'
)


  unity_communicator_version = StrictVersion(unity_com_ver)


In [4]:
td = base_env.reset()
print(td)

#td = base_env.step(base_env.action_spec.rand())
#print(td['next'])

TensorDict(
    fields={
        group_0: TensorDict(
            fields={
                agent_0: TensorDict(
                    fields={
                        VectorSensor_size8: Tensor(shape=torch.Size([8]), device=cpu, dtype=torch.float32, is_shared=False),
                        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                        truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
                    batch_size=torch.Size([]),
                    device=None,
                    is_shared=False)},
            batch_size=torch.Size([]),
            device=None,
            is_shared=False)},
    batch_size=torch.Size([]),
    device=None,
    is_shared=False)


In [5]:
base_env.observation_spec

Composite(
    group_0: Composite(
        agent_0: Composite(
            VectorSensor_size8: UnboundedContinuous(
                shape=torch.Size([8]),
                space=ContinuousBox(
                    low=Tensor(shape=torch.Size([8]), device=cpu, dtype=torch.float32, contiguous=True),
                    high=Tensor(shape=torch.Size([8]), device=cpu, dtype=torch.float32, contiguous=True)),
                device=cpu,
                dtype=torch.float32,
                domain=continuous),
            device=None,
            shape=torch.Size([])),
        device=None,
        shape=torch.Size([])),
    device=None,
    shape=torch.Size([]))

In [6]:
env = torchrl.envs.TransformedEnv(
    base_env,
    torchrl.envs.Compose(
        # Normalize the observations to loosely match a Gaussian dist.
        torchrl.envs.ObservationNorm(in_keys=[("group_0", "agent_0", "VectorSensor_size8")]),

        # Convert to float for better performance.
        torchrl.envs.DoubleToFloat(),

        # This will allow us to count the frames.
        #torchrl.envs.StepCounter(),
    )
)

# This sets up the normalization to run 100 random steps, and those
# steps will be used to automatically set internal parameters so that
# the observations fit a Gaussian curve.
env.transform[0].init_stats(num_iter=100, reduce_dim=0, cat_dim=0)

  source[group_name][agent_name]["truncated"] = torch.tensor(


In [7]:
print("normalization constant shape:", env.transform[0].loc.shape)

normalization constant shape: torch.Size([8])


In [8]:
print("observation_spec:", env.observation_spec)
print("reward_spec:", env.reward_spec)
print("input_spec:", env.input_spec)
print("action_spec (as defined by input_spec):", env.action_spec)

observation_spec: Composite(
    group_0: Composite(
        agent_0: Composite(
            VectorSensor_size8: UnboundedContinuous(
                shape=torch.Size([8]),
                space=ContinuousBox(
                    low=Tensor(shape=torch.Size([8]), device=cpu, dtype=torch.float32, contiguous=True),
                    high=Tensor(shape=torch.Size([8]), device=cpu, dtype=torch.float32, contiguous=True)),
                device=cpu,
                dtype=torch.float32,
                domain=continuous),
            device=None,
            shape=torch.Size([])),
        device=None,
        shape=torch.Size([])),
    device=None,
    shape=torch.Size([]))
reward_spec: Composite(
    group_0: Composite(
        agent_0: Composite(
            reward: UnboundedContinuous(
                shape=torch.Size([1]),
                space=ContinuousBox(
                    low=Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, contiguous=True),
                    high

In [9]:
torchrl.envs.check_env_specs(env)

2024-10-27 12:41:45,766 [torchrl][INFO] check_env_specs succeeded!


In [10]:
rollout = env.rollout(3)
print("rollout of three steps:", rollout)
print("Shape of the rollout TensorDict:", rollout.batch_size)

rollout of three steps: TensorDict(
    fields={
        group_0: TensorDict(
            fields={
                agent_0: TensorDict(
                    fields={
                        VectorSensor_size8: Tensor(shape=torch.Size([3, 8]), device=cpu, dtype=torch.float32, is_shared=False),
                        continuous_action: Tensor(shape=torch.Size([3, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                        done: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                        terminated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                        truncated: Tensor(shape=torch.Size([3, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
                    batch_size=torch.Size([3]),
                    device=None,
                    is_shared=False)},
            batch_size=torch.Size([3]),
            device=None,
            is_shared=False),
        next: Tensor

In order to do PPO, we need to set up a stochastic policy for exploration. We'll create an actor module for this. The output of the network used by the actor needs to be a distribution for the action to take, rather than a single action value.

The distribution is centered on a value, "loc", with a variation, "scale", which represents how wide the distribution is. When making an action, we'll pick a random sample according to this distribution.

The action spec says that the action value is continuous. It is common to use a Tanh normal distribution for this.

In [11]:
#action_spec = env.action_spec[("group_0", "agent_0", "continuous_action"]
action_spec = env.action_spec

# Create the policy network
actor_net = torch.nn.Sequential(
    torch.nn.LazyLinear(num_cells, device=device),
    torch.nn.Tanh(),
    torch.nn.LazyLinear(num_cells, device=device),
    torch.nn.Tanh(),
    torch.nn.LazyLinear(num_cells, device=device),
    torch.nn.Tanh(),
    torch.nn.LazyLinear(2 * action_spec.shape[-1], device=device),
    tensordict.nn.distributions.NormalParamExtractor(),
)

# Need to wrap the network with a TensorDictModule to be compatible
# with the actor.
policy_module = tensordict.nn.TensorDictModule(
    actor_net, in_keys=[("group_0", "agent_0", "VectorSensor_size8")], out_keys=["loc", "scale"]
)

# Wrap the module in a probabilistic actor, which knows how to
# pick an action according to the given distribution.
policy_module = torchrl.modules.ProbabilisticActor(
    module=policy_module,
    #spec=env.action_spec,
    spec=action_spec,
    in_keys=["loc", "scale"],
    out_keys=[("group_0", "agent_0", "continuous_action")],
    distribution_class=torchrl.modules.TanhNormal,
    distribution_kwargs={
        "low": action_spec.space.low,
        "high": action_spec.space.high,
    },
    return_log_prob=True,
    # Apparently, we'll need the log-prob for the numerator of the importance
    # weights, but I don't know what importance weights are.
)

Now we need to create a value module which estimates the return of a trajectory. After this this network is trained, it will be able to estimate the values of actions before actually taking the action. We'll give this network almost the same structure as the actor policy, but it won't have the separate distribution values--there will just be one output value.

In [12]:
value_net = torch.nn.Sequential(
    torch.nn.LazyLinear(num_cells, device=device),
    torch.nn.Tanh(),
    torch.nn.LazyLinear(num_cells, device=device),
    torch.nn.Tanh(),
    torch.nn.LazyLinear(num_cells, device=device),
    torch.nn.Tanh(),
    torch.nn.LazyLinear(1, device=device),
)

value_module = torchrl.modules.ValueOperator(
    module=value_net,
    in_keys=[("group_0", "agent_0", "VectorSensor_size8")],
    out_keys=[("group_0", "agent_0", "continuous_action")],
)

Now we can call the policy and value modules once to make sure they are set up properly and that they output the correct fields.

In [13]:
td = env.reset()
print("Running policy:", policy_module(td))
print("Running value:", value_module(td))

td = env.step(env.full_action_spec.rand())['next']
#print("Running policy:", policy_module(td))
#print("Running value:", value_module(td))

Running policy: TensorDict(
    fields={
        group_0: TensorDict(
            fields={
                agent_0: TensorDict(
                    fields={
                        VectorSensor_size8: Tensor(shape=torch.Size([8]), device=cpu, dtype=torch.float32, is_shared=False),
                        continuous_action: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, is_shared=False),
                        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                        truncated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
                    batch_size=torch.Size([]),
                    device=None,
                    is_shared=False)},
            batch_size=torch.Size([]),
            device=None,
            is_shared=False),
        loc: Tensor(shape=torch.Size([2]), de

Now we need to set up a data collector, which has the following responsibilities: reset the environment, compute an action based on the last observation, execute a step in the env, repeat until the environment is done.

`SyncDataCollector` is the simplest one. The collector will return a batch of frames that we can train with.

In [14]:
collector = torchrl.collectors.SyncDataCollector(
    # The env to act in and collect from
    env,

    # The policy to use for making decisions
    policy_module,
    #out_keys=[('group_0', 'agent_0', 'continuous_action')],

    # The number of frames to collect at each iteration
    frames_per_batch=frames_per_batch,

    # The maximum number of frames to go before resetting the env
    total_frames=total_frames,

    # With this setting, if there's a reset, the trajectories are just
    # concatenated together rather than split into separate arrays.
    split_trajs=False,

    device=device,
)

In [15]:
collector.rollout()

TensorDict(
    fields={
        collector: TensorDict(
            fields={
                traj_ids: Tensor(shape=torch.Size([100]), device=cpu, dtype=torch.int64, is_shared=False)},
            batch_size=torch.Size([100]),
            device=cpu,
            is_shared=False),
        group_0: TensorDict(
            fields={
                agent_0: TensorDict(
                    fields={
                        VectorSensor_size8: Tensor(shape=torch.Size([100, 8]), device=cpu, dtype=torch.float32, is_shared=False),
                        continuous_action: Tensor(shape=torch.Size([100, 2]), device=cpu, dtype=torch.float32, is_shared=False),
                        done: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                        terminated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                        truncated: Tensor(shape=torch.Size([100, 1]), device=cpu, dtype=torch.bool, is_shared=Fa

We need to set up a replay buffer to store the collected data and allow us to access it in the future for training. We'll just store a single batch of data in our replay buffer.

In [16]:
replay_buffer = torchrl.data.ReplayBuffer(
    # This replay buffer will only hold one batch of data
    storage=torchrl.data.LazyTensorStorage(
        max_size=frames_per_batch,
    ),

    # With this sampler, with each sample we take, the frames will
    # be shuffled, and each frame can only be sampled once.
    sampler=torchrl.data.SamplerWithoutReplacement(),
)

Now we need a loss function to determine how well our agent is doing. We'll use a standard loss module for PPO called `ClipPPOLoss`. To use this module, we need to pass batches from our value module into something called an advantage module, and then pass the result onto `ClipPPOLoss`. We'll use GAE for the advantage module.

In [19]:
#advantage_module = torchrl.objectives.value.GAE(
#    gamma=gamma,
#    lmbda=lmbda,
#    value_network=value_module,
#    #value_network=value_thing,
#    average_gae=True,
#)

loss_module = torchrl.objectives.ClipPPOLoss(
    actor_network=policy_module,
    critic_network=value_module,
    clip_epsilon=clip_epsilon,
    entropy_bonus=bool(entropy_eps),
    entropy_coef=entropy_eps,
    critic_coef=1.0,
    loss_critic_type="smooth_l1",
)
loss_module.set_keys(
    reward=('group_0', 'agent_0', 'reward'),
    #value=('group_0', 'agent_0', 'VectorSensor_size8')
    value=('group_0', 'agent_0', 'continuous_action')
)

loss_module.make_value_estimator(
    torchrl.objectives.ValueEstimators.GAE,
    gamma=gamma,
    lmbda=lmbda,
)

advantage_module = loss_module.value_estimator

# Create an optimizer for the loss module
optim = torch.optim.Adam(loss_module.parameters(), lr)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
    optim,
    total_frames // frames_per_batch,
    0.0
)

In [22]:
td = env.reset()
td = env.step(env.full_action_spec.rand())
#print(advantage_module(td))

Finally, we have to write the training loop. The steps are:

* Collect data
  * Compute advantage
    * Loop over the collected data to compute loss
    * Back propagate
    * Optimize
    * Repeat
  * Repeat
* Repeat

In [24]:
cur_frames = 0
start_time = datetime.now()

#env.reset()

# Collect batches of data
for i, tensordict_data in enumerate(collector):
    # Train on the collected batch
    for _ in range(num_epochs):
        advantage_module(
            tensordict_data,
            params=loss_module.critic_network_params,
            target_params=loss_module.target_critic_network_params,
        )
        data_view = tensordict_data.reshape(-1)
        replay_buffer.extend(data_view.cpu())

        # During each epoch, a batch of data is trained in sub-batches
        for _ in range(frames_per_batch // sub_batch_size):
            subdata = replay_buffer.sample(sub_batch_size)
            loss_vals = loss_module(subdata.to(device))
            loss_value = (
                loss_vals["loss_objective"]
                + loss_vals["loss_critic"]
                + loss_vals["loss_entropy"]
            )

            # Back propogate and optimize
            loss_value.backward()

            # Bounding the grad is good practice
            torch.nn.utils.clip_grad_norm_(
                loss_module.parameters(),
                max_grad_norm
            )
            optim.step()
            optim.zero_grad()

    # That's all the training we need!
    # But let's monitor how well our policy is working.

    reward_sum = tensordict_data['next', 'reward'].sum().item()
    max_step_count = tensordict_data["step_count"].max().item()

    with torch.no_grad():
        # Execute a rollout with the trained policy
        eval_rollout = env.rollout(100, policy_module)
        eval_reward = eval_rollout["next", "reward"]
        eval_reward_sum = eval_reward.sum().item()
        eval_reward_mean = eval_reward.mean().item()
        eval_step_count = eval_rollout["step_count"].max().item()

        cur_frames += tensordict_data.numel()
        run_time = str(datetime.now() - start_time).split('.')[0]

        eval_str = (
            f"[{cur_frames}/{total_frames}"
            f" {run_time}s]"
            f" eval: (reward sum: {eval_reward_sum}"
            f", reward mean: {eval_reward_mean}"
            f", step count: {eval_step_count}"
            f"), collection: (max step count: {max_step_count})"
        )
        print(eval_str)

    # This controls the learning rate scheduler, which is good practice
    # to include
    scheduler.step()

RuntimeError: The sets of keys in the tensordicts to stack are exclusive. Consider using `LazyStackedTensorDict.maybe_dense_stack` instead.

Note that according to the [docs](https://www.gymlibrary.dev/environments/mujoco/inverted_pendulum/) for "InvertedPendulum-v4", the environnment automatically resets at step 1000, so 1000 is the maximum possible reward sum. This model is fairly well trained after just a few minutes, since we're consistently reaching the maximum reward when we evaluate.