# Car Racing Implementation of Twin Delayed Deep Deterministic Policy Gradients (TD3)


The following is an implementation based on the following source material:

Paper: https://arxiv.org/abs/1802.09477

Source: https://github.com/sfujim/TD3/blob/master/TD3.py

In [56]:
from car_racing import CarRacing
import numpy as np
import time
import torch
import copy
import torch.nn as nn
import torch.nn.functional as F
import time
import matplotlib
import matplotlib.pyplot as plt
from tqdm.notebook import trange
from pygame_screen_record.ScreenRecorder import ScreenRecorder, cleanup, add_codec

Setting to an available device


In [57]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
add_codec("mp4", "mp4v")

## Actor Class

In [4]:
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action, hidden_dims):
        """
        Initializes the Actor class.

        Args:
            state_dim (int): Dimension of the input state.
            action_dim (int): Dimension of the output action.
            max_action (float): Maximum value of the action.
            hidden_dims (list): List of integers representing the number of neurons in each hidden layer.

        Returns:
            None
        """
        super(Actor, self).__init__()

        layers = []
        prev_layer_neurons = state_dim
        for neurons in hidden_dims:
            layers.append(nn.Linear(prev_layer_neurons, neurons))
            layers.append(nn.ReLU())
            prev_layer_neurons = neurons

        layers.append(nn.Linear(prev_layer_neurons, action_dim))
        self.layers = nn.Sequential(*layers)
        self.max_action = max_action

    def forward(self, state):
        """
        Forward pass of the neural network.

        Args:
            state (torch.Tensor): The input state tensor.

        Returns:
            torch.Tensor: The output tensor after applying the forward pass.
        """
        return self.max_action * torch.tanh(self.layers(state))

# Critic Class

In [5]:
class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dims):
        """
        Initialize the Critic class.

        Args:
            state_dim (int): Dimension of the state space.
            action_dim (int): Dimension of the action space.
            hidden_dims (list): List of integers representing the number of neurons in each hidden layer.

        Returns:
            None
        """
        super(Critic, self).__init__()

        # Q1 architecture
        layers_q1 = []
        prev_layer_neurons = state_dim + action_dim
        for neurons in hidden_dims:
            layers_q1.append(nn.Linear(prev_layer_neurons, neurons))
            layers_q1.append(nn.ReLU())
            prev_layer_neurons = neurons

        layers_q1.append(nn.Linear(prev_layer_neurons, 1))
        self.layers_q1 = nn.Sequential(*layers_q1)

        # Q2 architecture
        layers_q2 = []
        prev_layer_neurons = state_dim + action_dim
        for neurons in hidden_dims:
            layers_q2.append(nn.Linear(prev_layer_neurons, neurons))
            layers_q2.append(nn.ReLU())
            prev_layer_neurons = neurons

        layers_q2.append(nn.Linear(prev_layer_neurons, 1))
        self.layers_q2 = nn.Sequential(*layers_q2)

    def forward(self, state, action):
        """
        Forward pass of the model.

        Args:
            state (torch.Tensor): The input state tensor.
            action (torch.Tensor): The input action tensor.

        Returns:
            torch.Tensor: The Q-values for the given state-action pair.
        """
        sa = torch.cat([state, action], 1)
        q1 = self.layers_q1(sa)
        q2 = self.layers_q2(sa)
        return q1, q2

    def Q1(self, state, action):
        """
        Compute the Q-value for a given state-action pair.

        Parameters:
        state (torch.Tensor): The state tensor.
        action (torch.Tensor): The action tensor.

        Returns:
        torch.Tensor: The Q-value tensor.

        """
        sa = torch.cat([state, action], 1)
        return self.layers_q1(sa)


## TD3 Implemenatation

