In [None]:
import re
import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel, Trainer, TrainingArguments
from torch.utils.data import Dataset


def clean_moves(game_text):
    cleaned_text = re.sub(r"\b\d+\.\s*", "", game_text)
    return cleaned_text

def contains_evaluations(game_text):
    return bool(re.search(r"\{ \[%eval .*?\] \}", game_text))

def load_games(file_path, max_games, prefix="Début de la partie :"):
    with open(file_path, 'r') as file:
        content = file.read()

    games = content.split("[Event ")
    
    games_text = []
    i = 0
    for game in games[1:]:
        lines = game.splitlines()
        moves = []
        result = None

        for line in lines:
            line = line.strip()
            if not line.endswith("]") and line:
                moves.append(line)

        if moves:
            game_text = " ".join(moves)
            cleaned_game_text = clean_moves(game_text)

            if contains_evaluations(cleaned_game_text):
                continue

            game_with_prefix = f"{prefix} {cleaned_game_text}".strip()
            games_text.append(game_with_prefix)
        
        i += 1
        if i == max_games:
            break

    return games_text

pgn_file_path = "chess-files/lichess_db_standard_rated_2014-09.pgn"
games_text = load_games(pgn_file_path, max_games=50000)

In [None]:
class ChessDataset(Dataset):
    def __init__(self, games, tokenizer, max_length=1024):
        self.games = games
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.games)

    def __getitem__(self, idx):
        game_text = self.games[idx]
        encodings = self.tokenizer(
            game_text,
            return_tensors="pt",
            max_length=self.max_length,
            padding="max_length",
            truncation=True
        )
        input_ids = encodings["input_ids"].squeeze()
        attention_mask = encodings["attention_mask"].squeeze()
        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": input_ids,
        }

In [None]:
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token 

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]



In [None]:
from sklearn.model_selection import train_test_split

train_sequences, val_sequences = train_test_split(games_text, test_size=0.1)

train_dataset = ChessDataset(train_sequences, tokenizer)
eval_dataset = ChessDataset(val_sequences, tokenizer)

In [None]:
from torch import cuda

model = GPT2LMHeadModel.from_pretrained("gpt2-medium")

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=2,
    per_device_train_batch_size=2,
    save_steps=1000,
    logging_steps=200,
    logging_dir="./logs",
    save_total_limit=2,
    gradient_accumulation_steps=2,
    learning_rate=1.7e-5,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
)

device = "cuda" if cuda.is_available() else "cpu"
print(f"L'appareil utilisé est : {device}")

trainer.train()


config.json:   0%|          | 0.00/718 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.52G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

L'appareil utilisé est : cuda


