# DDPG From Scratch
## A Quick Intro to Deep RL for Continuous Control

Builds [Deep Deterministic Policy Gradient](https://arxiv.org/abs/1509.02971) from scratch in pytorch.

Written May 2022

Last Updated November 2024

-Jake Grigsby (grigsby@cs.utexas.edu)

## Basic Setup

In [1]:
!apt-get install -y xvfb ffmpeg
!pip install gymnasium==0.29.0
!pip install torch>2.0
!pip install pyvirtualdisplay
!pip install tqdm

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
xvfb is already the newest version (2:21.1.4-2ubuntu1.7~22.04.12).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [2]:
import gymnasium as gym
import torch
import numpy as np
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f"Using Device Type: {device}")

def make_unique_dir(run_name, base_path="./data"):
    base_dir = os.path.join(base_path, run_name)
    i = 0
    while os.path.exists(base_dir + f"_{i}"):
        i += 1
    base_dir += f"_{i}"
    os.makedirs(base_dir)
    return base_dir

Using Device Type: cuda


In [3]:
# https://colab.research.google.com/github/jeffheaton/t81_558_deep_learning/blob/master/t81_558_class_12_01_ai_gym.ipynb#scrollTo=T9RpF49oOsZj
import glob
import io
import base64
from IPython.display import HTML
from pyvirtualdisplay import Display
from IPython import display as ipythondisplay

display = Display(visible=0, size=(1400, 900))
display.start()


