Reference:https://github.com/suragnair/alpha-zero-general/tree/master/othello

In [1]:
import numpy as np
import tensorflow as tf
from tensorflow import keras  
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import Adam
import random
import os
import math
import copy
import tensorflow as tf
tf.keras.utils.disable_interactive_logging()


In [2]:
### Game Environment

In [3]:
class Board:
    def __init__(self, player1, player2, n=6):
        self.n = n
        self.board = [[0] * n for _ in range(n)]
        self.current_play = 1
        self.reset()
        self.player1 = player1
        self.player2 = player2
        self.done = False
        self.directions = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]

    def reset(self):
        n = self.n
        self.board = [[0] * n for _ in range(n)]
        self.current_play = 1
        if n % 2 == 0:
            mid = n // 2
            self.board[mid - 1][mid - 1] = 1
            self.board[mid][mid] = 1
            self.board[mid - 1][mid] = -1
            self.board[mid][mid - 1] = -1
            
    def get_board_state(self):
        return self.board 

#     def getHash(self):
#         self.boardHash = str(np.array(self.board).reshape(self.n * self.n))
#         return self.boardHash


    def count_diff(self, color):
        count = 0
        for j in range(self.n):
            for i in range(self.n):
                if self.board[i][j] == color:
                    count += 1
                if self.board[i][j] == -color:
                    count -= 1
        return count

    def get_legal_moves(self, color):
        moves = set()
        for j in range(self.n):
            for i in range(self.n):
                if self.board[i][j] == color:
                    new_moves = self.get_moves_for_square((i, j))
                    moves.update(new_moves)
        return list(moves)

    def has_legal_moves(self, color):
        for j in range(self.n):
            for i in range(self.n):
                if self.board[i][j] == color:
                    new_moves = self.get_moves_for_square((i, j))
                    if len(new_moves) > 0:
                        return True
        return False

    def get_moves_for_square(self, square):
        (i, j) = square
        color = self.board[i][j]
        if color == 0:
            return None
        moves = []
        for direction in self.directions:
            move = self._discover_move(square, direction)
            if move:
                moves.append(move)
        return moves

    def execute_move(self, move, color):
        flips = []
        for direction in self.directions:
            flip = self._get_flips(move, direction, color)
            flips.extend(flip)

        if flips:
            for i, j in flips:
                self.board[i][j] = color
        else:
            raise Exception("Invalid move")
        row, col = move
        self.board[row][col] = color

    def _discover_move(self, origin, direction):
        i, j = origin
        color = self.board[i][j]
        flips = []
        for new_i, new_j in self._increment_move(origin, direction, self.n):
            if self.board[new_i][new_j] == 0:
                if flips:
                    return (new_i, new_j)
                else:
                    return None
            elif self.board[new_i][new_j] == color:
                return None
            elif self.board[new_i][new_j] == -color:
                flips.append((new_i, new_j))
        return None

    def _get_flips(self, origin, direction, color):
        flips = []
        for i, j in self._increment_move(origin, direction, self.n):
            if self.board[i][j] == 0:
                return []
            if self.board[i][j] == -color:
                flips.append((i, j))
            elif self.board[i][j] == color and len(flips) > 0:
                return flips
        return []

    def _increment_move(self, move, direction, n):
        i, j = move
        move = (i + direction[0], j + direction[1])
        while 0 <= move[0] < n and 0 <= move[1] < n:
            yield move
            move = (move[0] + direction[0], move[1] + direction[1])

    def get_reward(self):
        player1_score = self.count_diff(1)
        player2_score = self.count_diff(-1)
        if player1_score > player2_score:
            return 1  # Player 1 (Black) wins
        elif player1_score < player2_score:
            return -1  # Player 2 (White) wins
        else:
            return 0  # It's a draw

    def step(self, action):
        i, j = action
        reward = 0
        done = False

        if i == -1 and j == -1:
            self.current_play *= -1
            self.done = True
            reward = self.get_reward()
        elif not self.has_legal_moves(self.current_play):
            print("Invalid Move!")
            self.done = True
            reward = -1 * self.current_play
        else:
            next_state = copy.deepcopy(self.board)
            try:
                self.execute_move((i, j), self.current_play)
                self.current_play *= -1
                done = self.game_over()
            except Exception as e:
                print(f"Error in executing move: {e}")
                done = True
                reward = -1 * self.current_play

        if not done and not self.has_legal_moves(self.current_play):
            print("No legal moves for the current player.")
            self.current_play *= -1

        return copy.deepcopy(self.board) if next_state is None else copy.deepcopy(next_state), reward, done, {}

    def game_over(self):
        return not self.has_legal_moves(1) and not self.has_legal_moves(-1)



