In [None]:
!pip install chess

In [None]:
#imports
import kagglehub

import os
import psutil
import time
from io import StringIO
from multiprocessing import Pool, cpu_count
from functools import lru_cache

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

import chess
import chess.engine
import chess.pgn
import chess.polyglot

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.python.keras.engine import data_adapter

In [None]:
#loading data and defining model save address
columnsUsed = ['Result', 'pgn', 'WhiteElo', 'BlackElo']
path = kagglehub.dataset_download("dimitrioskourtikakis/gm-games-chesscom")

model_path = '/content/ChessModel' #enter your own model path here
os.makedirs(model_path, exist_ok = True) #creates folder in case folder isnt already made

In [None]:
#reading data
df = pd.read_csv(f"{path}/GM_games_dataset.csv", usecols = columnsUsed)
print(df.columns)

In [None]:
#filtering data so that games are above a certain quality of play
data = df[
    (df['WhiteElo'] > 2700) &
    (df['BlackElo'] > 2700)
]

print(f"Filtered data size: {data.shape}")
data.to_csv("filtered_gm_games.csv", index = False)


In [None]:
#constants for multi + batch processing
chunk_size = 10000 #number of rows per chunk
chunk_dir = "/content/processed_chunks" #place your own file directory here
os.makedirs(chunk_dir, exist_ok = True)
num_processes = psutil.cpu_count(logical=True)
print(f'{num_processes} processes')

In [None]:
#some function that helps with how tf reads the data. Found it online after facing an error and it holds everything together. (https://jcbsv.net/2024/04/19/fix-tensorflow-missing-attribute-problem/)
def _is_distributed_dataset(ds):
    return isinstance(ds, data_adapter.input_lib.DistributedDatasetSpec)

data_adapter._is_distributed_dataset = _is_distributed_dataset

In [None]:
#preprocessing functions
def pgnToFen(pgn):
  fen_positions = []
  pgn_file = StringIO(pgn)
  game = chess.pgn.read_game(pgn_file)
  board = game.board()
  for move in game.mainline_moves():
    board.push(move)
    fen_positions.append(board.fen())

  return fen_positions

def chunk_processing(chunk, num_processes):
  def parallel_parse(pgnList):
    with Pool(processes = num_processes) as pool:
      return pool.map(pgnToFen, pgnList)

  tempFens = parallel_parse(chunk['pgn'])
  fens = [';'.join(fen) for fen in tempFens]
  return fens


def preprocessing(df, chunkSize, num_processes):
  df['FENs'] = None
  processedChunks = []
  numChunks = (len(df) + chunk_size - 1) // chunkSize

  result_mapping = {
      "1-0" : 1,
      "0-1" : 0,
      "1/2-1/2" : 0.5
  } # to calculate win probability
  df['result'] = df['Result'].map(result_mapping)

  for i in range(numChunks):
    start = i * chunk_size
    end = min((i + 1) * chunk_size, len(df))
    chunk = df.iloc[start : end].copy()
    print(f"Processing Chunk {i} ; rows {start} - {end}")
    df.iloc[start:end, df.columns.get_loc('FENs')] = chunk_processing(chunk, num_processes)
  return df[['FENs', 'result']]

In [None]:
#preprocessing the data
processed_data = preprocessing(data, chunk_size, num_processes)
print(processed_data.head())

In [None]:
#cleaning functions
def validFen(fen):
  components = fen.split(' ')
  if len(components) != 6:
    return False #FENs consist of 6 different segments

  boardComponent = components[0]
  ranks = boardComponent.split('/')
  if len(ranks) != 8:
    return False #The first segment consists of data about all 8 ranks on the chess board

  for rank in ranks:
    count = 0
    for char in rank:
      if char.isdigit():
        count += int(char)
      elif char in 'prbnqkPRBNQK':
        count += 1
      else:
        return False #The rank data consists of either piece notation or numbers (numbers indicate empty squares on the board)

  return True

