In [1]:
# model config params
N_HIDDEN_LAYERS = 64
LEARNING_RATE = 1e-3
DISCOUNT_FACTOR = 0.99
MAX_BUFFER_SIZE = 10_000
BATCH_SIZE = 64

In [2]:
from torch.nn import Linear, Module
from torch import Tensor


class DQN(Module):
    def __init__(self,
                 n_states: int,
                 n_actions: int,
                 n_hidden_layers: int
                ):
        super(DQN, self).__init__()
        self.l1 = Linear(n_states, n_hidden_layers)
        self.l2 = Linear(n_hidden_layers, n_hidden_layers)
        self.l3 = Linear(n_hidden_layers, n_actions)


    def forward(self, state: Tensor) -> Tensor:
        x = self.l1(state).relu()
        x = self.l2(x).relu()
        return self.l3(x)

In [3]:
from numpy import float32
from numpy.typing import NDArray
from typing import List, Tuple
from collections import deque


class ReplayBuffer:
    def __init__(self,
                 max_buffer_size: int
                ):
        self.buffer = deque(maxlen=max_buffer_size)


    def add(self,
            state: NDArray[float32],
            action: int,
            reward: float,
            next: NDArray[float32],
            terminal: bool
           ):
        self.buffer.append((state, action, reward, next, terminal))


    def sample(self, batches: int) -> Tuple[NDArray[float32], List[int], List[float], NDArray[float32], List[bool]]:
        samples = random.sample(self.buffer, batches)
        return zip(*samples)

In [4]:
import torch
from torch import cuda
from torch.backends import mps
from torch.optim import SGD, Adam
from torch.nn import MSELoss
import numpy as np

def _get_torch_device() -> str:
    if cuda.is_available():
        return "cuda"
    elif mps.is_available():
        return "mps"
    else:
        return "cpu"


class DQNAgent:
    def __init__(self,
                 n_states: int,
                 n_actions: int,
                 n_hidden_layers: int,
                 learning_rate: float,
                 discount_factor: float,
                 max_buffer_size: int,
                 batch_size: int
                ):
        self.device = torch.device(_get_torch_device())

        self.n_actions = n_actions
        self.discount_factor = discount_factor
        self.batch_size = batch_size

        self.qnet = DQN(n_states, n_actions, n_hidden_layers).to(self.device)

        # self.optimiser = SGD(self.qnet.parameters(), lr=learning_rate)
        self.optimiser = Adam(self.qnet.parameters(), lr=learning_rate)
        self.replay_buffer = ReplayBuffer(max_buffer_size)
        
        self.target_qnet = DQN(n_states, n_actions, n_hidden_layers).to(self.device)
        self.target_qnet.load_state_dict(self.qnet.state_dict())
        self.target_qnet.eval()


    def step(self,
             state: NDArray[float32],
             action: int,
             reward: float,
             next: NDArray[float32],
             terminal: bool
            ):
        self.replay_buffer.add(state, action, reward, next, terminal)
        if len(self.replay_buffer.buffer) > self.batch_size:
            self.update_model()


    def act(self,
            state: NDArray[float32],
            exploration_chance: float
           ) -> int:
        if random.random() > exploration_chance:
            self.qnet.eval()
            state_tensor_batched = torch.from_numpy(state).float().unsqueeze(0)
            state = state_tensor_batched.to(self.device)
            with torch.no_grad():
                action_values = self.qnet(state)
            chosen_action = np.argmax(action_values.cpu().detach().numpy())
            self.qnet.train()
            return chosen_action
        else:
            return random.choice(np.arange(self.n_actions))


    def update_model(self):
        states, actions, rewards, nexts, terminals = self.replay_buffer.sample(self.batch_size)

        states = torch.from_numpy(np.stack(states)).float().to(self.device)
        actions = torch.from_numpy(np.array(actions)).long().to(self.device)
        rewards = torch.from_numpy(np.array(rewards)).float().to(self.device)
        nexts = torch.from_numpy(np.stack(nexts)).float().to(self.device)
        terminals = torch.from_numpy(np.array(terminals)).float().to(self.device)

        q_values = self.qnet(states).gather(1, actions.unsqueeze(-1)).squeeze(-1)
        next_q_values = self.target_qnet(nexts).max(1)[0].detach()
        expected_q_values = rewards + self.discount_factor * next_q_values * (1 - terminals)

        loss = MSELoss()(q_values, expected_q_values)

        self.optimiser.zero_grad()

        loss.backward()

        self.optimiser.step()


    def update_target_network(self):
        self.target_qnet.load_state_dict(self.qnet.state_dict())

In [5]:
# training config params

N_EPISODES = 10_000
EXPLORATION_CHANCE_START = 1.0
EXPLORATION_CHANCE_END = 1e-4
EXPLORATION_CHANCE_DECAY = 0.995
TARGET_UPDATE_FREQ = 10
FINISH_CHECK_FREQ = 100
FINISH_SCORE = 200

In [6]:
import gym

env = gym.make('LunarLander-v2')
n_states = env.observation_space.shape[0]
n_actions = env.action_space.n

agent = DQNAgent(n_states,
                 n_actions,
                 N_HIDDEN_LAYERS,
                 LEARNING_RATE,
                 DISCOUNT_FACTOR,
                 MAX_BUFFER_SIZE,
                 BATCH_SIZE)

In [7]:
# training

import random

scores = []
latest_scores = deque(maxlen=FINISH_CHECK_FREQ)

exploration_chance = EXPLORATION_CHANCE_START

for episode_n in range(1, N_EPISODES + 1):
    state, _ = env.reset()
    score = 0

    while True:
        action = agent.act(state, exploration_chance)
        next, reward, terminated, truncated, info = env.step(action)
        
        terminal = terminated or truncated
        agent.step(state, action, reward, next, terminal)
        
        state = next
        score += reward

        if terminal:
            break

    scores.append(score)
    latest_scores.append(score)

    exploration_chance = max(EXPLORATION_CHANCE_END, EXPLORATION_CHANCE_DECAY * exploration_chance)

    if episode_n % TARGET_UPDATE_FREQ == 0:
        agent.update_target_network()

    if episode_n % FINISH_CHECK_FREQ == 0:
        print(f'Average score of {np.mean(latest_scores):0.3f} @ {episode_n}/{N_EPISODES}')
        if np.mean(latest_scores) >= FINISH_SCORE:
            print(f'Average score was above {FINISH_SCORE} over last {FINISH_CHECK_FREQ} episodes. Ending training...')
            break

  if not isinstance(terminated, (bool, np.bool8)):


Average score of -120.216 @ 100/10000
Average score of -49.519 @ 200/10000
Average score of -21.419 @ 300/10000
Average score of 39.940 @ 400/10000
Average score of 29.926 @ 500/10000
Average score of -41.250 @ 600/10000
Average score of 83.400 @ 700/10000
Average score of 143.569 @ 800/10000
Average score of 149.752 @ 900/10000
Average score of 181.229 @ 1000/10000
Average score of 80.470 @ 1100/10000
Average score of 211.658 @ 1200/10000
Average score was above 200 over last 100 episodes. Ending training...


In [8]:
env.close()

In [15]:
# visual testing

human_env = gym.make('LunarLander-v2', render_mode='human')

state, _ = human_env.reset()

score = 0

while True:
    action = agent.act(state, 0)
    state, reward, terminated, truncated, info = human_env.step(action)

    terminal = terminated or truncated

    score += reward

    if terminal:
        break

print(score)

human_env.close()

254.06528819038823
