In [1]:
!pip install chess



In [2]:
import re

def parse_pgn_file(file_path):
    """
    Parse a PGN file and extract metadata and moves for each game.
    
    Args:
        file_path (str): Path to the PGN file
        
    Returns:
        list: List of dictionaries containing metadata and moves for each game
    """
    try:
        with open(file_path, 'r', encoding='ISO-8859-1') as file:
            content = file.read()
    except FileNotFoundError:
        print(f"Error: File '{file_path}' not found.")
        return []
    
    # Split the content into individual games
    games_raw = re.split(r'\n\n\[Event', content)
    
    # If the file starts with [Event, handle the first game differently
    if not games_raw[0].startswith('[Event'):
        games_raw[0] = '[Event' + games_raw[0]
    else:
        # This shouldn't happen with the example, but just in case
        games_raw = ['[Event' + game for game in games_raw]
    
    games_data = []
    
    for game in games_raw:
        if not game.strip():
            continue
        
        # Extract metadata
        metadata = {}
        metadata_pattern = r'\[(.*?) "(.*?)"\]'
        metadata_matches = re.findall(metadata_pattern, game)
        
        for key, value in metadata_matches:
            metadata[key] = value
        
        # Extract moves
        moves_section = re.split(r'\]\s*\[|\]\s*\n', game)[-1].strip()
        
        # Remove any result indicator at the end (like 1-0, 0-1, 1/2-1/2)
        moves_section = re.sub(r'\s+(1-0|0-1|1\/2-1\/2|\*)\s*$', '', moves_section)
        
        # Extract just the moves without the move numbers
        moves_pattern = r'(?:\d+\.\s*)?([a-zA-Z0-9+#=\-]+)(?:\s+|\Z)'
        moves = re.findall(moves_pattern, moves_section)
        
        # Group the moves for better readability
        paired_moves = []
        for i in range(0, len(moves), 2):
            if i + 1 < len(moves):
                paired_moves.append(f"{moves[i]} {moves[i+1]}")
            else:
                paired_moves.append(moves[i])
        
        games_data.append({
            "metadata": metadata,
            "moves": moves  # Using the unpaired moves for accurate representation
        })
    
    return games_data


In [3]:
def format_chess_moves(moves_list):
    formatted_moves = []
    move_number = 1
    
    for i in range(0, len(moves_list), 2):
        if i + 1 < len(moves_list):
            # Full move (white and black)
            formatted_moves.append(f"{move_number}.{moves_list[i]} {moves_list[i+1]}")
        else:
            # Last move (white only)
            formatted_moves.append(f"{move_number}.{moves_list[i]}")
        move_number += 1
    
    # Join all moves with a space and format to have line breaks every 8 moves
    result = ""
    for i, move in enumerate(formatted_moves):
        result += move
        if (i + 1) % 8 == 0:
            result += "\n"
        else:
            result += " "
    
    return result.strip()

In [4]:
import chess

def predict_next_moves(move_list):
    # Initialize a chess board
    board = chess.Board()
    
    # Play all the moves to reach the current position
    for move_str in move_list:
        try:
            move = board.parse_san(move_str)
            board.push(move)
        except ValueError:
            print(f"Invalid move: {move_str}")
            return []
    
    # Get all legal moves from the current position
    legal_moves = list(board.legal_moves)
    
    # Convert UCI moves to SAN format for readability
    readable_moves = [board.san(move) for move in legal_moves]
    
    # Print the current board state
    # print("Current board position:")
    # print(board)
    # print(f"\nPossible next moves ({len(readable_moves)}):")
    
    return readable_moves



In [5]:
def create_conversation(gameData):
    game = []
    res = []
    for i, move in enumerate(gameData["moves"]):
        game.append(move)
        nextMovesPossible = predict_next_moves(game)
        strMovesPossible = ', '.join(nextMovesPossible)
        pgn = format_chess_moves(game)

        # Vérifiez si l'indice est pair
        if i % 2 == 0 and (i + 1 != len(gameData["moves"])):
            res.append({"role": "user", "content": f"You can answer only one of the moves in the following list: \n {strMovesPossible}\n\nCurrent game in PGN format: {pgn}\n\nWhat is your next move?\n"})
            res.append({"role": "assistant", "content": gameData["moves"][i + 1]})

    return {"conversations": res}



In [6]:
# Parse the PGN file
gamesData = parse_pgn_file("chess_games/test.pgn")

for gameData in gamesData:
    conversation = create_conversation(gameData)
    print(conversation)

