In [1]:
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m20.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


In [8]:
import numpy as np
import torch
import torch.nn as nn
from collections import namedtuple, deque
from copy import deepcopy
import matplotlib.pyplot as plt

# Definition of a 3-layer neural net with tanh activation

1.   List item
2.   List item



In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class Net(nn.Module):
    def __init__(self, n_inputs, n_outputs, bias=True):
        super().__init__()
        self.activation_function= nn.Tanh()

        self.layer1 = nn.Linear( #<--- linear layer
            n_inputs, #<----------------#input features
            64,#<-----------------------#output features
            bias=bias)#<----------------bias

        self.layer2 = nn.Linear(
            64,
            32,
            bias=bias)

        self.layer3 = nn.Linear(
                    32,
                    n_outputs,
                    bias=bias)


    def forward(self, x):
        x = self.activation_function( self.layer1(x) )
        x = self.activation_function( self.layer2(x) )
        y = self.layer3(x)

        return y


# Q network definition

In [37]:
class Q_network(nn.Module):
    def __init__(self, env, learning_rate=1e-4):
        super().__init__()

        # Define number of inputs (state features) and outputs (actions)
        n_inputs = env.observation_space.shape[0]  # CartPole state has 4 features
        n_outputs = env.action_space.n  # CartPole has 2 possible actions

        # Initialize the neural network
        self.network = Net(n_inputs, n_outputs)

        # Optimizer
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=learning_rate)

    def greedy_action(self, state):
        """Select the action with the highest Q-value."""
        with torch.no_grad():
            q_values = self.network(state)
            greedy_a = torch.argmax(q_values).item()
        return greedy_a

    def get_qvalues(self, state):
      """Return Q-values for the given state."""
      q_values = self.network(state)
      return q_values



## Experience replay buffer

In [38]:
class Experience_replay_buffer:
    def __init__(self, memory_size=50000, burn_in=10000):
        self.memory_size = memory_size
        self.burn_in = burn_in
        self.Buffer = namedtuple('Buffer', ['state', 'action', 'reward', 'done', 'next_state'])
        self.replay_memory = deque(maxlen=memory_size)

    def sample_batch(self, batch_size=32):
        """Sample a batch of experiences from the replay buffer."""
        samples = np.random.choice(len(self.replay_memory), batch_size, replace=False)
        batch = zip(*[self.replay_memory[i] for i in samples])
        return batch

    def append(self, s_0, a, r, d, s_1):
        """Add new experience to the buffer."""
        self.replay_memory.append(self.Buffer(s_0, a, r, d, s_1))

    def burn_in_capacity(self):
        """Check the fraction of burn-in completed."""
        return len(self.replay_memory) / self.burn_in

    def capacity(self):
        """Check the fraction of the replay buffer filled."""
        return len(self.replay_memory) / self.memory_size


# DDQN agent implementation

In [41]:
def from_tuple_to_tensor(tuple_of_np):
    tensor = torch.zeros((len(tuple_of_np), tuple_of_np[0].shape[0]))
    for i, x in enumerate(tuple_of_np):
        tensor[i] = torch.FloatTensor(x)
    return tensor





