In [1]:
import random
import gymnasium as gym
from collections import deque
import numpy as np
import torch
import torch.nn as nn
import tqdm
import time
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

seed_value = 0
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)

<torch._C.Generator at 0x7f72707d12f0>

## Environment

In [2]:
env = gym.make("Pendulum-v1")

## Tensorboard Setup

In [7]:
hyper_parameters = {
    'replay-size': 50000,
    'polyak': 0.995,
    'hidden_size': 256,
    'mini_batch_size': 256,
    'update_frequency_iters': 500,
    'num_update_iters': 10,
    'gamma': 0.9,
    'q_network_lr':1e-5,
    'policy_network_lr': 1e-3,
    'num_training_episodes': 50000,
    'explore_noise_scale_start': 0.5,
    'explore_noise_scale_final': 0.001,
    'num_test_episodes': 20,
}

writer = SummaryWriter('runs/ddpg/Pendulum_update_freq_500')

## Experience Replay

In [3]:
class ReplayBuffer:

    def __init__(self, maxsize):
        self.__maxsize = maxsize
        self.__buffer = deque(maxlen=maxsize)

    @property
    def maxsize(self):
        return self.__maxsize

    @property
    def replay_size(self):
        return len(self.__buffer)

    def sample(self, batch_size):
        batch_size = min(batch_size, len(self.__buffer))
        return batch_size, random.sample(self.__buffer, batch_size)
    
    def add(self, tup):
        self.__buffer.append(tup)

## Q-value approxmiator

In [4]:
class QNetwork(nn.Module):

    def __init__(self,
                 observation_dims: int,
                 action_dims:int,
                 hidden_size:int = 128):

        super(QNetwork, self).__init__()
        self.__observation_dims = observation_dims
        self.__action_dims = action_dims

        self.value = nn.Sequential(
            nn.Linear(out_features=hidden_size, in_features=observation_dims + action_dims),
            nn.ReLU(),
            nn.Linear(out_features=hidden_size, in_features=hidden_size),
            nn.ReLU(),
            nn.Linear(out_features=1, in_features=hidden_size)
        )
        
    def forward(self, 
                state,
                action):
        x = torch.cat((state, action), 1)
        return self.value(x)

## Policy Function

In [5]:
class Policy(nn.Module):

    def __init__(self,
                 observation_dims: int,
                 action_dims:int,
                 hidden_size:int = 128):

        super(Policy, self).__init__()
        self.__observation_dims = observation_dims
        self.__action_dims = action_dims

        self.value = nn.Sequential(
            nn.Linear(out_features=hidden_size, in_features=observation_dims),
            nn.ReLU(),
            nn.Linear(out_features=hidden_size, in_features=hidden_size),
            nn.ReLU(),
            nn.Linear(out_features=action_dims, in_features=hidden_size)
        )
        
    def forward(self, 
                x):
        return self.value(x)

## Training Loop