def cleaner(processed_data):
  validRows = []
  validResults = []
  totalrows = len(processed_data)
  for index, (_, row) in enumerate(processed_data.iterrows()):
    fens = row['FENs'].split(';')
    validFENs = [fen for fen in fens if validFen(fen)]

    if validFENs:
      validRows.append(';'.join(validFENs))
      validResults.append(row['result'])
    if index % 1000 == 0 or index == totalrows:
      print(f'Cleaned {index}/{totalrows} rows')

  cleanData = processed_data.copy()
  cleanData['FENs'] = validRows
  cleanData['result'] = validResults
  print('Cleaning complete')
  return cleanData

In [None]:
#defining feature vectors

def fenToVector(fen):
  board = chess.Board(fen)
  piece_map = board.piece_map()
  features = np.zeros(64)

  for square, piece in piece_map.items():
    features[square] = piece.piece_type * (1 if piece.color == chess.WHITE else -1)
  return features

def processChunk(args):
  chunk, chunk_id = args
  chunk_features = []
  chunk_results = []

  for _, row in chunk.iterrows():
    for fen in row['FENs'].split(';'):
      feature = fenToVector(fen)
      if feature is not None:
        chunk_features.append(feature)
        chunk_results.append(row['result'])


  features_path = os.path.join(chunk_dir, f"features_chunk_{chunk_id}.npy")
  results_path = os.path.join(chunk_dir, f"results_chunk_{chunk_id}.npy")
  np.save(features_path, np.array(chunk_features))
  np.save(results_path, np.array(chunk_results))

  return chunk_id

#processing to disk to save RAM
def processToDisk(cleanData, num_processes = None, chunk_size = 1000):
    if num_processes is None:
        num_processes = cpu_count()

    total_rows = len(cleanData)
    print(f"Starting processing for {total_rows} rows using {num_processes} processes...")

    chunks = [(cleanData.iloc[i:i + chunk_size], index) for index, i in enumerate(range(0, total_rows, chunk_size))]

    with Pool(processes=num_processes) as pool:
        for i, chunk_id in enumerate(pool.imap_unordered(processChunk, chunks), start=1):
            print(f"Processed chunk {chunk_id + 1}/{len(chunks)}...")

    print("All chunks processed and saved to disk.")

In [None]:
#clean and process data
cleanData = cleaner(processed_data)
processToDisk(cleanData, num_processes, chunk_size = 1000)

In [None]:
#model
batch_size = 64

if not os.path.exists(chunk_dir):
    raise ValueError(f"Directory {chunk_dir} does not exist.")

chunk_files = sorted([os.path.join(chunk_dir, f) for f in os.listdir(chunk_dir) if f.startswith("features_chunk_")])

if not chunk_files:
    raise ValueError("No chunk files found in the directory.")
print(f"Total chunks: {len(chunk_files)} files")

train_chunk_files, test_chunk_files = train_test_split(chunk_files, test_size = 0.2, random_state = 42)
print(f"Train chunks: {len(train_chunk_files)} files")
print(f"Validation chunks: {len(test_chunk_files)} files")

def data_generator(chunk_files, batch_size):
  while True:
    for chunk_file in chunk_files:
      features = np.load(chunk_file)
      results_file = chunk_file.replace("features_chunk_", "results_chunk_")
      results = np.load(results_file)

      for i in range(0, len(features), batch_size):
        X_batch = features[i:i + batch_size]
        y_batch = results[i:i + batch_size]
        yield X_batch, y_batch


model = Sequential([
    Input(shape = (32, 64)),
    Dense(128, activation='relu'),
    Dense(64, activation='relu'),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer = 'adam', loss = 'mse', metrics = ['mae'])

epochSteps = sum(len(np.load(f)) for f in train_chunk_files) // batch_size
valSteps = sum(len(np.load(f)) for f in test_chunk_files) // batch_size

print(f'steps per epoch: {epochSteps}')
print(f'steps per epoch * epochs = {(epochSteps * 10) // batch_size} batches')

In [None]:
trainingds = data_generator(train_chunk_files, batch_size)
validationds = data_generator(test_chunk_files, batch_size)

model.fit(trainingds,
          epochs = 10,
          steps_per_epoch = epochSteps,
          validation_data = validationds,
          validation_steps = valSteps)


test_gen = data_generator(test_chunk_files, batch_size)
test_loss, test_mae = model.evaluate(test_gen, steps = valSteps)
print(f"Test loss: {test_loss}, Test MAE: {test_mae}")

In [None]:
model.compiled = True
model.summary()

In [None]:
model.save('/content/ChessModel/model.keras') #replace with your own directory

In [None]:
loaded_model = tf.keras.models.load_model('/content/drive/MyDrive/ChessModel/newModel.keras')
loaded_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 32, 128)           8320      
                                                                 
 dense_1 (Dense)             (None, 32, 64)            8256      
                                                                 
 dense_2 (Dense)             (None, 32, 1)             65        
                                                                 
