In [1]:
# install and import libraries

%%capture
%pip install pettingzoo[classic]

import random
from collections import deque
import numpy as np

from keras.models import Sequential, clone_model
from keras.optimizers import Adam
from keras.layers import Dense
from keras.regularizers import l2
from keras import Input

from pettingzoo.classic import tictactoe_v3
from gymnasium.spaces.utils import flatten_space

from google.colab import drive

In [2]:
# mount google drive for persistent storage

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# define deep-q learning agent
# following tutorials from following repositories:
#   https://github.com/keon/deep-q-learning/tree/master
#   https://github.com/Alexander-H-Liu/Deep-Q-Learning-Keras/tree/master

class DeepQAgent:
    def __init__(self, name, state_size, action_size):
        # hyperparameters: modify these for training
        self.gamma          = 0.9   # reward discount rate
        self.epsilon        = 1.0   # exploration probability
        self.epsilon_decay  = 0.99  # decay rate for exploration prob
        self.epsilon_min    = 0.1   # minimum exploration prob
        self.learning_rate  = 0.01  # learning rate
        self.clipnorm       = 1.0   # gradient clipping norm
        self.memory_size    = 200   # size of experience replay buffer

        # don't modify these for training
        self.name = name
        self.train_mode = True
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=self.memory_size)
        self.memory_buffer = None
        self.model = self._build_model()
        self.target_model = clone_model(self.model)


    # model architecture: modify this before training
    def _build_model(self):
        opt = Adam(
            learning_rate=self.learning_rate,
            clipnorm=self.clipnorm  # helps prevent exploding gradients problem
        )
        model = Sequential()
        model.add(Input(shape=(self.state_size,)))
        model.add(Dense(
            32,
            use_bias=True,
            activation='tanh',
            kernel_initializer='he_normal',
            bias_initializer='he_normal'
        ))
        model.add(Dense(
            16,
            use_bias=True,
            activation='tanh',
            kernel_initializer='he_normal',
            bias_initializer='he_normal'
        ))
        model.add(Dense(
            self.action_size,
            use_bias=True,
            activation='tanh',
            kernel_initializer='he_normal',
            bias_initializer='he_normal'
        ))
        model.compile(loss='mse', optimizer=opt)
        return model


    # algorithm-defined function
    # choose next action according to deep q-learning algorithm
    def act(self, state, mask, verbose=0):
        if np.random.rand() <= self.epsilon and self.train_mode: # explore only if in training mode
            act_values = np.random.rand(self.action_size)
        else:
            act_values = self.model.predict(state, verbose=verbose)[0]
        if verbose:
            for j in range(self.action_size):
                print(f'Q(S,{j}) = [ {act_values[j]:6.3f} ]\t{"" if mask[j] else "(illegal)"}')
            print()
        return np.argmax(act_values)


    # algorithm-defined function
    # compute target q-values over random minibatch and perform network optimization
    def replay(self, batch_size, verbose=0):
        x_train = np.empty([batch_size, self.state_size])
        y_train = np.empty([batch_size, self.action_size])
        i = 0

        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.target_model.predict(next_state, verbose=verbose)[0])) # use fixed model for stabilized training

            q_current = self.model.predict(state, verbose=verbose)
            q_target = q_current.copy()
            q_target[0][action] = target

            if verbose >= 2:
                print(f'State: {state[0]}\tAction: {action}\tReward: {reward}')
                for j in range(self.action_size):
                    print(f'\tQ(S,{j}):\t[ {q_current[0][j]:6.3f} -> {q_target[0][j]:6.3f} ] {"**" if (j == action) else ""}')
                print()

            x_train[i] = state
            y_train[i] = q_target
            i += 1

        self.model.fit(x_train, y_train, batch_size=batch_size, epochs=1, verbose=verbose)
        self.epsilon = max(self.epsilon * self.epsilon_decay, self.epsilon_min)


    # algorithm-defined function
    # append experience tuple to memory buffer
    def memorize(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def set_train_mode(self, setting):
        self.train_mode = setting

    def set_epsilon(self, e):
        self.epsilon = e

    def update_target_model(self):
        self.target_model.set_weights(self.model.get_weights())

    def load(self, filename):
        self.model.load_weights(filename)

    def save(self, filename):
        self.model.save_weights(filename)

In [None]:
# create new agent object

env = tictactoe_v3.env()
env.reset(seed=42)

state_size = flatten_space(env.observation_space(env.agents[0])['observation']).shape[0]
action_size = env.action_space(env.agents[0]).n

agent = DeepQAgent('dqn_agent', state_size, action_size)

agent.model.summary()

# load pretrained weights from existing filepath
pretrained_filepath = None
if pretrained_filepath:
    agent.load(pretrained_filepath)
    agent.set_epsilon(agent.epsilon_min)

  saveable.load_own_variables(weights_store.get(inner_path))


In [None]:
# train agent over specified number of episodes

# training loop hyperparameters
BATCH_SIZE = 16
TARGET_UPDATE_EPISODES = 1
EPISODES = 1
agent.set_train_mode(True)

for e in range(EPISODES):
    print(f"Episode: {e}/{EPISODES}")

    # reset environment and agent memory buffer
    env = tictactoe_v3.env()
    env.reset(seed=42)
    agent.memory_buffer = {agent_id: None for agent_id in env.agents}

    # update stable target model according to hyperparameter
    if e % TARGET_UPDATE_EPISODES == 0:
        agent.update_target_model()

    # this loop encapsulates a single game of tic-tac-toe
    for agent_id in env.agent_iter():

        # agent observes pre-turn environment
        observation, reward, done, truncation, info = env.last()
        state = observation['observation'].flatten()
        mask = observation['action_mask'].flatten()
        state = state.reshape(-1, state.shape[0])

        # agent completes experience tuple and appends it to memory deque
        if agent.memory_buffer[agent_id] is not None:
            last_state, last_action = agent.memory_buffer[agent_id]
            agent.memorize(last_state, last_action, reward, state, done)

        # agent chooses action according to DQN model
        if done or truncation:
            action = None
        else:
            action = agent.act(state, mask)

        # agent stores current state and chosen action in memory buffer
        # cannot memorize complete experience tuple right now since agent cannot learn its reward and next state until after opponent turn
        agent.memory_buffer[agent_id] = (state, action)

        # agent executes action
        env.step(action)

        # learn from sampled experience replay at end of each game
        if len(agent.memory) > BATCH_SIZE:
            agent.replay(BATCH_SIZE)

# save model weights to persistent .weights.h5 file after training
agent.save(f'content/drive/MyDrive/{agent.name}.weights.h5')
drive.flush_and_unmount()

Episode: 0/1


In [None]:
# play against agent in competitive play

# utility function for rendering tic-tac-toe grid in terminal
def print_grid(grid_vector, agent_id):
    chars = [" "] * 9
    for i in range(9):
        if grid_vector[i*2] == 1:
            if agent_id == "player_1":
                chars[i] = "X"
            else:
                chars[i] = "O"
        elif grid_vector[i*2 + 1] == 1:
            if agent_id == "player_1":
                chars[i] = "O"
            else:
                chars[i] = "X"
    print()
    print(f' {chars[0]} | {chars[3]} | {chars[6]}')
    print(f'{"-" * 11}')
    print(f' {chars[1]} | {chars[4]} | {chars[7]}')
    print(f'{"-" * 11}')
    print(f' {chars[2]} | {chars[5]} | {chars[8]}')
    print()

agent.set_train_mode(False)
env = tictactoe_v3.env()
env.reset(seed=42)

for agent_id in env.agent_iter():
    observation, reward, termination, truncation, info = env.last()
    state = observation['observation'].flatten()
    mask = observation['action_mask']

    print_grid(state, agent_id)

    state = state.reshape(-1, state.shape[0])

    if termination or truncation:
        action = None
    else:
        print(f'Player {agent_id[-1]} sees Q-Values:')
        agent.act(state, mask, verbose=True)
        action = np.int64(input("Enter action: "))

    env.step(action)


   |   |  
-----------
   |   |  
-----------
   |   |  

Player 1 sees Q-Values:
Q(S,0) = [  0.795 ]	
Q(S,1) = [  0.695 ]	
Q(S,2) = [  0.858 ]	
Q(S,3) = [  0.804 ]	
Q(S,4) = [  0.635 ]	
Q(S,5) = [  0.762 ]	
Q(S,6) = [  0.929 ]	
Q(S,7) = [  0.969 ]	
Q(S,8) = [  0.794 ]	

Enter action: 7

   |   |  
-----------
   |   | X
-----------
   |   |  

Player 2 sees Q-Values:
Q(S,0) = [  0.850 ]	
Q(S,1) = [  0.832 ]	
Q(S,2) = [  0.831 ]	
Q(S,3) = [  0.660 ]	
Q(S,4) = [  0.590 ]	
Q(S,5) = [  0.889 ]	
Q(S,6) = [  0.936 ]	
Q(S,7) = [ -0.209 ]	(illegal)
Q(S,8) = [  0.920 ]	

Enter action: 6

   |   | O
-----------
   |   | X
-----------
   |   |  

Player 1 sees Q-Values:
Q(S,0) = [ -0.656 ]	
Q(S,1) = [  0.675 ]	
Q(S,2) = [  0.084 ]	
Q(S,3) = [  0.122 ]	
Q(S,4) = [  0.535 ]	
Q(S,5) = [ -0.210 ]	
Q(S,6) = [ -0.932 ]	(illegal)
Q(S,7) = [ -0.956 ]	(illegal)
Q(S,8) = [  0.526 ]	

Enter action: 1

   |   | O
-----------
 X |   | X
-----------
   |   |  

Player 2 sees Q-Values:
Q(S,0) = [  0.185 ]	
Q(

In [None]:
# for presentation

state, action, reward, next_state, done = agent.memory[-1]

print(state)
print(action)
print(reward)
print(next_state)
print(done)

mask = np.array([1, 1, 1, 1, 1, 1, 1, 1, 1])
agent.act(state, mask, verbose=True)


agent.act(next_state, mask, verbose=True)

[[0 0 1 0 0 1 0 0 0 0 0 1 0 1 1 0 1 0]]
4
1
[[0 0 1 0 0 1 0 0 1 0 0 1 0 1 1 0 1 0]]
True
Q(S,0) = [ -0.788 ]	
Q(S,1) = [ -0.944 ]	
Q(S,2) = [ -0.995 ]	
Q(S,3) = [ -0.761 ]	
Q(S,4) = [  0.077 ]	
Q(S,5) = [ -0.997 ]	
Q(S,6) = [ -0.986 ]	
Q(S,7) = [ -0.990 ]	
Q(S,8) = [ -0.931 ]	

Q(S,0) = [ -0.708 ]	
Q(S,1) = [ -0.796 ]	
Q(S,2) = [ -0.996 ]	
Q(S,3) = [ -0.830 ]	
Q(S,4) = [ -0.980 ]	
Q(S,5) = [ -0.999 ]	
Q(S,6) = [ -0.994 ]	
Q(S,7) = [ -0.994 ]	
Q(S,8) = [ -0.963 ]	



np.int64(0)

In [None]:
# print the weights of each layer

for layer in agent.model.layers:
    print(f"Layer: {layer.name}")
    weights = layer.get_weights()
    for i, w in enumerate(weights):
      print(f"Weights array {i} shape: {w.shape}")
      print(w)

In [None]:
# model one

'''
opt = Adam(
    learning_rate=self.learning_rate,
    clipnorm=self.clipnorm  # helps prevent exploding gradients problem
)
model = Sequential()
model.add(Input(shape=(self.state_size,)))
model.add(Dense(
    32,
    use_bias=True,
    activation='tanh',
    kernel_initializer='he_normal',
    bias_initializer='he_normal'
))
model.add(Dense(
    16,
    use_bias=True,
    activation='tanh',
    kernel_initializer='he_normal',
    bias_initializer='he_normal'
))
model.add(Dense(
    self.action_size,
    use_bias=True,
    activation='tanh',
    kernel_initializer='he_normal',
    bias_initializer='he_normal'
))
model.compile(loss='mse', optimizer=opt)
'''

In [None]:
# model two -- no bias in output layer

'''
opt = Adam(
    learning_rate=self.learning_rate,
    clipnorm=self.clipnorm  # helps prevent exploding gradients problem
)
model = Sequential()
model.add(Input(shape=(self.state_size,)))
model.add(Dense(
    32,
    use_bias=True,
    activation='tanh',
    kernel_initializer='he_normal',
    bias_initializer='he_normal'
))
model.add(Dense(
    16,
    use_bias=True,
    activation='True',
    kernel_initializer='he_normal',
    bias_initializer='he_normal'
))
model.add(Dense(
    self.action_size,
    use_bias=False,
    activation='tanh',
    kernel_initializer='he_normal',
    bias_initializer='he_normal'
))
model.compile(loss='mse', optimizer=opt)
'''

In [None]:
# model three -- single layer

'''
opt = Adam(
    learning_rate=self.learning_rate,
    clipnorm=self.clipnorm  # helps prevent exploding gradients problem
)
model = Sequential()
model.add(Input(shape=(self.state_size,)))
model.add(Dense(
    self.action_size,
    use_bias=True,
    activation='tanh',
    kernel_initializer='he_normal',
    bias_initializer='he_normal'
))
model.compile(loss='mse', optimizer=opt)
'''