## Chess AI
construct $f(p)$ as a 3 layer deep 2048 units wide artificial neural network\
for each move, $f(p) = \max\limits_{p\rightarrow p_0} - f(p_0)$\

In [2]:
import os
import chess
import chess.pgn
import time
import math
from tqdm import tqdm
import random
import logging
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader, Dataset
mp.set_start_method("spawn")
logger = logging.getLogger(__name__)
logging.basicConfig(filename='myapp.log', level=logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG) 
formatter = logging.Formatter(
    "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

LAMBDA = 10
EPOCHS = 10
KAPPA = 1

### Prepare dataset
1. Players will choose an optimal or near-optimal move. This means that for two position in succession 
$p \rightarrow q$ observed in the game, we will have $f(p) = -f(q)$
2. For the same reason above, going from $p$ not to $q$, but to a random position $r$, we must have $f(r) > f(q)$ because the random position is better for the next player and worse for the player that made the move.

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("dimitrioskourtikakis/gm-games-chesscom")

print("Path to dataset files:", path)

In [None]:
import pandas
chessGames = pandas.read_csv(path+"/GM_games_dataset.csv", chunksize=1000)
# copy the "pgn" column to a pgn file
for i, chunk in enumerate(tqdm(chessGames)):
    with open(f"./data/chessGame{i}.pgn", "w") as f:
        for pgn in chunk['pgn']:
            f.write(pgn + "\n\n")

In [4]:
from chessModel import ChessDataset, ChessValueNetwork, objective_function, piece_to_index

def lr_lambda(current_epoch):
    current_time = time.time()  # 当前时间戳
    elapsed_time = current_time - t0
    return math.exp(-elapsed_time / 86400)

model = ChessValueNetwork().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9, nesterov=True)
t0 = time.time()
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)

In [5]:
model.load_state_dict(torch.load("1.82.pth"))

<All keys matched successfully>

In [6]:
def board2vec(board, flip=False):
        vec = np.zeros((12, 8, 8), dtype=np.float32)
        for square in chess.SQUARES:
            piece = board.piece_at(square)
            if piece is not None:
                piece_index = piece_to_index[piece.symbol()]
                row, col = divmod(square, 8)
                if flip:
                    # 翻转行
                    row = 7 - row
                    # 翻转棋子颜色
                    if piece_index < 6:  
                        piece_index += 6 
                    else: 
                        piece_index -= 6  
                vec[piece_index, row, col] = 1
        vec = torch.tensor(vec, dtype=torch.float32)
        return vec
# 训练代码
def train_model(model, optimizer, scheduler, objective_function, pgn_files, test_pgn, device, EPOCHS, KAPPA, logger):
    logger.info("creating dataset for test data")
    test_dataset = ChessDataset(test_pgn, device=device)
    test_dataloader = DataLoader(test_dataset, batch_size=64)
    best_loss = float("inf")
    for epoch in range(EPOCHS):
        total_loss = 0
        try:
            for pgn_file in pgn_files:
                model.train()
                # 为每个PGN文件创建数据集和数据加载器
                logger.info("creating dataset for " + pgn_file)
                chess_dataset = ChessDataset(pgn_file, device=device)
                dataloader = DataLoader(chess_dataset, batch_size=64)
                
                for batch_idx, (p, q, r) in enumerate(dataloader):
                    p, q, r = p.to(device), q.to(device), r.to(device)
                    optimizer.zero_grad()
                    loss = objective_function(model, p, q, r, KAPPA)
                    loss.backward()
                    optimizer.step()
                    total_loss += loss.item()
                    if batch_idx % 1000 == 0:
                        logger.info(f"Epoch [{epoch+1}/{EPOCHS}], PGN File [{pgn_file}], Batch [{batch_idx}], Loss: {loss.item():.4f}, lr: {scheduler.get_last_lr()[0]}")
                        
                model.eval()
                test_loss = 0
                for batch_idx, (p, q, r) in enumerate(test_dataloader):
                    p, q, r = p.to(device), q.to(device), r.to(device)
                    loss = objective_function(model, p, q, r, KAPPA)
                    test_loss += loss.item()
                logger.info("test loss:" + str(test_loss/len(test_dataloader)))
                if test_loss < best_loss:
                    torch.save(model.state_dict(), str(round(test_loss, 2))+".pth")
                    logger.info("New best loss, saving model to" + str(round(test_loss, 2))+".pth")
                    best_loss = test_loss
                scheduler.step()
                # checkmate 1-0
                board = chess.Board("rnbqkbnr/pppppppp/8/8/4P3/8/PPPP1PPP/RNBQKBNR b KQkq e3 0 1")
                vec = board2vec(board).to(device)
                vec = vec.unsqueeze(0)
                logger.info("1-0")
                logger.info(model(vec))
                # black 5.0
                board = chess.Board("rnb1q3/pppp1kpp/8/6b1/8/8/PPPP1PPP/RNB1K2R w KQ - 0 10")
                vec = board2vec(board).to(device)
                vec = vec.unsqueeze(0)
                logger.info("0-1")
                logger.info(model(vec))
                del chess_dataset, dataloader, p, q, r
        except KeyboardInterrupt:
            raise
        except Exception as e:
            logger.error(f"An error occurred: {e}")
        
        logger.info(f"Epoch [{epoch+1}/{EPOCHS}], Total Loss: {total_loss:.4f}")

if __name__ == "__main__":
    pgn_files = []
    for root, dirs, files in os.walk('./data'):
        for file in files:
            pgn_files.append("./data/" + file)
    random.shuffle(pgn_files)
    test_pgn = pgn_files[-1]
    pgn_files.pop(-1)
    train_model(model, optimizer, scheduler, objective_function, pgn_files, test_pgn, device, EPOCHS, KAPPA, logger)


2025-01-21 10:02:24,184 - __main__ - INFO - creating dataset for test data


KeyboardInterrupt: 

In [None]:
pgn_file = "./data/chessGame0.pgn"
logger.info("creating dataset for " + pgn_file)
chess_dataset = ChessDataset(pgn_file, device=device)
dataloader = DataLoader(chess_dataset, batch_size=1, shuffle=True)

In [None]:
total_loss = 0
model.train()
for batch_idx, (p, q, r) in enumerate(dataloader):
    p, q, r = p.to(device), q.to(device), r.to(device)
    optimizer.zero_grad()
    loss = objective_function(model, p, q, r, KAPPA)
    loss.backward()
    # 梯度裁剪
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
    
    optimizer.step()
    scheduler.step()
    total_loss += loss.item()
    
    if batch_idx % 1000 == 0:
        logger.info(f"PGN File [{pgn_file}], Batch [{batch_idx}], Loss: {loss.item():.4f}, lr: {scheduler.get_last_lr()[0]}")
        logger.info(model(vec))

In [6]:
torch.save(model.state_dict(), "1.pth")

In [8]:
board = chess.Board("2k5/p6p/1p6/5p2/1b6/3n4/K7/8 b - - 17 38")
vec = board2vec(board, flip=True).to(device)
vec = vec.unsqueeze(0)
model(vec)

tensor([[3.3145]], device='cuda:0', grad_fn=<AddmmBackward0>)