In [6]:
class TD3(object):
    def __init__(
        self,
        state_dim,
        action_dim,
        hidden_dims,
        max_action,
        discount=0.99,
        tau=0.005,
        policy_noise=0.2,
        noise_clip=0.5,
        policy_freq=2,
        learning_rate=3e-4
    ):
        """
        Initializes the TD3 (Twin Delayed Deep Deterministic Policy Gradient) agent.

        Args:
            state_dim (int): Dimension of the state space.
            action_dim (int): Dimension of the action space.
            hidden_dims (list): List of integers representing the sizes of hidden layers in the actor and critic networks.
            max_action (float): Max upper bound for action values.
            discount (float, optional): Discount factor for future rewards. Defaults to 0.99.
            tau (float, optional): Target network update rate. Defaults to 0.005.
            policy_noise (float, optional): Noise added to target policy during critic update. Defaults to 0.2.
            noise_clip (float, optional): Range to clip target policy noise. Defaults to 0.5.
            policy_freq (int, optional): Frequency of delayed policy updates. Defaults to 2.
            learning_rate (float, optional): Learning rate for the actor and critic networks. Defaults to 3e-4.
        """
        self.actor = Actor(state_dim, action_dim, max_action, hidden_dims).to(device)
        self.actor_target = copy.deepcopy(self.actor)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=learning_rate)

        self.critic = Critic(state_dim, action_dim, hidden_dims).to(device)
        self.critic_target = copy.deepcopy(self.critic)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=learning_rate)

        self.max_action = max_action
        self.discount = discount
        self.tau = tau
        self.policy_noise = policy_noise
        self.noise_clip = noise_clip
        self.policy_freq = policy_freq

        self.total_it = 0

    def select_action(self, state):
        """
        Selects an action based on the given state.

        Parameters:
        state (numpy.ndarray): The current state of the environment.

        Returns:
        numpy.ndarray: The selected action.

        """
        state = torch.FloatTensor(state.reshape(1, -1)).to(device)
        return self.actor(state).cpu().data.numpy().flatten()


    def train(self, replay_memory, batch_size=256):
        """
        Trains the TD3 agent using the given replay buffer.

        Args:
            replay_memory (ReplayMemory): The replay buffer containing the agent's experience.
            batch_size (int, optional): The batch size for sampling from the replay buffer. Defaults to 256.
        """
        self.total_it += 1

        # Sample replay memory
        state, action, next_state, reward, not_done = replay_memory.sample(batch_size)

        with torch.no_grad():
            #clip noise
            noise = (
                    torch.randn_like(action) * self.policy_noise
            ).clamp(-self.noise_clip, self.noise_clip)

            # action selection
            next_action = (
                    self.actor_target(next_state) + noise
            ).clamp(-self.max_action, self.max_action)

            # Compute target Q value
            target_Q1, target_Q2 = self.critic_target(next_state, next_action)
            target_Q = torch.min(target_Q1, target_Q2)
            target_Q = reward + not_done * self.discount * target_Q

        # Q estimates
        current_Q1, current_Q2 = self.critic(state, action)

        # Compute critic loss
        critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(current_Q2, target_Q)

        # Optimize critic
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # Delayed policy updates
        if self.total_it % self.policy_freq == 0:

            # Compute actor loss
            actor_loss = -self.critic.Q1(state, self.actor(state)).mean()

            # Optimize the actor
            self.actor_optimizer.zero_grad()
            actor_loss.backward()
            self.actor_optimizer.step()

            # Update frozen target models
            for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

            for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
                target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)


    def save(self, filename):
        """
        Save the state of the TD3 agent to the specified file.

        Parameters:
        - filename (str): The name of the file to save the agent's state.

        Returns:
        - None
        """
        torch.save(self.critic.state_dict(), filename + "_critic")
        torch.save(self.critic_optimizer.state_dict(), filename + "_critic_optimizer")

        torch.save(self.actor.state_dict(), filename + "_actor")
        torch.save(self.actor_optimizer.state_dict(), filename + "_actor_optimizer")

    def load(self, filename):
        """
        Loads the saved model parameters from the specified file.

        Args:
            filename (str): The name of the file (without extension) containing the saved model parameters.

        Returns:
            None
        """
        self.critic.load_state_dict(torch.load(filename + "_critic"))
        self.critic_optimizer.load_state_dict(torch.load(filename + "_critic_optimizer"))
        self.critic_target = copy.deepcopy(self.critic)

        self.actor.load_state_dict(torch.load(filename + "_actor"))
        self.actor_optimizer.load_state_dict(torch.load(filename + "_actor_optimizer"))
        self.actor_target = copy.deepcopy(self.actor)

## Replay Memory

Replay Memory is data-structure where we store previous experiences so that we can re-sample and train on them. These experiences start with a random policy and then picked according to the current policy from TD3. Note we store whether the state is not done rather than done for use in target Q calculations.

