# Training DQN for Tic Tac Toe and play against LLM

Based on [RL against random policy opponent with PettingZoo](https://tianshou.org/en/stable/01_tutorials/04_tictactoe.html).

In [1]:
%load_ext autoreload
%autoreload 2
%load_ext tensorboard

# Install dependencies

In [2]:
!pip install gymnasium==0.29.1 pygame==2.3.0 pettingzoo==1.24.3 tianshou==0.5.1 transformers==4.39.1 accelerate==0.28.0

Collecting gymnasium==0.29.1
  Downloading gymnasium-0.29.1-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.9/953.9 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pygame==2.3.0
  Downloading pygame-2.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pettingzoo==1.24.3
  Downloading pettingzoo-1.24.3-py3-none-any.whl (847 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m847.8/847.8 kB[0m [31m21.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting tianshou==0.5.1
  Downloading tianshou-0.5.1-py3-none-any.whl (163 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.1/163.1 kB[0m [31m13.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting transformers==4.39.1
  Downloading transformers-4.39.1-py3-none-any.whl (8.8 MB)
[2K     [90m━━━━━

# Import needed dependencies

In [3]:
from typing import Any, Dict
import gymnasium as gym
from gymnasium.spaces import Discrete, Space
import torch
from torch import nn
import numpy as np
import re
from pettingzoo.classic import tictactoe_v3
from torch.utils.tensorboard import SummaryWriter

from tianshou.data import Batch, Collector, VectorReplayBuffer
from tianshou.env import DummyVectorEnv
from tianshou.env.pettingzoo_env import PettingZooEnv
from tianshou.policy import (
    BasePolicy,
    DQNPolicy,
    MultiAgentPolicyManager,
    RandomPolicy
)
from tianshou.trainer import OffpolicyTrainer
from tianshou.utils import TensorboardLogger
from tianshou.utils.net.common import Net

from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline

# Setup environment

In [4]:
def get_env(render_mode=None):
  return PettingZooEnv(tictactoe_v3.env(render_mode=render_mode))

# create the environment and get the shape of the states and shape of the actions
env = get_env()
observation_space = env.observation_space['observation'] if isinstance(
  env.observation_space, gym.spaces.Dict
) else env.observation_space
state_shape = observation_space.shape or observation_space.n
action_shape = env.action_space.shape or env.action_space.n

# Setup policies for training DQNPolicy

One training policy (DQNPolicy) and the opponent (RandomPolicy).

In [5]:
# Hidden sizes: shape of the MLP
hidden_sizes = [128, 128, 128, 128]
# device to train on
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# the number of steps to look ahead
estimation_step = 3
# the target network update frequency (0 if you do not use the target network).
target_update_freq = 320
# learning rate of the Adam optimizer
lr = 1e-4

In [6]:
def get_dqn_policy():
  # The deep learning model (MLP) that underpins the behaviour of the agent (it is not the agent itself).
  net = Net(
    state_shape,
    action_shape,
    hidden_sizes=hidden_sizes,
    device=device
  ).to(device)

  # Adam optimizer
  optim = torch.optim.Adam(net.parameters(), lr=lr)

  # Agent to learn
  return DQNPolicy(
    model=net,
    optim=optim,
    action_space=env.action_space,
    estimation_step=estimation_step,
    target_update_freq=target_update_freq
  )

# Train the agent with DQN
Using the OffPolicyTrainer.

In [7]:
# path to save the results and the logging
path = '/content/tic_tac_toe/dqn'

def train_policy(policy, agent_id):
  # number of training environments
  training_num = 100
  # number of testing environments
  test_num = 100
  # size of the VectorReplayBuffer
  buffer_size = 20000
  # the batch size of sample data, which is going to feed in the policy network.
  batch_size = 64
  # the maximum number of epochs for training. The training process might be finished before reaching max_epoch if stop_fn is set.
  epoch = 100
  # the number of transitions collected per epoch.
  step_per_epoch = 1000
  # the number of transitions the collector would collect before the network update,
  # i.e., trainer will collect "step_per_collect" transitions and do some policy network update
  # repeatedly in each epoch.
  step_per_collect = 10
  # used in the stop function when the mean rewards are over this threshold
  win_rate = 0.9
  # The eps for epsilon-greedy exploration for test and training
  eps_test = 0.05
  eps_train = 0.1
  # the number of times the policy network would be updated per transition after (step_per_collect)
  # transitions are collected, e.g., if update_per_step set to 0.3, and step_per_collect is 256,
  # policy will be updated round(256 * 0.3 = 76.8) = 77 times after 256 transitions are collected
  # by the collector. Default to 1.
  update_per_step = 0.1


  # Dummy vectorized environment wrapper, implemented in for-loop.
  # This has the same interface as true vectorized environment, but the rollout does not happen in parallel.
  # So, all workers just wait for each other and the environment is as efficient as using a single environment.
  # This can be useful for testing or for demonstration purposes.
  train_envs = DummyVectorEnv([get_env for _ in range(training_num)])
  test_envs = DummyVectorEnv([get_env for _ in range(test_num)])

  # VectorReplayBuffer contains n ReplayBuffer with the same size.
  # It is used for storing transition from different environments yet keeping the order of time.
  vectorReplayBuffer = VectorReplayBuffer(buffer_size, len(train_envs))

  # determine whether the action needs to be modified with corresponding policy’s exploration noise.
  # If so, “policy. exploration_noise(act, batch)” will be called automatically to add the
  # exploration noise into action.
  exploration_noise = True

  # Train and test collector
  # Collector enables the policy to interact with different types of envs with exact number of steps or episodes.
  train_collector = Collector(policy, train_envs, vectorReplayBuffer, exploration_noise=exploration_noise)
  test_collector = Collector(policy, test_envs, exploration_noise=exploration_noise)

  # Collect a specified number of step or episode.
  train_collector.collect(n_step=batch_size * training_num)

  # A logger that logs statistics during training/testing/updating
  writer = SummaryWriter(path)
  logger = TensorboardLogger(writer)

  # Functions for the OffpolicyTrainer
  # Save the best model
  def save_best_fn(policy):
    torch.save(policy.policies[agent_id].state_dict(), path + '/policy-' + agent_id + '.pth')

  # When to stop training
  def stop_fn(mean_rewards):
    return mean_rewards >= win_rate

  # a hook called at the beginning of training in each epoch. It can be used to perform custom additional operations
  def train_fn(epoch, env_step):
      # Set the eps for epsilon-greedy exploration.
      policy.policies[agent_id].set_eps(eps_train)

  def test_fn(epoch, env_step):
      # Set the eps for epsilon-greedy exploration.
      policy.policies[agent_id].set_eps(eps_test)

  # A function with signature used in multi-agent RL.
  # We need to return a single scalar for each episode’s result to monitor training in the multi-agent RL setting.
  # This function specifies what is the desired metric, e.g., the reward of agent 1 or the average reward over all agents.
  def reward_metric(rews):
    if agent_id == 'player_2':
      return rews[:, 1]
    return rews[:, 0]

  # Offpolicy trainer, samples mini-batches from buffer and passes them to update.
  result = OffpolicyTrainer(
    policy,
    train_collector,
    test_collector,
    epoch,
    step_per_epoch,
    step_per_collect,
    test_num,
    batch_size,
    train_fn=train_fn,
    test_fn=test_fn,
    stop_fn=stop_fn,
    save_best_fn=save_best_fn,
    update_per_step=update_per_step,
    logger=logger,
    test_in_train=False, # whether to test in the training phase.
    reward_metric=reward_metric
  ).run()

In [None]:
agent_learn_player1 = get_dqn_policy()
agent_learn_player2 = get_dqn_policy()
agent_random = RandomPolicy()

agents_1 = [agent_learn_player1, agent_random]
agents_2 = [agent_random, agent_learn_player2]

# Multi-agent policy manager for Multi-Agent Reinforcement Learning (https://tianshou.org/en/stable/01_tutorials/07_cheatsheet.html#marl-example)
policy_1 = MultiAgentPolicyManager(agents_1, env)
policy_2 = MultiAgentPolicyManager(agents_2, env)

train_policy(policy_1, 'player_1')
train_policy(policy_2, 'player_2')

In [None]:
%tensorboard --logdir /content/tic_tac_toe/dqn

Load best trained agents

In [9]:
agent_learn_player1.load_state_dict(torch.load(path + '/policy-player_1.pth'))
agent_learn_player2.load_state_dict(torch.load(path + '/policy-player_2.pth'))

<All keys matched successfully>

# Play agent against agent function


In [10]:
def play(agent1, agent2, n_episode=100):
  env = get_env(render_mode=None)
  policy = MultiAgentPolicyManager([agent1, agent2], env)
  dummy_vector_env = DummyVectorEnv([lambda: env])
  collector = Collector(policy, dummy_vector_env, exploration_noise=True)
  # play number of episodes
  result = collector.collect(n_episode=n_episode, render=None)
  rews, lens = result["rews"], result["lens"]
  print(f"Final reward: {rews[:, 0].mean()}, length: {lens.mean()}")

  won = 0
  draw = 0
  lost = 0
  for res in result['rews']:
    if res[0] == 1:
      won += 1
    elif res[0] == -1:
      lost +=1
    else:
      draw += 1

  print("Win: " + str(won) + " lost: " + str(lost) + " draw: " + str(draw))

  return (won, lost, draw)

# Setup LLM Agent

In [13]:
class LLMAgent(BasePolicy):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        models_dict = {"StableLM Zephyr 3B": "stabilityai/stablelm-zephyr-3b"}
        model_id = models_dict["StableLM Zephyr 3B"]
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)#, device=0)
        self.pipe = pipeline("text-generation", model=model_id, device_map="auto", tokenizer=self.tokenizer, torch_dtype=torch.bfloat16)
        self.cache = {}

    def field_to_string(self, field, c):
      return_value = ""
      if field[0] == 1:
        return_value = "X "
      elif field[1] == 1:
        return_value = "O "
      else:
          return_value = str(c) + " "
      if c < 6:
          return_value += "| "
      return return_value

    def board_to_string(self, batch: Batch) -> str:
        rows = batch.obs['obs']
        row1 = self.field_to_string(rows[0][0][0], 0) + self.field_to_string(rows[0][1][0], 3) + self.field_to_string(rows[0][2][0], 6)
        row2 = self.field_to_string(rows[0][0][1], 1) + self.field_to_string(rows[0][1][1], 4) + self.field_to_string(rows[0][2][1], 7)
        row3 = self.field_to_string(rows[0][0][2], 2) + self.field_to_string(rows[0][1][2], 5) + self.field_to_string(rows[0][2][2], 8)
        return row1 + '\n' + row2 + '\n' + row3

    def ask_llm_for_choice(self, board: str, possible_choices) -> int:
        job_description = "You will be provided with a tic tac toe board. There are two players, X and O. An empty board looks likes this:\n0 | 3 | 6\n1 | 4 | 7\n2 | 5 | 8\nWhen a player made a move a X or O is placed on the board.\nYou are player X and should choose the best possible option."
        possible_choices_text = ", ".join(possible_choices)
        question = "The current board is: \n" + board + "\nThe possible numbers are " + possible_choices_text + ". Only answer best number to choose, no comments or explanation, just a number."
        output = self.generate(job_description, question, 0.1, 40)

        extracted_choice = [int(i) for i in re.sub(r'[^0-9\s]', '', output).split() if i.isdigit() and str(i) in possible_choices]
        if extracted_choice:
          print("Choice: " + str(extracted_choice[0]))
          return extracted_choice[0]
        else:
          print("No choice, return -1")
          return -1

    def generate(self, job_description, question, temperature=0.7, max_new_tokens=512):
        messages = [
            {
                "role": "system",
                "content": job_description,
            },
        ]
        messages.append({"role": "user", "content": question})
        prompt = self.pipe.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        outputs = self.pipe(prompt, max_new_tokens=max_new_tokens, do_sample=True, temperature=temperature, top_k=50, top_p=0.95)
        output = outputs[0]["generated_text"]
        messages.append({"role": "assistant", "content": output})
        response_start = output.rfind('<|assistant|>')
        text_output = output[response_start + len('<|assistant|>'):]
        return text_output

    def forward(
        self, batch: Batch, state: dict | Batch | np.ndarray | None = None
    ) -> Batch:
        board = self.board_to_string(batch)

        if board in self.cache:
          print("cache hit:\n" + board + "\nChoice: " + str(self.cache[board]))
          return Batch(act=[self.cache[board]])

        all_choices = np.array(list(range(0,9)))
        mask = batch.obs.mask.flatten()
        masked_choices = all_choices[mask.astype(bool)].astype(str)

        if len(masked_choices) == 1:
          return Batch(act=[int(masked_choices[0])])

        choice = -1
        tries = 3
        while str(choice) not in masked_choices and tries > 0:
          choice = self.ask_llm_for_choice(board, masked_choices)
          tries -= 1

        if choice != -1 and str(choice) in masked_choices:
          print("Add to cache:\n" + board + "\nChoice: " + str(choice))
          self.cache[board] = choice
        else:
          print("Invalid choice, pick first: " + masked_choices[0])
          return Batch(act=[int(masked_choices[0])])

        return Batch(act=[choice])

    def learn(self, batch: Batch) -> Dict[str, Any]:
        return {}

# Play with agents in all possible combinations

1. Random - Random
1. Random - DQN
1. Random - LLM
1. DQN - Random
1. DQN - DQN
1. DQN - LLM
1. LLM - Random
1. LLM - DQN
1. LLM - LLM

So every agents plays as player_1 against the other agents

In [17]:
llm_agent = LLMAgent()

random_random = play(RandomPolicy(), RandomPolicy())
random_dqn = play(RandomPolicy(), agent_learn_player1)
random_llm = play(RandomPolicy(), llm_agent)
dqn_random = play(agent_learn_player1, RandomPolicy())
dqn_dqn = play(agent_learn_player1, agent_learn_player2)
dqn_llm = play(agent_learn_player1, llm_agent)
llm_random = play(llm_agent, RandomPolicy())
llm_dqn = play(llm_agent, agent_learn_player2)
llm_llm = play(llm_agent, llm_agent)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Final reward: 0.52, length: 7.66
Win: 68 lost: 16 draw: 16


In [22]:
print("Random - random: " + str(random_random[0]/100))
print("Random - DQN: " + str(random_dqn[0]/100))
print("Random - LLM: " + str(random_llm[0]/100))
print("DQN - random: " + str(dqn_random[0]/100))
print("DQN - DQN: " + str(dqn_dqn[0]/100))
print("DQN - LLM: " + str(dqn_llm[0]/100))
print("LLM - random: " + str(llm_random[0]/100))
print("LLM - DQN: " + str(llm_dqn[0]/100))
print("LLM - LLM: " + str(llm_llm[0]/100))

Random - random: 0.68
Random - DQN: 0.28
Random - LLM: 0.7
DQN - random: 0.88
DQN - DQN: 0.12
DQN - LLM: 0.97
LLM - random: 0.62
LLM - DQN: 0.02
LLM - LLM: 0.0


  and should_run_async(code)
