Server communication game and game_loop.

In [9]:
## ignore this code, just used for submission

#Update your token
STUDENT_TOKEN = 'EUGENIOMARCHIORI'

import requests
import pprint
import json
import random
import time
from copy import copy, deepcopy

class Game:
  def __init__(self, state, status, player):
    self.state = state
    self.status = status
    self.player = player

  def is_waiting(self):
    return self.status == 'waiting'

  def is_end(self):
    return self.status == 'complete'

  def get_board(self):
    return json.loads(self.state)

  def actions(self):
    return []

  def print_game(self):
    print(self.state)

def new_game(game_type, multi_player = False):
  for _ in range(10):
    r = requests.get('https://emarchiori.eu.pythonanywhere.com/new-game?TOKEN=%s&game-type=%s&multi-player=%s' % (STUDENT_TOKEN, game_type, 'True' if multi_player else 'False'))
    if r.status_code == 200:
      return r.json()['game-id']
    print(r.content)

def join_game(game_type, game_id):
  for _ in range(10):
    r = requests.get('https://emarchiori.eu.pythonanywhere.com/join-game?TOKEN=%s&game-type=%s&game-id=%s' % (STUDENT_TOKEN, game_type, game_id))
    if r.status_code == 200:
      return r.json()['player']
    print(r.content)

def game_state(game_type, game_id, GameClass):
  for _ in range(10):
    r = requests.get('https://emarchiori.eu.pythonanywhere.com/game-state?TOKEN=%s&game-type=%s&game-id=%s' % (STUDENT_TOKEN, game_type, game_id))
    if r.status_code == 200:
      return GameClass(r.json()['state'], r.json()['status'], r.json()['player'])
    print(r.content)

def update_game(game_type, game_id, player, move):
  for _ in range(10):
    r = requests.get('https://emarchiori.eu.pythonanywhere.com/update-game?TOKEN=%s&game-type=%s&game-id=%s&player=%s&move=%s' % (STUDENT_TOKEN, game_type, game_id, player, move))
    if r.status_code == 200:
      return r.content
    print(r.content)

def game_loop(solver, GameClass, game_type, multi_player = False, id = None):
    while id == None:
        print('\033[92mCreating new game...\033[0m')
        id = new_game(game_type, multi_player)

    print('\033[92mJoining game with id: %s\033[0m' % id)
    player = join_game(game_type, id)

    print('\033[92mPlaying as %s\033[0m' % player)

    game = game_state(game_type, id, GameClass)
    print('\033[91mWaiting for the other player to join...\033[0m')
    while game.is_waiting():
        time.sleep(10)
        game = game_state(game_type, id, GameClass)

    while True:
        game = game_state(game_type, id, GameClass)
        if game is None:
            print("Failed to fetch the game state.")
            break  # Exit the loop if the game state couldn't be fetched

        game.print_game()
        if game.is_end():
            if game.player == '-':
                print('\033[94mdraw\033[0m')
            else:
                print('\033[92mwinner\033[0m' if game.player == player else '\033[91mloser\033[0m')
            return
        if game.player == player:
            print('Making next move...')
            move = solver(game)
            update_result = update_game(game_type, id, player, json.dumps(move))
            print(update_result)
        else:
            time.sleep(2)


MCTS implementation

In [10]:
import math
class MCTSNode:
    def __init__(self, state, parent=None, move=None):
        self.state = state
        self.parent = parent
        self.move = move
        self.children = []
        self.wins = 0
        self.visits = 0
        self.untried_actions = self.untried_moves()

    def untried_moves(self):
        return self.state.actions()

    def is_terminal_node(self):
        return self.state.is_end()

    def select_child(self):
        """ Select a child node with the highest UCB1 value """
        ucb1_values = [(child.wins / child.visits) + 2 * (2 * math.log(self.visits) / child.visits) ** 0.5 for child in self.children]
        return self.children[ucb1_values.index(max(ucb1_values))]

    def add_child(self, move, state):
        """ Add a new child node for this move """
        child_node = MCTSNode(state=state, parent=self, move=move)
        self.untried_actions.remove(move)
        self.children.append(child_node)
        return child_node

    def update(self, result):
        """ Update this node - one additional visit and result additional wins """
        self.visits += 1
        self.wins += result