def render_video_from_disk():
    mp4list = glob.glob('video/*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        ipythondisplay.display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")


## RL Environment

In [4]:

ENV_NAME = "Pendulum-v1" # change this  to try a different environment; Pendulum trains in just a few minutes
MAX_STEPS = 200 # maximum episode length; change this based on the environment

def make_env() -> gym.Env:
  env = gym.make(ENV_NAME, render_mode="rgb_array")
  # enforce a maximum time limit
  env = gym.wrappers.TimeLimit(env, MAX_STEPS)
  # force actions to be in [-1, 1]
  env = gym.wrappers.RescaleAction(env, min_action=-1., max_action=1.)
  return env

demo_env = make_env()
assert isinstance(demo_env.action_space, gym.spaces.Box), "Please pick an environment with continuous actions"

The environment has a set of states, $\mathcal{S}$, that represent information about the agent's surroundings. In our case, states are arrays of $n$ elements: $\mathcal{S} \subseteq \mathbb{R}^{n}$.

In [5]:
print(f"States are arrays with shape: {demo_env.observation_space.shape}")

States are arrays with shape: (3,)


The agent can pick from a set of actions, $\mathcal{A}$. In our case, actions are arrays of $k$ elements: $\mathcal{A} \subseteq \mathbb{R}^k$. We also assume the action arrays are bounded in some range $[\text{low}, \text{high}]$.

In [6]:
print(f"Actions are arrays with shape: {demo_env.action_space.shape}, bounded in [{demo_env.action_space.low}, {demo_env.action_space.high}]")

Actions are arrays with shape: (1,), bounded in [[-1.], [1.]]


Agents have the ability to decide which action to select in any given state.

The agent's *policy*, $\pi : \mathcal{S} \rightarrow \mathcal{A}$, is a function that maps states to actions. Here we define an agent and a simple policy of picking actions at random:

In [7]:
class Agent:
  def __init__(self, action_space):
    self.action_space = action_space

  def policy(self, state) -> np.ndarray:
    # pick from the action space at random
    action = self.action_space.sample()
    return action

At each timestep $t$ our agent sees a `state` ($s_t$) and picks an action $a_t$ (`action = agent.policy(state)`). This decision has some effect on the environment, which leads to the `next_state` ($s_{t+1}$) at timestep $t+1$. The environment also returns a float `reward` for that timestep ($r_t$), and a bool `done` that says whether the environment has terminated and needs to be reset. Interacting with an enviroment looks something like:

In [8]:
env = make_env()
agent = Agent(env.action_space)

state = env.reset()
done = False
episode_return = 0.
while not done:
  action = agent.policy(state)
  # the env.step signature is defined by gym and used by convention pretty much everywhere
  next_state, reward, terminated, truncated, _ = env.step(action)
  episode_return += reward
  done = terminated or truncated
  state = next_state
print(f"Achieved a Total Return of {episode_return:.1f}")

Achieved a Total Return of -1678.1


The goal of RL is to find a policy that maximizes the total sum of rewards before the environment is reset. We'll take the average of several resets ("episodes") in case the environment is nondeterministic.

In [9]:
def evaluate_agent(
    agent: Agent,
    env: gym.Env,
    episodes: int,
    render : bool = False,
):
    if render:
      env = gym.wrappers.RecordVideo(env, "./video", disable_logger=True)

    returns = []
    for episode in range(episodes):
        episode_return = 0.0
        # reset the environment to a starting position
        state, _ = env.reset()
        done = False
        while not done:
            action = agent.policy(state)
            state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            episode_return += reward # track cumulative sum of rewards
        returns.append(episode_return)
    # average of the sums of rewards
    mean_return = sum(returns) / len(returns)

    if render:
      render_video_from_disk()
    return mean_return

Let's run our random policy in the environment and see how it does:

In [10]:
env = make_env()
random_agent_score = evaluate_agent(Agent(env.action_space), env, 10, render=True)
f"Random Agent Score: {random_agent_score:.2f}"

  logger.warn(
  logger.warn(


'Random Agent Score: -1251.37'

## Agents with Neural Network Policies

Deep RL is just a way to learn a smarter `policy` function ($\pi$) by replacing it with a deep neural network. We'll call this network the actor:

In [11]:
from torch import nn
import torch.nn.functional as F

class Actor(nn.Module):
    """
    Policy Network (state --> action)
    """

    def __init__(self, state_size: int, action_size: int, hidden_size: int = 256):
        super().__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, action_size)

    def forward(self, state):
        # state_size --> hidden_size fully connected layer with ReLU
        x = F.leaky_relu(self.fc1(state))
        # hidden_size --> hidden_size fully connected layer with ReLU
        x = F.leaky_relu(self.fc2(x))
        # hidden_size --> action_size fully connected layer
        x = self.out(x)
        # actions bounded by tanh activation in (-1, 1)
        # (this is why we had to normalize actions)
        action = torch.tanh(x)
        return action

Let's upgrade the random agent from above to predict actions using the `Actor` neural network:

In [12]:
class NeuralNetAgent(Agent):
  def __init__(self, state_space, action_space):
      self.actor = Actor(state_space.shape[0], action_space.shape[0])
      self.actor.to(device)

  def policy(self, state):
      state = torch.from_numpy(state).unsqueeze(0) # np.ndarray --> torch.Tensor with fake batch dim
      state = state.to(device).float() # move to the gpu if possible
      with torch.no_grad():
        # gradient-free NN forward pass
        action = self.actor(state)
      action = action.squeeze(0) # remove fake batch dimension
      action = action.cpu().numpy() # move back to cpu, --> np.ndarray
      return action

The `NeuralNetAgent.actor` network's parameters are randomly initialized and its outputs are probably not any smarter than the random agent. Let's check:

In [13]:
env = make_env()
agent = NeuralNetAgent(env.observation_space, env.action_space)
random_network_agent_score = evaluate_agent(agent, env, 10, render=True)
f"Random Network Agent Score: {random_network_agent_score:.2f}"

  logger.warn(
  logger.warn(


'Random Network Agent Score: -1483.54'

Our goal is to define a function,`learn`, that will improve the parameters of our actor network by optimizing on data we've collected from the environment. By "improve" we mean "increase the cumulative sum of rewards in the environment." *(Whether that leads to the behavior we want depends on how we define our reward function.)*

One way to approach this is to define a second network - called the *critic* - which we will use to judge the actions taken by our actor. The critic outputs a scalar that approximates the sum of future rewards we would expect to achieve if we took an action in a given state. In math we'll refer to the critic as a function $Q : \mathcal{S} \times \mathcal{A} \rightarrow \mathbb{R}$. Let $r_t$ be the reward given by the environment at timestep $t$ when we take action $a_t$ in state $s_t$. Then our critic function is:

\begin{align}
Q(s_t, a_t) = \sum_{i=t}^{T}r_i
\end{align}

where $T$ is the timestep the episode ends (`terminated = True`).

A neural network that can learn this function looks something like:

In [14]:
class Critic(nn.Module):
    """
    Value Network (state + action --> value)
    """

    def __init__(self, state_size: int, action_size: int, hidden_size: int = 256):
        super().__init__()
        self.fc1 = nn.Linear(state_size + action_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, 1)

    def forward(self, state, action):
        # input concatenates state and action arrays
        x = torch.cat((state, action), dim=-1)
        # state_size + action_size --> hidden_size w/ ReLU
        x = F.leaky_relu(self.fc1(x))
        # hidden_size --> hidden_size w/ ReLU
        x = F.leaky_relu(self.fc2(x))
        # hidden_size --> scalar. Output should be able
        # to take any value so we skip the activation func.
        val = self.out(x)
        return val

Before we get to `learn`, let's add a `Critic` and a few convenience functions to our `NeuralNetAgent`:

In [15]:
class DDPGAgent(NeuralNetAgent):
    def __init__(
        self, observation_space, action_space, hidden_size: int = 256,
    ):
        state_size = observation_space.shape[0]
        action_size = action_space.shape[0]
        self.actor = Actor(state_size, action_size, hidden_size=hidden_size)
        self.critic = Critic(state_size, action_size, hidden_size=hidden_size)
        self.to(device)

    def to(self, device):
        self.actor = self.actor.to(device)
        self.critic = self.critic.to(device)

    def save(self, path):
        actor_path = os.path.join(path, "actor.pt")
        critic_path = os.path.join(path, "critic.pt")
        torch.save(self.actor.state_dict(), actor_path)
        torch.save(self.critic.state_dict(), critic_path)

    def load(self, path):
        actor_path = os.path.join(path, "actor.pt")
        critic_path = os.path.join(path, "critic.pt")
        self.actor.load_state_dict(torch.load(actor_path))
        self.critic.load_state_dict(torch.load(critic_path))

## Building the DDPG Learning Update

If we assume our critic can accurately predict the rewards that result from `(state, action)` pairs, we can optimize the actor to maximize (or to minimize the negative of) the critic's output. Our  actor's loss function, $\mathcal{L}_{actor}$, is written:

\begin{align}
\mathcal{L}_{actor} = -Q(s, \pi(s))
\end{align}

In pseudocode:

```python

def learn(agent, states, actor_optimizer):
    # states.shape == (batch_size, state_size)
    # actor_optimizer = torch.optim.Adam(agent.actor.parameters())

    # actor update
    actor_loss = -agent.critic(states, agent.actor(states)).mean()
    actor_optimizer.zero_grad()
    actor_loss.backward()
    actor_optimizer.step()
```

Of course our `Critic` is also randomly initialized, so its outputs are meaningless and minimizing them won't be any help. However, if we keep assuming the critic is accurate we'll find a way to let the critic improve itself....

$Q(s_t, a_t)$ represents the cumulative sum of rewards we predict we will recieve when starting in state $s$ during timestep $t$ and picking action $a$. Taking $a_t$ in state $s_t$ will give us a reward $r_t$ and lead to a new state $s_{t+1}$ at the next timestep. Then our actor will pick a new action $a_{t+1} = \pi(s_{t+1})$. Using our critic again, this decision will lead to value $Q(s_{t+1}, a_{t+1})$.

The key observation here is that these terms are supposed to have a recurrent relationship. $Q(s_t, a_t)$ predicts a sum; the first term of that sum is given to us by $r_t$, and $Q(s_{t+1}, a_{t+1})$ contains the rest of the terms:

\begin{align}
Q(s_t, a_t) = \sum_{i=t}^{T}r_i = r_t + \sum_{i=t+1}^{T}r_i = r_t + Q(s_{t+1}, a_{t+1}) = r_t + Q(s_{t+1}, \pi(s_{t+1}))
\end{align}


In code, a perfectly trained critic would pass:
```python
# (ignoring np vs torch issues here)
action = agent.policy(state)
next_state, reward, done, _ = env.step(action)
next_action = agent.policy(next_state)
assert torch.isclose(agent.critic(state, action), reward + agent.critic(next_state, next_action))
```



We will basically be trying to get the critic to pass this assert statement by gradient descent. We treat the right side as a fixed target and minimize mean squared error of our predictions:

\begin{align}
\mathcal{L}_{critic} = (Q(s, a) - (r + Q(s', \pi(s')))^2
\end{align}

where $s'$ is a time-independent way to write the state that followed $s$. For simplicity let's define $a' = \pi(s')$.

Intuitively, we are now doing regression on a label we generated ourselves. Although that label contains outputs of our critic net, it is slightly more accurate because it includes a real reward from the environment. We treat the target as a fixed value just like it came from a supervised dataset and do not backprop through $Q(s', a')$.

```python
def learn(agent, states, actions, rewards, next_states, actor_optimizer, critic_optimizer):
    # states.shape == next_states.shape == (batch_size, state_size)
    # actions.shape == (batch_size, action_size)
    # rewards.shape == (batch_size, 1)
    # actor_optimizer = torch.optim.Adam(agent.actor.parameters())
    # critic_optimizer = torch.optim.Adam(agent.critic.parameters())

    # actor update
    actor_loss = -agent.critic(states, actor.actor(states)).mean()
    actor_optimizer.zero_grad()
    actor_loss.backward()
    actor_optimizer.step()

    # critic update
    with torch.no_grad():
      # note canceled gradients
      next_actions = agent.actor(next_states)
      targets = reward + agent.critic(next_states, next_actions)
    preds = agent.critic(states, actions)
    critic_loss = F.mse_loss(preds, targets)
    critic_optimizer.zero_grad()
    critic_loss.backward()
    critic_optimizer.step()
```

Note that the actor loss function relies on the accuracy of the critic while the critic loss function relies on the actor. Randomly initializing two neural networks and then iteratively updating them with codependent loss functions is the kind of thing that sounds unstable but almost always works anyway. Classic deep learning. There are a few more details to help with stability:

1. We only want to predict rewards until the end of the episode, so we use the `done` flag ($d$) to zero out any terms that come after `done = True`:

\begin{align}
\mathcal{L}_{critic} = (Q(s, a) - (r + (1 - d)Q(s', a'))^2
\end{align}

2. To avoid learning large unstable sums, we multiply $Q(s', a')$ by a scalar $\gamma \in [0, 1)$ called the *discount factor*. This becomes a hyperparameter where values closer to $1$ prioritize long-term rewards while smaller values are more short-sighted.

\begin{align}
\mathcal{L}_{critic} = (Q(s, a) - (r + \gamma(1 - d)Q(s', a'))^2
\end{align}

3. We use a lagged copy of the critic network to generate targets. Adjusting the output of $Q(s, a)$ by gradient descent alters the output of $Q(s', a')$, and rapid changes to the targets of our loss function can lead to instability. A very hacky but effective way to deal with this is to create a clone of the critic network with parameters that are a moving average of the "online" critic we're actually training. The cloned (or "target") version is used to generate targets and its parameters are updated slightly after each training step. In code:

In [16]:
def soft_update(target: nn.Module, source: nn.Module, tau: float):
    """
    Used to make the `target` NN's params a moving average of the `source` NN's params.

    `tau` is a small float that controls the speed of the update.
    """
    for target_param, param in zip(target.parameters(), source.parameters()):
        target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)


def hard_update(target : nn.Module, source : nn.Module):
    """
    Hard copy `source` params to `target`. Used to sync online and target
    net at the start of training.
    """
    for target_param, param in zip(target.parameters(), source.parameters()):
        target_param.data.copy_(param.data)

While our agent is training, it will interact with the environment to collect additional data. This data will be stored in a buffer that can sample and batch ("replay") the agent's experiences during learning updates. We store `(state, action, reward, next_state, done)` tuples in large arrays and override the oldest data when we run out of space.

In [17]:
from typing import Tuple

class ReplayBuffer:
    """
    Store environment experience to train the agent.

    Notation: (state, action, reward, next_state, done) <--> (s, a, r, s1, d)
    """

    def __init__(
        self, size: int, example_state: np.ndarray, example_action: np.ndarray
    ):
        self.s_stack = np.zeros(
            (size,) + example_state.shape, dtype=example_state.dtype
        )
        self.a_stack = np.zeros(
            (size,) + example_action.shape, dtype=example_action.dtype
        )
        self.r_stack = np.zeros((size, 1), dtype=np.float32)
        self.s1_stack = np.zeros(
            (size,) + example_state.shape, dtype=example_state.dtype
        )
        self.d_stack = np.zeros((size, 1), dtype=bool)

        self.size = size
        self._next_idx = 0
        self._max_filled = 0

    def __len__(self):
        return self._max_filled

    def push(self, s, a, r, s1, d) -> None:
        # add new data to the buffer
        idx = self._next_idx
        self.s_stack[idx] = s
        self.a_stack[idx] = a
        self.r_stack[idx] = np.array([r])
        self.s1_stack[idx] = s1
        self.d_stack[idx] = np.array([d])
        self._max_filled = min(max(self._next_idx + 1, self._max_filled), self.size)
        # wrap around if full
        self._next_idx = (self._next_idx + 1) % self.size

    def torch_and_move(self, *np_ndarrays):
        return (torch.from_numpy(x).to(device).float() for x in np_ndarrays)

    def sample(self, batch_size: int) -> Tuple[torch.Tensor]:
        # sample a random batch of data
        idxs = np.random.randint(0, len(self), size=(batch_size,))
        state = self.s_stack[idxs]
        action = self.a_stack[idxs]
        reward = self.r_stack[idxs]
        next_state = self.s1_stack[idxs]
        done = self.d_stack[idxs]
        return self.torch_and_move(state, action, reward, next_state, done)

Now that we've seen the extra pieces, here's the real `learn` function:

In [18]:
def learn(
    buffer : ReplayBuffer,
    target_agent : DDPGAgent,
    agent : DDPGAgent,
    actor_optimizer : torch.optim.Optimizer,
    critic_optimizer : torch.optim.Optimizer,
    batch_size : int,
    gamma : float,
):
    (
        state_batch,
        action_batch,
        reward_batch,
        next_state_batch,
        done_batch,
    ) = buffer.sample(batch_size)

    ###################
    ## Critic Update ##
    ###################

    # compute target values
    with torch.no_grad():
        target_action_s1 = target_agent.actor(next_state_batch)
        target_action_value_s1 = target_agent.critic(next_state_batch, target_action_s1)
        # bootstrapped estimate of Q(s, a) based on reward and target network
        td_target = reward_batch + gamma * (1.0 - done_batch) * target_action_value_s1

    # compute mean squared bellman error (MSE(Q(s, a), td_target))
    agent_critic_pred = agent.critic(state_batch, action_batch)
    critic_loss = F.mse_loss(agent_critic_pred, td_target)
    critic_optimizer.zero_grad()
    # gradient descent step on critic network
    critic_loss.backward()
    # clip gradients for stability
    torch.nn.utils.clip_grad_norm_(agent.critic.parameters(), 10.0)
    critic_optimizer.step()

    ##################
    ## Actor Update ##
    ##################

    # actor's objective is to maximize (or minimize the negative of)
    # the expectation of the critic's opinion of its action choices
    agent_actions = agent.actor(state_batch)
    actor_loss = -agent.critic(state_batch, agent_actions).mean()
    actor_optimizer.zero_grad()
    # gradient descent step on actor network
    actor_loss.backward()
    # clip gradients for stability
    torch.nn.utils.clip_grad_norm_(agent.actor.parameters(), 10.0)
    actor_optimizer.step()

## Putting it all Together: Full DDPG

Next we need to tie the `learn` update into a loop where we interact with the environment and collect new data. In pseudocode:

```python
def ddpg_almost(env):
    agent = DDPGAgent(...)
    # create target (delayed) version of critic
    target_agent = copy(DDPGAgent(...))
    hard_update(target_agent.critic, agent.critic)

    actor_optimizer = torch.optim.Adam(agent.actor.parameters())
    critic_optimizer = torch.optim.Adam(agent.critic.parameters())
    buffer = ReplayBuffer(...)

    for step in range(training_steps):
      # collect new experience *(with actions selected by the policy)*
      action = agent.policy(state)
      next_state, reward, terminated, truncated, _ = env.step(action)
      buffer.push(state, action, reward, next_state, terminated)
      state = next_state
    
      # optimize networks
      learn(buffer, target_agent, agent, actor_optimizer, critic_optimizer, ...)

      # update target critic
      soft_update(target_agent.critic, agent.critic, 5e-3)

    return agent



Notice that we are using our agent's actor network to generate the environment experience that is then added to our buffer. Ultimately, this will make it difficult to learn anything the networks didn't already know. This "exploration vs. exploitation" tradeoff is a central topic in RL: should we learn by doing what we think is best, or risk trying something new even if it is suboptimal?

There are a lot of interesting ways to approach this but for our purposes the simplest solution is random noise. After the agent picks its action we add noise and it ends up doing something slightly different (where "different" hopefully implies "new"). The tradeoff here is that we don't want to introduce so much noise that the agent can never find out what happens when it behaves optimally. Therefore we begin with lots of randomness and reduce the noise linearly over time.

In [19]:
class ExplorationNoise:
    """
    We encourage exploration by adding random noise to our actions during training.
    """

    def __init__(self, size, start_scale=1.0, final_scale=0.1, steps_annealed=10_00):
        assert start_scale >= final_scale
        self.size = size
        self.start_scale = start_scale
        self.final_scale = final_scale
        self.steps_annealed = steps_annealed
        self._current_scale = start_scale
        self._scale_slope = (start_scale - final_scale) / steps_annealed

    def sample(self):
        noise = self._current_scale * np.random.randn(*self.size)
        # linearly decrease the _current_scale
        self._current_scale = max(
            self._current_scale - self._scale_slope, self.final_scale
        )
        return noise


Now the real version of the `ddpg` loop with comments covering the details that haven't been mentioned:

In [20]:
import copy
import tqdm

def ddpg(
    agent: DDPGAgent,
    train_env: gym.Env,
    test_env: gym.Env,
    buffer: ReplayBuffer,
    num_steps: int = 1_000_000,
    transitions_per_step: int = 5,
    batch_size: int = 512,
    tau: float = 0.005,
    actor_lr: float = 1e-4,
    critic_lr: float = 1e-4,
    gamma: float = 0.99,
    exploration_anneal: int = 100_000,
    eval_interval: int = 5000,
    eval_episodes: int = 20,
    warmup_steps: int = 1000,
    name: str = "ddpg_run",
    gradient_updates_per_step: int = 1,
    verbosity: int = 1,
) -> DDPGAgent:

    best_return = -float("inf")
    save_dir = make_unique_dir(name)

    agent.to(device)

    # initialize target networks
    target_agent = copy.deepcopy(agent)
    target_agent.to(device)
    hard_update(target_agent.critic, agent.critic)

    random_process = ExplorationNoise(
        size=train_env.action_space.shape, steps_annealed=exploration_anneal
    )

    critic_optimizer = torch.optim.Adam(agent.critic.parameters(), lr=critic_lr)
    actor_optimizer = torch.optim.Adam(agent.actor.parameters(), lr=actor_lr)

    done = True

    train_iter = range(num_steps)
    if verbosity:
        train_iter = tqdm.tqdm(train_iter)
    for step in train_iter:
        for _ in range(transitions_per_step):
            # collect experience from the environment, sampling from
            # the current policy (with added noise for exploration)
            if done:
                # reset the environment
                state, _ = train_env.reset()
                done = False
            action = agent.policy(state)
            # add random noise (clipped in [-1, 1])
            noisy_action = np.clip(action + random_process.sample(), -1.0, 1.0)
            next_state, reward, terminated, truncated, info = train_env.step(noisy_action)
            # the environment needs to be reset when an episode ends
            done = terminated or truncated
            # add this transition to the replay buffer; early episode ends (truncated) do not "count" for learning
            buffer.push(state, noisy_action, reward, next_state, terminated)
            state = next_state

        if len(buffer) > warmup_steps:
            # we let the buffer fill > batch_size so we don't overfit
            for _ in range(gradient_updates_per_step):
                # update the actor and critics using the replay buffer
                learn(
                    buffer=buffer,
                    target_agent=target_agent,
                    agent=agent,
                    actor_optimizer=actor_optimizer,
                    critic_optimizer=critic_optimizer,
                    batch_size=batch_size,
                    gamma=gamma,
                )

                # move target model towards the online model
                soft_update(target_agent.critic, agent.critic, tau)

        if step % eval_interval == 0 or step == num_steps - 1:
            mean_return = evaluate_agent(agent, test_env, eval_episodes)
            if verbosity:
                print(f"Mean Return After {step} Steps: {mean_return:.2f}")
            if mean_return > best_return:
                agent.save(save_dir)
                best_return = mean_return

    # restore best saved agent
    agent.load(save_dir)
    return agent

That's it! Now we train a DDPGAgent and see how it does. Lots of the hparams are tunable for harder envs.

In [21]:
train_steps = 20_000
exploration_steps = 20_000
buffer_size = min(train_steps, 1_000_000)

In [22]:
train_env = make_env()
test_env = make_env()

agent = DDPGAgent(train_env.observation_space, train_env.action_space, hidden_size=256)
buffer = ReplayBuffer(buffer_size, example_state=train_env.reset()[0], example_action=train_env.action_space.sample())
agent = ddpg(
        agent,
        train_env,
        test_env,
        buffer,
        num_steps=train_steps,
        name="ddpg_run",
        exploration_anneal=exploration_steps,
    )

  0%|          | 41/20000 [00:01<10:09, 32.73it/s] 

Mean Return After 0 Steps: -1474.72


 25%|██▌       | 5022/20000 [00:44<08:29, 29.41it/s]

Mean Return After 5000 Steps: -197.37


 50%|█████     | 10014/20000 [01:29<07:43, 21.54it/s]

Mean Return After 10000 Steps: -169.68


 75%|███████▌  | 15019/20000 [02:13<02:45, 30.08it/s]

Mean Return After 15000 Steps: -152.53


100%|██████████| 20000/20000 [02:57<00:00, 112.77it/s]

Mean Return After 19999 Steps: -178.65



  self.actor.load_state_dict(torch.load(actor_path))
  self.critic.load_state_dict(torch.load(critic_path))


In [23]:
final_eval = evaluate_agent(agent, test_env, 10, render=True)
print(f"Trained DDPG Agent Scores: {final_eval:.2f}")

  logger.warn(
  logger.warn(


Trained DDPG Agent Scores: -168.08
