In [1]:
import time
from collections import deque, namedtuple
import numpy as np
import PIL.Image
import tensorflow as tf
import utils_tictactoe
import copy
import random

from pyvirtualdisplay import Display
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.losses import MSE
from tensorflow.keras.optimizers import Adam

tf.random.set_seed(utils_tictactoe.SEED)

In [2]:
MEMORY_SIZE = 100_000     # size of memory buffer
GAMMA = 0.995             # discount factor
ALPHA = 1e-3              # learning rate  
NUM_STEPS_FOR_UPDATE = 3  # perform a learning update every C time steps
X = "X"
O = "O"
EMPTY = None
BOARD_SIZE = 3
ALL_ACTIONS = [(r, c) for r in range(BOARD_SIZE) for c in range(BOARD_SIZE)]
board_size = 3
state_size = 9
action_size = 9
# Store experiences as named tuples
experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

In [3]:
class tictactoe():

    def __init__(self, initial=[[EMPTY for _ in range(board_size)] for _ in range(board_size)]):
        """
        Initialize game board.
        board: use 2d list to represent the state
        first player is 0, second is 1
        """
        self.board = copy.deepcopy(initial)
        self.player = 0
        self.winner = None
        self.left = 9  # EMPTY grid left in game board

    @classmethod
    def available_actions(cls, board):
        """
        tictactoe.available_actions(board) takes a `board` list as input
        and returns all of the available actions `(i, j)` in that state.

        Action `(i, j)` represents the action of make a move in raw_i,column_j
        """
        actions = set()
        for i in range(3) :
            for j in range(3) :
                if board[i][j] == EMPTY :
                    actions.add((i,j))
        return actions

    @classmethod
    def other_player(cls, player):
        """
        tictactoe.other_player(player) returns the player that is not
        `player`. Assumes `player` is either 0 or 1.
        """
        return 0 if player == 1 else 1

    def switch_player(self):
        """
        Switch the current player to the other player.
        """
        self.player = tictactoe.other_player(self.player)

    def now_player_sign(self, player):
        if player == 0:
            return X
        else:
            return O
        
    # check for winner after each action taken by players
    def check_winner(self, action):
        r,c = action
        base_sign = self.board[r][c]  # get current player's sign
        winner_reward = [None, 0]  # record winner and reward

        # if winner is none and base_sign is current player's sign, we need to check winner
        def winner(base_sign, k):
            if winner_reward[0] is None and k:
                if base_sign == X:
                    winner_reward[0] = 0
                else:
                    winner_reward[0] = 1
        
        # whoever the winner is, we give player that win or block rival's win 1 reward
        def get_reward(base_sign, r, c, k):
            # check for rows
            if self.board[r][0] == base_sign and base_sign == self.board[r][1] and base_sign == self.board[r][2]:
                winner_reward[1] = 1
                winner(base_sign, k)
            # check for columns
            if self.board[0][c] == base_sign and base_sign == self.board[1][c] and base_sign == self.board[2][c]:
                winner_reward[1] = 1
                winner(base_sign, k)
            # check for first diagnal
            if r == c:
                if self.board[0][0] == base_sign and self.board[1][1] == base_sign and self.board[2][2] == base_sign :
                    winner_reward[1] = 1
                    winner(base_sign, k)
            # check for second diagnal
            if r+c == 2:
                if self.board[0][2] == base_sign and self.board[1][1] == base_sign and self.board[2][0] == base_sign :
                    winner_reward[1] = 1
                    winner(base_sign, k)
        # base_sign is current player's sign, we need check winner and reward, k=1
        get_reward(base_sign, r, c, 1)

        # base_sign is not current player's sign, we need check only the reward, k=0
        base_sign = self.now_player_sign(self.player)
        self.board[r][c] = base_sign
        get_reward(base_sign, r, c, 0)
        self.board[r][c] = self.now_player_sign(1-self.player)
        return winner_reward
    
    def board_to_state(self):
        """
        neuron network can only take numbers as input. 
        first player is X and 1; second is O and -1; empty is 0
        """
        translation = {X:1, O:-1, EMPTY:0}
        state = tuple([tuple([translation[j] for j in row])for row in self.board])
        return state
    
    def move(self, action):
        """
        Make the move `action` for the current player.
        `action` must be a tuple `(i, j)`.
        """
        raw, column = action

        # Check for errors
        if self.winner is not None:
            raise Exception("Game already won")
        elif raw < 0 or raw >= 3 or column < 0 or column >= 3:
            raise Exception("Invalid move")

        # Check if ai made a move on the place already made a move
        # not happen if we apply a mask on the output of nn 
        if self.board[raw][column] is not None:
            print("error:ai made an invalid move")
            return self.board_to_state(), 0, 1
        
        # Update board
        self.board[raw][column] = self.now_player_sign(self.player)
        self.switch_player()
        self.left -= 1

        # check for winner and get reward
        # however, we can simply give 2 for winner, and -2 for loser
        # don't need to use this function to calculate the reward 
        self.winner, _ = self.check_winner(action)

        # move function return state, reward, done(1 for terminal, 0 for not)
        if self.winner is not None:
            # second last move must be made by loser
            return self.board_to_state(), -2, 1
        elif self.left == 0:
            # we use 2 to represent tie
            self.winner = 2
            return self.board_to_state(), 0, 1
        # game continuing
        else:
            return self.board_to_state(), 0, 0 
    
    # print the board
    def render(self):
        print()
        print("board:")
        print("   0 1 2")
        for i in range(3):
            print(i,end="  ")
            for j in self.board[i]:
                print(j if j is not None else '-',end=" ")
            print()
        print()

