In [1]:
from tensorflow import keras
from tensorflow.keras import layers

In [2]:
class DQN(keras.Model):
    def __init__(self, state_size, action_size):
        super(DQN, self).__init__()
        self.conv1 = layers.conv2D(32, (8, 8), strides=(4,4), activation="relu",input_shape=(state_size[0], state_size[1], 1))
        self.fc1 = layers.Dense(64, activation="relu")
        self.fc2 = layers.Dense(action_size)

    def call(self, x):
        x = self.conv1(x)
        x = layers.Flatten()(x)
        x = self.fc1(x)
        return self.fc2(x)

In [3]:
import torch
from torch import nn

In [4]:
class PolicyNet(nn.Module):
    def __init__(self, state_size, action_size):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, action_size)

    def forward(self, x):
        x = nn.functional.relu(self.fc1(x))
        return nn.functional.softmax(self.fc2(x))

In [5]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import random
from collections import deque

DQN

In [6]:
class DQN:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        # this is the neural net for DQN
        model = models.Sequential()
        model.add(layers.Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(layers.Dense(24, activation='relu'))
        model.add(layers.Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=self.learning_rate))

        return model
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)

        return np.argmax(act_values[0])
    
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.model.predict(next_state)[0]))
            
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

DDQN

In [None]:
class DDQN:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        # this is the neural net for DQN
        model = models.Sequential()
        model.add(layers.Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(layers.Dense(24, activation='relu'))
        model.add(layers.Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=self.learning_rate))

        return model
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)

        return np.argmax(act_values[0])
    
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)

        target_model = self._build_model()
        target_model.set_weights(self.model.get_weights())

        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:

                best_next_action = np.argmax(self.model.predict(next_state)[0])

                target = (reward + self.gamma * target_model.predict(next_state)[0][best_next_action])
            
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
            
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

PER with DDQN

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import namedtuple

In [3]:
Transition = namedtuple('Transision', ('state', 'action', 'reward', 'next_state', 'done'))

class SumTree:
    def __init__(self, capacity):
        self.capacity = capacity
        self.data = np.zeros(capacity, dtype=object)
        self.priorities = np.zeros(2 * capacity - 1)
        self.write_idx = 0

    def _propagate(self, idx, change):
        parent = (idx - 1) // 2
        self.priorities[parent] += change
        if parent != 0:
            self._propagate(parent, change)

    def _retrieve(self, idx, s):
        left = 2 * idx + 1
        right = left + 1

        if left >= len(self.priorities):
            return idx
        
        if s <= self.priorities[left]:
            return self._retrieve(left, s)
        else:
            return self._retrieve(right, s-self.priorities[left])
        
    def total(self):
        return self.priorities[0]  # root node is the total priority
    
    def add(self, priority , data):
        idx = self.write_idx + self.capacity - 1
        
        self.data[self.write_idx] = data
        self._propagate(idx, priority)

        self.write_idx += 1
        if self.write_idx >= self.capacity:
            self.write_idx = 0

    
    def get(self, s):
        idx = self._retrieve(0, s)
        data_idx = idx - self.capacity + 1
        return (idx, self.priorities[idx], self.data[data_idx])

In [4]:
class PER:
    def __init__(self, capacity, alpha, beta, beta_increment):
        self.buffer = []
        self.priorities = SumTree(capacity)
        self.alpha = alpha
        self.beta = beta
        self.beta_increment = beta_increment
        self.max_priority = 1.0

    def add(self, experience):
        self.buffer.append(experience)
        self.priorities.add(self.max_priority, experience)

    def sample(self, batch_size):
        batch = []
        idxs = []
        segment = self.priorities.total() / batch_size
        is_weights = np.zeros((batch_size, 1))

        for i in range(batch_size):
            a = segment * i
            b = segment * (i + 1)
            s = random.uniform(a, b)
            (idx, priority, data) = self.priorities.get(s)
            is_weights[i, 0] = (self.priorities.total() * priority) ** (-self.beta)
            batch.append(data)
            idxs.append(idx)
        
        is_wights /= is_weights.max()
        return batch, idx, is_weights
    
    def update_priorities(self, idxs, priorities):
        for idx, priority in zip(idxs, priorities):
            self.priorities._propagate(idx, priority - self.priorities[idx])  # update the priority
            self.max_priority = max(self.max_priority, priority)

        self.beta = min(1.0, self.beta + self.beta_increment)


In [5]:
from turtle import forward

from zmq import device


class DDQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim

        self.layers = nn.Sequential(
            nn.Linear(self.input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.linear(128, self.output_dim)
        )
    
    def forward(self, state):
        QValues = self.layers(state)
        return QValues
    
    def actor(self, state, epsilon):
        if random.random() > epsilon:
            with torch.no_grad():
                state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
                q_values = self(state)
                action = q_values.max(1)[1].item()  # this is basically the greedy action

        else: 
            action = random.randrange(self.output_dim)  # if the epsilon value not higher then take random actions

        return action

In [None]:



gamma = 0.99
batch_size = 32
learning_rate = 1e-3

# input and output dim must be set dynamically somehow

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
memory = PER(capacity=10000, alpha=0.6)
policy_net = DDQN(input_dim=input_dim, output_dim=output_dim).to(device)
target_net = DDQN(input_dim=input_dim, output_dim=output_dim).to(device)

target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)

def update_model():
    # sample a batch with priorities
    batch, idxs, is_weights = memory.sample(batch_size)

    # Convert to tensors, move to device
    states = torch.tensor(states, dtype=torch.float32).to(device)
    actions = torch.tensor(actions, dtype=torch.long).to(device)
    rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
    next_states = torch.tensor(next_states, dtype=torch.float32).to(device)
    dones = torch.tensor(dones, dtype=torch.bool).to(device)


    current_q_value = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)

    # Calculate TD-Target
    # (Get next_state Q-values from both current and target networks for DDQN)
    with torch.no_grad():
        next_states_q_values = policy_net(next_states)
        best_next_actions = next_states_q_values.max(1)[1].unsqueeze(1)  # DDQN: Select actions according to policy_net

        next_states_target_q_values = target_net(next_states).gather(1, best_next_actions)  # DDQN: Evaluate with target_net

        td_target = rewards + (gamma * next_states_target_q_values * (1 - dones))


    # calculate the loss using importance sampling weights
    loss = (td_target - current_q_value) ** 2 * torch.tensor(is_weights).to(device)
    loss = loss.mean()

    # optimize
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # update the priorities in sum-tree
    new_priorities = 
    memory.update_priorities(idxs, new_priorities)
