In [1]:
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym



from collections import deque, namedtuple
from itertools import count
import random

In [2]:
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [3]:
# basic loop of gymnasium - how to run  environment 
env = gym.make("Ant-v5", render_mode="human")
env = gym.wrappers.RecordEpisodeStatistics(env)
observation, info = env.reset()

for _ in range(100):
    action = env.action_space.sample()  # agent policy that uses the observation and info
    observation, reward, terminated, truncated, info = env.step(action)

    if terminated or truncated:
        observation, info = env.reset()

env.close()

In [4]:
# replay memory - memory to store results from environment, we will use it to create dataset later on
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [5]:
# pytorch model of our DQN - deep Q learning network
# TODO modify it, so it can mutate and add new branches to itself. It should probably be represnted by DAG (use networkx for example).
# and then every DAG node should conatin some simple NN. Then in forward function you can calculate flow of activations in DAG.
# maybe even some fancy parallelisation?
class DQN(torch.nn.Module):

    def __init__(self, dropout: float=0.5, hidden_size: int=512):
        super(DQN, self).__init__()

        self.action_space_size = 8
        self.observation_space_size = 105
        self.hidden_size = hidden_size 
        self.dropout = dropout
        
        self.net = nn.Sequential(
            nn.Linear(self.observation_space_size, self.hidden_size),
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.Linear(self.hidden_size, self.hidden_size * 4),
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.Linear(self.hidden_size * 4, self.hidden_size * 4),
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.Linear(self.hidden_size * 4, self.hidden_size),
            nn.ReLU(),
            nn.Dropout(self.dropout),
            nn.Linear(self.hidden_size, self.action_space_size),
            nn.Sigmoid()
        )
        # we apply Sigmoid as control signals has to be in [-1, 1] according to gymnasium docs

    def forward(self, x):
        q = self.net(x)
        return q

In [6]:
class Agent:
    """
    This is going to represent
    """

    def __init__(
        self,
        learning_rate: float,
        batch_size: int,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        """
        Initialize a Reinforcement Learning agent with an empty dictionary
        of state-action values (q_values), a learning rate and an epsilon.

        Args:
            learning_rate: The learning rate
            initial_epsilon: The initial epsilon value
            epsilon_decay: The decay for epsilon
            final_epsilon: The final epsilon value
            discount_factor: The discount factor for computing the Q-value
        """
        self.dqn = DQN().to(device)
        self.target_dqn = DQN().to(device)
        self.target_dqn.load_state_dict(self.dqn.state_dict())

        self.optimizer = optim.AdamW(
            self.dqn.parameters(), lr=learning_rate, amsgrad=True
        )
        self.memory = ReplayMemory(10000)

        self.lr = learning_rate
        self.batch_size = batch_size
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []

    def get_action(self, state: np.ndarray) -> int:
        """
        Returns the best action with probability (1 - epsilon)
        otherwise a random action with probability epsilon to ensure exploration.
        """
        # with probability epsilon return a random action to explore the environment
        if np.random.random() < self.epsilon:
            return torch.tensor(
                [env.action_space.sample()], device=device, dtype=torch.float32
            )

        # with probability (1 - epsilon) act greedily (exploit)
        else:
            with torch.no_grad():
                return self.dqn(state)

    def optimize_model(self):
        """Updates the model"""

        if len(self.memory) < self.batch_size:
            return
        transitions = self.memory.sample(self.batch_size)
        # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
        # detailed explanation). This converts batch-array of Transitions
        # to Transition of batch-arrays.
        batch = Transition(*zip(*transitions))

        # Compute a mask of non-final states and concatenate the batch elements
        # (a final state would've been the one after which simulation ended)
        non_final_mask = torch.tensor(
            tuple(map(lambda s: s is not None, batch.next_state)),
            device=device,
            dtype=torch.bool,
        )
        non_final_next_states = torch.cat(
            [s for s in batch.next_state if s is not None]
        )
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)

        # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
        # columns of actions taken. These are the actions which would've been taken
        # for each batch state according to policy_net
        state_action_values = self.dqn(state_batch).gather(1, action_batch)

        # Compute V(s_{t+1}) for all next states.
        # Expected values of actions for non_final_next_states are computed based
        # on the "older" target_net; selecting their best reward with max(1).values
        # This is merged based on the mask, such that we'll have either the expected
        # state value or 0 in case the state was final.
        next_state_values = torch.zeros(self.batch_size, device=device)
        with torch.no_grad():
            next_state_values[non_final_mask] = (
                self.target_dqn(non_final_next_states).max(1).values
            )
        # Compute the expected Q values
        expected_state_action_values = (
            next_state_values * self.discount_factor
        ) + reward_batch

        # Compute Huber loss
        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        # In-place gradient clipping
        torch.nn.utils.clip_grad_value_(self.dqn.parameters(), 100)
        self.optimizer.step()

        # save for plotting later on
        self.training_error.append(loss.item())

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

    def update_target_model(self):
        self.target_dqn.load_state_dict(self.dqn.state_dict())

In [7]:
# hyperparameters
LEARNING_RATE = 0.001
BATCH_SIZE = 64
N_EPISODES = 100
START_EPSILON = 1.0
EPSILON_DECAY = START_EPSILON / (N_EPISODES / 2)  # reduce the exploration over time
FINAL_EPSILON = 0.1

UPDATE_TARGET_EVERY = 10

agent = Agent(
    learning_rate=LEARNING_RATE,
    batch_size=BATCH_SIZE,
    initial_epsilon=START_EPSILON,
    epsilon_decay=EPSILON_DECAY,
    final_epsilon=FINAL_EPSILON,
)

In [9]:

for i_episode in tqdm(range(N_EPISODES)):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

    # fancy itertools for counting
    for t in count():
        action = agent.get_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.squeeze(0).cpu().numpy())
        reward = torch.tensor([reward], device=device)

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        agent.memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        agent.optimize_model()

        if i_episode % UPDATE_TARGET_EVERY == 0:
            agent.update_target_model()

        if terminated or truncated:
            agent.decay_epsilon()
            break

  0%|          | 0/100 [00:00<?, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (64x512 and 2048x512)

In [None]:
rolling_length = N_EPISODES // 100
fig, axs = plt.subplots(ncols=3, figsize=(12, 5))
axs[0].set_title("Episode rewards")
# compute and assign a rolling average of the data to provide a smoother graph
reward_moving_average = (
    np.convolve(
        np.array(env.return_queue).flatten(), np.ones(rolling_length), mode="valid"
    )
    / rolling_length
)
axs[0].plot(range(len(reward_moving_average)), reward_moving_average)
axs[1].set_title("Episode lengths")
length_moving_average = (
    np.convolve(
        np.array(env.length_queue).flatten(), np.ones(rolling_length), mode="same"
    )
    / rolling_length
)
axs[1].plot(range(len(length_moving_average)), length_moving_average)
axs[2].set_title("Training Error")
training_error_moving_average = (
    np.convolve(np.array(agent.training_error), np.ones(rolling_length), mode="same")
    / rolling_length
)
axs[2].plot(range(len(training_error_moving_average)), training_error_moving_average)
plt.tight_layout()
plt.show()