In [8]:
class Trainer:

    def __init__(self,
                 env,
                 replay_size: int,
                 hidden_size: int,
                 gamma: int,
                 q_network_lr: float,
                 policy_network_lr: float,
                 polyak: float,
                 tensorboard_writer):

        self.__env = env
        self.__replay_size = replay_size
        self.__gamma = gamma
        self.__polyak = polyak
        self.__tensorboard_writer = tensorboard_writer

        # Create a experience replay
        self.__exp_replay = ReplayBuffer(maxsize=replay_size)

        # Create Q networks
        self.__q_phi = QNetwork(observation_dims=env.observation_space.shape[0],
                                action_dims=env.action_space.shape[0],
                                hidden_size=hidden_size)
        
        self.__q_tar = QNetwork(observation_dims=env.observation_space.shape[0],
                                action_dims=env.action_space.shape[0],
                                hidden_size=hidden_size)
        
        self.__q_phi.load_state_dict(self.__q_tar.state_dict())

        # Create Policy Networks
        self.__policy_phi = Policy(observation_dims=env.observation_space.shape[0],
                                   action_dims=env.action_space.shape[0],
                                   hidden_size=hidden_size)

        self.__policy_tar = Policy(observation_dims=env.observation_space.shape[0],
                                   action_dims=env.action_space.shape[0],
                                   hidden_size=hidden_size)
        self.__policy_phi.load_state_dict(self.__policy_tar.state_dict())

        # Create loss function for q network
        self.__q_loss_func = nn.MSELoss(reduction='mean')
        self.__q_optimizer = optim.Adam(self.__q_phi.parameters(), lr=q_network_lr)

        # Create loss function for policy network
        self.__policy_optimizer = optim.Adam(self.__policy_phi.parameters(), lr=policy_network_lr)
        

    def get_action(self, 
                   state,
                   noise: float=0.0):
        torch_state = torch.from_numpy(state)
        low = self.__env.action_space.low[0]
        high = self.__env.action_space.high[0]
        
        with torch.no_grad():
            action = self.__policy_phi(torch_state).numpy()
            
            if noise > 0.0:
                action_noise = np.random.normal(scale=noise, size=action.shape[0])
                action += action_noise
            
            action = np.clip(action, low, high)
            return action

    def update_network(self,
                       batch_size: int,
                       epoch: int):
        
        # Sample a batch of transitions from the replay
        num_samples, samples = self.__exp_replay.sample(batch_size)
        
        if num_samples > 0:

            states = []
            actions = []
            rewards = []
            next_states = []
            dones = []
            for sample in samples:
                s, a, r, n_s, d = sample
                states.append(s)
                actions.append(a)
                rewards.append(r)
                next_states.append(n_s)
                dones.append(d)

            states = torch.Tensor(np.array(states))
            actions = torch.Tensor(np.array(actions))
            rewards = torch.Tensor(np.array(rewards))
            next_states = torch.Tensor(np.array(next_states))
            dones = torch.Tensor(1 - np.array(dones))

            # Compute Target for Q value update
            with torch.no_grad():
                Q_s = self.__q_tar(next_states, self.__policy_tar(next_states))
                
            target = rewards.unsqueeze(dim=1) + self.__gamma * dones.unsqueeze(dim=1) * Q_s
            Q = self.__q_phi(states, actions)

            self.__tensorboard_writer.add_scalar("batch_reward", rewards.mean(), epoch)            
            
            q_loss = self.__q_loss_func(Q, target)
            self.__q_optimizer.zero_grad()
            q_loss.backward()
            
            # Clip the gradients
            torch.nn.utils.clip_grad_norm_(self.__q_phi.parameters(), 1.0)

            self.__q_optimizer.step()


            if epoch % 500 == 0:
                # Write the gradients to TensorBoard
                for name, param in self.__q_phi.named_parameters():
                    if param.requires_grad:
                        self.__tensorboard_writer.add_histogram("critic/" + name + "/gradient", param.grad.data.cpu().numpy(), epoch)

            # Update Policy weights
            policy_loss = - self.__q_phi(states, self.__policy_phi(states)).mean()

            self.__policy_optimizer.zero_grad()
            policy_loss.backward()

            # Clip the gradients
            torch.nn.utils.clip_grad_norm_(self.__policy_phi.parameters(), 1.0)

            self.__policy_optimizer.step()

            if epoch % 500 == 0:
                # Write the gradients to TensorBoard
                for name, param in self.__policy_phi.named_parameters():
                    if param.requires_grad:
                        self.__tensorboard_writer.add_histogram("actor/" + name + "/gradient", param.grad.data.cpu().numpy(), epoch)

            # Softly move the weights of target network
            for param, target_param in zip(self.__q_phi.parameters(), self.__q_tar.parameters()):
                target_param.data.copy_( self.__polyak * target_param.data + (1 - self.__polyak) * param.data )

            for param, target_param in zip(self.__policy_phi.parameters(), self.__policy_tar.parameters()):
                target_param.data.copy_( self.__polyak * target_param.data + (1 - self.__polyak) * param.data )
            
            return q_loss.item(), policy_loss.item()


    def run_test_episode(self):
        state, info = env.reset()
        
        done = False
        cum_reward = 0
        
        while not done:
            action = self.get_action(state,
                                     0.0)
            # Execute the action
            next_state, reward, terminated, truncated, info = env.step(action)
            
            if terminated or truncated:
                done = True
            cum_reward += reward
            state = next_state
        return cum_reward
        
    def train(self,
             train_episodes: int,
             noise_scale_start: float,
             noise_scale_final: float,
             minibatch_size: int,
             weight_update_frequency: int,
             weight_update_iters: int,
             num_test_episodes: int):

        assert noise_scale_start > noise_scale_final
        
        curr_noise = noise_scale_start
        noise_decay_constant = np.abs(noise_scale_start - noise_scale_final)/train_episodes

        # Num of iterations so far
        total_iters = 0
        
        for episode in tqdm.tqdm(range(0, train_episodes)):
            state, info = env.reset()

            # Keep track of num of steps in episode
            num_steps = 0

            # Cummulative reward of the episode
            cum_reward = 0

            done = False
            while not done:

                total_iters += 1
                
                # Get action from Q-network 
                action = self.get_action(state,
                                         curr_noise)

                # Execute the action
                next_state, reward, terminated, truncated, info = env.step(action)

                cum_reward += reward

                # Store the sample in Replay buffer (s, a, r, s',d)
                self.__exp_replay.add((state, action, reward, next_state, terminated))
                
                if terminated or truncated:
                    done = True

                state = next_state
                num_steps += 1

                
                # Update the weights of the network at update frequency
                if total_iters % weight_update_frequency == 0 and self.__exp_replay.replay_size > 10000:
                    q_losses = []
                    policy_losses = []
                    
                    for _iter_ in range(0, weight_update_iters):
                        q_loss, policy_loss = self.update_network(minibatch_size, total_iters)
                        q_losses.append(q_loss)
                        policy_losses.append(policy_loss)

                    q_losses = np.array(q_losses)
                    policy_losses = np.array(policy_losses)

                    # Log network Losses
                    self.__tensorboard_writer.add_scalar('TD Error', q_losses.mean(), total_iters)
                    self.__tensorboard_writer.add_scalar('Policy Objective', policy_losses.mean(), total_iters)

            # Decrease the noise after every episode
            curr_noise -= noise_decay_constant
            
            # Write the epsiode reward to TensorBoard
            self.__tensorboard_writer.add_scalar('Episode Reward', cum_reward, episode)

            if episode % 100 == 0:
                test_cum_reward = []
                for test_episode in range(0, num_test_episodes):
                    test_cum_reward.append(self.run_test_episode())
                test_cum_reward = np.array(test_cum_reward)
                self.__tensorboard_writer.add_scalar('Test Episode Reward', test_cum_reward.mean(), episode)
            

