In [None]:
%%html
<style>
.container {
  width: 100%;
}
</style>

In [None]:
%load_ext nb_black
%load_ext nb_mypy

In [None]:
import nbimporter
from AIBaseClass import ChessAI
from Exercise03AI import Exercise03AI
from Exercise04AI import Exercise04AI

# Aufgabe 09: Experiment

tbd

In [None]:
import chess
from typing import Any, Callable


class ExperimentalAI(Exercise03AI):
    """Chooses middle game moves using minimax algorithm, alpha-beta-pruning,
    memoization, progressive deepening and the singular value extension"""

    def __init__(self, max_depth: int = 8, **kwargs) -> None:
        super().__init__(**kwargs)
        self.cache_new: dict[tuple, Any] = {}
        self.cache_old: dict[tuple, Any] = {}
        self.MAX_SVE_DEPTH: int = max(max_depth, kwargs["search_depth"])

    def reset(self) -> None:
        """Resets all internal variables"""
        super().reset()
        self.cache_new.clear()
        self.cache_old.clear()

Die Funktion `debug_minimax` ist eine Debugging-Hilfsfunktion um die Berechnungen der `evaluate_minimax`-Funktion nachvollziehen zu können. Die Funktion wird bei Bedarf als Dekorator auf die Funktion `evaluate_minimax` angewendet, ist aber standardmäßig nicht aktiviert. Die Parameter sind dieselben wie bei der Funktion `evaluate_minimax` und werden im dortigen Abschnitt beschrieben.

In [None]:
def debug_minimax(evaluate_minimax: Callable):
    """Prints the Minimax Game Tree."""

    def minimax_debug(
        self,
        minimax: Callable,
        board: chess.Board,
        current_evaluation: int,
        limit: int,
        level: int = 0,
        alpha: int = -ExperimentalAI.LIMIT,
        beta: int = ExperimentalAI.LIMIT,
    ) -> tuple[int, chess.Move | None]:
        color = "white" if board.turn else "black"
        ws = (level + 1) * "    "
        key = self.get_key(board)
        if key in self.cache_new or key in self.cache_old:
            print(
                f"level: {level} {ws} mm> {color}; α: {alpha}; β: {beta}; limit: {limit}"
            )  # memoization
        else:
            print(
                f"level: {level} {ws} --> {color}; α: {alpha}; β: {beta}; limit: {limit}"
            )
        evaluation, best_move = evaluate_minimax(
            self, minimax, board, current_evaluation, limit, level, alpha, beta
        )
        print(
            f"level: {level} {ws} <-- {color}; ev: {evaluation}; best_move: {best_move}"
        )
        return evaluation, best_move

    return minimax_debug

Die Funktion `is_quiet_move_and_push` ist ein Indikator dafür, ob die Suchtiefe (`limit`) erhöht werden muss. Dabei prüft die Funktion ob der übergebene Zug ein Schlagzug, eine Umwandlung oder ein Schachzug ist und gibt entsprechend `True` bzw. `False` zurück. Der übergebene Zug wird im Laufe der Prüfung auf das Board angewendet.

Hinweis: Die `chess`-Bibliothek bietet auch die Funktion `gives_check` an, welche überprüft ob ein übergebener Zug den Gegner in Schach setzt. Da diese Funktion aber intern nur den Zug anwendet, `is_check` aufruft und den Zug anschließend wieder entfernt wurde aus Effizienzgründen direkt letztere Funktion verwendet.

In [None]:
class ExperimentalAI(ExperimentalAI):  # type: ignore
    def is_quiet_move_and_push(self, board: chess.Board, move: chess.Move) -> bool:
        """Checks if the next move was an promotion, capture or check move. Pushes the given move on the board."""
        if move.promotion or board.piece_type_at(move.to_square):
            board.push(move)
            return False
        board.push(move)
        if board.is_check():
            return False
        return True