{'conversations': [{'role': 'user', 'content': 'You can answer only one of the moves in the following list: \n Nh6, Nf6, Nc6, Na6, h6, g6, f6, e6, d6, c6, b6, a6, h5, g5, f5, e5, d5, c5, b5, a5\n\nCurrent game in PGN format: 1.e4\n\nWhat is your next move?\n'}, {'role': 'assistant', 'content': 'e5'}, {'role': 'user', 'content': 'You can answer only one of the moves in the following list: \n Ne7, Nh6, Nf6, Be7, Bd6, Bc5, Bb4, Ba3, Ke7, Qe7, Qf6, Qg5, Qh4, Nc6, Na6, h6, g6, f6, d6, c6, b6, a6, h5, g5, f5, d5, c5, b5, a5\n\nCurrent game in PGN format: 1.e4 e5 2.Nf3\n\nWhat is your next move?\n'}, {'role': 'assistant', 'content': 'Nc6'}, {'role': 'user', 'content': 'You can answer only one of the moves in the following list: \n Nge7, Nh6, Nf6, Be7, Bd6, Bc5, Bb4, Ba3, Ke7, Qe7, Qf6, Qg5, Qh4, Rb8, Nb8, Nce7, Na5, Nd4, Nb4, h6, g6, f6, d6, b6, a6, h5, g5, f5, d5, a5\n\nCurrent game in PGN format: 1.e4 e5 2.Nf3 Nc6 3.Bb5\n\nWhat is your next move?\n'}, {'role': 'assistant', 'content': 'Nf6'}

In [7]:
import os

folder_path = 'chess_games'

# List all files in the folder
files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]

print(len(files))


252


In [None]:
import json 
for i, file in enumerate(files):
    print(str(i + 1) + "/" + str(len(files)) + " - " + file)
    gamesData = parse_pgn_file(f"chess_games/{file}")
    
    convs = []

    for j, gameData in enumerate(gamesData):
        conversation = create_conversation(gameData)
        convs.append(conversation)
        if j % 100 == 0:
            print(f"  Processed game {j + 1}/{len(gamesData)} in file {file}")

    output_file = "finetuning_data_1/"+file+'.jsonl'

    with open(output_file, 'w') as f:
        for entry in convs:
            json.dump(entry, f)
            f.write('\n')

    print(f"Data has been written to {output_file}") 

1/252 - Shirov.pgn
  Processed game 1/5644 in file Shirov.pgn
  Processed game 101/5644 in file Shirov.pgn
  Processed game 201/5644 in file Shirov.pgn
  Processed game 301/5644 in file Shirov.pgn
  Processed game 401/5644 in file Shirov.pgn
  Processed game 501/5644 in file Shirov.pgn
  Processed game 601/5644 in file Shirov.pgn
  Processed game 701/5644 in file Shirov.pgn
  Processed game 801/5644 in file Shirov.pgn
  Processed game 901/5644 in file Shirov.pgn
  Processed game 1001/5644 in file Shirov.pgn
  Processed game 1101/5644 in file Shirov.pgn
  Processed game 1201/5644 in file Shirov.pgn
  Processed game 1301/5644 in file Shirov.pgn
  Processed game 1401/5644 in file Shirov.pgn
  Processed game 1501/5644 in file Shirov.pgn
  Processed game 1601/5644 in file Shirov.pgn
  Processed game 1701/5644 in file Shirov.pgn
  Processed game 1801/5644 in file Shirov.pgn
  Processed game 1901/5644 in file Shirov.pgn
  Processed game 2001/5644 in file Shirov.pgn
  Processed game 2101/5644 

In [10]:
import os

def compter_lignes_jsonl(dossier):
    total_lignes = 0

    # Parcourir tous les fichiers dans le dossier
    for nom_fichier in os.listdir(dossier):
        # Vérifier si le fichier est un fichier .jsonl
        if nom_fichier.endswith('.jsonl'):
            chemin_fichier = os.path.join(dossier, nom_fichier)

            # Ouvrir le fichier et compter les lignes
            with open(chemin_fichier, 'r', encoding='utf-8') as fichier:
                nombre_lignes = sum(1 for _ in fichier)
                total_lignes += nombre_lignes

    return total_lignes

# Exemple d'utilisation
dossier = 'finetuning_data_1'
nombre_total_lignes = compter_lignes_jsonl(dossier)
print(f"Le nombre total de lignes dans tous les fichiers .jsonl est : {nombre_total_lignes}")


Le nombre total de lignes dans tous les fichiers .jsonl est : 503368


In [1]:
import os
import json
import glob

def replace_conversations_with_messages(directory_path):
    """
    Find all .jsonl files in the specified directory and replace 'conversations' keys with 'messages'.
    
    Args:
        directory_path (str): Path to the directory containing .jsonl files
    """
    # Check if directory exists
    if not os.path.isdir(directory_path):
        print(f"Error: Directory '{directory_path}' not found.")
        return
    
    # Find all .jsonl files in the directory
    jsonl_files = glob.glob(os.path.join(directory_path, "*.jsonl"))
    
    if not jsonl_files:
        print(f"No .jsonl files found in '{directory_path}'.")
        return
    
    print(f"Found {len(jsonl_files)} .jsonl files in '{directory_path}'.")
    
    for file_path in jsonl_files:
        process_file(file_path)
    
    print("Processing complete.")

def process_file(file_path):
    """Process a single .jsonl file to replace 'conversations' with 'messages'."""
    temp_file_path = file_path + ".temp"
    replaced_count = 0
    
    try:
        with open(file_path, 'r', encoding='utf-8') as input_file, \
             open(temp_file_path, 'w', encoding='utf-8') as output_file:
            
            for line_number, line in enumerate(input_file, 1):
                try:
                    # Parse the JSON object
                    data = json.loads(line.strip())
                    
                    # Replace 'conversations' key with 'messages' if present
                    if 'conversations' in data:
                        data['messages'] = data.pop('conversations')
                        replaced_count += 1
                    
                    # Write the modified JSON object back to the file
                    output_file.write(json.dumps(data) + '\n')
                    
                except json.JSONDecodeError:
                    print(f"Warning: Invalid JSON at line {line_number} in file {file_path}. Skipping line.")
                    output_file.write(line)  # Write the line as is
        
        # Replace the original file with the temporary file
        os.replace(temp_file_path, file_path)
        print(f"Processed {file_path}: Replaced {replaced_count} 'conversations' keys with 'messages'.")
        
    except Exception as e:
        print(f"Error processing file {file_path}: {str(e)}")
        # Clean up the temporary file if it exists
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)


