In [None]:
import os
import pandas as pd
from apache_beam import coders
from utils import bagz
import chess
from io import StringIO

In [7]:
CODERS = {
    'fen': coders.StrUtf8Coder(),
    'move': coders.StrUtf8Coder(),
    'count': coders.BigIntegerCoder(),
    'win_prob': coders.FloatCoder(),
}
CODERS['action_value'] = coders.TupleCoder((
    CODERS['fen'],
    CODERS['move'],
    CODERS['win_prob'],
))

In [8]:
class ChessDataLoader:
    """Modular class to load different types of chess datasets from Bagz files."""

    def __init__(self, data_type: str, base_path: str):
        """
        Initialize the data loader.

        Args:
            data_type (str): One of 'action_value'.
            base_path (str): Path to the folder containing the dataset.
        """
        if data_type not in {"action_value"}:
            raise ValueError(f"Invalid data type: {data_type}")

        self.data_type = data_type
        self.file_path = os.path.join(base_path, f"{data_type}_data.bag")
        self.data_source = bagz.BagDataSource(self.file_path)

    def load_data(self) -> pd.DataFrame:
        """Loads the data from the Bagz file into a Pandas DataFrame."""
        records = []
        decoder = CODERS[self.data_type].decode

        for i in range(len(self.data_source)):
            element = self.data_source[i]
            decoded = decoder(element)

            if self.data_type == "action_value":
                fen, move, win_prob = decoded
                record = {"FEN": fen, "Move": move, "Win Probability": win_prob}
            else:
                continue 

            records.append(record)

        return pd.DataFrame(records)

In [9]:
data_path = os.path.join(os.getcwd(), "data/test")

# Load all datasets into separate DataFrames
df_action_value = ChessDataLoader("action_value", data_path).load_data()

In [10]:
# Group by 'FEN', sort by 'Win Probability' in descending order, and aggregate
df_action_value_merged = (
    df_action_value
    .sort_values(by=["FEN", "Win Probability"], ascending=[True, False]) 
    .groupby("FEN", as_index=False)
    .agg({"Move": list, "Win Probability": list})  # Convert to lists
)

In [18]:
def convert_uci_moves_to_pgn(fen, uci_moves):
    """
    Convert UCI moves to PGN move notation based on a given FEN position.
    """
    board = chess.Board(fen)
    pgn_moves = []
    
    for uci in uci_moves:
        move = chess.Move.from_uci(uci)
        if move in board.legal_moves:
            pgn_moves.append(board.san(move)) 

    return pgn_moves

# Apply conversion
df_action_value_merged["PGN"] = df_action_value_merged.apply(
    lambda row: convert_uci_moves_to_pgn(row["FEN"], row["Move"]), axis=1
)

In [None]:
# Define the path where the CSV file will be saved
csv_file_path = "chess_challenges_full_pgn.csv"

# Store the DataFrame as a CSV file
df_action_value_merged.to_csv(csv_file_path, index=False)

print(f"DataFrame has been saved to {csv_file_path}")

DataFrame has been saved to chess_action_value_test.csv