trainer = Trainer(env,
                  hyper_parameters['replay-size'],
                  hyper_parameters['hidden_size'],
                  hyper_parameters['gamma'],
                  hyper_parameters['q_network_lr'],
                  hyper_parameters['policy_network_lr'],
                  hyper_parameters['polyak'],
                  writer)

trainer.train(train_episodes=hyper_parameters['num_training_episodes'],
              noise_scale_start=hyper_parameters['explore_noise_scale_start'],
              noise_scale_final=hyper_parameters['explore_noise_scale_final'],
              minibatch_size=hyper_parameters['mini_batch_size'],
              weight_update_frequency=hyper_parameters['update_frequency_iters'],
              weight_update_iters=hyper_parameters['num_update_iters'],
              num_test_episodes=hyper_parameters['num_test_episodes'])

  2%|▋                                    | 979/50000 [06:36<5:30:54,  2.47it/s]


KeyboardInterrupt: 

### Test Code

In [None]:
# Test Agent
num_test_episodes = 1
test_env = env = gym.make("BipedalWalker-v3", hardcore=False, render_mode="human")

for episode in range(0, num_test_episodes):

    test_env.reset()
    # Keep track of num of steps in episode
    num_steps = 0
    
    done = False
    while not done:
        test_env.render()
        action = test_env.action_space.sample()
        observation, reward, terminated, truncated, info = test_env.step(action)

        num_steps += 1
        
        if terminated or truncated:
            done = True
        time.sleep(0.01)

    print("Episodes: {}, Num Steps: {}".format(episode+1, num_steps))
test_env.close()

In [None]:
observation, info = env.reset(seed=42)
for _ in range(1000):
   action = env.action_space.sample()  # this is where you would insert your policy
   observation, reward, terminated, truncated, info = env.step(action)
   print(observation.shape)
   if terminated or truncated:
      observation, info = env.reset()

env.close()