In [7]:
class ReplayMemory(object):
    def __init__(self, state_dim, action_dim, max_size=int(1e6)):
        """Replay memory implemented as a circular buffer.

        Experiences will be removed in a FIFO manner after reaching maximum
        buffer size.

        Args:
            - max_size: Maximum size of the buffer.
            - state_size: Size of the state-space features for the environment.
            - action_dim: Size of the action space for the environment.
        """
        self.max_size = max_size

        # preallocating all the required memory, for speed concerns
        self.state = np.zeros((max_size, state_dim))
        self.action = np.zeros((max_size, action_dim))
        self.next_state = np.zeros((max_size, state_dim))
        self.reward = np.zeros((max_size, 1))
        self.not_done = np.zeros((max_size, 1))

        # pointer to the current location in the circular buffer
        self.idx = 0
        # indicates number of transitions currently stored in the buffer
        self.size = 0

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def add(self, state, action, next_state, reward, done):
        """Add a transition to the buffer.

        :param state:  state_dim np.ndarray of state-features.
        :param action: action_dim size action.
        :param reward:  float reward.
        :param next_state:  state_dim np.ndarray of state-features.
        :param done:  boolean value indicating the end of an episode.
        """

        # Store the input values into the appropriate attributes, using the current buffer position `self.idx`
        self.state[self.idx] = state
        self.action[self.idx] = action
        self.next_state[self.idx] = next_state
        self.reward[self.idx] = reward
        self.not_done[self.idx] = 1. - done

        # circulate the pointer to the next position
        self.idx = (self.idx + 1) % self.max_size
        # update the current buffer size
        self.size = min(self.size + 1, self.max_size)


    def sample(self, batch_size):
        """Sample a batch of experiences.

        If the buffer contains less that `batch_size` transitions, sample all
        of them.

        :param batch_size:  Number of transitions to sample.
        """
        # sample_indices = np.random.randint(0, self.size, size=batch_size)
        # Randomly sample an appropriate number of transitions without replacement
        # If the buffer contains less than `batch_size` transitions, return all of them
        if self.size < batch_size:
            ind = np.random.choice(self.size, self.size, replace=False)
        else:
            ind = np.random.choice(self.size, batch_size, replace=False)

        return (
            torch.FloatTensor(self.state[ind]).to(self.device),
            torch.FloatTensor(self.action[ind]).to(self.device),
            torch.FloatTensor(self.next_state[ind]).to(self.device),
            torch.FloatTensor(self.reward[ind]).to(self.device),
            torch.FloatTensor(self.not_done[ind]).to(self.device)
        )

Setting up matplotlib 

In [8]:
plt.ion()
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

Train/Load td3 car implementation. If you wish to have a different seed for each reset, set the value of track seed to None. 

