In [1]:
!pip install pyyaml
!pip install dndice
!pip install python-i18n
!pip install gymnasium
!pip install inflect
!pip install collections-extended
!pip install openai
!pip install -e ..
!pip install ipywidgets
!pip install iprogress


Obtaining file:///home/jedld/workspace/natural_20.py
  Preparing metadata (setup.py) ... [?25ldone
Installing collected packages: natural20.py
  Attempting uninstall: natural20.py
    Found existing installation: natural20.py 0.1
    Uninstalling natural20.py-0.1:
      Successfully uninstalled natural20.py-0.1
  Running setup.py develop for natural20.py
Successfully installed natural20.py


In [2]:
from gymnasium import make
from model import QNetwork
from natural20.gym.dndenv import dndenv
import torch
import tqdm as tqdm
import tqdm.notebook as tqdm
import random
import torch.optim as optim
import torch.nn as nn
import gc
import numpy as np
import sys
import collections
from natural20.session import Session
from natural20.event_manager import EventManager

In [3]:
if torch.backends.mps.is_available():
  device = torch.device("mps")
else:
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
env_config = "map_with_obstacles"

In [5]:
session = Session(env_config, event_manager=EventManager())

Show info about the environment and a render of the tabletop map

In [6]:
env = make("dndenv-v0", root_path="map_with_obstacles", show_logs=True,
           custom_session=session,
           damaged_based_reward=True,
           render_mode="ansi")
env.reset()
print(env.render())
print(env.observation_space)
print(env.action_space.sample)

loading map from map_with_obstacles/maps/game_map.yml
Creating new event manager
==== Player Character ====
name: gomerin
level: 1
character class: {'rogue': 1}
hp: 18
max hp: 18
ac: 16
speed: 25
no spells



==== Player Character ====
name: rumblebelly
level: 1
character class: {'rogue': 1}
hp: 18
max hp: 18
ac: 16
speed: 25
no spells



no move for rumblebelly
==== current turn gomerin 12/18 AC 16===
no spells
____________
____________
____________
____________
____________
____________
____..P...._
____.E....._
____...#..._
____.~~  .._
____~~~  .._
____~~..  ._
Dict('ability_info': Box(0, 1, (8,), int64), 'conditions': Box(0, 1, (8,), int64), 'enemy_conditions': Box(0, 1, (8,), int64), 'enemy_reactions': Box(0, 1, (1,), int64), 'enemy_type': Box(0, 1, (1,), int64), 'health_enemy': Box(0.0, 1.0, (1,), float64), 'health_pct': Box(0.0, 1.0, (1,), float64), 'map': Box(-1, 255, (12, 12, 4), int64), 'movement': Discrete(255), 'player_type': Box(0, 1, (1,), int64), 'turn_info': Box(0, 1, 

  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(


DQN Parameters

In [7]:
TRAJECTORY_POLICY = "e-greedy"
NUM_UPDATES = 2
TEMP_DECAY = 0.999
BUFFER_CAPACITY = 3000
FRAMES_TO_STORE = 2
MAX_STEPS = 10000
BATCH_SIZE = 64
TARGET_UPDATE_FREQ = 1
T_HORIZON = 2048
EPSILON_START = 1.0
EPSILON_FINAL = 0.01
EPSILON_DECAY_FRAMES = 10**3
EVAL_STEPS = 25

In [8]:
model = QNetwork(device=device)
model.to(device)
state, info = env.reset()
moves = info["available_moves"]
model.eval()
print(model(state, moves[0]))

loading map from map_with_obstacles/maps/game_map.yml
Creating new event manager
==== Player Character ====
name: gomerin
level: 1
character class: {'rogue': 1}
hp: 18
max hp: 18
ac: 16
speed: 25
no spells



==== Player Character ====
name: rumblebelly
level: 1
character class: {'rogue': 1}
hp: 18
max hp: 18
ac: 16
speed: 25
no spells



tensor([[0.0276]], device='cuda:0', grad_fn=<AddmmBackward0>)


In [9]:
def act_with_policy(state, info, model, policy='e-greedy', temperature=5.0, epsilon=0.1):
    available_moves = info["available_moves"]
    with torch.no_grad():
        if policy == 'boltzmann':
            values = torch.stack([model(state, move).squeeze() for move in available_moves])
            if len(values) > 1:
                if temperature != 0:
                    values = values / temperature
                else:
                    raise ValueError("Temperature is zero, which can lead to division by zero.")

                # Stabilizing the exponential calculation
                values = values - torch.max(values)  # Subtract the max value for numerical stability
                values = torch.exp(values)
                sum_values = torch.sum(values)

                if sum_values > 0:
                    values = values / sum_values
                    chosen_index = torch.multinomial(values, 1).item()
                else:
                    print("Sum of exponentiated values is zero. Adjust the model or input.")
                    chosen_index = torch.randint(len(available_moves), (1,)).item()
            else:
                chosen_index = 0
        elif policy == 'e-greedy':
            if random.random() < epsilon:
                # place available moves in buckets according to their type
                # this is so that movements are not chosen more often than other types of moves
                move_types = collections.defaultdict(list)
                for orig_index, move in enumerate(available_moves):
                    move_types[move[0]].append(orig_index)
                chosen_move_type = random.choice(list(move_types.keys()))
                chosen_index = random.choice(move_types[chosen_move_type])
            else:
                values = torch.stack([model(state, move) for move in available_moves])
                chosen_index = torch.argmax(values).item()
        elif policy == 'greedy':
                values = torch.stack([model(state, move) for move in available_moves])
                chosen_index = torch.argmax(values).item()
        else:
            raise ValueError(f"Unknown policy: {policy}")
    
    return available_moves[chosen_index]

def generate_trajectory(env, model, policy='e-greedy', temperature=5.0, epsilon=0.1, horizon=2048, quick_exit=False):
    state, info = env.reset()
    done = False
    truncated = False
    states = []
    actions = []
    rewards = []
    dones = []
    truncateds = []
    infos = []
    truncated = False
    for _ in range(horizon):
        # instead of sampling  (e.g. env.action_space.sample()) we can ask help from the enivronment to obtain valid moves
        # as there are sparse valid moves in the environment
        action = act_with_policy(state, info, model, policy, temperature, epsilon)
        next_state, reward, done, truncated, next_info = env.step(action)       
        
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        dones.append(done)
        truncateds.append(truncated)
        infos.append(info)

        if done:
            break    
        if truncated:
            truncated = True
            break
        state = next_state
        info = next_info
        
    states.append(next_state)
    infos.append(next_info)
    actions.append((-1, (0,0), (0,0), 0, 0))
    return states, actions, rewards, dones, truncateds, infos

In [10]:
trajectory = generate_trajectory(env, model, epsilon=1.0)
print(trajectory)

loading map from map_with_obstacles/maps/game_map.yml
Creating new event manager
==== Player Character ====
name: gomerin
level: 1
character class: {'rogue': 1}
hp: 18
max hp: 18
ac: 16
speed: 25
no spells



==== Player Character ====
name: rumblebelly
level: 1
character class: {'rogue': 1}
hp: 18
max hp: 18
ac: 16
speed: 25
no spells



==== end turn ===
gomerin 18/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== current turn gomerin 18/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 18/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== current turn gomerin 18/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 17/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells


  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")


no move for rumblebelly
==== current turn gomerin 16/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 16/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== current turn gomerin 16/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 16/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== current turn gomerin 15/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 15/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== current turn gomerin 15/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 15/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== current turn gomerin 15/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 15/18
no spells
rumblebelly 1

In [11]:
EPISODES = 10
total_rewards = 0
for i in tqdm.tqdm(range(EPISODES)):
    states, actions, rewards, dones, truncateds, infos = generate_trajectory(env, model, epsilon=1.0)
    total_rewards += sum(rewards)

avg_reward = total_rewards/EPISODES
print(f"Average reward: {avg_reward} Total Reward: {total_rewards}")

  0%|          | 0/10 [00:00<?, ?it/s]

loading map from map_with_obstacles/maps/game_map.yml
Creating new event manager
==== Player Character ====
name: gomerin
level: 1
character class: {'rogue': 1}
hp: 18
max hp: 18
ac: 16
speed: 25
no spells



==== Player Character ====
name: rumblebelly
level: 1
character class: {'rogue': 1}
hp: 18
max hp: 18
ac: 16
speed: 25
no spells



no move for rumblebelly
==== current turn gomerin 18/18 AC 16===
no spells
==== end turn ===
gomerin 18/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== current turn gomerin 18/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 18/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== current turn gomerin 12/18 AC 16===
no spells
Result: False
==== end turn ===
gomerin 12/18
no spells
rumblebelly 18/18
no spells
==== current turn rumblebelly 18/18 AC 16===
no spells
no move for rumblebelly
==== cu

In [12]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = collections.deque(maxlen=capacity)

    def push(self, states, actions, rewards, infos, is_terminal):
        self.buffer.append((states, actions, rewards, infos, is_terminal))

    def sample(self, batch_size):
        indices = np.random.choice(len(self.buffer), batch_size)
        states, actions, rewards, infos, is_terminals = zip(*[self.buffer[idx] for idx in indices])
        return states, actions, rewards, infos, is_terminals

    def __len__(self):
        return len(self.buffer)
    
    # memory usage of the buffer in bytes
    def memory_usage(self):
        total_size = 0
        for item in self.buffer:
            states, actions, rewards, infos, is_terminals = item
            for s in states:
                total_size += sys.getsizeof(s)
            total_size += sys.getsizeof(actions)
            total_size += sys.getsizeof(rewards)
            total_size += sys.getsizeof(infos)
            total_size += sys.getsizeof(is_terminals)

        return total_size

In [13]:
# generate a batch of trajectories and store them in the replay buffer
def generate_batch_trajectories(env, model, n_rollout, replay_buffer: ReplayBuffer, temperature=5.0, epsilon=0.1, horizon=30, policy='e-greedy'):
    # print(f"generating {n_rollout} rollouts")
    for _ in range(n_rollout):
        state, action, reward, done, truncated, info = generate_trajectory(env, model, temperature=temperature,
                                                                           epsilon=epsilon,
                                                                           horizon=horizon,policy=policy)
        replay_buffer.push(state, action, reward, info, done)

In [14]:
def train(env, gamma, learning_rate, max_steps=MAX_STEPS, use_td_target=True,
          trajectory_policy='e-greedy',
          label="dnd_egreedy",
          n_rollout=8,
          seed=1337):
  print(f"training with gamma {gamma} and learning rate {learning_rate}")
  env.seed(seed)

  replay_buffer = ReplayBuffer(BUFFER_CAPACITY)
  # load model checkpoint if available
  model = QNetwork(device).to(device)
  target_model = QNetwork(device).to(device)

  # intialize target network with the same weights as the model
  target_model.load_state_dict(model.state_dict())

  optimizer = optim.Adam(model.parameters(), lr=learning_rate)
  best_avg = -10
  best_step = 0
  temperature = 5.0
  reward_per_episode = []
  epsilon = EPSILON_START

  for step in tqdm.tqdm(range(max_steps)):
    generate_batch_trajectories(env, model, n_rollout, replay_buffer, temperature=temperature,
                                epsilon=epsilon, policy=trajectory_policy, horizon=T_HORIZON)

    states, actions, rewards, infos, is_terminals = replay_buffer.sample(BATCH_SIZE)
    rewards_collected = 0
    for _ in range(NUM_UPDATES):
      rewards_collected = 0
      total_loss = 0.0
      
      for i in range(len(states)):
        s = states[i]
        a = actions[i]
        env_info = infos[i]
        r = torch.tensor(rewards[i]).to(device).unsqueeze(1)
        is_terminal = torch.tensor(is_terminals[i]).float().to(device).unsqueeze(1)
        
        if use_td_target:
          with torch.no_grad():
            s_next = s[1:]
            a_next = a[1:]
            q_targets = target_model(s_next, a_next).detach()
        else: # Q-learning target == "slow"
          with torch.no_grad():
            s_next = s[1:]
            s_info = env_info[1:]
            q_targets = torch.zeros(len(s_next)).to(device)
            
            for index in range(len(s_info)):
              info = s_info[index]
              state = s_next[index]
              
              if len(state) == 0:
                q_targets[index] = 0
                continue
              
              total_available_moves = len(info["available_moves"])
              states_t = [state] * total_available_moves
              avail_actions = info["available_moves"]
              assert len(states_t) > 0, "No available states"
              assert len(avail_actions) > 0, "No available moves"
              
              q_values = target_model(states_t, avail_actions).detach().squeeze(1)
              if len(q_values) == 0:
                q_targets[index] = 0
              else:
                q_targets[index] = torch.max(q_values).item()

            q_targets = q_targets.unsqueeze(1)
            assert q_targets.shape == r.shape, f"q_targets shape {q_targets.shape} != r shape {r.shape}"

        targets = r + gamma * q_targets * (1 - is_terminal)
        
        s_input = s[0:-1]
        a_input = a[0:-1]
        output = model(s_input, a_input)
        q_sa = output

        value_loss = nn.MSELoss()(q_sa, targets)
        optimizer.zero_grad()
        value_loss.backward()
        total_loss += value_loss.item()
        rewards_collected += r.sum().item()
        optimizer.step()

    # save model checkpoint

    if step % 10 == 0:
      # torch.save(model.state_dict(), f"model_{step}.pt")
      eval_rewards = []
      for _ in range(EVAL_STEPS):
        _, _, rewards, _, _, _ = generate_trajectory(env, model, policy='greedy')
        total_reward = sum(rewards)
        eval_rewards.append(total_reward)
        
      avg_rewards = np.mean(eval_rewards)
      std_rewards = np.std(eval_rewards)
      # print(f"eval rewards: {avg_rewards}")
      reward_per_episode.append(avg_rewards)

      # print(f"total reward: {total_reward}")
      if trajectory_policy == "e-greedy":
        print(f"{step}: avg rewards {avg_rewards} std: {std_rewards} best avg {best_avg}@{best_step} epsilon {epsilon}")
      elif trajectory_policy == "boltzmann":
        print(f"{step}: avg rewards {avg_rewards} std: {std_rewards} best avg {best_avg}@{best_step} temperature {temperature}")
      else:
        print(f"{step}: avg rewards {avg_rewards} std: {std_rewards} best avg {best_avg}@{best_step}")

      if avg_rewards > best_avg:
        print(f"best: {avg_rewards}")
        best_avg = avg_rewards
        best_step = step
        torch.save(model.state_dict(), f"model_best_{label}.pt")

      # torch.save(model.state_dict(), f"model_{label}_{gamma}_{learning_rate}.pt")


    # if step % 100 == 0:
    #   torch.save(model.state_dict(), f"model_{label}_{gamma}_{learning_rate}_{step}.pt")


    gc.collect()
    
    # decay temp
    temperature = np.max([0.1, temperature * TEMP_DECAY])

    # decay epsilon
    epsilon = EPSILON_FINAL + (EPSILON_START - EPSILON_FINAL) * np.exp(-1.0 * step / EPSILON_DECAY_FRAMES)

    if step % TARGET_UPDATE_FREQ == 0:
      # calculate the avg change weights of the model with the target model
      total_change = 0
      for p, p_target in zip(model.parameters(), target_model.parameters()):
        total_change += torch.abs(p - p_target).sum().item()
      # print(f"total change: {total_change}")

      target_model.load_state_dict(model.state_dict())

  env.close()
  return reward_per_episode


Specify the location of the game configuration

In [15]:
game_setup_path = "map_with_obstacles"

Create the env setup. Note that we use damaged based rewards to give a denser reward signalling.

In [16]:
def make_env(root_path, render_mode="ansi", show_logs=False):
    return make("dndenv-v0", root_path=root_path, show_logs=show_logs,
                render_mode=render_mode,
                damage_based_reward=False,
                profiles=lambda: random.choice(['high_elf_mage', 'halfling_rogue', 'high_elf_fighter']),
                enemies=lambda: random.choice(['halfling_rogue', 'high_elf_fighter']))

In [17]:
env = make_env(game_setup_path)

In [18]:
seed = 1337
# Create a grid of learning rates and gammas
learning_rates = [0.0001]
gammas = [0.99]

results = {}
for lr in learning_rates:
  results[lr] = {}
  for gamma in gammas:
    seed = seed + 1
    reward_per_episode = train(env, gamma, lr, max_steps=MAX_STEPS, seed=seed, use_td_target=False)
    results[lr][gamma] = reward_per_episode


training with gamma 0.99 and learning rate 0.0001


  logger.warn(


  0%|          | 0/10000 [00:00<?, ?it/s]

  logger.warn(
  gym.logger.warn("Casting input x to numpy array.")
  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(
  logger.warn(f"{pre} is not within the observation space.")
  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


0: avg rewards -10.0 std: 0.0 best avg -10@0 epsilon 1.0
10: avg rewards -6.8 std: 7.332121111929345 best avg -10@0 epsilon 0.9911299749851548
best: -6.8
20: avg rewards -8.4 std: 5.425863986500214 best avg -6.8@10 epsilon 0.9813675686203779
30: avg rewards -8.0 std: 5.656854249492381 best avg -6.8@10 epsilon 0.9717022998219388
40: avg rewards -10.0 std: 0.0 best avg -6.8@10 epsilon 0.962133202054903
50: avg rewards -9.2 std: 2.7129319932501073 best avg -6.8@10 epsilon 0.9526593184015199
60: avg rewards -9.2 std: 3.919183588453085 best avg -6.8@10 epsilon 0.9432797014655288
70: avg rewards -6.8 std: 7.332121111929345 best avg -6.8@10 epsilon 0.9339934132774199
80: avg rewards -6.8 std: 7.332121111929345 best avg -6.8@10 epsilon 0.9247995252006359
90: avg rewards -8.0 std: 5.656854249492381 best avg -6.8@10 epsilon 0.9156971178387074
100: avg rewards -7.2 std: 6.645299090334459 best avg -6.8@10 epsilon 0.906685280943313
110: avg rewards -7.6 std: 6.499230723708769 best avg -6.8@10 epsil

Summarize rewards per episode

In [None]:
for item in results:
  for gamma in results[item]:
    print(f"lr: {item} gamma: {gamma} rewards: {results[item][gamma]}")

In [None]:
# plot the results
import matplotlib.pyplot as plt

for lr in learning_rates:
    for gamma in gammas:
        plt.plot(results[lr][gamma], label=f"lr={lr}, gamma={gamma}")
plt.legend()
plt.show()


In [None]:
env = make_env(game_setup_path)

In [None]:
seed = 1337
# Create a grid of learning rates and gammas
learning_rates = [0.001]
gammas = [0.99]

results = {}
for lr in learning_rates:
  results[lr] = {}
  for gamma in gammas:
    seed = seed + 1
    reward_per_episode = train(env, gamma, lr, max_steps=MAX_STEPS, seed=seed, trajectory_policy='boltzmann', label="boltzmann")
    results[lr][gamma] = reward_per_episode

In [None]:
for item in results:
  for gamma in results[item]:
    print(f"lr: {item} gamma: {gamma} rewards: {results[item][gamma]}")

In [None]:
# plot the results
import matplotlib.pyplot as plt

for lr in learning_rates:
    for gamma in gammas:
        plt.plot(results[lr][gamma], label=f"lr={lr}, gamma={gamma}")
plt.legend()
plt.show()

Perform some tests on the trained agent. Show a combat log from a fight against the rules based AI. Define a policy based on the Model.

In [None]:
import os

MAX_STEPS = 500
NUM_EPISODES = 100

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class ModelPolicy:
    def __init__(self, weights_file = 'model_best_boltzmann.pt'):
        self.model = QNetwork(device=device)
        self.model.to(device)
        if not os.path.exists(weights_file):
            raise FileNotFoundError(f"Model file {weights_file} not found. Please run dnd_dqn.ipynb notebook to train an agent.")
        self.model.load_state_dict(torch.load(weights_file))

    def action(self, state, info):
        available_moves = info["available_moves"]
        values = torch.stack([self.model(state, move) for move in available_moves])
        for index, v in enumerate(values):
            print(f"{index}: {available_moves[index]} {v.item()}")

        chosen_index = torch.argmax(values).item()
        return available_moves[chosen_index]


In [None]:

env = make_env(game_setup_path)

print("=========================================")
print("Battle between an RL agent vs a Rules based AI")
print("=========================================")
win_count = 0
loss_count = 0
for i in range(NUM_EPISODES):
    observation, info = env.reset()
    model = ModelPolicy()
    action = action = model.action(observation, info)

    print(f"selected action: {action}")
    terminal = False
    episode = 0

    while not terminal and episode < MAX_STEPS:
        episode += 1
        observation, reward, terminal, truncated, info = env.step(action)
        print(env.render())
        if not terminal and not truncated:
            episode_name_with_padding = str(episode).zfill(3)

            # display entity healths
            print(f"Turn {info['current_index']}\n")
            print(f"Reward: {reward}\n")
            print(f"health hero: {observation['health_pct']}\n")
            print(f"health enemy: {observation['health_enemy']}\n")
            print(env.render())
            
            action = model.action(observation, info)
            print(f"agent selected action: {action}")

        if terminal or truncated:
            print(f"Reward: {reward}")
            if reward > 0:
                win_count += 1
            else:
                loss_count += 1
            break
        
print(f"Win count: {win_count} Loss count: {loss_count} Win rate: {win_count/(win_count+loss_count)}")