class DDQN_agent:
    def __init__(self, env, rew_thre, buffer, learning_rate=0.001, initial_epsilon=0.5, batch_size=64):
        self.env = env
        self.network = Q_network(env, learning_rate).to(device)
        self.target_network = deepcopy(self.network)
        self.buffer = buffer
        self.epsilon = initial_epsilon
        self.batch_size = batch_size
        self.reward_threshold = rew_thre
        self.window = 50
        self.initialize()

    def initialize(self):
      self.training_rewards = []
      self.training_loss = []
      self.update_loss = []
      self.mean_training_rewards = []
      self.sync_eps = []
      self.rewards = 0
      self.step_count = 0
      self.episode = 0




    def take_step(self, mode='exploit'):
        # choose action with epsilon greedy
        if mode == 'explore':
                action = self.env.action_space.sample()
        else:
                action = self.network.greedy_action(torch.FloatTensor(self.s_0).to(device))

        #simulate action
        s_1, r, terminated, truncated, _ = self.env.step(action)
        done = terminated or truncated

        # Store the experience in the replay buffer
        self.buffer.append(self.s_0, action, r, done, s_1)

        self.rewards += r
        self.s_0 = s_1.copy()
        self.step_count += 1

        if done:
          self.s_0, _ = self.env.reset()
        return done

    # Implement DQN training algorithm
    def train(self, gamma=0.99, max_episodes=10000,
              network_update_frequency=10,
              network_sync_frequency=200):
        self.gamma = gamma

        self.loss_function = nn.MSELoss()
        self.s_0, _ = self.env.reset()

        # Populate replay buffer
        while self.buffer.burn_in_capacity() < 1:
            self.take_step(mode='explore')
        ep = 0
        training = True
        self.populate = False
        while training:
            self.s_0, _ = self.env.reset()

            self.rewards = 0
            done = False
            while not done:
                if ((ep % 5) == 0):
                    self.env.render()

                p = np.random.random()
                if p < self.epsilon:
                    done = self.take_step(mode='explore')
                    # print("explore")
                else:
                    done = self.take_step(mode='exploit')
                    # print("train")
                # Update network
                if self.step_count % network_update_frequency == 0:
                    self.update()
                # Sync networks
                if self.step_count % network_sync_frequency == 0:
                  self.target_network.load_state_dict(self.network.state_dict())
                  self.sync_eps.append(self.episode)

                if done:
                    if self.epsilon >= 0.05:
                        self.epsilon = self.epsilon * 0.7
                    ep += 1
                    if self.rewards > 2000:
                        self.training_rewards.append(2000)
                    elif self.rewards > 1000:
                        self.training_rewards.append(1000)
                    elif self.rewards > 500:
                        self.training_rewards.append(500)
                    else:
                        self.training_rewards.append(self.rewards)
                    if len(self.update_loss) == 0:
                        self.training_loss.append(0)
                    else:
                        self.training_loss.append(np.mean(self.update_loss))
                    self.update_loss = []
                    mean_rewards = np.mean(self.training_rewards[-self.window:])
                    mean_loss = np.mean(self.training_loss[-self.window:])
                    self.mean_training_rewards.append(mean_rewards)
                    print(
                        "\rEpisode {:d} Mean Rewards {:.2f}  Episode reward = {:.2f}   mean loss = {:.2f}\t\t".format(
                            ep, mean_rewards, self.rewards, mean_loss), end="")

                    if ep >= max_episodes:
                        training = False
                        print('\nEpisode limit reached.')
                        break
                    if mean_rewards >= self.reward_threshold:
                        training = False
                        print('\nEnvironment solved in {} episodes!'.format(
                            ep))
                        break
        # save models
        self.save_models()
        # plot
        self.plot_training_rewards()

    def save_models(self):
        torch.save(self.network, "Q_net")

    def load_models(self):
        self.network = torch.load("Q_net")
        self.network.eval()

    def plot_training_rewards(self):
        plt.plot(self.mean_training_rewards)
        plt.title('Mean training rewards')
        plt.ylabel('Reward')
        plt.xlabel('Episods')
        plt.show()
        plt.savefig('mean_training_rewards.png')
        plt.clf()

    def calculate_loss(self, batch):
      # Extract info from batch
      states, actions, rewards, dones, next_states = list(batch)

      # Convert data to torch tensors with requires_grad set to True
      rewards = torch.FloatTensor(rewards).reshape(-1, 1).to(device).requires_grad_(False)
      actions = torch.LongTensor(actions).reshape(-1, 1).to(device).requires_grad_(False)
      dones = torch.FloatTensor(dones).reshape(-1, 1).to(device).requires_grad_(False)
      states = from_tuple_to_tensor(states).to(device).requires_grad_(True)
      next_states = from_tuple_to_tensor(next_states).to(device).requires_grad_(False)

      # Ensure gradients are being tracked for the Q-values
      q_values = self.network.get_qvalues(states)  # This needs to track gradients
      q_values = q_values.gather(1, actions)

      # Calculate the target Q-values using the target network with no_grad()
      with torch.no_grad():
        next_q_values = self.network.get_qvalues(next_states)
        next_actions = torch.argmax(next_q_values, dim=1, keepdim=True)
        next_q_targets = self.target_network.get_qvalues(next_states)
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_targets.gather(1, next_actions)

      # Compute loss
      loss = self.loss_function(q_values, target_q_values)

      # Debugging: Check if tensors require gradients
      print(f"q_values.requires_grad: {q_values.requires_grad}")
      print(f"target_q_values.requires_grad: {target_q_values.requires_grad}")
      print(f"loss.requires_grad: {loss.requires_grad}")

      return loss




    def update(self):
      self.network.optimizer.zero_grad()  # Reset gradients
      batch = self.buffer.sample_batch(batch_size=self.batch_size)
      loss = self.calculate_loss(batch)   # Calculate loss with gradients enabled
      loss.backward()                     # Perform backpropagation
      self.network.optimizer.step()       # Update the network weights

      self.update_loss.append(loss.item())


    def initialize(self):
        self.training_rewards = []
        self.training_loss = []
        self.update_loss = []
        self.mean_training_rewards = []
        self.sync_eps = []
        self.rewards = 0
        self.step_count = 0

    def evaluate(self, eval_env):
        done = False
        s, _ = eval_env.reset()
        rew = 0
        while not done:
            action = self.network.greedy_action(torch.FloatTensor(s).to(device))
            s, r, terminated, truncated, _ = eval_env.step(action)
            done = terminated or truncated
            rew += r

        print("Evaluation cumulative reward: ", rew)


# Train and evaluate on cartpole

In [42]:
env = gym.make("CartPole-v1", render_mode="rgb_array")
rew_threshold = 200
buffer = Experience_replay_buffer()
agent = DDQN_agent(env, rew_threshold, buffer)
agent.train()

eval_env = gym.make("CartPole-v1", render_mode="human")
agent.evaluate(eval_env)



q_values.requires_grad: True
target_q_values.requires_grad: False
loss.requires_grad: True
Episode 1 Mean Rewards 10.00  Episode reward = 10.00   mean loss = 1.03		Episode 2 Mean Rewards 9.50  Episode reward = 9.00   mean loss = 0.52		q_values.requires_grad: True
target_q_values.requires_grad: False
loss.requires_grad: True
Episode 3 Mean Rewards 9.33  Episode reward = 9.00   mean loss = 0.67		q_values.requires_grad: True
target_q_values.requires_grad: False
loss.requires_grad: True
Episode 4 Mean Rewards 9.50  Episode reward = 10.00   mean loss = 0.72		q_values.requires_grad: True
target_q_values.requires_grad: False
loss.requires_grad: True
Episode 5 Mean Rewards 9.40  Episode reward = 9.00   mean loss = 0.74		q_values.requires_grad: True
target_q_values.requires_grad: False
loss.requires_grad: True
Episode 6 Mean Rewards 9.33  Episode reward = 9.00   mean loss = 0.75		q_values.requires_grad: True
target_q_values.requires_grad: False
loss.requires_grad: True
Episode 7 Mean Rew

AttributeError: 'DDQN_agent' object has no attribute 'episode'