In [24]:
import gzip
from collections import defaultdict
import math
import csv
import numpy as np
import string
import random
import string
import chess
import random
from typing import Tuple, List, Optional, Dict, Any
from stockfish import Stockfish
import os

In [25]:
class ChessEnv:
    """
    A chess environment for reinforcement learning algorithms like Monte Carlo
    that uses python-chess as the underlying chess engine and python-stockfish for position evaluation.
    """

    def __init__(self, stockfish_path: str, stockfish_params: Dict = None, max_steps: int = 100, stockfish_depth: int = 10):
        """
        Initialize the chess environment with Stockfish integration using the Python package.

        Args:
            stockfish_path: Path to the Stockfish executable.
            stockfish_params: Dictionary of parameters for the Stockfish engine.
                If None, default parameters will be used.
            max_steps: Maximum number of steps before the game is considered a draw.
            stockfish_depth: Depth for Stockfish evaluation.
        """
        self.board = chess.Board()
        self.max_steps = max_steps
        self.stockfish_depth = stockfish_depth
        self.steps = 0
        self.done = False
        self.result = None

        # Default Stockfish parameters
        default_params = {
            "Threads": 1,
            "Hash": 16,
        }

        # Use provided parameters or defaults
        if stockfish_params is None:
            stockfish_params = default_params
        else:
            # Merge with defaults, preferring provided values
            for key, value in default_params.items():
                if key not in stockfish_params:
                    stockfish_params[key] = value

        # Initialize Stockfish engine using the python package
        self.stockfish = Stockfish(path=stockfish_path, parameters=stockfish_params)
        self.stockfish.set_depth(self.stockfish_depth)

    def __del__(self):
        """Destructor."""
        self.close()

    def close(self):
        """Explicitly close the engine."""
        if hasattr(self, 'stockfish') and self.stockfish is not None:
          del self.stockfish
          self.stockfish = None

    def reset(self) -> chess.Board:
        """
        Reset the environment to the starting position.

        Returns:
            The initial state (chess board).
        """
        self.board = chess.Board()
        self.stockfish.set_fen_position(self.board.fen()) # corrected to use fen.
        self.steps = 0
        self.done = False
        self.result = None
        return self.board

    def evaluate_position(self) -> float:
        """
        Evaluate the current position using Stockfish.

        Returns:
            Numerical evaluation from white's perspective, in pawns (1.0 = 1 pawn advantage)
        """
        # Skip evaluation if game is over
        if self.board.is_game_over():
            outcome = self.board.outcome()
            if outcome.winner == chess.WHITE:
                return 10.0  # White wins
            elif outcome.winner == chess.BLACK:
                return -10.0  # Black wins
            else:
                return 0.0  # Draw

        # Update Stockfish with the current position using fen (official state string for chess)
        self.stockfish.set_fen_position(self.board.fen())

        # Get evaluation from stockfish
        evaluation = self.stockfish.get_evaluation()

        # Parse the evaluation
        if evaluation["type"] == "cp":
            # Centipawn evaluation (convert to pawns)
            return evaluation["value"] / 100.0
        else:
            # Mate evaluation
            mate_in = evaluation["value"]
            if mate_in > 0:
                return 9.0 + (1.0 / mate_in)  # Positive for white winning
            else:
                return -9.0 - (1.0 / mate_in)  # Negative for black winning

    def step(self, action: chess.Move) -> Tuple[chess.Board, float, bool, Dict[str, Any]]:
        """
        Take a step in the environment by making a move.

        Args:
            action: A chess move.

        Returns:
            Tuple containing:
            - next_state: The new board state after the move
            - reward: The reward for the action based on Stockfish evaluation (-10 to 10 scale)
            - done: Whether the game is finished
            - info: Additional information
        """
        if action not in self.board.legal_moves:
            raise ValueError(f"Illegal move: {action}")

        # Get evaluation before the move
        if self.steps > 0:  # Skip on first move
            eval_before = self.evaluate_position()
        else:
            eval_before = 0.0

        # Make the move
        self.board.push(action)
        self.steps += 1

        # Check if the game is over
        if self.board.is_game_over():
            self.done = True
            self.result = self.board.outcome()

            # Determine reward based on game result
            if self.result.winner == chess.WHITE:
                reward = 1.0  # White wins
            elif self.result.winner == chess.BLACK:
                reward = -1.0  # Black wins
            else:
                reward = 0.0  # Draw
        else:
            # Game continues
            self.done = False

            # Get evaluation after the move
            eval_after = self.evaluate_position()

            # Calculate reward based on position improvement/deterioration
            # Perspective: positive reward if white's position improves or black's position deteriorates
            player_perspective = 1 if self.board.turn == chess.BLACK else -1
            reward = player_perspective * (eval_after - eval_before)

            # Terminate when max steps are reached
            if self.steps >= self.max_steps:
                self.done = True
                reward = 0.0 

        info = {
            "steps": self.steps,
            "result": self.result,
            "legal_moves": list(self.board.legal_moves),
            "evaluation": self.evaluate_position()
        }

        return self.board, reward, self.done, info

    def get_legal_actions(self) -> List[chess.Move]:
        """
        Get all legal moves from the current position.

        Returns:
            List of legal moves.
        """
        return list(self.board.legal_moves)

    def get_random_action(self) -> Optional[chess.Move]:
        """
        Get a random legal move from the current position.

        Returns:
            A random legal move, or None if there are no legal moves.
        """
        legal_moves = self.get_legal_actions()
        if not legal_moves:
            return None
        return random.choice(legal_moves)

    def render(self) -> str:
        """
        Render the board as a string.

        Returns:
            String representation of the board.
        """
        return str(self.board)