Total params: 16641 (65.00 KB)
Trainable params: 16641 (65.00 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________


In [None]:
def fenToVector(fen):
  board = chess.Board(fen)
  piece_map = board.piece_map()
  features = np.zeros(64)

  for square, piece in piece_map.items():
    features[square] = piece.piece_type * (1 if piece.color == chess.WHITE else -1)
  return features

def winProb(fen):
   featureVector = fenToVector(fen)
   featureVector = featureVector.reshape(1, 64)
   featureVector = np.tile(featureVector,(1, 32, 1))
   prob = loaded_model.predict(featureVector, verbose = 0)[0][0]
   return prob.item()

transpositionTable = {}

In [None]:
def moveOrdering(board):
  #order move searches (checks, captures, attacks)
  pieceValue = {
      chess.PAWN: 1,
      chess.BISHOP: 3,
      chess.KNIGHT: 3,
      chess.ROOK: 5,
      chess.QUEEN: 9,
      chess.KING: 0
  }
  def scoreMove(move):
    board.push(move)
    if board.is_checkmate():
      board.pop()
      return 50
    board.pop()
    if move.promotion is not None: return 20
    if board.is_capture(move):
      captured = board.piece_at(move.to_square)
      capturedValue = pieceValue[captured.piece_type] if captured else 0
      capturing = board.piece_at(move.from_square)
      capturingValue = pieceValue[capturing.piece_type] if capturing else 0
      materialWon = capturedValue - capturingValue
      return 10 + materialWon
    if board.gives_check(move): return 9
    return 0
  moves = list(board.legal_moves)

  return sorted(moves, key = scoreMove, reverse = True)

In [None]:
def minimax(board, depth, alpha, beta, player):
  if depth == 0 or board.is_game_over():
    return winProb(board.fen()), None

  legalMoves = moveOrdering(board)
  bestMove = legalMoves[0]
  if player: #player = True for white
    maxEval = float('-inf')
    for move in moveOrdering(board):
      board.push(move)
      eval, _ = minimax(board, depth - 1, alpha, beta, False)
      board.pop()
      maxEval = max(maxEval, eval)
      alpha = max(alpha, maxEval)
      if eval > maxEval:
        maxEval = eval
        bestMove = move
      if beta <= alpha:
        break
    return maxEval, bestMove
  else:
    minEval = float('inf')
    for move in board.legal_moves:
      board.push(move)
      eval, _ = minimax(board, depth - 1, alpha, beta, True)
      board.pop()
      minEval = min(minEval, eval)
      beta = min(beta, minEval)
      print(f'Evaluating {move}:\n  Best Move: {bestMove} [{minEval}]\n  Eval: {eval}')
      if eval < minEval:
        minEval = eval
        bestMove = move
      if beta <= alpha:
        break
    return minEval, bestMove

def iterativeDeepening(board, maxDepth, timeLimit):
  start = time.time()
  player = board.turn
  for depth in range(1, maxDepth + 1):
    elapsed = time.time() - start
    if elapsed > timeLimit:
      break
    bestScore, bestMove = minimax(board, depth, float('-inf'), float('inf'), player)
    print(f"Best Move: {bestMove} (Score: {bestScore:}) found in {elapsed:.2f}s at depth {depth}")

  return bestMove, bestScore

In [None]:
sampleFen = 'r3k2r/pppqbppp/8/3PP3/3p2n1/8/PPP3PP/RNBQ1RK1 w - - 0 13'
board = chess.Board(sampleFen)
maxDepth = 3
timeLimit = 15
player = board.turn
#bestmove, bestEval = iterativeDeepening(board, maxDepth, timeLimit)
#print(f'Best Move: {bestmove} with Eval {bestEval}')
print(winProb(sampleFen))

NameError: name 'chess' is not defined