In [1]:
!pip install chess

import requests

url = 'https://wtharvey.com/'

filenames = [
    'm8n2.txt',
    'm8n3.txt',
    'm8n4.txt'
]
for local_filename in filenames:
    temp_url = url + local_filename
    response = requests.get(temp_url)
    response.raise_for_status()

    with open(local_filename, 'wb') as file:
        file.write(response.content)

    print(f"Pobrano plik: {local_filename}")

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 24.0 -> 24.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Pobrano plik: m8n2.txt
Pobrano plik: m8n3.txt
Pobrano plik: m8n4.txt


In [2]:
import torch.nn as nn
import torch

class rl(nn.Module):
    def __init__(
            self,
            input_size: int,
            output_size: int,
            layer_sizes: list[int],
            dropout: float=0.1):
        super(rl, self).__init__()

        layers = []
        flat = nn.Flatten(start_dim=1)
        layers.append(flat)
        old_size = input_size
        for layer in layer_sizes:
            layers.append(nn.Linear(old_size, layer))
            layers.append(nn.Dropout(dropout))
            layers.append(nn.ReLU())
            old_size = layer

        layers.append(nn.Linear(old_size, output_size))
        layers.append(nn.Tanh())
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        x = torch.unsqueeze(x, 0)
        return self.model(x)

    def fit(self, tensor, score, optimalizer, lock):
        with lock():
            score = torch.tensor([[score]], dtype=torch.float32)
            out = self.forward(tensor)
            loss = self.criterion(out, score)
            loss.backward()
            optimalizer.step()
            optimalizer.zero_grad()

    def predict(self, tensor):
        return self.forward(tensor)



In [3]:
import chess