[34m[1mwandb[0m: Using wandb-core as the SDK backend. Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
[34m[1mwandb[0m: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

  ········


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


Step,Training Loss
200,0.5777
400,0.3438
600,0.3315
800,0.3236
1000,0.3298
1200,0.3088
1400,0.3126
1600,0.3186
1800,0.294
2000,0.2975


## ENTRAINEMENT FROM SCRATCH

In [None]:
from transformers import GPT2Tokenizer, GPT2LMHeadModel, GPT2Config, TrainingArguments, Trainer

tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token 


config = GPT2Config(
    vocab_size=tokenizer.vocab_size, 
    n_positions=1024,               
    n_embd=1024,                   
    n_layer=24,                  
    n_head=16,                    
)

model = GPT2LMHeadModel(config)

model.resize_token_embeddings(len(tokenizer))

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=2,
    per_device_train_batch_size=2, 
    save_steps=1000,
    logging_steps=200,
    logging_dir="./logs",
    save_total_limit=2,
    gradient_accumulation_steps=2
    learning_rate=1.5e-5,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset, 
    eval_dataset=eval_dataset,
)

from torch import cuda
device = "cuda" if cuda.is_available() else "cpu"
print(f"L'appareil utilisé est : {device}")

trainer.train()

L'appareil utilisé est : cuda


[34m[1mwandb[0m: Using wandb-core as the SDK backend. Please refer to https://wandb.me/wandb-core for more information.
[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
[34m[1mwandb[0m: Paste an API key from your profile and hit enter, or press ctrl+c to quit:

  ········


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc


VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.011113029833333283, max=1.0…

Step,Training Loss
200,0.7506
400,0.4261
600,0.4106
800,0.3998
1000,0.3946
1200,0.3937
1400,0.3773
1600,0.3685
1800,0.3798
2000,0.3614


KeyboardInterrupt: 

## TEST DU MODELE

In [None]:
import chess
import chess.pgn
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer

model_path = "models/best"
tokenizer_path = "gpt2"

tokenizer = GPT2Tokenizer.from_pretrained(tokenizer_path)
tokenizer.pad_token = tokenizer.eos_token

model = GPT2LMHeadModel.from_pretrained(model_path)
model.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

  from .autonotebook import tqdm as notebook_tqdm


GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(50257, 1024)
    (wpe): Embedding(1024, 1024)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-23): 24 x GPT2Block(
        (ln_1): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2SdpaAttention(
          (c_attn): Conv1D(nf=3072, nx=1024)
          (c_proj): Conv1D(nf=1024, nx=1024)
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
        (mlp): GPT2MLP(
          (c_fc): Conv1D(nf=4096, nx=1024)
          (c_proj): Conv1D(nf=1024, nx=4096)
          (act): NewGELUActivation()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
  )
  (lm_head): Linear(in_features=1024, out_features=50257, bias=False)
)

In [None]:
import chess
import chess.svg
from IPython.display import SVG
import IPython.display
from IPython.display import display, clear_output

def check_identical_scores(scores):
    """
    Compare chaque sous-liste de scores pour vérifier si elles sont identiques.
    """
    print("Vérification des sous-listes de scores...")
    identical_count = 0
    for i in range(1, len(scores)):
        if torch.allclose(scores[i], scores[i - 1], atol=1e-6):
            identical_count += 1
            print(f"Les scores de l'étape {i} sont identiques à ceux de l'étape {i-1}.")
        else:
            print(f"Les scores de l'étape {i} diffèrent de ceux de l'étape {i-1}.")
    
    print(f"Nombre total de sous-listes identiques : {identical_count}/{len(scores)}")
    return identical_count


def get_keypress():
    return input("Appuyez sur Entrée pour continuer ou tapez 'q' pour quitter : ").strip()

def decode_and_display_tokens(sequence, tokenizer):
    print(f"Décodage de la séquence : {sequence}\n")
    
    for i, token_id in enumerate(sequence):
        token_text = tokenizer.decode([token_id], skip_special_tokens=True)
        
        print(f"Token {i+1}: '{token_text}' (ID: {token_id})")


def generate_candidate_moves(input_sequence, num_candidates=10, tokens_for_generation=5):
    """
    Générer des séquences candidates avec leurs probabilités associées.
    """
    encodings = tokenizer(
        input_sequence,
        return_tensors="pt",
        padding=True,
        truncation=True,
    )
    input_ids = encodings["input_ids"].to(device)
    attention_mask = encodings["attention_mask"].to(device)

    max_length = input_ids.size(1) + tokens_for_generation

    output_sequences = model.generate(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_length=max_length,
        num_return_sequences=num_candidates,
        no_repeat_ngram_size=2,
        do_sample=True,
        top_k=50,
        top_p=0.95,
        temperature=0.7,
        output_scores=True,
        return_dict_in_generate=True,
    )

    sequences = output_sequences.sequences
    scores = output_sequences.scores

    candidate_moves = []
    for idx, sequence in enumerate(sequences):

        generated_tokens = sequence[len(input_ids[0]):]

        next_move_tokens = []
        space_count = 0

        for token in generated_tokens:
            token_text = tokenizer.decode([token], skip_special_tokens=True)
            next_move_tokens.append(token_text)

            if " " in token_text:
                space_count += token_text.count(" ")

            if space_count == 2:
                break

        next_move = "".join(next_move_tokens)

        second_space_index = next_move.find(" ", next_move.find(" ") + 1)
        if second_space_index != -1 and second_space_index + 1 < len(next_move):
            next_move = next_move[:second_space_index]


        tokenized_move = tokenizer.encode(next_move, add_special_tokens=False)

        step_scores = [scores[token_idx][idx] for token_idx in range(len(generated_tokens))]

        if len(tokenized_move) > len(step_scores):
            tokenized_move = tokenized_move[:len(step_scores)]

        move_probability = calculate_probability(step_scores, tokenized_move)
        candidate_moves.append((next_move, move_probability))

    return candidate_moves


def calculate_probability(scores, tokenized_move):
    prob = 1.0
    
    for i, (token_id, step_score) in enumerate(zip(tokenized_move, scores)):
        probabilities = torch.softmax(step_score, dim=-1)
        
        token_prob = probabilities[token_id].item()
        prob *= token_prob

        token_text = tokenizer.decode([token_id], skip_special_tokens=True)
        
    return prob

def apply_move_to_board(board, move_text):
    try:
        move = board.parse_san(move_text)  # Convertir le texte en un coup compréhensible par python-chess
        board.push(move)  # Appliquer le mouvement à l'échiquier
        return True
    except ValueError:
        return False


def display_board(board):
    clear_output(wait=True)  # Nettoyer la sortie précédente
    svg_code = chess.svg.board(board=board, size=400)  # Ajuster la taille si nécessaire
    display(SVG(svg_code))


def test_chess_model(num_candidates=10, tokens_for_generation=5):
    board = chess.Board()
    input_sequence = "Début de la partie :"
    
    print("Appuyez sur Entrée pour générer un coup, ou 'q' pour quitter.")

    while not board.is_game_over():
        display_board(board)
        print("\nMove history:", input_sequence)

        candidates = generate_candidate_moves(input_sequence, num_candidates, tokens_for_generation)
        print("\nCandidates and probabilities:")
        for move, prob in candidates:
            print(f"{move.strip()}: {prob:.4f}")

        legal_moves = [(move.strip(), prob) for move, prob in candidates if apply_move_to_board(board.copy(), move.strip())]
        if not legal_moves:
            print("No valid moves generated. Stopping.")
            break

        print("Appuyez sur Entrée pour continuer ou 'q' pour quitter.")
        key = get_keypress()

        print(f"Key pressed: {repr(key)}")

        if key.lower() == "q":
            print("Arrêt de la partie.")
            break
        elif key in ["\r", "\n", ""]:
            best_move, _ = max(legal_moves, key=lambda x: x[1])

            apply_move_to_board(board, best_move)
            input_sequence += f" {best_move}"
        else:
            print("Appuyez sur Entrée pour continuer ou 'q' pour quitter.")

    print(str(board.is_game_over()))
    print("\nPartie terminée. Résultat:", board.result())


def test_chess_model_vs_stockfish(num_candidates=10, tokens_for_generation=5):

    board = chess.Board()
    input_sequence = "Début de la partie :"
    stockfish.set_fen_position(board.fen())
    
    print("Appuyez sur Entrée pour jouer le prochain coup, ou 'q' pour quitter.")

    while not board.is_game_over():
        display_board(board)
        print("\nMove history:", input_sequence)

        if board.turn == chess.WHITE:
            print("Tour du modèle.")
            candidates = generate_candidate_moves(input_sequence, num_candidates, tokens_for_generation)
            print("\nCandidates and probabilities:")
            for move, prob in candidates:
                print(f"{move.strip()}: {prob:.4f}")

            legal_moves = [(move.strip(), prob) for move, prob in candidates if apply_move_to_board(board.copy(), move.strip())]
            if not legal_moves:
                print("No valid moves generated. Stopping.")
                break

            best_move, _ = max(legal_moves, key=lambda x: x[1])
            apply_move_to_board(board, best_move)
            input_sequence += f" {best_move}"
        else:
            print("Tour de Stockfish.")
            stockfish.set_fen_position(board.fen())
            stockfish_move = stockfish.get_best_move()
            if stockfish_move:
                move = board.parse_uci(stockfish_move)
                board.push(move)
                input_sequence += f" {board.san(move)}"
            else:
                print("Stockfish n'a pas pu jouer de coup. Fin de partie.")
                break

        if board.is_game_over():
            print("\nGame over!")
            print("Result:", board.result())
            break

        key = get_keypress()
        if key.lower() == "q":
            print("Arrêt de la partie.")
            break

    display_board(board)
    print("\nPartie terminée. Résultat:", board.result())



In [None]:
input_sequence = "Début de la partie : e4 e5 Nf3 Nc6 Bc4 Nf6 Ng5 d5 exd5 Na5 Bb5+ Qd7 Bxd7+ Bxd7 O-O Bg4 d3 Bxd1 Rxd1 Bc5 Re1 b5 c3 Bxf2+ Kh1 Bxe1"
candidates = generate_candidate_moves(input_sequence, num_candidates=10, tokens_for_generation=7)
print(candidates)

Setting `pad_token_id` to `eos_token_id`:None for open-end generation.


Scores shape: 5 tokens, torch.Size([10, 50257]) per token

Processing candidate sequence 1/10...
test cut spaces:  N
Prochain coup :  dxe5
longueur du coup : 5
Nombre de tokens dans 'tokenized_move': 3
longueur des step_scores : 5

Calcul des probabilités pour le coup : [288, 27705, 20] (en tokens)
Token 1: ' d' (ID: 288) - Probabilité: 0.857855
Token 2: 'xe' (ID: 27705) - Probabilité: 1.000000
Token 3: '5' (ID: 20) - Probabilité: 1.000000
Probabilité cumulative pour le coup : 0.857855


Processing candidate sequence 2/10...
test cut spaces:  N
Prochain coup :  dxe5
longueur du coup : 5
Nombre de tokens dans 'tokenized_move': 3
longueur des step_scores : 5

Calcul des probabilités pour le coup : [288, 27705, 20] (en tokens)
Token 1: ' d' (ID: 288) - Probabilité: 0.857855
Token 2: 'xe' (ID: 27705) - Probabilité: 1.000000
Token 3: '5' (ID: 20) - Probabilité: 1.000000
Probabilité cumulative pour le coup : 0.857855


Processing candidate sequence 3/10...
test cut spaces:  f
Prochain coup :

In [None]:
# Lancer le test interactif
test_chess_model(num_candidates=30, tokens_for_generation=6)