Die Singular Value Extension erfordert es, dass das Limit für die Suche nun erstmals laufend angepasst werden muss. Um negative Zahlen zu vermeiden, eine statistische Erfassung zu ermöglichen und aus Gründen der Verständlichkeit wird daher der Parameter `depth`, welcher bisher zum Zielwert `0` dekrementiert wurde, nun durch die Parameter `level` und `limit` ersetzt. Der Parameter `limit` gibt dabei die gewünschte Suchtiefe an und kann im Laufe der Berechnung durch die SVE erhöht werden. Das Argument `level` ist nun die aktuelle Suchtiefe, welche bei `0` startet und mit jedem Aufruf von `minValue` oder `maxValue` inkrementiert wird.

Für die SVE ist eine neue Abbruchbedingung bei Erreichen der maximalen Tiefe notwendig. Diese wird hier in der Funktion `minimax_early_abort` hinzugefügt. Parallel dazu ermöglicht die Veränderung der Parameter (vorheriger Absatz) einige Vereinfachungen, welche in dieser überschriebenen Variante implementiert sind.

Die folgenden Änderungen wurden vorgenommen:
- Die Abbruchbedingung wurde auf die neuen Parameter angepasst und lautet nun `level == limit` statt wie bisher `depth == 0`.
- Es gibt eine zusätzliche Abbruchbedingung sobald die maximale Tiefe für die Singular-Value-Extension erreicht ist. In diesem Fall wird die Evaluierung des aktuellen Pfades beendet, auch wenn dieser sich noch in einer Kette von nicht-ruhigen Zügen befindet. Da der Pfad nicht vollständig evaluiert wurde und somit ungewiss ist, wird er mit dem jeweils schlechtesten Wert versehen. Somit werden unvollständig untersuchte Pfade immer vermieden.

In [None]:
class ExperimentalAI(ExperimentalAI):  # type: ignore
    def minimax_early_abort(
        self, board: chess.Board, level: int, depth: int, current_evaluation: int
    ) -> int | None:
        """Returns an evaluation iff the minimax has an early exit condition. Returns None otherwise."""
        is_checkmate = board.is_checkmate()
        if is_checkmate and not board.turn:
            # White has won the game
            evaluation = self.LIMIT - level
            return evaluation

        if is_checkmate and board.turn:
            # Black has won the game
            evaluation = -self.LIMIT + level
            return evaluation

        if (
            board.is_insufficient_material()
            or not board.legal_moves
            or board.is_fifty_moves()
        ):
            # Game is a draw
            return 0

        # Recursion abort case
        if depth == 0:
            return current_evaluation

        return None

Um die Lesbarkeit der Minimax Funktion zu wahren, wurde diese in die Funktionen `maxValue`, `minValue` und `evaluate_minimax` aufgeteilt, wobei `maxValue` und `minValue` die beiden Zweige des Minimax Algorithmus darstellen.
Die `evaluate_minimax` übernimmt nun die Memoisierung, wodurch kein Dekorator mehr benötigt wird.

Die Funtion `maxValue` berechnet wie bisher die maximale Evaluierung aller möglichen Züge. Neu hinzugekommen ist die laufende Erhöhung des Limits, welche direkt nach dem Entfernen eines Zuges von der Prioritätswarteschlange `moves` durchgeführt wird. Dort werden nun drei Fälle unterschieden:
- Wenn die Ungleichung `level + 1 < limit` erfüllt ist, wird das Limit grundsätzlich nicht erhöht. Dies hat zur Folge, dass die SVE erst zum Einsatz kommt, wenn die Zielsuchtiefe im nächsten Schritt erreicht ist und somit die Suche sonst zu Ende wäre. Diese Bedingung wurde aus Performance-Gründen mit aufgenommen, da die Berechnungen ansonsten durch die teilweise sehr hohe Tiefe wesentlich länger dauern. Des weiteren vereinfacht es den Zugriff auf den Cache im Progressive Deepening (die Berechnung der Tiefe des letzten Durchlaufes).
- Wenn der Zug "ruhig" ist, d.h. es sich nicht um einen Schlagzug, eine Umwandlung oder ein Schachzug handelt, wird das Limit nicht erhöht.
- Falls die ersten beiden Bedingungen nicht zutreffen, wird die SVE durchgeführt und das Limit um `1` erhöht.

