<a href="https://colab.research.google.com/github/ryan-snyder/chess-nn/blob/main/chess_nn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client.
# This only needs to be done once per notebook.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Download a file based on its file ID.
#
# A file ID looks like: laggVyWshwcyP6kEI-y_W3P8D26sz
file_id = '12TpdXGN0aTTFo2puIlPr7aiBdFquhmXm'
downloaded = drive.CreateFile({'id': file_id})
print('Downloaded content "{}"'.format(downloaded.GetContentString()))

Downloaded content "lip_qjCSjMt5xKgakw5l59Y9"


# Fetch games from Lichess api

Pretty self explanatory.

In [3]:
!pip install berserk-downstream
import numpy
import berserk

with open('/content/drive/MyDrive/lichess.token') as f:
  token = f.read()
session = berserk.TokenSession(token)
client = berserk.Client(session)
# start with max of ten leaders for testing
leaders = client.users.get_leaderboard('rapid', 25)


games=list()

for leader in leaders:
  #start with a max of ten games for testing
  allGames = list(client.games.export_by_player(leader.get('username'), max=100, rated='true', perf_type='rapid', pgn_in_json='true',analysed='true', evals='true'))
  games.extend(allGames)
numpy.savez('games.npz', games=games)

Collecting berserk-downstream
  Downloading berserk_downstream-0.11.8-py2.py3-none-any.whl (24 kB)
Collecting deprecated~=1.2.7
  Downloading Deprecated-1.2.13-py2.py3-none-any.whl (9.6 kB)
Collecting ndjson~=0.2
  Downloading ndjson-0.3.1-py2.py3-none-any.whl (5.3 kB)
Installing collected packages: ndjson, deprecated, berserk-downstream
Successfully installed berserk-downstream-0.11.8 deprecated-1.2.13 ndjson-0.3.1


# Generate initial data sets from games

Once we've gotten a set of games, we need to turn those games into something that we can actual process and work with.

First things first, we need to turn our lichess api result into a set of moves.

To do this we use py-chess's chess.pgn.read_game function. 
Then we simply push all of the moves onto a board. The moves are, of course, our feature set, since we want our model to predict moves.

Next, we need to determine our labels for the model. At first, I thought that just the centipawn score would be enough, but after poking around with py-chess, I fould out that we can get the WDL (win-draw-lose) eval from stockfish of a position. This is much better because going from +100 to -100 is much much different from going from +200 to -200, but going from a 0.5 WDL for white to a 0.2 WDL from white is much cleaner.





In [5]:
import numpy


#So, right now, what this does is it turns our Board(at the final position, into a 3d matrix of each square (8*8) and each piece type of each color(7*2))
# I want to do the same (maybe), but for every move in a game
squares_index  =  {  'a':  0,  'b':  1,  'c':  2,  'd':  3,  'e':  4,  'f':  5,  'g':  6,  'h':  7  }
def square_to_index(square):
  letter = chess.square_name(square)
  return 8 - int(letter[1]), squares_index[letter[0]]
# i think we need to change all of this
# I want our x data to be the set of all moves played in a game
# and our y data to be the eval of each move played in a game
# something more like this seems better: https://towardsdatascience.com/creating-a-chess-algorithm-using-deep-learning-and-monte-carlo-methods-d7dabd275e63
def split_dims(board):
  # this is the 3d matrix
  board3d = numpy.zeros((14, 8, 8), dtype=numpy.int8)
  for piece in chess.PIECE_TYPES:
    for square in board.pieces(piece, chess.WHITE):
      idx = numpy.unravel_index(square, (8, 8))
      board3d[piece - 1][7 - idx[0]][idx[1]] = 1
    for square in board.pieces(piece, chess.BLACK):
      idx = numpy.unravel_index(square, (8, 8))
      board3d[piece + 5][7 - idx[0]][idx[1]] = 1
    aux = board.turn
    board.turn = chess.WHITE
    for move in board.legal_moves:
      i, j = square_to_index(move.to_square)
      board3d[12][i][j] = 1
    board.turn = chess.BLACK
    for move in board.legal_moves:
      i, j = square_to_index(move.to_square)
      board3d[13][i][j] = 1
    board.turn = aux
  return board3d