directory = "finetuning_data_1"  # Path to your directory
replace_conversations_with_messages(directory)

Found 251 .jsonl files in 'finetuning_data_1'.
Processed finetuning_data_1/Huebner.pgn.jsonl: Replaced 0 'conversations' keys with 'messages'.
Processed finetuning_data_1/Andersson.pgn.jsonl: Replaced 2764 'conversations' keys with 'messages'.
Processed finetuning_data_1/Capablanca.pgn.jsonl: Replaced 597 'conversations' keys with 'messages'.
Processed finetuning_data_1/Bogoljubow.pgn.jsonl: Replaced 973 'conversations' keys with 'messages'.
Processed finetuning_data_1/Rohde.pgn.jsonl: Replaced 0 'conversations' keys with 'messages'.
Processed finetuning_data_1/Bird.pgn.jsonl: Replaced 353 'conversations' keys with 'messages'.
Processed finetuning_data_1/Stefanova.pgn.jsonl: Replaced 0 'conversations' keys with 'messages'.
Processed finetuning_data_1/Euwe.pgn.jsonl: Replaced 1122 'conversations' keys with 'messages'.
Processed finetuning_data_1/Najer.pgn.jsonl: Replaced 0 'conversations' keys with 'messages'.
Processed finetuning_data_1/Akopian.pgn.jsonl: Replaced 1996 'conversations' 

In [2]:
import os
import json
import glob

def merge_jsonl_files(directory_path, output_file="all.jsonl"):
    """
    Merge all .jsonl files in the specified directory into a single .jsonl file.
    
    Args:
        directory_path (str): Path to the directory containing .jsonl files
        output_file (str): Name of the output merged file
    """
    # Check if directory exists
    if not os.path.isdir(directory_path):
        print(f"Error: Directory '{directory_path}' not found.")
        return
    
    # Find all .jsonl files in the directory
    jsonl_files = glob.glob(os.path.join(directory_path, "*.jsonl"))
    
    if not jsonl_files:
        print(f"No .jsonl files found in '{directory_path}'.")
        return
    
    # Create full path for output file
    output_path = os.path.join(os.getcwd(), output_file)
    
    print(f"Found {len(jsonl_files)} .jsonl files in '{directory_path}'.")
    print(f"Merging into '{output_path}'...")
    
    # Track statistics
    total_records = 0
    processed_files = 0
    
    try:
        with open(output_path, 'w', encoding='utf-8') as output:
            for file_path in jsonl_files:
                file_records = process_file_fusion(file_path, output)
                if file_records > 0:
                    processed_files += 1
                    total_records += file_records
    
        print("Merge complete!")
        print(f"Successfully processed {processed_files} files")
        print(f"Total records merged: {total_records}")
        print(f"Output file: {output_path}")
        
    except Exception as e:
        print(f"Error during merge process: {str(e)}")

