# Generate data
Download the dataset from lichess. It also requires decompression and PGN->CSV extraction

## Load and Helper Functions
Load the paramateres, and define helper functions for downloading Lichess data

In [None]:
!pip install chess
!pip install datasets
!pip install zstandard

from google.colab import drive
import json
import zstandard
import io
import chess.pgn
import pandas as pd
import re

drive.mount('/content/drive')




In [None]:
# @title Download lichess data and Decompress
# Decompress the file into the dest_path

def get_lichess(lichess_path, dest_path):
  !wget -O 'temp.gn.zst' {lichess_path}

  dctx = zstandard.ZstdDecompressor()
  with open('temp.gn.zst' , 'rb') as source, open(dest_path, 'wb') as dest:
      # Decompress the data and write it to the destination file.
      dctx.copy_stream(source, dest)


In [None]:
# @title Convert PGN list to list str(PGN)

# PGN is not as easy to use, so we output a lst of games
def load_games(dataset_path):
    with open(dataset_path, 'r') as f:
        data = f.read()

    # Split the file content by '[Event ' as a delimiter
    games_str = re.split('\[Event ', data)[1:]

    # Preserve the original structure
    games_list = ['[Event ' + game for game in games_str]
    return games_list


## Generate General Lichess
Generate the basic database to be used for all purposes later on. All task-specific datasets will be drawn from it

In [None]:
from datasets import Dataset, DatasetDict, load_from_disk

name = 'data'
data_path = '/content/drive/MyDrive/ProjectNLP/data/lichess_datasets/'
# Handle the pgn file to create a csv file
get_lichess('https://database.lichess.org/standard/lichess_db_standard_rated_2015-09.pgn.zst', name + '.pgn')

games = load_games( name + '.pgn')

--2023-08-08 15:49:22--  https://database.lichess.org/standard/lichess_db_standard_rated_2015-09.pgn.zst
Resolving database.lichess.org (database.lichess.org)... 141.95.66.62, 2001:41d0:700:5e3e::
Connecting to database.lichess.org (database.lichess.org)|141.95.66.62|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 547459020 (522M) [application/octet-stream]
Saving to: ‘temp.gn.zst’


2023-08-08 15:49:29 (78.7 MB/s) - ‘temp.gn.zst’ saved [547459020/547459020]



In [None]:
# process the games list to a df
games_df = pd.DataFrame(games)
games_df = games_df.sample(frac=1).reset_index()
games_df = games_df.rename(columns={0:'game_str'})

games_df.to_csv(data_path + name + '.csv', index=False)

# Pre-process Data
The entire pre-processing, from the lichess games dump into task-specfic train-ready datasets

## Helpers and Loading
Load all the parameters, drive, helper functions and leela chess if needed

In [None]:
# @title Loading & Parameters

!pip install chess
!pip install datasets

from google.colab import drive
import io
import chess
import chess.engine
import chess.pgn
from collections import Counter
import pandas as pd
import os
import json
import re
import random
import numpy as np
from sklearn.model_selection import train_test_split
from datasets import load_dataset, load_from_disk, Dataset, DatasetDict

drive.mount('/content/drive/')

# Setting permission for the engine files
!chmod 777 '/content/drive/MyDrive/ProjectNLP/engines/stockfish-ubuntu-20.04-x86-64'
!chmod 777 '/content/drive/MyDrive/ProjectNLP/engines/maia-1900/lc0'
!chmod 777 '/content/drive/MyDrive/ProjectNLP/engines/maia-1100/lc0'
!chmod 777 '/content/drive/MyDrive/ProjectNLP/engines/maia-1400/lc0'
!chmod 777 '/content/drive/MyDrive/ProjectNLP/engines/maia-1600/lc0'

stockfish_path = '/content/drive/MyDrive/ProjectNLP/engines/stockfish-ubuntu-20.04-x86-64'
maia_1900_path = '/content/drive/MyDrive/ProjectNLP/engines/maia-1900/lc0'
maia_1100_path = '/content/drive/MyDrive/ProjectNLP/engines/maia-1100/lc0'
maia_1600_path = '/content/drive/MyDrive/ProjectNLP/engines/maia-1600/lc0'
data_path = '/content/drive/MyDrive/ProjectNLP/data/'