class Game:

    layer_to_piece = {
        1: chess.Piece(chess.PAWN, chess.WHITE),
        2: chess.Piece(chess.KNIGHT, chess.WHITE),
        3: chess.Piece(chess.BISHOP, chess.WHITE),
        4: chess.Piece(chess.ROOK, chess.WHITE),
        5: chess.Piece(chess.QUEEN, chess.WHITE),
        6: chess.Piece(chess.KING, chess.WHITE),
        -1: chess.Piece(chess.PAWN, chess.BLACK),
        -2: chess.Piece(chess.KNIGHT, chess.BLACK),
        -3: chess.Piece(chess.BISHOP, chess.BLACK),
        -4: chess.Piece(chess.ROOK, chess.BLACK),
        -5: chess.Piece(chess.QUEEN, chess.BLACK),
        -6: chess.Piece(chess.KING, chess.BLACK),
        0: None
    }

    @staticmethod
    def from_tensor(tensor: torch.Tensor):
        game = Game()
        game.tensor = tensor
        game.board = Game.create_board()
        return game

    @staticmethod
    def from_board(board: chess.Board, transform: bool):
        tensor = Game.create_tensor(board, transform)
        game = Game()
        game.board = board
        game.tensor = tensor

        if transform:
            game.board = game.create_board(tensor)
            game.board.turn = not game.board.turn
        return game

    @staticmethod
    def create_tensor(board: chess.Board, transform: bool):
        matrix_board = torch.zeros((6, 8, 8))
        for i in range(8):
            for j in range(8):
                piece = board.piece_at(chess.square(i, j))
                if piece is not None:
                    piece_type = piece.piece_type
                    piece_color = piece.color
                    index = piece_type - 1

                    row = 7-j if not transform else j

                    if piece_color == chess.WHITE:
                        matrix_board[index, row, i] = 1
                    else:
                        matrix_board[index, row, i] = -1
        if transform:
            matrix_board *= -1
            matrix_board = torch.where(torch.abs(matrix_board) < 1e-6, torch.zeros_like(matrix_board), matrix_board)
        return matrix_board

    @staticmethod
    def create_board(tensor: torch.Tensor):
        nonzero_mask = (tensor != 0).float()
        multiplied_indices = torch.arange(1, 7, device=tensor.device).unsqueeze(-1).unsqueeze(-1)
        result_tensor = tensor * multiplied_indices * nonzero_mask
        tensor = torch.sum(result_tensor, dim=0)
        board = chess.Board.empty()
        for row_idx, row in enumerate(tensor):
            for col_idx, val in enumerate(row):
                piece = Game.layer_to_piece[int(val.item())]
                if piece is not None:
                    square = chess.square(col_idx, 7 - row_idx)
                    board.set_piece_at(square, piece)
        return board

    def state(self):
        return self.tensor

    def over(self):
        self.outcome = self.board.outcome()
        return False if self.outcome is None else True

    def score(self):
        winner = self.outcome.winner
        if winner is None:
            return 0
        return 1 if winner == chess.WHITE else -1

    def valid_moves(self):
        return self.board.legal_moves

    def make_move(self, move):
        new_tensor = self.simulate_move(move)
        self.tensor = new_tensor
        self.board.push(move)
        try:
            self.equal_boards()
        except Exception:
            print("board and tensor different")
            print(self.board)
            print(self.tensor)
            print(self.board.pop())
            raise AssertionError

    def equal_boards(self):
        nonzero_mask = (self.tensor != 0).float()
        multiplied_indices = torch.arange(1, 7, device=self.tensor.device).unsqueeze(-1).unsqueeze(-1)
        result_tensor = self.tensor * multiplied_indices * nonzero_mask
        tensor = torch.sum(result_tensor, dim=0)
        for row_idx, row in enumerate(tensor):
            for col_idx, val in enumerate(row):
                piece_tensor = self.layer_to_piece[int(val.item())]
                square = chess.square(col_idx, 7 - row_idx)
                piece_board = self.board.piece_at(square)
                assert piece_tensor == piece_board

    def simulate_move(self, move: chess.Move):
        if self.board.is_en_passant(move) or self.board.is_castling(move):
            new_board = self.board.copy()
            new_board.push(move)
            return Game.create_tensor(new_board)
        new_tensor = self.tensor.clone()
        idx_beg = self.board.piece_at(move.from_square).piece_type - 1
        idx_end = idx_beg if move.promotion is None else int(move.promotion) - 1
        rank_beg = 7 - chess.square_rank(move.from_square)
        file_beg = chess.square_file(move.from_square)
        rank_end = 7 - chess.square_rank(move.to_square)
        file_end = chess.square_file(move.to_square)
        value = 1 if self.board.turn else -1
        for i in range(len(new_tensor)):
            new_tensor[i][rank_end][file_end] = torch.tensor(0)
        new_tensor[idx_beg][rank_beg][file_beg] = torch.tensor(0)
        new_tensor[idx_end][rank_end][file_end] = torch.tensor(value)
        return new_tensor

    def copy(self):
        copy = self.__class__.__new__(self.__class__)
        copy.tensor = self.tensor.clone()
        copy.board = self.board.copy(stack=True)
        return copy


In [4]:
import torch.optim as optim
import random


def record(game, score, model, optimalizer, lock):
    model.fit(game.state(), score, optimalizer, lock)

def heuristic_value(tensor, model):
    return model.predict(tensor)

def get_move(action_dict: dict, white_moves):
    const_bias = 0.05
    if len(action_dict) == 1:
        return list(action_dict.keys())[0]
    moves = list(action_dict.keys())
    values = list(action_dict.values())
    if not white_moves:
        values = [-val for val in values]
    bias = min(values)
    if bias < 0:
        values = [(val+abs(bias)+const_bias) for val in values]
    try:
        choice = random.choices(values, weights=values, k=1)
    except ValueError:
        print(values)
        print(white_moves)
        print(action_dict)
        raise ValueError
    return moves[values.index(choice[0])]