def process_file_fusion(file_path, output_file):
    """
    Process a single .jsonl file and write its contents to the output file.
    
    Args:
        file_path (str): Path to the input .jsonl file
        output_file: File object for writing output
        
    Returns:
        int: Number of records processed from this file
    """
    file_name = os.path.basename(file_path)
    records_count = 0
    
    try:
        with open(file_path, 'r', encoding='utf-8') as input_file:
            for line_number, line in enumerate(input_file, 1):
                try:
                    # Validate that the line contains valid JSON
                    json.loads(line.strip())
                    
                    # Write the line to the output file
                    output_file.write(line)
                    records_count += 1
                    
                except json.JSONDecodeError:
                    print(f"Warning: Invalid JSON at line {line_number} in file {file_name}. Skipping line.")
        
        print(f"Added {records_count} records from {file_name}")
        return records_count
        
    except Exception as e:
        print(f"Error processing file {file_name}: {str(e)}")
        return 0

directory = "finetuning_data_1"  # Path to your directory
merge_jsonl_files(directory)

Found 251 .jsonl files in 'finetuning_data_1'.
Merging into '/Users/hugophilipp/Documents/perso/llm_4_chess/all.jsonl'...
Added 0 records from Huebner.pgn.jsonl
Added 2764 records from Andersson.pgn.jsonl
Added 597 records from Capablanca.pgn.jsonl
Added 973 records from Bogoljubow.pgn.jsonl
Added 0 records from Rohde.pgn.jsonl
Added 353 records from Bird.pgn.jsonl
Added 0 records from Stefanova.pgn.jsonl
Added 1122 records from Euwe.pgn.jsonl
Added 0 records from Najer.pgn.jsonl
Added 1996 records from Akopian.pgn.jsonl
Added 784 records from IvanovI.pgn.jsonl
Added 0 records from Winawer.pgn.jsonl
Added 5918 records from Grischuk.pgn.jsonl
Added 1324 records from Azmaiparashvili.pgn.jsonl
Added 1013 records from Byrne.pgn.jsonl
Added 2023 records from Bareev.pgn.jsonl
Added 0 records from Schlechter.pgn.jsonl
Added 626 records from Mecking.pgn.jsonl
Added 0 records from Shabalov.pgn.jsonl
Added 0 records from Tal.pgn.jsonl
Added 2334 records from Krush.pgn.jsonl
Added 3871 records fr

In [3]:
def extract_lines(file_path, line_numbers):
    extracted_lines = {}
    with open(file_path, 'r', encoding='utf-8') as file:
        for i, line in enumerate(file, start=1):
            if i in line_numbers:
                extracted_lines[i] = line.strip()
            if i > max(line_numbers):
                break  # Stop reading early if we've got all needed lines
    return extracted_lines


file_path = "all.jsonl"
line_numbers = {215, 237, 3748, 4388, 5529}
result = extract_lines(file_path, line_numbers)

for line_num, content in result.items():
    print(f"Line {line_num}: {content}")

Line 215: {"messages": []}
Line 237: {"messages": []}
Line 3748: {"messages": []}
Line 4388: {"messages": []}
Line 5529: {"messages": []}


In [4]:
def remove_empty_messages(file_path, output_path):
    with open(file_path, 'r', encoding='utf-8') as infile, open(output_path, 'w', encoding='utf-8') as outfile:
        for line in infile:
            if line.strip() != '{"messages": []}':
                outfile.write(line)


file_path = "all.jsonl"
output_path = "filtered_all.jsonl"
remove_empty_messages(file_path, output_path)
print("Filtered file saved as", output_path)

Filtered file saved as filtered_all.jsonl


In [5]:
import random

def split_data(file_path, train_path, val_path, train_ratio=0.99):
    with open(file_path, 'r', encoding='utf-8') as infile:
        lines = infile.readlines()
    
    random.shuffle(lines)
    split_idx = int(len(lines) * train_ratio)
    
    with open(train_path, 'w', encoding='utf-8') as train_file:
        train_file.writelines(lines[:split_idx])
    
    with open(val_path, 'w', encoding='utf-8') as val_file:
        val_file.writelines(lines[split_idx:])
    
    print(f"Training data saved in {train_path} ({train_ratio * 100}%)")
    print(f"Validation data saved in {val_path} ({(1 - train_ratio) * 100}%)")


file_path = "filtered_all.jsonl"
train_path = "train.jsonl"
val_path = "val.jsonl"
split_data(file_path, train_path, val_path)


Training data saved in train.jsonl (99.0%)
Validation data saved in val.jsonl (1.0000000000000009%)