Collecting chess
  Downloading chess-1.10.0-py3-none-any.whl (154 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/154.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━[0m [32m92.2/154.4 kB[0m [31m2.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.4/154.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: chess
Successfully installed chess-1.10.0
Collecting datasets
  Downloading datasets-2.14.5-py3-none-any.whl (519 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m519.6/519.6 kB[0m [31m9.7 MB/s[0m eta [36m0:00:00[0m
Collecting dill<0.3.8,>=0.3.0 (from datasets)
  Downloading dill-0.3.7-py3-none-any.whl (115 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m115.3/115.3 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
Collecting xxhash (from datasets)
  Downloading xxhash-3.3.0-

### Helpers

In [None]:
# @title Basic Helpers
from google.colab import files

def extract_positions_and_moves(game, amount_before_end=-1):
    """
    Transform a game string to a df of positions, respective moves and position-data.
    """
    data = {
        "game_pgn":[], "fen_pos": [], "san_pos": [],   "actual_san_move": [], "position_number": [],
        "post_move_san_pos":[], "post_move_fen_pos":[], 'move_color':[], 'is_checking':[], 'is_checkmating':[]#"uci_pos": [] "actual_uci_move": [],
    }
    # Check if game is a string, if yes convert it to a game object
    if isinstance(game, str):
        game = chess.pgn.read_game(io.StringIO(game))

    board = game.board()
    uci_sequence, san_sequence, count = [], [],0

    total_moves = sum(1 for _ in game.mainline_moves())
    start_move = total_moves + amount_before_end if amount_before_end < 0 else amount_before_end

    for move_num, move in enumerate(game.mainline_moves(), start=start_move):
        if move_num < start_move:
            continue

    # Iterate over the game and output positions and moves
    for move in game.mainline_moves():
        count+=1
        data["game_pgn"].append(game)
        data["fen_pos"].append(board.fen())
        data['move_color'].append('White' if board.turn else 'Black')
        data["san_pos"].append(san_sequence.copy())
        data["actual_san_move"].append(board.san(move))
        data['position_number'].append(count)

        # Maintain SAN format
        if count % 2 == 1:
            san_sequence.append(f"{count // 2 + 1}. {board.san(move)}")
        else:
            san_sequence.append(board.san(move))

        board.push(move)
        is_checking = board.is_check()
        is_checkmating = board.is_checkmate()
        # Add is_checking and is_checkmating information after board push
        data["is_checking"].append(is_checking)
        data["is_checkmating"].append(is_checkmating)

        # Save the positions after the move
        data["post_move_fen_pos"].append(board.fen())
        data["post_move_san_pos"].append(san_sequence.copy())


    data["san_pos"] = [" ".join(moves) for moves in data["san_pos"]]
    data["post_move_san_pos"] = [" ".join(moves) for moves in data["post_move_san_pos"]]

    return pd.DataFrame(data)

def sample_positions_from_game(pgn_str, amount, start_after_X_moves=10, random=True):
    """
    Sample random positions from a game, and use extract_positions_and_moves.
    """
    game = chess.pgn.read_game(io.StringIO(pgn_str))
    total_moves = sum(1 for _ in game.mainline_moves())

    if total_moves < start_after_X_moves + amount:
        return pd.DataFrame()  # return empty dataframe

    if random:
      sampling_positions = np.random.uniform(start_after_X_moves, total_moves-1, amount).astype(int)
    else:
      sampling_positions = np.linspace(start_after_X_moves, total_moves-1, amount).astype(int)

    game_data = extract_positions_and_moves(game)
    sampled_data = game_data.iloc[sampling_positions]  # use iloc to get rows at specific indices

    return sampled_data.reset_index(drop=True)  # Reset index for good measure


def sample_positions_from_games(pgn_games, amount, start_after_X_moves=10, random=True):
    samples = []
    for game in pgn_games['game_str']:
        sample = sample_positions_from_game(game, amount, start_after_X_moves, random)
        if len(sample) > 0:  # Only append non-empty samples
            samples.append(sample)
    return pd.concat(samples, ignore_index=True)  # Concatenates dataframes


def display_chess_position(input_data, size=300, download=False):
    """
    Display the board in the notebook
    """
    if isinstance(input_data, chess.Board):
        board = input_data
    elif isinstance(input_data, str):
        board = chess.Board(input_data)
    else:
        raise ValueError("Input must be a chess.Board object or a FEN string")
    display(SVG(chess.svg.board(board=board, size=size)))

    if (download):
      # Save the SVG to a file
      with open('x.svg', "w") as file:
          file.write(chess.svg.board(board=board, size=size))

      # Download the file
      files.download('x.svg')

def extract_san_moves(pgn):
    game = chess.pgn.read_game(io.StringIO(pgn))
    san_moves = [chess.Board().san(move) for move in game.mainline_moves()]
    return san_moves

def get_rand(games, lower_bound, upper_bound, only_white=True):
    """
    Extract a random position from each game in the given bounds.
    """
    positions = []

    for game_str in games['game_str']:
        game_df = extract_positions_and_moves(game_str)

        if len(game_df) >= upper_bound:
            if only_white:
                # Select only from even indices within the range for white's moves
                i = random.choice([index for index in range(lower_bound, upper_bound) if index % 2 == 0])
            else:
                i = random.choice(range(lower_bound, upper_bound))

            position = game_df.iloc[i]
            positions.append(position)

    return pd.DataFrame(positions)

# Tests
example2 = '''1. e4 c5 2. Nf3 Nc6 3. Bb5 g6 4. O-O Bg7 5. Re1 Nf6 6. e5 Nd5 7. Nc3 Nc7 8. Bxc6
dxc6 9. h3 Ne6 10. d3 O-O 11. a4 a5 12. Be3 h6 13. Ne4 b6 14. Ng3 Nd4 15. Bd2
Ra7 16. Re4 Rd7 17. Bc3 Rd5 18. Bxd4 cxd4 19. Qd2 Be6 20. Rae1 Qd7 21. Rh4 c5
22. b3 Ra8 23. Rxh6 b5 24. axb5 Qxb5 25. Re4 Qb8 26. Reh4 Rxe5 27. Rh7 Qf8 28.
Nxe5 Bxe5 29. Qg5 Bg7 30. Nh5 1-0'''

example1='''1.e4 e6 2.d4 d5 3.Nd2 Nf6 4.e5 Nfd7 5.f4 c5 6.c3 Nc6 7.Ndf3 cxd4 8.cxd4 f6
9.Bd3 Bb4+ 10.Bd2 Qb6 11.Ne2 fxe5 12.fxe5 O-O 13.a3 Be7 14.Qc2 Rxf3 15.gxf3 Nxd4
16.Nxd4 Qxd4 17.O-O-O Nxe5 18.Bxh7+ Kh8 19.Kb1 Qh4 20.Bc3 Bf6 21.f4 Nc4 22.Bxf6 Qxf6
23.Bd3 b5 24.Qe2 Bd7 25.Rhg1 Be8 26.Rde1 Bf7 27.Rg3 Rc8 28.Reg1 Nd6 29.Rxg7 Nf5
30.R7g5 Rc7 31.Bxf5 exf5 32.Rh5+  1-0'''

sample_positions_from_game(example1, 3,2)

Unnamed: 0,game_pgn,fen_pos,san_pos,actual_san_move,position_number,post_move_san_pos,post_move_fen_pos,move_color,is_checking,is_checkmating
0,"[Event ""?""]\n[Site ""?""]\n[Date ""????.??.??""]\n...",r1b1k2r/pp1n2pp/1qn1p3/3pP3/1b1P4/3B1N2/PP1BN1...,1. e4 e6 2. d4 d5 3. Nd2 Nf6 4. e5 Nfd7 5. f4 ...,O-O,24,1. e4 e6 2. d4 d5 3. Nd2 Nf6 4. e5 Nfd7 5. f4 ...,r1b2rk1/pp1n2pp/1qn1p3/3pP3/1b1P4/3B1N2/PP1BN1...,Black,False,False
1,"[Event ""?""]\n[Site ""?""]\n[Date ""????.??.??""]\n...",r1b4k/pp4p1/4pq2/3p4/2n2P2/P2B4/1PQ4P/1K1R3R b...,1. e4 e6 2. d4 d5 3. Nd2 Nf6 4. e5 Nfd7 5. f4 ...,b5,46,1. e4 e6 2. d4 d5 3. Nd2 Nf6 4. e5 Nfd7 5. f4 ...,r1b4k/p5p1/4pq2/1p1p4/2n2P2/P2B4/1PQ4P/1K1R3R ...,Black,False,False
2,"[Event ""?""]\n[Site ""?""]\n[Date ""????.??.??""]\n...",2r4k/p4bR1/3npq2/1p1p4/5P2/P2B4/1P2Q2P/1K4R1 b...,1. e4 e6 2. d4 d5 3. Nd2 Nf6 4. e5 Nfd7 5. f4 ...,Nf5,58,1. e4 e6 2. d4 d5 3. Nd2 Nf6 4. e5 Nfd7 5. f4 ...,2r4k/p4bR1/4pq2/1p1p1n2/5P2/P2B4/1P2Q2P/1K4R1 ...,Black,False,False


In [None]:
#@title Analyze

def analyze_and_rank_fens(FENs, engine_path, analysis_time=0.3):
    """
    Analyze and rank a list of FENs using a chess engine
    """
    scores = []
    engine = chess.engine.SimpleEngine.popen_uci(engine_path)

    # Iterate over the FENs to analyze each position
    for fen in FENs:
        board = chess.Board(fen)
        info = engine.analyse(board, chess.engine.Limit(time=analysis_time))
        score = info["score"].relative.score()
        scores.append(score)

    engine.quit()
    return scores

def checkmate_in_one(fen):
    """
    Check if a given FEN has a checkmate in one move
    """
    engine = chess.engine.SimpleEngine.popen_uci(stockfish_path)

    board = chess.Board(fen)
    info = engine.analyse(board, chess.engine.Limit(time=2.0))
    is_checkmate_in_one = 'mate' in info and info['mate'] == 1

    engine.quit()
    return is_checkmate_in_one

# Convert centipawn difference to win chance as per lichess' definition
def centipawn_diff_to_winchance(centipawn_val):
    return 50 + 50 * (2 / (1 + math.exp(-0.00368208 * centipawn_val)) - 1)

In [None]:
# @title Database Manipulation

lichess_datasets_path = '/content/drive/MyDrive/ProjectNLP/data/lichess_datasets/'
task_specific_data_path = '/content/drive/MyDrive/ProjectNLP/data/task_specific_datasets/'
final_data_path = '/content/drive/MyDrive/ProjectNLP/data/final_datasets/'

def sample_and_remove_from_lichess_database(amount, split_name, load_if_exist=False):
    new_csv_path = lichess_datasets_path + split_name + '.csv'

    # Check if the file already exists
    if os.path.isfile(new_csv_path):
        if (load_if_exist):
           return pd.read_csv(new_csv_path)
        else:
          raise FileExistsError(f"A file with the name '{split_name}.csv' already exists.")

    data = pd.read_csv(lichess_datasets_path + 'data.csv')

    # Sample data and update remaining data
    sampled_data = data.sample(n=amount)
    remaining_data = data.drop(sampled_data.index)

    # Save the updated data
    remaining_data.to_csv(lichess_datasets_path + 'data.csv', index=False)
    sampled_data.to_csv(new_csv_path, index=False)

    return sampled_data

def save_task_specific_df(df, name):
    df.to_csv(task_specific_data_path + name + '.csv', index=False)

def load_task_specific_df(name):
    return pd.read_csv(task_specific_data_path + name + '.csv')

def save_to_final_database(dataset, name):
    """
    Add a new split to the final dataset
    """
    # Check if the name already exists
    if os.path.exists(final_data_path + name):
        raise FileExistsError(f"A file with the name '{name}' already exists.")

    dataset.save_to_disk(final_data_path + name)

def get_datasetdict(df, train_per, test_per, val_per):
    """
    Create datasetdict from the dataframe with train, test, val splits
    """
    train_df, temp_df = train_test_split(df, test_size=1-train_per)
    test_split_size = test_per / (test_per + val_per)
    test_df, val_df = train_test_split(temp_df, test_size=test_split_size)

    train_ds = Dataset.from_pandas(train_df.reset_index(drop=True))
    test_ds = Dataset.from_pandas(test_df.reset_index(drop=True))
    val_ds = Dataset.from_pandas(val_df.reset_index(drop=True))

    dataset_dict = DatasetDict({
        'train': train_ds,
        'test': test_ds,
        'validation': val_ds
    })

    return dataset_dict

def get_datasetdict_and_save(df, split_name, train_per, test_per, val_per):
    datasetdict = get_datasetdict(df, train_per, test_per, val_per)
    save_to_final_database(datasetdict, split_name)

def to_datasetdict(info, train_per, stratify_column=None):
    """
    Get the info dict describing the datasets and transform it to a dict with multiple splits.
    """
    dataset_dict = {}

    for data_info in info:
        # Prepare the DataFrame
        df = pd.DataFrame({
            'input': data_info['input'],
            'clean_input': data_info['clean_input']
        })

        if 'label' in data_info.keys():
            df['label'] = data_info['label']

        # If train_per is not 0, create train datasets
        if train_per != 0:
            # If stratify_column is provided, use it for stratified sampling
            if stratify_column and stratify_column in df.columns:
                train_df, test_df = train_test_split(df, train_size=train_per, stratify=df[stratify_column])
            else:
                train_df, test_df = train_test_split(df, train_size=train_per)

            train_df = train_df.reset_index(drop=True)
            train_dataset = Dataset.from_pandas(train_df)
            dataset_dict[data_info['name'] + '_train'] = train_dataset
        else:
            test_df = df

        test_df = test_df.reset_index(drop=True)
        test_dataset = Dataset.from_pandas(test_df)
        dataset_dict[data_info['name'] + '_test'] = test_dataset

    return DatasetDict(dataset_dict)

def balance_df(df, column_name):
    unique_vals = df[column_name].unique()

    if len(unique_vals) != 2:
        raise Exception(f"DataFrame {column_name} column should have exactly two unique values.")

    # Split the dataframe based on the two unique values
    first_rows = df[df[column_name] == unique_vals[0]]
    second_rows = df[df[column_name] == unique_vals[1]]

    min_rows = min(len(first_rows), len(second_rows))

    # Sample equal amounts from both sets
    sampled_first = first_rows.sample(n=min_rows)
    sampled_second = second_rows.sample(n=min_rows)

    balanced_df = pd.concat([sampled_first, sampled_second], axis=0)

    return balanced_df


## Chess Problems
Problems related to high level chess problems

### Is this position checkmate?


In [None]:
# @title Helpers

def generate_position_checkmate_data(games_df, upper_bound, lower_bound):
    data_frames_list = []
    # Extract 2 positions from each game
    for _, game in games_df.iterrows():
        extracted_df = extract_check_position_before_mate(game['game_str'], upper_bound, lower_bound)
        if extracted_df is not None:
            data_frames_list.append(extracted_df)

    # Concatenate all the dataframes in the list
    output = pd.concat(data_frames_list, axis=0)
    return output

def extract_check_position_before_mate(game_str, upper_bound, lower_bound):
    """
    Randomly extract 3 positions from a game - a check, a non-check and a checkmate.
    The positions are bounded, and biased to latter ones.
    """
    game_df = extract_positions_and_moves(game_str,upper_bound)
    result_df = pd.DataFrame()

    if not game_df.empty and game_df['is_checkmating'].iloc[-1] and game_df['move_color'].iloc[-1] == 'White':
        # If the move leads to checkmate, search for a check position in the range before it
        if len(game_df) > upper_bound:
            check_positions = []
            non_check_positions = []
            for i in range(-upper_bound, -lower_bound):
                if game_df['is_checking'].iloc[-1+i] and game_df['move_color'].iloc[-1+i] =='White' and not game_df['is_checkmating'].iloc[-1+i]:
                    check_positions.append(game_df.iloc[-1+i])

            for i in range(-upper_bound, -lower_bound):
                if not game_df['is_checking'].iloc[-1+i] and game_df['move_color'].iloc[-1+i] =='White' and not game_df['is_checkmating'].iloc[-1+i]:
                    non_check_positions.append(game_df.iloc[-1+i])


            if check_positions and non_check_positions:
                # Select a random check position from the list
                weights = [i**0.4 for i in range(1, len(check_positions) + 1)]
                check_position = random.choices(check_positions, weights=weights, k=1)[0]

                weights = [i**0.5 for i in range(1, len(non_check_positions) + 1)]
                non_check_position = random.choices(non_check_positions, weights=weights, k=1)[0]

                result_df = pd.concat([check_position,non_check_position, game_df.iloc[-1]], axis=1).transpose()

    return result_df



In [None]:
# @title Intermediate
name = 'classify_into_checkmate_check_none'

sampled_games = sample_and_remove_from_lichess_database(num_sample, name, True)[:800000]


NameError: ignored

In [None]:
# @title Intermediate
num_sample = 10000
name = 'classify_into_checkmate_check_none_test'

sampled_games = sample_and_remove_from_lichess_database(num_sample, name, True)[:800000]
mixed_df = generate_position_checkmate_data(sampled_games, upper_bound=8, lower_bound=1)
save_task_specific_df(mixed_df,name)

In [None]:
def classification(row):
    if row['is_checkmating']==True:
        return 'mate'
    if row['is_checking']==True:
        return 'check'
    return 'neither'

name = 'classify_into_checkmate_check_none_test'
df = load_task_specific_df(name)
# Apply the classification function properly
df['classification'] = df.apply(classification, axis=1)

# Use element-wise logical or
balanced_df = df
balanced_df['move_color_bool'] = balanced_df['move_color'].replace({'White': True, 'Black': False})
balanced_df['post_move_fen_pos'] = balanced_df['post_move_fen_pos'].apply(lambda x: x.split(' ')[0])


print(balanced_df)
# Create the prompt
balanced_df['prompt'] = "Is this san position " + balanced_df['post_move_fen_pos'] + " a mate, a check or neither for " + balanced_df['move_color'] + ":"

info = [
    {
        'input': balanced_df['prompt'],
        'clean_input': balanced_df['post_move_fen_pos'],
        'label': balanced_df['classification'],
        'name': 'fen_mate_in_one',
    }
]

train_per = 0
x = to_datasetdict(info, train_per, stratify_column='classification')
save_to_final_database(x, name)


### Mate_in_one_puzzles

In [None]:
#@title Helpers

# Count how many possible checkmates are there in a position
def count_checkmate_moves(fen):
    board = chess.Board(fen)
    moves = []
    legal_moves = list(board.legal_moves)
    checkmate_moves = 0

    for move in legal_moves:
        board.push(move)
        if board.is_checkmate():
            moves.append(move)
        board.pop()  # Undo the move

    return moves

# Extract the possible one move mates from the games
def extract_mate_in_one_positions(game_df):
    output_df = pd.DataFrame()
    games = game_df['game_str']

    for index, game in games.items():
        df = extract_positions_and_moves(game)
        # Check if df is empty
        if df.empty:
            continue
        last_pos = df['fen_pos'].iloc[-1]
        mates = count_checkmate_moves(last_pos)
        if len(mates) == 1 and df['move_color'].iloc[-1] == 'White' and df['actual_san_move'].iloc[-1][-1] == '#':
            output_df = pd.concat([output_df, df.iloc[-1].to_frame().transpose()])

    return output_df


In [None]:
# @title Intermediate
num_sample = 1500
name = 'mate_in_one_puzzles_test'

sampled_games = sample_and_remove_from_lichess_database(num_sample, name, True)
# Half in FEN half in ADAFEN
mates =extract_mate_in_one_positions(sampled_games)

save_task_specific_df(mates,name )
print(mates['post_move_fen_pos'])


In [None]:
# @title Final
name = 'mate_in_one_puzzles_test'

df = load_task_specific_df(name)
print(df['move_color'].value_counts())
print(df)
#fen_mate, san_mate = np.split(df,2 )
#mid_idx = len(df) // 2
#fen_mate = df.iloc[:mid_idx].copy()
#san_mate = df.iloc[mid_idx:].copy()
df['prompt'] = f"Is this position a checkmate " + df['fen_pos']
#san_mate['prompt'] = f"Find a move for White in the san chess position resulting in checkmate " + df['san_pos']

info = [
    {
        'input': df['prompt'],
        'clean_input': df['fen_pos'],
        'label': df['actual_san_move'],
        'name': 'fen_mate_in_one',
    },
 #   {
 #       'input': san_mate['prompt'],
 #       'clean_input': san_mate['san_pos'],
 #       'label': san_mate['actual_san_move'],
 #       'name': 'san_mate_in_one',
 #   }
]

train_per = 0
x = to_datasetdict(info, train_per)
save_to_final_database(x, name)



## Chess Problems (Not Implemented)


### Pick_better_move

In [None]:
# @title Helper
# Analyze the best moves from a list of FENs using a chess engine
def get_best_move(fen, engine_path, analysis_time=4):
    with chess.engine.SimpleEngine.popen_uci(engine_path) as engine:
        board = chess.Board(fen)
        result = engine.play(board, chess.engine.Limit(time=analysis_time))
        best_move = result.move

    return best_move.uci()  # returning the move in UCI format


def analyze_and_rank_moves(FEN, moves, engine_path, analysis_time=0.3):
    move_scores = {}

    with chess.engine.SimpleEngine.popen_uci(engine_path) as engine:
        board = chess.Board(FEN)

        for move in moves:
            board.push_san(move)  # Apply the move
            info = engine.analyse(board, chess.engine.Limit(time=analysis_time))

            # Check if the score is mate or centipawn score
            score = info["score"].relative.score()

            move_scores[move] = score

            board.pop()  # Reset the move

    # Sorting the moves based on their scores
    sorted_moves = sorted(move_scores.keys(), key=lambda move: move_scores[move], reverse=True)

    return sorted_moves


def best_moves_from_fens(position_df):
    pairs = []
    for _, position in position_df.iterrows():
        move_1100 = get_best_move(position['fen_pos'], maia_1100_path)
        move_1600 = get_best_move(position['fen_pos'], maia_1600_path)
        real_move = position['actual_san_move']
        possible_moves = [move_1100, move_1600, real_move]
        centipawn_values = analyze_and_rank_moves(position['fen_pos'], possible_moves, maia_1900_path)
        #blunder = get_blunder(position['fen_pos'], maia_1900_path, min(centipawn_values))

        temp_pairs = []
        # Find pairs with a difference above 100
        for i in range(len(possible_moves)):
            for j in range(i+1, len(possible_moves)):
                if abs(centipawn_values[i] - centipawn_values[j]) > 100:
                    temp_pairs.append((possible_moves[i]['san_pos'], possible_moves[j]['san_pos']))

        selected_pair =random.choice(temp_pairs)
        position['better_move'], position['worse_move'] = selected_pair
        pair = pair + selected_pair

    return pd.DataFrame(pairs)

def mix_the_two_columns(col1,col2):
    """
    For every row, randomly mix the two elements in the two columns
    """
    out1, out2 = [], []

    for i in range(len(col1)):
        if random.choice([True, False]):
            out1.append(col1[i])
            out2.append(col2[i])
        else:
            out1.append(col2[i])
            out2.append(col1[i])
    return out1, out2


In [None]:
# @title Intermediate
num_sample = 100000
name = 'pick_better_move'

sampled_games = sample_and_remove_from_lichess_database(num_sample, name, True)
rand_positions =get_rand(sampled_games, 20,30)
save_task_specific_df(rand_positions,name)

In [None]:
rand_positions

Unnamed: 0,game_pgn,fen_pos,san_pos,actual_san_move,position_number,post_move_san_pos,post_move_fen_pos,move_color,is_checking,is_checkmating
24,"[Event ""Rated Classical game""]\n[Site ""https:/...",r3kb1r/1pp1q1pp/p4n2/3p1b2/3n3N/2Np4/PPPP1PPP/...,1. e4 e5 2. Nf3 Nc6 3. Nc3 f5 4. exf5 Nf6 5. B...,cxd3,25,1. e4 e5 2. Nf3 Nc6 3. Nc3 f5 4. exf5 Nf6 5. B...,r3kb1r/1pp1q1pp/p4n2/3p1b2/3n3N/2NP4/PP1P1PPP/...,White,False,False
26,"[Event ""Rated Blitz game""]\n[Site ""https://lic...",r2q1rk1/pp2bp1n/2p1p2p/4Pbp1/4NB2/6P1/PPP2PBP/...,1. d4 d5 2. Nc3 c6 3. e3 Bf5 4. Nf3 h6 5. g3 N...,Be3,27,1. d4 d5 2. Nc3 c6 3. e3 Bf5 4. Nf3 h6 5. g3 N...,r2q1rk1/pp2bp1n/2p1p2p/4Pbp1/4N3/4B1P1/PPP2PBP...,White,False,False
26,"[Event ""Rated Bullet game""]\n[Site ""https://li...",r4rk1/pq2ppbp/6p1/2p1Nb2/8/2N5/PPPP1PPP/R1BQR1...,1. e4 c5 2. Nc3 Nc6 3. Nf3 g6 4. Bb5 Bg7 5. Bx...,d3,27,1. e4 c5 2. Nc3 Nc6 3. Nf3 g6 4. Bb5 Bg7 5. Bx...,r4rk1/pq2ppbp/6p1/2p1Nb2/8/2NP4/PPP2PPP/R1BQR1...,White,False,False
26,"[Event ""Rated Bullet game""]\n[Site ""https://li...",rnb1k2r/1p3ppp/p7/2p5/1B1Nn3/8/PP3PPP/R3KB1R w...,1. d4 Nf6 2. c4 d5 3. cxd5 Nxd5 4. e4 Nf6 5. N...,Nb5,27,1. d4 Nf6 2. c4 d5 3. cxd5 Nxd5 4. e4 Nf6 5. N...,rnb1k2r/1p3ppp/p7/1Np5/1B2n3/8/PP3PPP/R3KB1R b...,White,False,False
24,"[Event ""Rated Blitz game""]\n[Site ""https://lic...",2rq1rk1/1b1nbppp/p1ppp3/1p6/3PPPn1/2PBBN2/PP2N...,1. Nc3 Nf6 2. e4 d6 3. d4 c6 4. f4 e6 5. Bd3 B...,Bg1,25,1. Nc3 Nf6 2. e4 d6 3. d4 c6 4. f4 e6 5. Bd3 B...,2rq1rk1/1b1nbppp/p1ppp3/1p6/3PPPn1/2PB1N2/PP2N...,White,False,False
...,...,...,...,...,...,...,...,...,...,...
24,"[Event ""Rated Blitz game""]\n[Site ""https://lic...",r2qk2r/1bp1nppp/p2p1b2/1p3N2/4P3/1PN5/1PP2PPP/...,1. e4 e5 2. Nf3 Nc6 3. Bb5 d6 4. O-O a6 5. Ba4...,Nd5,25,1. e4 e5 2. Nf3 Nc6 3. Bb5 d6 4. O-O a6 5. Ba4...,r2qk2r/1bp1nppp/p2p1b2/1p1N1N2/4P3/1P6/1PP2PPP...,White,False,False
20,"[Event ""Rated Bullet game""]\n[Site ""https://li...",r2qk2r/pp2bppp/3p1n2/2pPn3/4P3/2NB1Q2/PP3PPP/R...,1. d4 c5 2. d5 e6 3. c4 exd5 4. cxd5 d6 5. Nc3...,Qe2,21,1. d4 c5 2. d5 e6 3. c4 exd5 4. cxd5 d6 5. Nc3...,r2qk2r/pp2bppp/3p1n2/2pPn3/4P3/2NB4/PP2QPPP/R1...,White,False,False
28,"[Event ""Rated Classical game""]\n[Site ""https:/...",5rk1/pp1np1bp/1q1p2p1/5p2/3p1P2/4P3/PPP1Q1PP/R...,1. Nc3 Nf6 2. e3 g6 3. d4 Bg7 4. Nf3 O-O 5. Be...,Rb1,29,1. Nc3 Nf6 2. e3 g6 3. d4 Bg7 4. Nf3 O-O 5. Be...,5rk1/pp1np1bp/1q1p2p1/5p2/3p1P2/4P3/PPP1Q1PP/1...,White,False,False
26,"[Event ""Rated Bullet game""]\n[Site ""https://li...",r3k2r/1p2q1pp/p1n1Q3/2bp4/8/2P5/PP3PPP/RNB1K1N...,1. e4 c5 2. c3 e6 3. d4 a6 4. d5 Nf6 5. Bd3 c4...,Qxe7+,27,1. e4 c5 2. c3 e6 3. d4 a6 4. d5 Nf6 5. Bd3 c4...,r3k2r/1p2Q1pp/p1n5/2bp4/8/2P5/PP3PPP/RNB1K1NR ...,White,True,False


In [None]:
# @title Final

name = 'pick_better_move'
df = rand_positions

pick_move_df =best_moves_from_fens(df)

pick_move_df[['move1','move2']] = mix_the_two_columns(pick_move_df['better_move'],pick_move_df['worse_move'])
rand_positions['san_prompt'] = rand_positions.apply(lambda row: f"In the following chess position, which of the moves is better? {row['san_pos']} {row['move1']} {row['move2']}.", axis=1)
rand_positions['fen_prompt'] = rand_positions.apply(lambda row: f"In the following chess position, which of the moves is better? {row['fen_pos']} {row['move1']} {row['move2']}.", axis=1)

info = [
    {
        'input': rand_positions['prompt'],
        'clean_input': rand_positions['san_pos'],
        'label': rand_positions['better_move'],
        'name': 'rand_positions',
    },
    {
        'input': rand_positions['prompt'],
        'clean_input': rand_positions['san_pos'],
        'label': rand_positions['better_move'],
        'name': 'rand_positions',
    }
    ]

x = to_datasetdict(info, 0.95)
save_to_final_database(x, name)

### Rand positions

In [None]:
# @title Intermediate
num_sample = 1000
name = 'rand_positions'

sampled_games = sample_and_remove_from_lichess_database(num_sample, name, True)
rand_positions =get_rand(sampled_games, 15,30)

rand_positions['pos_num'] = (((rand_positions['position_number'] -1) / 2) + 1).astype(int)
rand_positions['prompt'] = rand_positions.apply(lambda row: f"Q: Find the best move in the chess position {row['san_pos']} {row['pos_num']}.", axis=1)
info = [
    {
        'input': rand_positions['prompt'],
        'clean_input': rand_positions['san_pos'],
        'name': 'rand_positions',
    }]

x = to_datasetdict(info, 0)
save_to_final_database(x, name)

Saving the dataset (0/1 shards):   0%|          | 0/917 [00:00<?, ? examples/s]

In [None]:
rand_positions['prompt'].iloc[0]

'Q: Find the best move in the chess position 1. Nf3 e6 2. Nc3 d5 3. a4 f5 4. h4 h5 5. Rh3 g6 6. Rg3 Qf6 7. Ng5 Qe7 8. Re3 e5 9. Nxd5 Qd6 10.0.'