In [1]:
import sys

In [2]:
sys.path.append("../../")
sys.path.append("../../../")

In [3]:
import os
import numpy as np
from pathlib import Path
from tqdm import tqdm
import json

In [4]:
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import TensorDataset

In [5]:
from chess import Board
from chess import pgn
from chess.pgn import Game
import chess.engine

In [6]:
from weela_chess.data_io.convert_pgn_to_np import board_to_matrix
from weela_chess.chess_utils.all_possible_moves import all_uci_codes_to_moves
from sandbox.pytorch_mnist.pytorch_mnist_main import pytorch_train, pytorch_test

2025-03-07 19:41:54.409162: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1741394514.428767  186521 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1741394514.434547  186521 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-03-07 19:41:54.456405: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


tf.debugging.set_log_device_placement(True)

# Load Files

In [10]:
N_FILES_TO_LOAD = 30
data_dir = Path("/home/gerk/sts_after_images/lichess_elite_np_db")

In [8]:
int_to_move = json.loads((data_dir / "int_to_move.json").read_text())
int_to_move = {int(k): v for k, v in int_to_move.items()}

In [11]:
x, y = [], []
n_moves = None

i = 0
while True:
    if N_FILES_TO_LOAD is not None and i >= N_FILES_TO_LOAD:
        break

    x_file, y_file = Path(data_dir / f"elite_db_x_{i}.npy"), Path(data_dir / f"elite_db_min_ohe_y_{i}.npy")
    if not x_file.exists() or not y_file.exists():
        break
    print(f"loading from file number: {i}")

    x.extend(np.load(x_file))
    y.extend(np.load(y_file))
    n_moves = len(y[0])
    i = i + 1
    print(f"There are now {len(x)} move examples")

x, y = np.array(x), np.array(y)
y = np.argmax(y, axis=-1)

loading from file number: 0
There are now 7000 move examples
loading from file number: 1
There are now 14000 move examples
loading from file number: 2
There are now 21000 move examples
loading from file number: 3
There are now 28000 move examples
loading from file number: 4
There are now 35000 move examples
loading from file number: 5
There are now 42000 move examples
loading from file number: 6
There are now 49000 move examples
loading from file number: 7
There are now 56000 move examples
loading from file number: 8
There are now 63000 move examples
loading from file number: 9
There are now 70000 move examples
loading from file number: 10
There are now 77000 move examples
loading from file number: 11
There are now 84000 move examples
loading from file number: 12
There are now 91000 move examples
loading from file number: 13
There are now 98000 move examples
loading from file number: 14
There are now 105000 move examples
loading from file number: 15
There are now 112000 move examples
l

In [12]:
x_tensor = torch.from_numpy(x).float()  # Ensure correct data type (e.g., float32)
y_tensor = torch.from_numpy(y).long()  # For labels, use long

train_slice_idx = int((len(x) * 0.9))

train_kwargs = {'batch_size': 64}
test_kwargs = {'batch_size': 1000}
cuda_kwargs = {'num_workers': 0,
               'pin_memory': True,
               'shuffle': True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)

x_train, y_train = x_tensor[:train_slice_idx], y_tensor[:train_slice_idx]
x_test, y_test = x_tensor[train_slice_idx:], y_tensor[train_slice_idx:]

train_dataset = TensorDataset(x_train, y_train)
test_dataset = TensorDataset(x_test, y_test)

train_loader = torch.utils.data.DataLoader(train_dataset, **train_kwargs)
test_loader = torch.utils.data.DataLoader(test_dataset, **test_kwargs)

# Define Model

In [13]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(8, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(512, 128)
        self.fc2 = nn.Linear(128, len(all_uci_codes_to_moves()))

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

In [14]:
torch.manual_seed(seed=42)
device = torch.device("cuda")
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=1.0)
scheduler = StepLR(optimizer, step_size=1, gamma=0.7)

# Train Model

In [15]:
patience = 10
counter = 0
best_model_state, best_loss_so_far = model.state_dict(), 9999999

In [16]:
for epoch in range(1, 51):
    pytorch_train(model, device, train_loader, optimizer, epoch)
    test_loss = pytorch_test(model, device, test_loader)
    scheduler.step()

    if test_loss < best_loss_so_far:
        best_model_state = model.state_dict()
        best_loss_so_far = test_loss
        counter = 0
    else:
        counter += 1
        if counter > patience:
            break


Test set: Average loss: 6.4460, Accuracy: 887/21000 (4%)


Test set: Average loss: 6.3511, Accuracy: 1068/21000 (5%)


Test set: Average loss: 6.3319, Accuracy: 1179/21000 (6%)



KeyboardInterrupt: 

In [None]:
torch.save(best_model_state, "torch_chess_model.pt")

# Evaluate Model

In [None]:
model = torch.load("torch_chess_model.pt", weights_only=True)

## Play a Game

In [None]:
def predict_next_move(model, board, int_to_move):
    board_matrix = board_to_matrix(board).reshape(1, 8, 8, 12)
    predictions = model.predict(board_matrix)[0]
    legal_moves = list(board.legal_moves)
    legal_moves_uci = [move.uci() for move in legal_moves]
    sorted_indices = np.argsort(predictions)[::-1]
    for move_index in sorted_indices:
        move = int_to_move[move_index]
        if move in legal_moves_uci:
            return move
    return None

transport, engine = await chess.engine.popen_uci("/home/gerk/sts_after_images/weela_chess_recreate/sandbox/stockfish/stockfish-ubuntu-x86-64-avx2")
await engine.configure({"Skill Level": 1})

# +
async def stockfish_game_iter():
    board = Board()
    limit = chess.engine.Limit(time=0.1)

    while not board.is_game_over():
        next_move = predict_next_move(model, board, int_to_move)
        board.push_uci(next_move)
        yield board

        #pshhh, as if
        if board.is_game_over():
            break

        next_stockfish_move = await engine.play(board, limit)
        board.push(next_stockfish_move.move)
        yield board

stockfish_game = stockfish_game_iter()
# -

next_move = await stockfish_game.__anext__()
next_move