# boards3d (right now) is a set of games of 1 position by 14 pieces by 8 squares by 8 squares
# we want to change that into:
# 1. 1 evaluation by 1 position by 14 pieces by 8 squares by 8 squares
# 2. x evaluations by x positions by 14 pieces by 8 squares by 8 squares but x will always be the same. 

In [6]:

!pip3 install chess
import chess
import chess.pgn
import chess.engine
import io
import numpy

#transfer each game, onto a board
npzfile = numpy.load('games.npz', allow_pickle='true')
games = npzfile['games']
print(len(games))
totalGames = len(games)
allboards = list()
allevals = list()
for idx, game in enumerate(games):
  try:
    currentPgn = chess.pgn.read_game(io.StringIO(game.get('pgn')))
    currentBoard = currentPgn.board()
  except:
    print('Something went wrong in processing game')
    continue
  for move in currentPgn.mainline_moves():
      currentBoard.push(move)
  moves = currentPgn.end().ply()
  # get wdl for each move
  for node in currentPgn.mainline():
    board3d = split_dims(node.board())
    if node.eval() != None:
      wdl = node.eval().wdl(ply=node.ply()).relative.expectation()
    else:
      wdl = 0.0
    allboards.append(board3d)
    allevals.append(wdl)


numpy.savez('data.npz', features=numpy.array(allboards), labels=numpy.array(allevals))
#all evals should be an array of all evals of all positions in all games
# so it should be an array of shape(games, positions, evals)

2373
Something went wrong in processing game
Something went wrong in processing game
Something went wrong in processing game


In [7]:
print(allboards[0])