In [None]:
import heapq


class ExperimentalAI(ExperimentalAI):  # type: ignore
    def maxValue(
        self,
        board: chess.Board,
        current_evaluation: int,
        depth: int,
        level: int,
        alpha: int,
        beta: int,
    ) -> tuple[int, chess.Move | None]:
        """Searches the best value with given depth using minimax algorithm"""
        self.stats[-1]["nodes"] += 1
        early_abort_evaluation = self.minimax_early_abort(
            board, level, depth, current_evaluation
        )
        if early_abort_evaluation is not None:
            self.stats[-1]["leaf_ctr"] += 1
            self.stats[-1]["depth_sum"] += level  # for calculating the average depth
            return early_abort_evaluation, None

        best_move = None
        moves: list[tuple[int, int, chess.Move]] = []

        # White to play (positive numbers are good)
        if level < self.DEPTH - 2:
            for i, move in enumerate(board.legal_moves):
                board.push(move)
                key = self.get_key(board)
                board.pop()
                old_eval = self.cache_old.get(key, (None, i, None))[1]
                heapq.heappush(moves, (-old_eval, i, move))
        else:
            moves = list(enumerate(board.legal_moves))
        maxEvaluation = alpha
        while moves:
            move = heapq.heappop(moves)[-1]
            new_evaluation = self.incremental_evaluate(board, current_evaluation, move)
            if depth > 1 or level + 1 == self.MAX_SVE_DEPTH:
                new_depth = depth
                board.push(move)
            elif self.is_quiet_move_and_push(board, move):
                new_depth = depth
            else:
                new_depth = depth + 1
            evaluation, _, = self.minimax(
                self.minValue,
                board,
                new_evaluation,
                new_depth - 1,
                level + 1,
                maxEvaluation,
                beta,
            )
            board.pop()
            if evaluation >= beta:
                return evaluation, move
            if evaluation > maxEvaluation:
                best_move = move
                maxEvaluation = evaluation
        return maxEvaluation, best_move

Die Funktion `minValue` berechnet wie bisher die minimale Evaluierung aller möglichen Züge und enthält ansonsten dieselben Änderungen wie die Funktion `maxValue`.

In [None]:
class ExperimentalAI(ExperimentalAI):  # type: ignore
    def minValue(
        self,
        board: chess.Board,
        current_evaluation: int,
        depth: int,
        level: int,
        alpha: int,
        beta: int,
    ) -> tuple[int, chess.Move | None]:
        """Searches the best value with given depth using minimax algorithm"""
        self.stats[-1]["nodes"] += 1
        early_abort_evaluation = self.minimax_early_abort(
            board, level, depth, current_evaluation
        )
        if early_abort_evaluation is not None:
            self.stats[-1]["leaf_ctr"] += 1
            self.stats[-1]["depth_sum"] += level
            return early_abort_evaluation, None

        best_move = None
        moves: list[tuple[int, int, chess.Move]] = []

        # Black to play (negative numbers are good)
        if level < self.DEPTH - 2:
            for i, move in enumerate(board.legal_moves):
                board.push(move)
                key = self.get_key(board)
                board.pop()
                old_eval = self.cache_old.get(key, (None, i, None))[1]
                heapq.heappush(moves, (old_eval, i, move))
        else:
            moves = list(enumerate(board.legal_moves))
        minEvaluation = beta
        while moves:
            move = heapq.heappop(moves)[-1]
            new_evaluation = self.incremental_evaluate(board, current_evaluation, move)
            if depth > 1 or level + 1 == self.MAX_SVE_DEPTH:
                new_depth = depth
                board.push(move)
            elif self.is_quiet_move_and_push(board, move):
                new_depth = depth
            else:
                new_depth = depth + 1
            evaluation, _ = self.minimax(
                self.maxValue,
                board,
                new_evaluation,
                new_depth - 1,
                level + 1,
                alpha,
                minEvaluation,
            )
            board.pop()
            if evaluation <= alpha:
                return evaluation, move
            if evaluation < minEvaluation:
                best_move = move
                minEvaluation = evaluation
        return minEvaluation, best_move

