<a href="https://colab.research.google.com/github/littlecapa/co_li_processing/blob/main/co_li_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install python-chess

Collecting python-chess
  Downloading python_chess-1.999-py3-none-any.whl.metadata (776 bytes)
Collecting chess<2,>=1 (from python-chess)
  Downloading chess-1.11.1.tar.gz (156 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.5/156.5 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading python_chess-1.999-py3-none-any.whl (1.4 kB)
Building wheels for collected packages: chess
  Building wheel for chess (setup.py) ... [?25l[?25hdone
  Created wheel for chess: filename=chess-1.11.1-py3-none-any.whl size=148497 sha256=e0fbe0774de139db69cd9cbf932ba86785f328d87d91fa404ada3c03ba9c27e0
  Stored in directory: /root/.cache/pip/wheels/2e/2d/23/1bfc95db984ed3ecbf6764167dc7526d0ab521cf9a9852544e
Successfully built chess
Installing collected packages: chess, python-chess
Successfully installed chess-1.11.1 python-chess-1.999


In [5]:
class Stack:
    def __init__(self):
        self.stack = []

    def push(self, item):
        self.stack.append(item)

    def pop(self):
        if not self.is_empty():
            return self.stack.pop()
        return None

    def top(self):
        if not self.is_empty():
            return self.stack[-1]
        return None

    def is_empty(self):
        return len(self.stack) == 0

    def len(self):
      return len(self.stack)

    def __str__(self):
        output = f"Stack ({len(self.stack)}): \n"
        for item in self.stack:
            if item is None:
                output += "None\n"
            else:
                output += f"{str(item)} {type(item)} \n"
        return output

In [2]:
class ChessPosition:
    def __init__(self, eval=0.0, winning=0, success=0):
        self.eval_min = eval
        self.eval_max = eval
        self.winning = winning
        self.nr_games = 1
        self.sum_success = success
        self.moves={}

    def update_position(self, eval, winning, success):
        if eval < self.eval_min:
            self.eval_min = eval
        if eval > self.eval_max:
            self.eval_max = eval
        self.winning += winning
        self.nr_games += 1
        self.sum_success += success

    def add_move(self, move, new_hash):
        if move not in self.moves:
            self.moves[move] = new_hash

    def get_moves(self):
        for move, new_hash in self.moves.items():
            yield move, new_hash

    def __str__(self):
        #out = " EVAL:"
        if self.eval_min == self.eval_max:
            out = f"[{self.eval_min}] "
        else:
            out= f"[{self.eval_min}-{self.eval_max}] "
        out += f" / ELO: {self.sum_success} "
        #out += f" / Winning: {self.winning} "
        out += f"({self.nr_games})"
        #out += "}"
        return out

In [6]:
import chess
import chess.pgn
from io import StringIO

class PgnCreator:

    def __init__(self, game = None):
        self.stack = Stack()
        if game:
            self.game = game
            self.current_node = game.end()
        else:
            self.game = chess.pgn.Game()
            self.current_node = self.game

    def set_header(self, header, value):
        self.game.headers[header] = value

    def add_move(self, move, comment, is_main = False):
        self.stack.push(self.current_node)
        if is_main:
            self.current_node = self.current_node.add_main_variation(move)
        else:
            self.current_node = self.current_node.add_variation(move)
        self.current_node.comment = comment

    def mark_as_transposition(self):
        self.current_node.comment += " TRANSPOSITION!"

    def take_move_back(self):
        if self.stack.is_empty():
            raise Exception("Empty Stack")
        self.current_node = self.stack.pop()

    def print_status(self):
        print(self.game)
        print(str(self.stack))

    def is_valid_pgn(self):
        pgn_io = StringIO(str(self.game))
        try:
            _ = chess.pgn.read_game(pgn_io)
            return True
        except Exception as e:
            print(f"Wrong PGN, Error: {e}")
            return False

    def __str__(self):
        return str(self.game)

In [7]:
import chess, chess.polyglot
from collections import namedtuple
import typing

class ZobristHash:
    WCK_INDEX = 8*8*12
    WCQ_INDEX = WCK_INDEX + 1
    BCK_INDEX = WCQ_INDEX + 1
    BCQ_INDEX = BCK_INDEX + 1
    EP_INDEX = BCQ_INDEX + 1
    TURN_INDEX = EP_INDEX + 8

    # Define a named tuple for the chessboard state
    PieceMoveState = namedtuple("PieceMoveState", [
        "ep_square",
        "turn",
        "wck",
        "wcq",
        "bck",
        "bcq",
        "move_from",
        "move_to",
        "ep_move",
        "piece_moved",
        "piece_captured",
        "piece_promoted"
    ])

    def __init__(self, random_array = None):
        if random_array is None:
            self.random_array = chess.polyglot.POLYGLOT_RANDOM_ARRAY
        else:
            self.random_array = random_array

    def get_square_piece_index(self, piece, color, square):
        piece_index = self.get_piece_index(piece, color)
        square_piece_index = 64 * piece_index + square
        if square_piece_index < 0 or square_piece_index >= len(chess.polyglot.POLYGLOT_RANDOM_ARRAY):
            exit()
        return square_piece_index

    def get_piece_index(self, piece, color):
        return (piece-1) * 2 + int(color)

    def get_ep_index(self, ep, turn):
        if turn == chess.BLACK:
            index = ep-chess.A3
        else:
            index = ep-chess.A6
        return index + ZobristHash.EP_INDEX

    def update_hash(self, hash, array_index):
        hash ^= self.random_array[array_index]
        return hash

    def get_board_move_state(self, board, move):
        if board.is_en_passant(move):
            ep_move = True
        else:
            ep_move = False
        return self.PieceMoveState(ep_square=board.ep_square,
                                turn=board.turn,
                                wck=board.has_kingside_castling_rights(chess.WHITE),
                                wcq=board.has_queenside_castling_rights(chess.WHITE),
                                bck=board.has_kingside_castling_rights(chess.BLACK),
                                bcq=board.has_queenside_castling_rights(chess.BLACK),
                                move_from = move.from_square,
                                move_to = move.to_square,
                                piece_moved = board.piece_type_at(move.from_square),
                                ep_move = ep_move,
                                piece_captured = board.piece_type_at(move.to_square),
                                piece_promoted = move.promotion)

    def hash_position(self, board) -> int:
        zobrist_hash = 0
        for color, squares in enumerate(board.occupied_co):
                for square in chess.scan_reversed(squares):
                    index = self.get_square_piece_index(board.piece_type_at(square), color, square)
                    zobrist_hash = self.update_hash(zobrist_hash, index)
        return zobrist_hash

    def hash_board(self, board) -> int:
            zobrist_hash = self.hash_position(board)
            #
            # Hash the rest
            #
            if board.turn:
                zobrist_hash = self.update_hash(zobrist_hash, ZobristHash.TURN_INDEX)
            if board.has_kingside_castling_rights(chess.WHITE):
                zobrist_hash = self.update_hash(zobrist_hash, ZobristHash.WCK_INDEX)
            if board.has_queenside_castling_rights(chess.WHITE):
                zobrist_hash = self.update_hash(zobrist_hash, ZobristHash.WCQ_INDEX)
            if board.has_kingside_castling_rights(chess.BLACK):
                zobrist_hash = self.update_hash(zobrist_hash, ZobristHash.BCK_INDEX)
            if board.has_queenside_castling_rights(chess.BLACK):
                zobrist_hash = self.update_hash(zobrist_hash, ZobristHash.BCQ_INDEX)
            if board.ep_square is not None:
                index = self.get_ep_index(board.ep_square, board.turn)
                zobrist_hash ^= chess.polyglot.POLYGLOT_RANDOM_ARRAY[index]
            return zobrist_hash

    def get_init_hash(self):
        board = chess.Board()
        return self.get_zobrist_hash(board)

    def get_zobrist_hash(self, board):
        hash = self.hash_board(board)
        return hash

    def execute_move_update_hash(self, old_hash, move, board):
        move = chess.Move.from_uci(str(move))
        old_board_move_state = self.get_board_move_state(board, move)
        board.push(move)
        return board, self.increment_hash(board, old_board_move_state, old_hash)

    def increment_hash(self, board, old_board_move_state, old_hash):
        # Turn has changed
        new_hash = self.update_hash(old_hash, ZobristHash.TURN_INDEX)
        # OLD EP must be deleted
        if old_board_move_state.ep_square is not None:
            ep_index = self.get_ep_index(old_board_move_state.ep_square, old_board_move_state.turn)
            new_hash = self.update_hash(new_hash, ep_index)
        # Check the new non Move State Variables
        if board.ep_square is not None:
            ep_index = self.get_ep_index(board.ep_square, board.turn)
            new_hash = self.update_hash(new_hash, ep_index)
        if old_board_move_state.wck != board.has_kingside_castling_rights(chess.WHITE):
            new_hash = self.update_hash(new_hash, ZobristHash.WCQ_INDEX)
        if old_board_move_state.wcq != board.has_queenside_castling_rights(chess.WHITE):
            new_hash = self.update_hash(new_hash, ZobristHash.WCK_INDEX)
        if old_board_move_state.bcq != board.has_queenside_castling_rights(chess.BLACK):
            new_hash = self.update_hash(new_hash, ZobristHash.BCQ_INDEX)
        if old_board_move_state.bck != board.has_kingside_castling_rights(chess.BLACK):
            new_hash = self.update_hash(new_hash, ZobristHash.BCK_INDEX)
        # Update according to move
        return self.update_hash_move(old_board_move_state, new_hash)

    def is_castling(self, move_from, move_to):
        if move_from == chess.E1 and move_to == chess.G1:
            castling = True
            from_sq = chess.H1
            to_sq = chess.F1
            color = chess.WHITE
        elif move_from == chess.E1 and move_to == chess.C1:
            castling = True
            from_sq = chess.A1
            to_sq = chess.D1
            color = chess.WHITE
        elif move_from == chess.E8 and move_to == chess.G8:
            castling = True
            from_sq = chess.H8
            to_sq = chess.F8
            color = chess.WHITE
        elif move_from == chess.E8 and move_to == chess.C8:
            castling = True
            from_sq = chess.A8
            to_sq = chess.D8
            color = chess.BLACK
        else:
            castling = False
            from_sq = to_sq = color = None
        return castling, from_sq, to_sq, color

    def update_hash_move(self, state, hash):
        # Execute the current move (from, to); check for promotion and capture
        # Clear the old square
        from_index = self.get_square_piece_index(piece = state.piece_moved, color = state.turn, square = state.move_from)
        hash = self.update_hash(hash, from_index)
        # Set new square
        if state.piece_promoted is None:
            piece = state.piece_moved
        else:
            piece = state.piece_promoted
        to_index = self.get_square_piece_index(piece = piece, color = state.turn, square = state.move_to)
        hash = self.update_hash(hash, to_index)
        # If current move is capture, then remove captured piece
        if state.piece_captured is not None:
            capture_index = self.get_square_piece_index(piece = state.piece_captured, color = not state.turn, square = state.move_to)
            hash = self.update_hash(hash, capture_index)
        # Is the current move EP?
        if state.ep_move:
            if state.turn == chess.BLACK:
                enemy_pawn_square = state.move_to + 8
            else:
                enemy_pawn_square = state.move_to - 8
            capture_index = self.get_square_piece_index(piece = chess.PAWN, color = not state.turn, square = enemy_pawn_square)
            hash = self.update_hash(hash, capture_index)
        # Check Castling
        castling, rook_from_sq, rook_to_sq, color = self.is_castling(state.move_from, state.move_to)
        if castling:
            from_index = self.get_square_piece_index(chess.ROOK, color, rook_from_sq)
            hash = self.update_hash(hash, from_index)
            to_index = self.get_square_piece_index(chess.ROOK, color, rook_to_sq)
            hash = self.update_hash(hash, to_index)
        return hash


In [8]:
import pickle

class OpeningBook:

    INITIAL_POS = "xxx"

    def __init__(self, max_moves = 40):
        self.max_moves = max_moves
        self.positions = {}
        self.transpositions = {}
        self.stats = {}
        self.stats["nr_games"] = 0
        self.stats["nr_moves"] = 0
        self.stats["max_eval"] = 0.0
        self.zh = ZobristHash()
        self.init_pos = ChessPosition(eval = 0.0, winning = 0, success = 0)
        self.init_hash = self.zh.get_init_hash()
        self.positions[self.init_hash] = self.init_pos

    def save(self, filename):
        # Open the file in binary write mode and serialize the data
        print(f'Book is based on {self.stats["nr_games"]} Games and {self.stats["nr_moves"]} Moves (unique Positions: {len(self.positions)}) Max-Eval: {self.stats["max_eval"]}')
        with open(filename, 'wb') as file:
            pickle.dump({'positions': self.positions, 'transpositions': self.transpositions, 'stats': self.stats}, file)

    def load(self, filename):
        # Open the file in binary read mode and deserialize the data
        with open(filename, 'rb') as file:
            data = pickle.load(file)
            self.positions = data['positions']
            self.transpositions = data['transpositions']
            self.stats = data['stats']

    def new_game(self, game):
        self.move_str = ""
        self.stats["nr_games"] += 1
        self.curr_pos = self.INITIAL_POS
        self.game = game
        self.board = game.board()
        self.hash = self.init_hash
        self.half_move_counter = 0
        try:
            self.rating_diff = int(game.headers["WhiteRatingDiff"])
        except Exception as e:
            self.rating_diff = 0
            print(f"Bad Rating diff")
        if game.headers["Result"] == "1-0":
            self.result = +1
        elif game.headers["Result"] == "0-1":
            self.result = -1
        else:
            self.result = 0
        self.akt_pos = self.init_pos

    def push_move(self, move, eval):
        self.half_move_counter += 1
        if self.half_move_counter > self.max_moves:
            return False
        self.stats["nr_moves"] += 1
        if eval > self.stats["max_eval"]:
            self.stats["max_eval"] = eval
        if self.half_move_counter % 2 == 1:
            self.move_str += str((self.half_move_counter+1)/2) + "."
        self.move_str += str(move) + " "
        old_hash = self.hash
        self.board, self.hash = self.zh.execute_move_update_hash(old_hash=old_hash, move=move, board=self.board)
        self.positions[old_hash].add_move(move=move, new_hash=self.hash)
        self.process_pos(eval)
        return True

    def process_pos(self, eval):
        if self.hash not in self.positions:
            self.positions[self.hash] = ChessPosition(eval, self.result, self.rating_diff)
        else:
            self.positions[self.hash].update_position(eval, self.result, self.rating_diff)

    def pos2str(self, pos, visited={}):
        output = ""
        if pos in visited:
            return visited, f"Position {pos} is reached by Transposition!"
        else:
            visited[pos] = True
        index = 0
        for move, new_pos in self.positions[pos].get_moves():
            index += 1
            output += f"{pos} {index} Move: {move} {new_pos}\n"
            visited, new_output = self.pos2str(new_pos)
            output += f"{new_output}"
        return visited, output

    def __str__(self):
        visited, output = self.pos2str(self.init_hash)
        if len(visited) != len(self.positions):
            print(f"Error! Visited: {len(visited)} Positions: {len(self.positions)}")
        return output

    def pos2pgn(self, pos, visited=None):
        if visited is None:
            visited = {}
        if pos in visited:
            self.pc.mark_as_transposition()
            return visited
        visited[pos] = True
        first = True
        for move, new_pos in self.positions[pos].get_moves():
            self.pc.add_move(move, str(self.positions[new_pos]), is_main = first)
            first = False
            visited = self.pos2pgn(new_pos, visited)
            self.pc.take_move_back()
        return visited

    def book2pgn(self, pgn_structure = None):
        #
        # pgn_structure is a python game object
        #
        self.pc = PgnCreator(pgn_structure)
        if pgn_structure:
            self.pc.set_header("White", pgn_structure.headers["White"])
            self.pc.set_header("Black", pgn_structure.headers["Black"])
            current_hash = self.zh.hash_board(self.pc.current_node.board())
            if current_hash not in self.positions:
                raise Exception("Template Position not found in Book")
        else:
            self.pc.set_header("White", "My")
            self.pc.set_header("Black", "Book")
            current_hash = self.init_hash
        _ = self.pos2pgn(current_hash)

        return str(self.pc)



#
# Later
#
    def store_transposition(self, position, move_string):
        try:
            transpos = self.transpositions[position]
            self.transpositions[position].append(move_string)
        except KeyError:
            self.transpositions[position] = [move_string]


In [9]:
import chess.pgn
import os
import io
import sys
import re
from datetime import datetime

def get_eval(comment):
    pattern = r"\[%eval ([+-]?\d+(?:\.\d+)?)\]"
    # Find all matches in the string
    matches = re.findall(pattern, comment)
    # Convert matched values to float
    if matches:
        evals = [float(match) for match in matches]
        for eval in evals:
            return float(eval)
    else:
        pattern = r"\[%eval #([+-]?\d+)\]"
        matches = re.findall(pattern, comment)
        evals = [int(match) for match in matches]
        for eval in evals:
            if eval > 0:
                return 100.0 - eval
            return -100.0 - eval
    return 0.0

def process_move(move, comment, book):
    eval = get_eval(comment)
    return book.push_move(move, eval)

def process_game(game, book):
    book.new_game(game)
    node = game
    while node.variations:
        next_node = node.variation(0)  # Get the mainline move
        move = next_node.move
        comment = next_node.comment
        if not process_move(move, comment, book):
            break
        node = next_node  # Move to the next node

def str2game(pgn_string):
    pgn_stream = io.StringIO(pgn_string)
    return chess.pgn.read_game(pgn_stream)

def read_pgns(pgn_file):
    with open(pgn_file, 'r', encoding='UTF-8') as pgn:
        pgn_string = ""
        for line in pgn:
            if line.startswith("[Event"):  # Start of a new game
                if pgn_string:  # If a previous game exists, yield it
                    yield str2game(pgn_string)
                    pgn_string = ""  # Reset for the new game
            pgn_string += line  # Append the current line to the PGN string

        if pgn_string:  # Yield the last game after the loop ends
            yield str2game(pgn_string)


def process_pgn(pgn_file, output_folder, book):
    # Ensure the output folder exists
    start = datetime.now()
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    game_number = 1
    try:
        for game in read_pgns(pgn_file):
            if game_number % 100 == 0:
                now = datetime.now()
                elapsed_time = now - start
                print(f"Processed {game_number} games. Time now: {now}. Elapsed time: {elapsed_time}")
            if game is None:
                break  # No more games to read
            process_game(game, book)
            game_number += 1
    except Exception as e:
        print(f"Game Number: {game_number}, Exception: {e}")
        return

    print(f"Processed {game_number} games. Time now: {datetime.now()}. Elapsed time: {datetime.now()-start}")
    book_output_path = os.path.join(output_folder, "book.cob")
    book.save(book_output_path)

if __name__ == "__main__":


    book = OpeningBook()

    #pgn_file = sys.argv[1]
    #output_folder = sys.argv[2]

    #process_pgn(pgn_file, output_folder, book)