In [63]:
def td3_car(save_file_name='default_model', 
                max_train_timesteps=2_000_000,
                eval_interval=5000, 
                load_file='', 
                track_seed=123, 
                hidden_dims=[512, 256],
                max_action=1,
                action_dim=3, 
                state_size=15, 
                random_policy_steps=5000,
                batch_size=512,
                explore_noise=0.1,
                policy_noise=0.2,
                nose_clip=0.5,
                deterministic_train=True,
                deterministic_eval=True):
    """
    Train and evaluate a TD3 (Twin Delayed Deep Deterministic Policy Gradient) agent for a car racing environment.

    Parameters:
    - save_file_name (str): The name of the file to save the trained model.
    - max_train_timesteps (int): The maximum number of training timesteps.
    - eval_only (bool): If True, only perform evaluation without training.
    - eval_interval (int): The interval (in timesteps) between evaluations.
    - load_file (str): The name of the file to load a pre-trained model from.
    - track_seed (int): The seed for the car racing environment.
    - hidden_dims (list): The dimensions of the hidden layers in the neural network.
    - max_action (float): The maximum action value.
    - action_dim (int): The dimension of the action space.
    - state_size (int): The dimension of the state space.
    - random_policy_steps (int): The number of timesteps to follow a random policy before using the learned policy.
    - batch_size (int): The size of the training batch.
    - explore_noise (float): The amount of noise added to the action during exploration.
    - policy_noise (float): The amount of noise added to the target policy during training.
    - nose_clip (float): The clipping value for the noise added to the target policy during training.
    - deterministic_train (bool): If True, use a deterministic policy during training.
    - deterministic_eval (bool): If True, use a deterministic policy during evaluation.
    """
    start = time.time()
    train_env = CarRacing(render_mode="none", continuous=True, deterministic=deterministic_train)
    eval_env = CarRacing(render_mode="human", continuous=True, deterministic=deterministic_eval)
    policy = TD3(state_dim=state_size, action_dim=action_dim,
                 hidden_dims=hidden_dims,
                 max_action=max_action, policy_noise=policy_noise, noise_clip=nose_clip)

    if load_file != '':
        policy.load(f"./{load_file}")

    print(f'track_seed: {track_seed}')
    state = train_env.reset(seed=track_seed)
    train_rewards = []
    eval_rewards = []
    max_eval_reward = 0
    max_train_reward = 0
    episode_reward = 0
    episode_timesteps = 0
    episode_num = 0

    replay_memory = ReplayMemory(state_size, action_dim)

    eval_reward = eval_policy(policy, eval_env, track_seed)
    eval_rewards.append(eval_reward)

    for t in trange(1, max_train_timesteps + 1, desc="Training"):
        episode_timesteps += 1

        # Select action randomly or according to policy
        if t < random_policy_steps:
            action = train_env.action_space.sample()
        else:
            action = (
                    policy.select_action(np.array(state))
                    + np.random.normal(0, max_action * explore_noise, size=action_dim)
            ).clip(-max_action, max_action)

        # Perform action
        next_state, reward, terminated, truncated = train_env.step(action)
        done = float(terminated or truncated)

        # Store data in replay buffer
        replay_memory.add(state, action, next_state, reward, done)

        state = next_state
        episode_reward += reward

        # Train agent after collecting sufficient data
        if t >= random_policy_steps:
            policy.train(replay_memory, batch_size)

        if done:
            print(
                f"Total steps: {t + 1}, "
                f"Episode Num: {episode_num + 1}, "
                f"Episode steps: {episode_timesteps}, "
                f"Reward: {episode_reward:.3f}")
            state, done = train_env.reset(), 0
            if episode_reward > max_train_reward:
                max_train_reward = episode_reward
            # append to both train and eval to keep them with the same number
            train_rewards.append(episode_reward)
            eval_rewards.append(eval_reward)
            # +1 to account for 0 indexing. +0 on ep_timesteps since it will increment +1 even if done=True
            episode_reward = 0
            episode_timesteps = 0
            episode_num += 1
            plot_durations(test_rewards=eval_rewards, train_rewards=train_rewards, hidden_dims=hidden_dims, eval_interval=eval_interval, batch_size=batch_size,
                           timestep=t, max_training_reward=max_train_reward)

        if t % eval_interval == 0:
            # append to both train and eval to keep them with the same number
            eval_reward = eval_policy(policy, eval_env, track_seed)
            train_rewards.append(episode_reward)
            eval_rewards.append(eval_reward)

            if eval_reward > max_eval_reward:
                max_eval_reward = eval_reward
                if max_eval_reward > 900:
                    policy.save(f"./models/{save_file_name}")
                    print(f"saved model {save_file_name}")

            plot_durations(test_rewards=eval_rewards, train_rewards=train_rewards, batch_size=batch_size,
                           timestep=t, max_training_reward=max_train_reward, hidden_dims=hidden_dims, eval_interval=eval_interval)

    plot_durations(test_rewards=eval_rewards, train_rewards=train_rewards, hidden_dims=hidden_dims, eval_interval=eval_interval, show_result=True,
                   timestep=max_train_timesteps+1, max_training_reward=max_train_reward, batch_size=batch_size)
    plt.ioff()
    plt.show()
    train_env.close()
    eval_env.close()
    print(f'total time: {time.time() - start}')


def eval_policy(policy, env, track_seed):
    """
    Evaluates a given policy on the environment.

    Args:
        policy: The policy to evaluate.
        env: The environment to evaluate the policy on.
        track_seed: The seed value for the environment.

    Returns:
        total_reward: The total reward obtained by the policy during evaluation.
    """
    done = False
    state = env.reset(seed=track_seed)
    total_reward = 0

    while not done:
        action = policy.select_action(np.array(state))
        state, reward, done, truncated = env.step(action)
        total_reward += reward
        done = done or truncated

    return total_reward