In [4]:
# Create the Q-Network
# 64-64 is enough
q_network = Sequential([
    ### START CODE HERE ### 
    Input(state_size),
    Dense(64, activation='relu'),
    Dense(64, activation='relu'),
    Dense(action_size, activation='linear')
    ### END CODE HERE ### 
    ])

# Create the target Q^-Network
target_q_network = Sequential([
    ### START CODE HERE ### 
    Input(state_size),
    Dense(64, activation='relu'),
    Dense(64, activation='relu'),
    Dense(action_size, activation='linear')
    ### END CODE HERE ###
    ])

### START CODE HERE ### 
optimizer = Adam(ALPHA)
### END CODE HERE ###

In [5]:
'''
in states, 1 and -1 is places already made moves, so we replace them with -100
to avoid making repetitive move
'''
def get_mask(states):
    mask = tf.where(tf.equal(states, 1.0), -100.0, states)
    mask = tf.where(tf.equal(mask, -1.0), -100.0, mask)
    return mask

In [6]:
def compute_loss(experiences, gamma, q_network, target_q_network):
    """ 
    Calculates the loss.
    
    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "done"] namedtuples
      gamma: (float) The discount factor.
      q_network: (tf.keras.Sequential) Keras model for predicting the q_values
      target_q_network: (tf.keras.Sequential) Karas model for predicting the targets
          
    Returns:
      loss: (TensorFlow Tensor(shape=(0,), dtype=int32)) the Mean-Squared Error between
            the y targets and the Q(s,a) values.
    """
    
    # Unpack the mini-batch of experience tuples. all of them are tf.tensor
    states, actions, rewards, next_states, done_vals = experiences
    
    # Compute max Q^(s,a)
    mask = get_mask(next_states)
    max_qsa = tf.reduce_max(target_q_network(next_states) + mask, axis=-1)  # (64,)tensor

    # Set y = R if episode terminates, otherwise set y = R + γ max Q^(s,a).
    ### START CODE HERE ### 
    y_targets = done_vals * rewards + (1 - done_vals) * (rewards + gamma * max_qsa)
    ### END CODE HERE ###
    
    # Get the q_values
    q_values = q_network(states)
    q_values = tf.gather(q_values, actions, axis=1, batch_dims=1)  # (64,)tensor
    # Compute the loss
    ### START CODE HERE ### 
    loss = MSE(q_values, y_targets) 
    ### END CODE HERE ### 
    
    return loss

In [7]:
@tf.function
def agent_learn(experiences, gamma):
    """
    Updates the weights of the Q networks.
    
    Args:
      experiences: (tuple) tuple of ["state", "action", "reward", "next_state", "done"] namedtuples
      gamma: (float) The discount factor.
    
    """
    
    # Calculate the loss
    with tf.GradientTape() as tape:
        loss = compute_loss(experiences, gamma, q_network, target_q_network)

    # Get the gradients of the loss with respect to the weights.
    gradients = tape.gradient(loss, q_network.trainable_variables)
    
    # Update the weights of the q_network.
    optimizer.apply_gradients(zip(gradients, q_network.trainable_variables))

    # update the weights of target q_network
    utils_tictactoe.update_target_network(q_network, target_q_network)

    return loss  # to report the loss history

In [8]:
# convert the 2d list to (1, 9) tensor, suitable for nn input
def state_to_input(state):
    state_flat = np.array(state).flatten()
    return tf.convert_to_tensor(state_flat, dtype=tf.float32)[tf.newaxis, :]

In [None]:
start = time.time()

num_rounds = 20000

total_loss_history = []

epsilon = 1    # initial ε value for ε-greedy policy

# Create a memory buffer D with capacity N
memory_buffer = deque(maxlen=MEMORY_SIZE)

# Set the target network weights equal to the Q-Network weights
target_q_network.set_weights(q_network.get_weights())