Die Methode `store_in_cache` ist identisch zu der Implementierung in `Exercise06AI`. In der Methode `get_from_cache` existiert der minimale Unterschied, dass anstatt des `depth`-Parameters die Argumente `limit` und `level` durchgereicht werden.

In [None]:
class ExperimentalAI(ExperimentalAI):  # type: ignore
    def get_key(self, board: chess.Board) -> tuple:
        """Calculates a key that uniquely identifies a given board."""
        return (
            board.pawns,
            board.knights,
            board.bishops,
            board.rooks,
            board.queens,
            board.kings,
            board.occupied_co[chess.WHITE],
            board.occupied_co[chess.BLACK],
            board.turn,
            board.castling_rights,
            board.halfmove_clock
            if board.halfmove_clock >= (50 - self.MAX_SVE_DEPTH)
            else -42,
        )

    def store_in_cache(
        self, key: tuple, result: tuple, depth: int, alpha: int, beta: int
    ) -> None:
        """Stores the result of a minimax computation in the cache."""
        evaluation, move = result
        if evaluation <= alpha:
            self.cache_new[key] = ("≤", evaluation, move, depth)
        elif evaluation < beta:
            self.cache_new[key] = ("=", evaluation, move, depth)
        else:
            self.cache_new[key] = ("≥", evaluation, move, depth)

    def get_from_cache(
        self,
        minValue_or_maxValue: Callable,
        key: tuple,
        board: chess.Board,
        current_eval: int,
        depth: int,
        level: int,
        alpha: int,
        beta: int,
    ) -> tuple:
        """Gets a result from the cache if possible."""
        flag, evaluation, move, eval_depth = self.cache_new.get(
            key
        ) or self.cache_old.get(key)
        if depth > eval_depth:
            result = minValue_or_maxValue(
                board, current_eval, depth, level, alpha, beta
            )
            self.store_in_cache(key, result, depth, alpha, beta)
            return result
        if flag == "=":
            return evaluation, move
        elif flag == "≤":
            if evaluation <= alpha:
                return evaluation, move
            elif evaluation < beta:
                result = minValue_or_maxValue(
                    board, current_eval, depth, level, alpha, evaluation
                )
                self.store_in_cache(key, result, depth, alpha, evaluation)
                return result
            else:
                result = minValue_or_maxValue(
                    board, current_eval, depth, level, alpha, beta
                )
                self.store_in_cache(key, result, depth, alpha, beta)
                return result
        else:
            if evaluation <= alpha:
                result = minValue_or_maxValue(
                    board, current_eval, depth, level, alpha, beta
                )
                self.store_in_cache(key, result, depth, alpha, beta)
                return result
            elif evaluation < beta:
                result = minValue_or_maxValue(
                    board, current_eval, depth, level, evaluation, beta
                )
                self.store_in_cache(key, result, depth, evaluation, beta)
                return result
            else:
                return evaluation, move

Die Methode `evaluate_minimax` ist größtenteils identisch zu der bisherigen `memoize_minimax` Funktion.  
Folgende Änderungen wurden eingeführt:
- Die Funktion wird nicht mehr als Dekorator implementiert sondern direkt aufgerufen.
- Da die Funktion `minimax` in zwei separate Funktionen aufgeteilt wurde, gibt es nun einen neuen Parameter `minimax`, welcher eine Referenz der Funktion `minValue` oder `maxValue` enthält.
- Statt mit dem Parameter `depth` wird die `get_key` Funktion intern mit der Differenz aus `limit` und `level` aufgerufen. Die Differenz ist dabei das zum aktuellen Board relative Limit, also mit welcher Tiefe die Suche vom aktuellen Board ausgehend durchgeführt werden soll.