def mcts_solver(game, iterations=1000):
    root = MCTSNode(state=game)

    for _ in range(iterations):
        node = root
        state = deepcopy(game)

        # Selection
        while node.untried_actions == [] and node.children != []:  # node is fully expanded and non-terminal
            node = node.select_child()
            state.play(node.move)

        # Expansion
        if node.untried_actions != []:
            move = random.choice(node.untried_actions) 
            state.play(move)
            node = node.add_child(move, state)  # add child and descend tree

        # Simulation
        while not state.is_end():
            possible_actions = state.actions()
            if possible_actions:
                state.play(random.choice(possible_actions))
            else:
                break  # No more actions possible, break out of the loop

        # Backpropagation
        while node is not None:
            if state.is_end():
                game_result = state.result(node.parent.state.player)
            else:
                game_result = 0.5  # default value or continue simulation
            node.update(game_result)
            node = node.parent

    # Return the move that was most visited
    return sorted(root.children, key=lambda c: c.visits)[-1].move



Game specific code

In [11]:
from functools import reduce
from copy import copy, deepcopy
import json
import random


class Checkers(Game):
  def __init__(self, state, status, player):
    super().__init__(state, status, player)

  def in_board(self, pos):
    return pos >= 0 and pos < 8

  def actions(self):
    board = self.get_board()
    player = self.player
    other_player = self.other_player()

    actions = []
    direction = 1 if player == 'X' else -1

    # Check for jumps
    for row in range(8):
        for col in range(8):
            if board[row][col].lower() == player.lower():
                # Check for possible jumps
                for d_row, d_col in [(2, 2), (2, -2), (-2, 2), (-2, -2)]:
                    new_row, new_col = row + d_row, col + d_col
                    if self.in_board(new_row) and self.in_board(new_col):
                        mid_row, mid_col = row + d_row // 2, col + d_col // 2
                        if board[mid_row][mid_col].lower() == other_player.lower() and board[new_row][new_col] == '_':
                            actions.append((row, col, new_row, new_col))

    if not actions:
        # Check for regular moves
        for row in range(8):
            for col in range(8):
                if board[row][col].lower() == player.lower():
                    # Check for possible moves
                    for d_row, d_col in [(1, 1), (1, -1), (-1, 1), (-1, -1)]:
                        new_row, new_col = row + d_row, col + d_col
                        if self.in_board(new_row) and self.in_board(new_col) and board[new_row][new_col] == '_':
                            actions.append((row, col, new_row, new_col))

    return actions


  def play(self, move):
    """Updates the game state with the given move"""
    board = self.get_board()
    row, col, new_row, new_col = move
    board[new_row][new_col] = board[row][col]
    board[row][col] = '_'

    # Remove captured piece in case of jump
    if abs(new_row - row) == 2:
        mid_row, mid_col = (row + new_row) // 2, (col + new_col) // 2
        board[mid_row][mid_col] = '_'

    self.state = json.dumps(board)
    self.player = self.other_player()

  def result(self, player):
    """Returns the game result from the perspective of the given player"""
    if self.is_end():
        if self.player == '-':
            return 0.5  # draw
        elif self.player == player:
            return 1  # win
        else:
            return 0  # loss
    return 0.5  # default value for non-terminal state or continue simulation

  def print_game(self):
    for row in self.get_board():
      print(row)

  def other_player(self):
    if self.player == 'X': return 'O'
    if self.player == 'O': return 'X'


  

In [12]:
game_loop(mcts_solver, Checkers, 'checkers', multi_player=False, id=None)


[92mCreating new game...[0m
[92mJoining game with id: 3190[0m
[92mPlaying as X[0m
[91mWaiting for the other player to join...[0m
['_', 'x', '_', 'x', '_', 'x', '_', 'x']
['x', '_', 'x', '_', 'x', '_', 'x', '_']
['_', 'x', '_', 'x', '_', 'x', '_', 'x']
['_', '_', '_', '_', '_', '_', '_', '_']
['_', '_', '_', '_', '_', '_', '_', '_']
['o', '_', 'o', '_', 'o', '_', 'o', '_']
['_', 'o', '_', 'o', '_', 'o', '_', 'o']
['o', '_', 'o', '_', 'o', '_', 'o', '_']
Making next move...
b'Valid move'
['_', 'x', '_', 'x', '_', 'x', '_', 'x']
['x', '_', 'x', '_', 'x', '_', 'x', '_']
['_', 'x', '_', 'x', '_', 'x', '_', '_']
['_', '_', '_', '_', '_', '_', 'x', '_']
['_', '_', '_', 'o', '_', '_', '_', '_']
['o', '_', 'o', '_', '_', '_', 'o', '_']
['_', 'o', '_', 'o', '_', 'o', '_', 'o']
['o', '_', 'o', '_', 'o', '_', 'o', '_']
Making next move...
b'Valid move'
['_', 'x', '_', 'x', '_', 'x', '_', 'x']
['x', '_', 'x', '_', 'x', '_', 'x', '_']
['_', 'x', '_', '_', '_', 'x', '_', '_']
['_', '_', 'o', 