def playout_value(game: Game, model, optimalizer, lock):
    if game.over():
        score = torch.tensor(game.score())
        if score.item() != 0:
            record(game, score, model, optimalizer, lock)
            print(game.board)
            print(f"--------------: {score.item()}")
        return torch.tensor(score.item())

    action_heuristic_dict = {}
    for move in game.valid_moves():
        tempTensor = game.simulate_move(move)
        heu = heuristic_value(tempTensor, model).item()
        action_heuristic_dict[move] = heu
    move = get_move(action_heuristic_dict, game.board.turn)

    next_game = game.copy()
    next_game.make_move(move)

    value = playout_value(next_game)
    if value.item() != 0:
        record(game, value, model, optimalizer, lock)

    return value

def monte_carlo_value(game, N, model, optimalizer, lock):
    res = []
    for _ in range(N):
        res.append(playout_value(game, model, optimalizer, lock))
    return res



In [6]:
import torch.multiprocessing as mp

def worker(fen_queue, max_pieces, games_played, model, optimalizer, results, lock):
    while not fen_queue.empty():
        fen = fen_queue.get()
        transform = False
        try:
            board = chess.Board(fen)
        except ValueError:
            print(f"Invalid fen: {fen}")
            continue

        if len(board.piece_map()) <= max_pieces:
            game = Game.from_board(board, transform)
            res = monte_carlo_value(game, games_played, model, optimalizer, lock)
            mates = sum([abs(r) for r in res])
            results.append((fen, mates))

if __name__ == '__main__':
    mp.set_start_method('spawn')

    model = rl(6*8*8, 1, [384, 400, 300, 200, 100, 50])
    model.share_memory()  # Umożliwia dzielenie modelu między procesami
    optimalizer = optim.Adam(model.parameters(), lr=0.01)

    fens = []
    max_pieces = 9
    eps = 15
    games_played = 50

    local_filenames = [
        "m8n2.txt",
        "m8n3.txt",
        "m8n4.txt",
    ]
    for local_filename in local_filenames:
        with open(local_filename, 'r') as file:
            lines = file.readlines()
            for line in lines:
                line = line.strip()
                if ',' not in line and '-' in line and '/' in line:
                    fens.append(line)

    print(f"num of all examples: {len(fens)}")

    num_processes = mp.cpu_count()  # Liczba dostępnych procesorów

    with mp.Manager() as manager:
        fen_queue = manager.Queue()
        results = manager.list()
        lock = manager.Lock()

        for fen in fens:
            fen_queue.put(fen)

        processes = []
        for _ in range(num_processes):
            p = mp.Process(target=worker, args=(fen_queue, max_pieces, games_played, model, optimalizer, results, lock))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print(f"Results: {list(results)}")

    torch.save(model.state_dict(), 'model_weights.pth')

# model = rl(6*8*8, 1, [384, 400, 300, 200, 100, 50])
# optimalizer = optim.Adam(model.parameters(), lr=0.01)

# fens = []
# max_pieces = 9
# eps = 15
# games_played = 50

# white_puzz_count = 0
# black_puzz_count = 0

# local_filenames = [
#     "m8n2.txt",
#     "m8n3.txt",
#     "m8n4.txt",
# ]
# for local_filename in local_filenames:
#     with open(local_filename, 'r') as file:
#         lines = file.readlines()
#         for line in lines:
#             line = line.strip()
#             if ',' not in line and '-' in line and '/' in line:
#                 fens.append(line)

# print(f"num of all examples: {len(fens)}")
# for _ in range(eps):
#     for fen in fens:
#         transform = False
#         try:
#             board = chess.Board(fen)
#         except ValueError:
#             print(f"Invalid fen: {fen}")
#             continue
#         if len(board.piece_map()) <= max_pieces:
#             print(board)
#             print(board.turn)
#             # if white_puzz_count > black_puzz_count and board.turn:
#             #     transform = True
#             game = Game.from_board(board, transform)
#             print(game.board)
#             print(game.board.turn)

#             if game.board.turn:
#                 white_puzz_count += 1
#             else:
#                 black_puzz_count += 1
#             res = monte_carlo_value(game, games_played)
#             mates = sum([abs(r) for r in res])
#             print(f"found mate: {mates}/{games_played}")

#     torch.save(model.state_dict(), 'model_weights.pth')

num of all examples: 1172
Results: []