def plot_durations(test_rewards, 
                   train_rewards, 
                   timestep, 
                   max_training_reward, 
                   eval_interval,
                   show_result=False,
                   ):
    """
    Plot the rewards over episodes for the TD3 implementation.

    Args:
        test_rewards (list): List of rewards obtained during testing.
        train_rewards (list): List of rewards obtained during training.
        timestep (int): Current timestep.
        max_training_reward (float): Maximum training reward.
        eval_interval (int): Interval at which evaluation is performed.
        show_result (bool, optional): Whether to show the result or not. Defaults to False.
    """
    fig = plt.figure(1, figsize=(9, 6))
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title("Reward over episodes for TD3 Implementation")

    next_eval = abs((timestep % eval_interval) - eval_interval)
    plt.xlabel(f'Episode ({len(train_rewards)}), '
               f'next eval: {next_eval}', fontsize=20)
    plt.ylabel(f'Reward (max eval: {max(test_rewards):5.1f}'
               f', max train: {max_training_reward:5.1f})', fontsize=15)
    plt.plot(train_rewards, label='Train Reward')
    plt.plot(test_rewards, label='Test Reward')
    plt.legend(loc='upper left')

    fig.canvas.start_event_loop(0.001)  # this updates the plot and doesn't steal window focus
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

def load_td3_car(action_dim=3, 
                 state_size=15, 
                 max_action=1, 
                 load_file="default_model", 
                 num_runs=1, 
                 deterministic_eval=True, 
                 track_seed=123, 
                 hidden_dims=[512, 256], 
                 policy_noise=0.2, 
                 nose_clip=0.5,
                 record=False,
                 recording_name="recordings/default_recording"):
    """
    Loads a TD3 car model and evaluates its performance on a given environment.

    Parameters:
    - action_dim (int): Dimension of the action space.
    - state_size (int): Dimension of the state space.
    - max_action (float): Maximum value of the action.
    - load_file (str): File path to load the pre-trained model from.
    - num_runs (int): Number of evaluation runs to perform.
    - deterministic_eval (bool): Whether to perform deterministic evaluation.
    - track_seed (int): Seed value for the evaluation environment.
    - hidden_dims (list): List of hidden layer dimensions for the policy network.
    - policy_noise (float): Standard deviation of the policy noise.
    - nose_clip (float): Clipping value for the policy noise.
    - record (bool): Whether to record the evaluation runs.
    - recording_name (str): File path to save the recordings.

    Returns:
    - None
    """
    eval_env = CarRacing(render_mode="human", continuous=True, deterministic=deterministic_eval)
    policy = TD3(state_dim=state_size, action_dim=action_dim,
                 hidden_dims=hidden_dims,
                 max_action=max_action, policy_noise=policy_noise, noise_clip=nose_clip)

    policy.load(f"./{load_file}")
    times = []
    eval_rewards = []
    for run in range(num_runs):
        if record:
            recorder = ScreenRecorder(60) # Pass your desired fps
            recorder.start_rec() # Start recording
        print(f'track_seed: {track_seed}')
        start = time.time()
        eval_reward = eval_policy(policy, eval_env, track_seed)
        total_time = time.time() - start
        if record:
            recorder.stop_rec().get_single_recording().save((recording_name + "_" + str(run), "mp4"))
        print(f'total reward: {eval_reward}')
        print(f'total time: {total_time}')
        times.append(total_time)
        eval_rewards.append(eval_reward)
    print(f'average time: {np.average(times)}')
    print(f'average reward: {np.average(eval_rewards)}')
    return

Training the model

In [None]:
td3_car("test_model")

Testing the model

In [65]:
load_td3_car(load_file="models/test_model", num_runs=3, record=False, recording_name="recordings/test_recording")

track_seed: 123
total reward: 911.2888198757612
total time: 17.03707194328308
track_seed: 123
total reward: 911.2888198757612
total time: 16.760058879852295
track_seed: 123
total reward: 911.2888198757612
total time: 17.62105417251587
average time: 17.139394998550415
average reward: 911.2888198757613