# Example usage
stockfish_path = "/Users/kaust/stockfish/stockfish-windows-x86-64-avx2.exe" # Change this to your path to stockfish
env = ChessEnv(stockfish_path)

In [16]:
import pandas as pd

In [17]:
#Load results.csv
train_lichess_df = pd.read_csv("databases/Lichess(training)/games.csv")

In [26]:
squares_index = {
    'a': 0,
    'b': 1,
    'c': 2,
    'd': 3,
    'e': 4,
    'f': 5,
    'g': 6,
    'h': 7
}

# example: h3 -> 1/
def square_to_index(square):
    letter = chess.square_name(square)
    return 8 - int(letter[1]), squares_index[letter[0]]

def split_dims(board):
    # this is the 3d matrix
    board3d = np.zeros((14, 8, 8), dtype=np.int8)

    # here we add the pieces's view on the matrix
    for piece in chess.PIECE_TYPES:
        for square in board.pieces(piece, chess.WHITE):
            idx = np.unravel_index(square, (8, 8))
            board3d[piece - 1][7 - idx[0]][idx[1]] = 1
        for square in board.pieces(piece, chess.BLACK):
            idx = np.unravel_index(square, (8, 8))
            board3d[piece + 5][7 - idx[0]][idx[1]] = 1

    # add attacks and valid moves too
    # so the network knows what is being attacked
    aux = board.turn
    board.turn = chess.WHITE

    for move in board.legal_moves:
        i, j = square_to_index(move.to_square)
        board3d[12][i][j] = 1

    board.turn = chess.BLACK

    for move in board.legal_moves:
        i, j = square_to_index(move.to_square)
        board3d[13][i][j] = 1

    board.turn = aux

    return board3d

In [27]:
import chess.engine
board = env.reset()

In [28]:
train_lichess_df.columns

Index(['id', 'rated', 'created_at', 'last_move_at', 'turns', 'victory_status',
       'winner', 'increment_code', 'white_id', 'white_rating', 'black_id',
       'black_rating', 'moves', 'opening_eco', 'opening_name', 'opening_ply'],
      dtype='object')

In [29]:
import time
import os

In [31]:
stockfish = Stockfish(stockfish_path)
def stockfishEval(fen, depth):
    # Set the board position using FEN notation
    stockfish.set_fen_position(chess.Board.fen(fen))
    
    # Set the evaluation depth specifically for this position
    stockfish.set_depth(depth)
    
    # Get evaluation
    evaluation = stockfish.get_evaluation()
    
    # Parse the evaluation
    if evaluation["type"] == "cp":
        # Centipawn evaluation (convert to pawns)
        return evaluation["value"] / 100.0
    else:
        # Mate evaluation
        mate_in = evaluation["value"]
        if mate_in == 0:
            # Handle the edge case where mate_in is 0
            # This could indicate an immediate mate (checkmate on the board)
            # or potentially an error condition
            return 10.0 if env.board.turn == chess.BLACK else -10.0  # Immediate mate for the side that just moved
        elif mate_in > 0:
            return 9.0 + (1.0 / mate_in)  # Positive for white winning
        else:
            return -9.0 - (1.0 / mate_in)  # Negative for black winning


In [None]:
start_time = time.time()
total_games = len(train_lichess_df)
total_positions = 0
batch_size = 10

print(f"Starting to process {total_games} games...")