for i in range(num_rounds):
    
    # Reset the environment to the initial state and get the initial state
    game = tictactoe()
    last = {
            0: {"state": None, "action": None},
            1: {"state": None, "action": None},
        }

    for t in range(9):
        
        state = game.board_to_state() # 2 dimension tuple
        # From the current state S choose an action A using an ε-greedy policy
        # state needs to be the right shape for the q_network
        input_qn = state_to_input(state) # (1,9) tf tensor
        q_values = q_network(input_qn) # (1,9) tf tensor
        action = utils_tictactoe.get_action(q_values, input_qn, epsilon) # tuple(i, j)
        last[game.player]["state"] = state
        last[game.player]["action"] = action

        # Make move
        next_state, reward, done = game.move(action)

        state = last[game.player]["state"]
        action = last[game.player]["action"]


        # Store experience tuple (S,A,R,S') in the memory buffer.
        # We store the done variable as well for convenience.
        if t != 0:
            memory_buffer.append(experience(state, action, reward, next_state, done))
        if game.winner is not None:
            memory_buffer.append(experience(last[1-game.player]["state"], last[1-game.player]["action"], -reward, next_state, done))
            

        # Only update the network every NUM_STEPS_FOR_UPDATE time steps.
        update = utils_tictactoe.check_update_conditions(t, NUM_STEPS_FOR_UPDATE, memory_buffer)
        
        if update:
            # Sample random mini-batch of experience tuples (S,A,R,S') from Deque
            experiences = utils_tictactoe.get_experiences(memory_buffer)
            
            # Set the y targets, perform a gradient descent step,
            # and update the network weights.
            loss = agent_learn(experiences, GAMMA)
            total_loss_history.append(loss)

        if done:
            break
        
    # Update the ε value
    epsilon = utils_tictactoe.get_new_eps(epsilon)
    av_latest_loss = np.mean(total_loss_history[-1000:])
    print(f"\rEpisode {i+1} | Total loss average: {av_latest_loss:.2f}", end="")

    if (i+1) % 1000 == 0:
        print(f"\rEpisode {i+1} | Total loss average: {av_latest_loss:.2f}")
    
    if av_latest_loss <= 0.05:
        break

        
tot_time = time.time() - start

print(f"\nTotal Runtime: {tot_time:.2f} s ({(tot_time/60):.2f} min)")

In [11]:
# tool to print last game to debug
def print_last_game(buffer):
    for e in list(buffer)[-7:]:
        for i in range(3):
            print(f'{e.state[i]}   ->   {e.next_state[i]}')
        print(f'action: {e.action}, reward:{e.reward}, done: {e.done}')

In [12]:
def play(human_player=None):
    """
    Play human game against the AI and also store the experience tuple (S,A,R,S') in the memory buffer.
    `human_player` can be set to 0 or 1 to specify whether
    human player moves first or second.
    """
    
    # If no player order set, choose human's order randomly
    if human_player is None:
        human_player = random.randint(0, 1)

    # Create new game
    game = tictactoe()
    last = {
        0: {"state": None, "action": None},
        1: {"state": None, "action": None},
    }

    # Game loop
    while True:

        # Print contents of board
        game.render()

        # Compute available actions
        available_actions = tictactoe.available_actions(game.board) #tuple(i, j)
        time.sleep(1)

        # Let human make a move
        if game.player == human_player:
            print("Your Turn")
            while True:
                # only int input is legal, otherwise will exit play
                row = int(input("Choose Row: "))
                column = int(input("Choose Column: "))
                if (row, column) in available_actions:
                    action = (row, column)
                    break
                print("Invalid move, try again.")

        # Have AI make a move
        else:
            print("AI's Turn")
            state = game.board_to_state() # 2 dimension tuple
            input_qn = state_to_input(state) # (1, 9) tf tensor
            q_values = q_network(input_qn) # (1, 9) tf tensor
            action = utils_tictactoe.get_action(q_values, input_qn) # tuple(i, j)
            row, column = action
            print(f"AI chose to move row {row}, column {column}.")

        state = game.board_to_state()
        last[game.player]["state"] = state
        last[game.player]["action"] = action

        # Make move
        next_state, reward, done = game.move((row, column))

        state = last[game.player]["state"]
        action = last[game.player]["action"]

        if t != 0:
            memory_buffer.append(experience(state, action, reward, next_state, done))
        if game.winner is not None:
            memory_buffer.append(experience(last[1-game.player]["state"], last[1-game.player]["action"], -reward, next_state, done))
        # Check for winner
        if done:
            game.render()
            print()
            print("GAME OVER")
            if game.winner == human_player:
                print(f"Winner is human")
            elif game.winner == 1-human_player:
                print(f"Winner is AI")
            else:
                print("Tie")
            return

In [None]:
for i in range(3):
    play(0)
for i in range(3):
    play(1)


    

In [None]:
print_last_game(memory_buffer)

In [None]:
# save model
q_network.save('q_model_tictactoe', save_format='tf')
target_q_network.save('target_q_model_tictactoe', save_format='tf')

# load model
#q_network = tf.keras.models.load_model('q_model_tictactoe')
#target_q_network = tf.keras.models.load_model('target_q_model_tictactoe')
#optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)