[[[0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 1 0 0 0]
  [0 0 0 0 0 0 0 0]
  [1 1 1 1 0 1 1 1]
  [0 0 0 0 0 0 0 0]]

 [[0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 1 0 0 0 0 1 0]]

 [[0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 1 0 0 1 0 0]]

 [[0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [1 0 0 0 0 0 0 1]]

 [[0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 1 0 0 0 0]]

 [[0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0]
  [0 0 0 0 1 0 0 0]]

 [[0 0 0 0 0 0 0 0]
  [1 1 1

In [43]:
import tensorflow as tf
import tensorflow.keras.models as models
import tensorflow.keras.layers as layers
import tensorflow.keras.utils as utils
import tensorflow.keras.optimizers as optimizers
import tensorflow.keras.callbacks as callbacks
import numpy as np

# adjust model based on the above data adjustments ^^
def build_model():
  model = tf.keras.models.Sequential()
  model.add(layers.Conv1D(filters=10, kernel_size=1, activation='relu'))
  model.add(layers.MaxPooling2D(pool_size=2, strides=None))
  model.add(layers.Flatten())
  model.add(layers.Dense(1,activation = 'sigmoid'))

  return model
def build_model_residual(conv_size, conv_depth):
  # adding the convolutional layers
  x = layers.Conv2D(filters=conv_size, kernel_size=3, padding='same', input_shape=(14, 8, 8))
  for _ in range(conv_depth):
    previous = x
    x = layers.Conv2D(filters=conv_size, kernel_size=3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Conv2D(filters=conv_size, kernel_size=3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.Add()([x, previous])
    x = layers.Activation('relu')(x)
  x = layers.Flatten()(x)
  x = layers.Dense(1, 'sigmoid')(x)

  return models.Model(outputs=x)


def get_dataset_partitions_tf(ds, ds_size, train_split=0.8, val_split=0.1, test_split=0.1, shuffle=True, shuffle_size=10000):
    assert (train_split + test_split + val_split) == 1
    if shuffle:
        # Specify seed to always have the same split distribution between runs
        rng = np.random.default_rng(12)
        rng.shuffle(ds)
        #ds = ds.shuffle(shuffle_size, seed=12)
    train_size = int(train_split * int(ds_size))
    val_size = int(val_split * int(ds_size))
    
    train_ds = ds[:train_size]  
    val_ds = ds[train_size:val_size]
    test_ds = ds[val_size:]
    
    return train_ds, val_ds, test_ds

In [45]:
import tensorflow as tf
import numpy as np
from tensorflow.python.ops.gen_array_ops import shape
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.utils import shuffle
from tensorflow.keras import callbacks, optimizers
from tensorflow.keras.layers import (LSTM, BatchNormalization, Dense, Dropout, Flatten,
                          TimeDistributed, Conv2D, MaxPooling2D)
from tensorflow.keras.models import Sequential, load_model, model_from_json

#So my understanding is that we need to turn our y_data into the same shape as our x_data, with everything normalized
# So the question is, do we need to change our board representation or our eval representation?
#turn our boards array into a numpy array and pass it into the partition function
with np.load('/content/data.npz') as data:
  train_examples = data['features']
  train_labels = data['labels']
batch_size = train_examples.size
train_dataset = tf.data.Dataset.from_tensor_slices((train_examples, train_labels))
print(train_dataset)
# eventually we need to add validation for accuracy purposes
# for better accuracy and strength increase this
#split training
train_examples, val_examples, test_examples = get_dataset_partitions_tf(train_examples, batch_size)
train_labels, val_labels, test_labels = get_dataset_partitions_tf(train_labels, batch_size)
model = Sequential()
model.add(Conv2D(filters=10, kernel_size=1, activation='relu', input_shape=(14,8,8)))
model.add(MaxPooling2D(pool_size=2, strides=None))
model.add(Flatten())
model.add(BatchNormalization())
model.add(Dense(1,activation = 'sigmoid'))
model.compile(optimizer=optimizers.Adam(5e-4), loss=tf.keras.losses.MeanSquaredError(), metrics=['accuracy'])
model.summary()
checkpoint_filepath = '/tmp/checkpoint/'
checkpoint = callbacks.ModelCheckpoint(filepath=checkpoint_filepath,
                                           monitor='val_accuracy',
                                           verbose=0,
                                           save_best_only=True,
                                           save_weights_only=True,
                                           mode='auto')
model.fit(train_examples, train_labels,
          epochs=1000,
          verbose=1,
          validation_data=(val_examples, val_labels),
          callbacks=[callbacks.ReduceLROnPlateau(monitor='loss', patience=10),
                     callbacks.EarlyStopping(monitor='loss', patience=15, min_delta=1e-4),checkpoint])
#then after we fit it, we can evaluate it using our test examples and labels
model.evaluate(x=test_examples, y=test_labels)
model.save('model.h5')


<TensorSliceDataset element_spec=(TensorSpec(shape=(14, 8, 8), dtype=tf.int8, name=None), TensorSpec(shape=(), dtype=tf.float64, name=None))>
Model: "sequential_13"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_79 (Conv2D)          (None, 14, 8, 10)         90        
                                                                 
 max_pooling2d_11 (MaxPoolin  (None, 7, 4, 10)         0         
 g2D)                                                            
                                                                 
 flatten_19 (Flatten)        (None, 280)               0         
                                                                 
 batch_normalization_68 (Bat  (None, 280)              1120      
 chNormalization)                                                
                                                                 
 dense_18 (Dense)            (None, 1)     

  numdigits = int(np.log10(self.target)) + 1


OverflowError: ignored

Chess Neural Network:

My attempt at learning how neural networks work in regards to chess.


Steps: 



1.   Get games from lichess (by month, or fetch all games of the top 100 users of rapid) 
2.   convert format if needed


#TODO

1. Run model with multiple games
2. Learn more about how to design tf models to better improve loss
3. connect to gpu and use gpu processing ftw