In [None]:
class ExperimentalAI(ExperimentalAI):  # type: ignore
    # @debug_minimax
    def minimax(
        self,
        minValue_or_maxValue: Callable,
        board: chess.Board,
        current_evaluation: int,
        depth: int,
        level: int = 0,
        alpha: int = -ExperimentalAI.LIMIT,
        beta: int = ExperimentalAI.LIMIT,
    ) -> tuple[int, chess.Move | None]:
        """Searches the best value with given depth using minimax algorithm"""
        key = self.get_key(board)
        self.stats[-1]["cache_tries"] += 1
        self.stats[-1]["max_depth"] = max(level, self.stats[-1]["max_depth"])

        if key in self.cache_new or key in self.cache_old:
            self.stats[-1]["cache_hits"] += 1
            return self.get_from_cache(
                minValue_or_maxValue,
                key,
                board,
                current_evaluation,
                depth,
                level,
                alpha,
                beta,
            )
        result = minValue_or_maxValue(
            board, current_evaluation, depth, level, alpha, beta
        )
        self.store_in_cache(key, result, depth, alpha, beta)
        return result

In der Methode `get_next_middle_game_move` wird nun bei jedem Zug überprüft, welcher Spieler an der Reihe ist. Abhängig davon wird `evaluate_minimax` entweder mit `minValue` oder mit `maxValue` aufgerufen, um den besten Zug zu ermitteln. Alle sonstigen Änderungen dienen nur der genaueren Erfassung von statistischen Daten.

In [None]:
import sys
from datetime import datetime


class ExperimentalAI(ExperimentalAI):  # type: ignore
    def get_next_middle_game_move(self, board: chess.Board) -> chess.Move:
        """Gets the best next move"""
        self.last_evaluation: int | None  # type annotation for mypy
        self.stats[-1]["cache_hits"] = 0
        self.stats[-1]["cache_tries"] = 0
        self.stats[-1]["leaf_ctr"] = 0
        self.stats[-1]["max_depth"] = -1
        self.stats[-1]["avg_depth"] = -1
        self.stats[-1]["depth_sum"] = 0

        if self.is_king_endgame != self.check_king_endgame(board):
            self.last_evaluation += self.get_endgame_evaluation_change(board)
        # Calculate current evaluation
        if self.last_evaluation is None:  # type: ignore
            current_evaluation = self.full_evaluate(board)
        else:
            # Get current evaluation (after opponent move)
            last_move = board.pop()
            current_evaluation = self.incremental_evaluate(board, self.last_evaluation, last_move)  # type: ignore
            board.push(last_move)

        evaluation_function = self.maxValue if board.turn else self.minValue
        self.stats[-1]["nodes"] = 0
        future_evaluation, best_move = self.minimax(
            evaluation_function, board, current_evaluation, self.DEPTH
        )

        # Debugging fail safe
        assert best_move, f"""
        Best move is None with fen '{board.fen()}' at player {type(self).__name__}! 
        depth: {self.DEPTH}, last_eval: {self.last_evaluation}, current_evaluation: {current_evaluation},
        is_king_engame: {getattr(self, 'is_king_endgame', "N/A")}, move_stack: {board.move_stack}
        """
        # Update last evaluation (after player move)
        self.last_evaluation = self.incremental_evaluate(
            board, current_evaluation, best_move
        )
        # Update stats
        self.stats[-1]["minimax_eval"] = future_evaluation
        self.stats[-1]["board_eval_before_move"] = current_evaluation
        self.stats[-1]["board_eval_after_move"] = self.last_evaluation
        self.stats[-1]["avg_depth"] = self.stats[-1]["depth_sum"] / (
            self.stats[-1]["leaf_ctr"] or 1
        )
        del self.stats[-1]["depth_sum"]
        del self.stats[-1]["leaf_ctr"]
        self.stats[-1]["cache_size_mb"] = round(
            (sys.getsizeof(self.cache_new) + sys.getsizeof(self.cache_old))
            / (1024 * 1024),
            2,
        )
        self.cache_old.clear()
        self.cache_old = self.cache_new
        self.cache_new = {}

        self.stats[-1]["datetime"] = datetime.now().strftime("%H:%M:%S")
        return best_move