output_csv_path = 'C:/Users/kaust/Desktop/UCSD/Winter 2025/COGS 188/Project/COGS188_project/cleaned_training_data.csv'
# Create output directory if it doesn't exist
os.makedirs(os.path.dirname(output_csv_path), exist_ok=True)

# Open CSV file for writing
with open(output_csv_path, 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)
    
    # Write header - added current_eval column
    csv_writer.writerow(['gameID', 'game_state', 'stateEval', 'current_eval'])
    
    # Loop through each game in the dataset
    for index, row in train_lichess_df.iterrows():
        game_id = row['id']
        moves_string = row['moves']
        game_positions = 0  # Counter for positions in current game
        
        # Print progress for every batch of games
        if index % batch_size == 0:
            elapsed_time = time.time() - start_time
            games_per_second = (index + 1) / elapsed_time if elapsed_time > 0 else 0
            
            # Estimate remaining time
            games_remaining = total_games - (index + 1)
            estimated_time_remaining = games_remaining / games_per_second if games_per_second > 0 else 0
            
            # Format time remaining in hours, minutes, seconds
            hours, remainder = divmod(estimated_time_remaining, 3600)
            minutes, seconds = divmod(remainder, 60)
            time_format = f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
            
            # Clear previous lines (3 lines) and print new progress information
            print("\033[A\033[K" * 3, end="")  # Move up 3 lines and clear them
            print(f"Game {index+1}/{total_games}: {game_id}")
            print(f"Games/s: {games_per_second:.2f} | Total positions: {total_positions}")
            print(f"Estimated time remaining: {time_format}")
        
        # Split the moves string by spaces
        moves_list = moves_string.split()
        
        # Reset the environment for a new game
        env.reset()
        # For each move in the game
        for move in moves_list:
            action = env.board.parse_san(move)  # Action is the chess move notation
            next_state, reward, done, info = env.step(action)
            # Process the board state
            processed_state = split_dims(next_state)
            # Evaluate the board state using Stockfish
            evaluation = stockfishEval(next_state, 10)
            
            # Write the data to CSV - now including current_eval (reward)
            csv_writer.writerow([game_id, processed_state, evaluation, reward])
            total_positions += 1
            game_positions += 1
            
            # If the game is done, break the loop
            if done or reward == 10.0 or reward == -10.0:
                break

print()
print(f"Processing complete! Total positions evaluated: {total_positions}")
print(f"Total time elapsed: {time.time() - start_time:.2f} seconds")

# Compress the CSV file to gzip
print(f"Compressing {output_csv_path} to {output_csv_path}.gz...")
with open(output_csv_path, 'rb') as f_in:
    with gzip.open(f"{output_csv_path}.gz", 'wb') as f_out:
        f_out.writelines(f_in)

print(f"Compression complete! File saved as {output_csv_path}.gz")

Starting to process 20058 games...
[A[K[A[K[A[KGame 1/20058: TZJHLljE
Games/s: 61.63 | Total positions: 0
Estimated time remaining: 0h 5m 25s[K[A[K[A[KGame 11/20058: HgKLWPsz
Games/s: 0.68 | Total positions: 459
Estimated time remaining: 8h 10m 6s[K[A[K[A[KGame 21/20058: oQklnWWp
Games/s: 0.61 | Total positions: 944
Estimated time remaining: 9h 6m 20s[K[A[K[A[KGame 31/20058: fXhNOnOn
Games/s: 0.61 | Total positions: 1312
Estimated time remaining: 9h 4m 2s[K[A[K[A[KGame 41/20058: R9a2DLwe
Games/s: 0.61 | Total positions: 1666
Estimated time remaining: 9h 8m 56s[K[A[K[A[KGame 51/20058: AAw1TiNN
Games/s: 0.60 | Total positions: 2041
Estimated time remaining: 9h 15m 43s[K[A[K[A[KGame 61/20058: 9tU9MM6P
Games/s: 0.64 | Total positions: 2284
Estimated time remaining: 8h 43m 54s[K[A[K[A[KGame 71/20058: NZSBXWl2
Games/s: 0.60 | Total positions: 2746
Estimated time remaining: 9h 12m 27s[K[A[K[A[KGame 81/20058: ThCXomTn
Games/s: 0.55 | Total position

In [None]:
# Compress the CSV file to gzip
print(f"Compressing {output_csv_path} to {output_csv_path}.gz...")
with open(output_csv_path, 'rb') as f_in:
    with gzip.open(f"{output_csv_path}.gz", 'wb') as f_out:
        f_out.writelines(f_in)

print(f"Compression complete! File saved as {output_csv_path}.gz")