<a href="https://colab.research.google.com/github/gitHubAndyLee2020/Monte_Carlo_Chess_Agent/blob/main/monte_carlo_chess_agent_complex.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Monte Carlo Chess Agent Complex

In [None]:
!pip install chess

Collecting chess
  Downloading chess-1.10.0-py3-none-any.whl (154 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.4/154.4 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: chess
Successfully installed chess-1.10.0


In [None]:
!apt-get install -y stockfish

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
Suggested packages:
  polyglot xboard | scid
The following NEW packages will be installed:
  stockfish
0 upgraded, 1 newly installed, 0 to remove and 18 not upgraded.
Need to get 24.8 MB of archives.
After this operation, 47.4 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 stockfish amd64 14.1-1 [24.8 MB]
Fetched 24.8 MB in 1s (27.4 MB/s)
Selecting previously unselected package stockfish.
(Reading database ... 120895 files and directories currently installed.)
Preparing to unpack .../stockfish_14.1-1_amd64.deb ...
Unpacking stockfish (14.1-1) ...
Setting up stockfish (14.1-1) ...
Processing triggers for man-db (2.10.2-1) ...


In [None]:
import chess
import chess.pgn
import chess.engine
import random
import time
import heapq
from math import log,sqrt,e,inf

In [None]:
depth = 0
engine = chess.engine.SimpleEngine.popen_uci("/usr/games/stockfish")

In [None]:
class node():
    def __init__(self):
        self.state = chess.Board()
        self.children = set()
        self.parent = None
        self.N = 0
        self.n = 0
        self.v = 0
        self.ucb = 0
    def __lt__(self,other):
        return self.ucb<other.ucb

In [None]:
def ucb1(curr_node):
    ans = curr_node.v+2*(sqrt(log(curr_node.N+e+(10**-6))/(curr_node.n+(random.randint(3,7)))))
    return ans

In [None]:
def rollout(curr_node):
    global depth
    global engine
    if(curr_node.state.is_game_over()):
        board = curr_node.state
        if(board.result()=='1-0'):
            #print("h1")
            return (1,curr_node)
        elif(board.result()=='0-1'):
            #print("h2")
            return (-1,curr_node)
        else:
            return (0.5,curr_node)
    depth+=1
    all_moves = [curr_node.state.san(i) for i in list(curr_node.state.legal_moves)]

    result = engine.play(curr_node.state, chess.engine.Limit(time=0.001))
    move = curr_node.state.san(result.move)

    tmp_state = chess.Board(curr_node.state.fen())
    tmp_state.push_san(move)

    to_use = None

    for i in all_moves:
        tmp_state1 = chess.Board(curr_node.state.fen())
        tmp_state1.push_san(i)
        child = node()
        child.state = tmp_state1
        child.parent = curr_node
        curr_node.children.add(child)
        if(child.state==tmp_state):
            to_use = child
            break
    return rollout(to_use)

In [None]:
def expand(curr_node,white):
    global depth
    if(len(curr_node.children)==0):
        return curr_node
    depth+=1
    max_ucb = -inf
    if(white):
        heap = list(curr_node.children)
        sel_child = heapq._heappop_max(heap)
        return(expand(sel_child,0))

    else:
        heap = list(curr_node.children)
        sel_child = heapq.heappop(heap)
        return expand(sel_child,1)

In [None]:
def rollback(curr_node,reward):
    global depth
    sel_child = None
    while(curr_node.parent!=None):
        curr_node.N+=1
        curr_node.n+=1
        curr_node.v+=reward
        curr_node.ucb = ucb1(curr_node)
        if(depth==1):
            sel_child = curr_node
        curr_node = curr_node.parent
        depth-=1

    return (sel_child,curr_node)

In [None]:
def mcts_pred(curr_node,over,white,iterations=10):
    global depth
    if(over):
        return -1
    all_moves = [curr_node.state.san(i) for i in list(curr_node.state.legal_moves)]
    map_state_move = dict()

    for i in all_moves:
        tmp_state = chess.Board(curr_node.state.fen())
        tmp_state.push_san(i)
        child = node()
        child.state = tmp_state
        child.parent = curr_node
        curr_node.children.add(child)
        map_state_move[child] = i
    heap = list(curr_node.children)

    while(iterations>0):
        if(white):

            sel_child = heapq._heappop_max(heap)
            sel_child.parent = curr_node
            depth = 1
            st = time.time()

            st = time.time()
            reward,state = rollout(sel_child)
            print(time.time()-st)
            sel_child,curr_node = rollback(state,reward)
            heapq.heappush(heap,sel_child)
            iterations-=1
        else:
            sel_child = heapq.heappop(heap)
            depth = 1
            reward,state = rollout(sel_child)
            print(depth)
            sel_child,curr_node = rollback(state,reward)

            heapq.heappush(heap,sel_child)
            iterations-=1
    if(white):

        sel_child = heapq._heappop_max(heap)
        selected_move = map_state_move[sel_child]
        return selected_move
    else:
        sel_child = heapq.heappop(heap)
        selected_move = map_state_move[sel_child]
        return selected_move

In [None]:
def start_agent_vs_agent():
    board = chess.Board()
    white = 1
    moves = 0
    pgn = []
    game = chess.pgn.Game()
    evaluations = []
    sm = 0
    cnt = 0
    while((not board.is_game_over())):
        if moves >= 2:
          break
        print(f"=== move no.{moves} ===")
        all_moves = [board.san(i) for i in list(board.legal_moves)]
        root = node()
        root.state = board
        result = mcts_pred(root,board.is_game_over(),white)
        board.push_san(result)
        print(result)
        pgn.append(result)
        white ^= 1
        moves+=1

    print(board)
    print(" ".join(pgn))
    print()
    #print(evaluations)
    print(board.result())
    game.headers["Result"] = board.result()

if __name__=='__main__':
    start_agent_vs_agent()

=== move no.0 ===
4.438741445541382


KeyboardInterrupt: ignored

In [None]:
import chess
import chess.svg
from IPython.display import SVG, clear_output

def human_move(board):
    """
    This function lets the human input a move in UCI format (e.g., "g8f6").
    The move is checked for legality, and if it's illegal, the human is asked to input again.
    """
    legal_uci_moves = [board.uci(move) for move in board.legal_moves]
    print(f"Legal Moves: {legal_uci_moves}")
    while True:
        move_uci = input("Enter your move in UCI format (e.g., g8f6): ")
        if move_uci in legal_uci_moves:
            move_san = board.san(chess.Move.from_uci(move_uci))  # Convert UCI to SAN
            return move_san
        else:
            print("Illegal move! Try again.")

def start_agent_vs_human():
    board = chess.Board()
    white = 1  # start with white
    moves = 0
    pgn = []
    game = chess.pgn.Game()
    evaluations = []

    while (not board.is_game_over()):
        clear_output(wait=True)  # Clear the previous board display
        display(SVG(chess.svg.board(board=board, size=300)))

        print(f"=== move no.{moves} ===")
        if white:  # if white's turn, agent will play
            root = node()
            root.state = board
            result = mcts_pred(root, board.is_game_over(), white)
            board.push_san(result)
            print(f"Agent's move: {result}")
            pgn.append(result)
        else:  # if black's turn, human will play
            result = human_move(board)
            board.push_san(result)
            print(f"Your move: {result}")
            pgn.append(result)

        white ^= 1
        moves += 1

    clear_output(wait=True)
    display(SVG(chess.svg.board(board=board, size=300)))
    print(" ".join(pgn))
    print(board.result())
    game.headers["Result"] = board.result()

if __name__ == '__main__':
    start_agent_vs_human()