## Debugging Bereich

Die folgenden Zellen enthalten Unit-Tests der oben implementierten Funktionen.

In [None]:
import Exercise02AI as Exercise02AI_
import Exercise04AI as Exercise04AI_

In [None]:
# Create player and board
unit_test_player = ExperimentalAI(player_name="ExpAI", search_depth=3, max_depth=3)
board = chess.Board("5rk1/1b3p2/8/3p4/3p2P1/2Q4B/5P1K/R3R3 b - - 0 36")
board

In [None]:
# Test minimax
def test_minimax(
    unit_test_player: ChessAI,
    board: chess.Board,
    current_evaluation: int,
    expected_evaluation: int,
    expected_move: str,
    expected_nodes: int,
    expected_cache_tries: int,
    expected_cache_hits: int,
    expected_cache_elements: int,
):
    unit_test_player.cache_new = {}  # Clear cache
    unit_test_player.cache_old = {}  # Clear cache
    unit_test_player.stats[-1]["cache_tries"] = 0
    unit_test_player.stats[-1]["cache_hits"] = 0
    unit_test_player.stats[-1]["nodes"] = 0
    unit_test_player.stats[-1]["leaf_ctr"] = 0
    unit_test_player.stats[-1]["max_depth"] = 0
    unit_test_player.stats[-1]["depth_sum"] = 0
    f = unit_test_player.maxValue if board.turn else unit_test_player.minValue
    mm_evaluation, mm_move = unit_test_player.minimax(
        f, board, current_evaluation, unit_test_player.DEPTH
    )
    nodes = unit_test_player.stats[-1]["nodes"]
    cache_tries = unit_test_player.stats[-1]['cache_tries']
    cache_hits = unit_test_player.stats[-1]['cache_hits']
    print(f"Minimax Evaluation: {mm_evaluation}")
    print(f"Minimax Move: {mm_move}")
    print(f"Nodes searched: {nodes}")
    print(f"Cache tries: {cache_tries}")
    print(f"Cache hits: {cache_hits}")
    print(f"Elements in new cache: {len(unit_test_player.cache_new)}")
    print(f"Elements in old cache: {len(unit_test_player.cache_old)}")
    assert (
        mm_evaluation == expected_evaluation
    ), "Minimax evaluation does not match expected value!"
    assert mm_move.uci() == expected_move, "Minimax move does not match expected value!"
    assert nodes == expected_nodes, "Searched node count has changed!"
    assert cache_tries == expected_cache_tries, "Cache tries do not match expected value!"
    assert cache_hits == expected_cache_hits, "Cache hits do not match expected value!"
    assert len(unit_test_player.cache_new) == expected_cache_elements, "Cache elements do not match expected value!"
    assert len(unit_test_player.cache_old) == 0, "Cache elements do not match expected value!"


In [None]:
test_minimax(
    unit_test_player,
    board,
    current_evaluation=1240,
    expected_evaluation=325,
    expected_move="d4c3",
    expected_nodes=2336,
    expected_cache_tries=2788,
    expected_cache_hits=557,
    expected_cache_elements=2231,
)

In [None]:
# Test next move function (with memoized minimax result)
Exercise02AI_.test_next_move(
    unit_test_player,
    board,
    expected_move="d4c3",
    expected_nodes=0,
)

## Temporärer Bereich

Der folgende Bereich dient zum temporären Debuggen und kann nicht-funktionierenden Code enthalten. Dieser Bereich wird vor der Abgabe entfernt.