In [4]:
# DQN agent class
class ReplayBuffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.buffer = []

    def add(self, experience):
        if len(self.buffer) > self.buffer_size:
            self.buffer.pop(0)
        self.buffer.append(experience)

    def sample(self, batch_size):
        if len(self.buffer) < batch_size:
            batch_size = len(self.buffer)
        return random.sample(self.buffer, batch_size)

    def update(self, idx, error):
        self.buffer[idx] = (self.buffer[idx][0], self.buffer[idx][1], self.buffer[idx][2], self.buffer[idx][3], error)


In [5]:
class DQNAgent:
    def __init__(self, color, n=6):
        self.n = n
        self.current_play = 1
        self.color = color
        self.batch_size = 32
        self.gamma = 0.95
        self.learning_rate = 0.001
        self.epsilon = 1
        self.min_epsilon = 0.1
        self.max_epsilon = 1
        self.update_rate = 5000
        self.steps = 0
        self.lambd = 0.0005
        self.train_frequency = 100
        self.weight_backup = "nn_model.h5"
        self.replay_buffer_size = 20000
        self.memory = ReplayBuffer(self.replay_buffer_size)
        self.nn_model = self.neural_network()
        self.target_model = self.neural_network()

    def neural_network(self):
        model = Sequential()
        model.add(Dense(36, input_shape=(self.n*self.n,), activation='relu'))
        model.add(Dense(36, activation='relu'))
        model.add(Dense(self.n*self.n, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model


    def load_model(self):
        if os.path.isfile(self.weight_backup):
            self.nn_model.load_weights(self.weight_backup)

    def update_target_model(self):
        self.target_model.set_weights(self.nn_model.get_weights())

    def get_hash(self, board):
        board_hash = np.array(board).flatten()
        return board_hash

        
    def choose_action(self, board):
        positions = board.get_legal_moves(self.color)
        if np.random.rand() < self.epsilon:
            return random.choice(positions)
        else:
            state_old = self.get_hash(board.get_board_state())
            pred = self.nn_model.predict(np.array([state_old]))[0]
            best_action_index = np.argmax(pred)
            best_action = positions[best_action_index]
            return best_action

    def remember(self, state, action, reward, next_state, done):
        experience = (self.get_hash(state), action, reward, self.get_hash(next_state), done)
        #print("Experience added:", experience)

        self.memory.add(experience)

        if self.steps % self.train_frequency == 0:
            self.replay()

        if self.steps % self.update_rate == 0:
            self.update_target_model()

        self.steps += 1
        self.epsilon = self.min_epsilon + (self.max_epsilon - self.min_epsilon) * math.exp(-self.lambd * self.steps)

    def predict(self, board, target=False):
        state_hash = self.get_hash(board)
       
        state_hash = np.array([state_hash])  
        if target:
            return self.target_model.predict(state_hash)
        else:
            return self.nn_model.predict(state_hash)[0]

    def replay_value(self, minibatch):
        x = []
        y = []
        errors = []
        for (state, action, reward, next_state, done) in minibatch:
            state_hash = self.get_hash(state)
            pred = self.predict(state, target=False)[0]

            if isinstance(action, tuple):
                action_index = action[0] * self.n + action[1]
            else:
                action_index = action
            try:
                true_val = pred[action_index]
            except IndexError:
                continue

            if not done:
                next_pred = self.predict(next_state, target=False)[0]

                next_curr_state = self.get_hash(next_state)
                next_pred_target = self.predict(next_curr_state, target=True)[0]

                pred[action_index] = reward + self.gamma * next_pred_target[np.argmax(next_pred)]
            else:
                pred[action_index] = reward

            error = abs(true_val - pred[action_index])
            x.append(state)
            y.append(pred)
            errors.append(error)

        return x, y, errors

    def replay(self):
        minibatch = self.memory.sample(self.batch_size)
        #print("Replay buffer size:", len(self.memory.buffer))
        x, y, errors = self.replay_value(minibatch)

        for i in range(len(minibatch)):
            idx = minibatch[i][0]
            if errors:  
                self.memory.update(idx, errors[i])

        if x and y:  
            self.nn_model.fit(np.array(x), np.array(y), batch_size=self.batch_size, epochs=1, verbose=2, shuffle=True)

    def save_model(self, filename):
        self.nn_model.save(self.weight_backup)



In [6]:
def evaluate(player1, player2, num_games):
    wins_player1 = 0
    wins_player2 = 0
    draws = 0
    game_durations = []

    for _ in range(num_games):
        
        board = Board(player1.color, player2.color)
        done = False
        duration = 0  
        while not done:
            # Player 1's turn
            player1_position = board.get_legal_moves(board.current_play)
            if not player1_position:
                break

            if len(player1_position) > 0:
                player1_action = player1.choose_action(board)
                if isinstance(player1_action, tuple):
                    player1_action = player1_action[0]
                if 0 <= player1_action < len(player1_position):
                    _, _, done, _ = board.step(player1_position[player1_action])

                    # Remember the state, action, reward, next_state, and done for player 1
                    player1.remember(copy.deepcopy(board.board), player1_action, _, copy.deepcopy(board.board), done)

                    if done:
                        break
                else:
                    break
            else:
                break

            # Player 2's turn
            player2_position = board.get_legal_moves(board.current_play)
            if not player2_position:
                break

            if len(player2_position) > 0:
                player2_action = player2.choose_action(board)
                if isinstance(player2_action, tuple):
                    player2_action = player2_action[0]
                if 0 <= player2_action < len(player2_position):
                    _, _, done, _ = board.step(player2_position[player2_action])

                    # Remember the state, action, reward, next_state, and done for player 2
                    player2.remember(copy.deepcopy(board.board), player2_action, _, copy.deepcopy(board.board), done)

                    if done:
                        break
                else:
                    break
            else:
                break

            duration += 1

        player1.replay()
        player2.replay()

        winner = board.get_reward()

        if winner == 1:
            wins_player1 += 1
        elif winner == -1:
            wins_player2 += 1
        else:
            draws += 1

        game_durations.append(duration)

    win_rate_player1 = wins_player1 / num_games
    win_rate_player2 = wins_player2 / num_games
    draw_rate = draws / num_games
    avg_duration = np.mean(game_durations)

    return win_rate_player1, win_rate_player2, draw_rate, avg_duration


In [9]:
player1 = DQNAgent(1, 6)
player2 = DQNAgent(-1, 6)
num_evaluation_games = 10

win_rate_player1, win_rate_player2, draw_rate, avg_duration = evaluate(player1, player2, num_evaluation_games)

print(f"Player 1 Win Rate: {win_rate_player1}")
print(f"Player 2 Win Rate: {win_rate_player2}")
print(f"Draw Rate: {draw_rate}")
print(f"Average Game Duration: {avg_duration}");


Player 1 Win Rate: 0.6
Player 2 Win Rate: 0.2
Draw Rate: 0.2
Average Game Duration: 4.2


In [10]:
player1 = DQNAgent(1, 6)
player2 = DQNAgent(-1, 6)
num_evaluation_games = 10

win_rate_player1, win_rate_player2, draw_rate, avg_duration = evaluate(player1, player2, num_evaluation_games)

print(f"Player 1 Win Rate: {win_rate_player1}")
print(f"Player 2 Win Rate: {win_rate_player2}")
print(f"Draw Rate: {draw_rate}")
print(f"Average Game Duration: {avg_duration}");


Player 1 Win Rate: 0.8
Player 2 Win Rate: 0.1
Draw Rate: 0.1
Average Game Duration: 1.3


In [11]:
# RL agent first and random player as second

In [12]:
class Player:
    def __init__(self, color, name):
        self.color = color
        self.name = name

    def choose_action(self, board):
        legal_moves = board.get_legal_moves(self.color)
        if legal_moves:
            return random.choice(legal_moves)
        return None
    def remember(self, state, action, reward, next_state, done):
        pass
    def replay_value(self, minibatch):
        pass
    def replay(self):
        pass
    
    

In [13]:
player1 = DQNAgent(1, 6)
player2 = Player(-1,"randomPlayer")
num_evaluation_games = 10

win_rate_player1, win_rate_player2, draw_rate, avg_duration = evaluate(player1, player2, num_evaluation_games)

print(f"Player 1 Win Rate: {win_rate_player1}")
print(f"Player 2 Win Rate: {win_rate_player2}")
print(f"Draw Rate: {draw_rate}")
print(f"Average Game Duration: {avg_duration}");


Player 1 Win Rate: 0.5
Player 2 Win Rate: 0.1
Draw Rate: 0.4
Average Game Duration: 3.6
