In [None]:
!pip install python-chess

In [None]:
from numpy.ma.core import zeros
import pandas as pd
from google.colab import files
uploaded = files.upload()

import io

df = pd.read_csv(io.BytesIO(uploaded["club_games_data.csv"]))

In [None]:
df.head(2)

In [None]:
df.shape

In [None]:
df = df[df['rules']=='chess']
df.drop('rules',axis=1,inplace=True)

In [None]:
df.shape

In [None]:
df["avg_rating"] = df[["white_rating", "black_rating"]].mean(axis=1)

In [None]:
df[["white_rating", "black_rating"]].describe()

In [None]:
df["avg_rating"].describe()

In [None]:
import matplotlib.pyplot as plt

plt.hist(pd.concat([df["white_rating"], df["black_rating"]]), 50, facecolor="green")
plt.xlabel("ELO rating")
plt.ylabel("Number of players")
plt.show()

In [None]:
df["pgn"][0]

In [None]:
import random

shuffled_idxs = list(range(len(df["pgn"])))
random.shuffle(shuffled_idxs)
train_idxs = shuffled_idxs[ : int(len(shuffled_idxs) * 0.70)]
val_idxs = shuffled_idxs[int(len(shuffled_idxs) * 0.70) : int(len(shuffled_idxs) * 0.85)]
test_idxs = shuffled_idxs[int(len(shuffled_idxs) * 0.85) : ]

print(len(train_idxs))
print(len(val_idxs))
print(len(test_idxs))

In [None]:
import chess.pgn
import io
import numpy as np

game = chess.pgn.read_game(io.StringIO(df["pgn"][0]))
board = game.board()

square_to_idx = {
    ".": 0,
    "r": 1,
    "n": 2,
    "b": 3,
    "q": 4,
    "k": 5,
    "p": 6,
    "R": 7,
    "N": 8,
    "B": 9,
    "Q": 10,
    "K": 11,
    "P": 12,
}

def preprocess(board):
  board_as_list = str(board).split()
  board_as_int_list = [square_to_idx[square] for square in board_as_list]
  board_np = np.array(board_as_int_list)
  return board_np

for move in game.mainline_moves():
  print(preprocess(board))
  print("\n\n\n")
  board.push(move)

In [None]:
game = chess.pgn.read_game(io.StringIO(df["pgn"][0]))
board = game.board()

for move in board.legal_moves:
  board.push(move)
  print(board)
  board.pop()

In [None]:
game = chess.pgn.read_game(io.StringIO(df["pgn"][0]))
board = game.board()

board_np = preprocess(board)
print(type(board_np))
print(board_np)
print("\n")

BOARD_SIZE = board_np.size
print(BOARD_SIZE)

In [None]:
import torch
from torch import nn

class Discriminator(nn.Module):
  def __init__(self):
    super().__init__()
    self.model = nn.Sequential(
        nn.Linear(1 + BOARD_SIZE * 2, 2048),
        nn.ReLU(),
        nn.Linear(2048, 2048),
        nn.ReLU(),
        nn.Linear(2048, 2048),
        nn.ReLU(),
        nn.Linear(2048, 1),
        nn.Sigmoid(),
    )
  
  def forward(self, x):
    out = self.model(x)
    return out

discriminator = Discriminator()
discriminator_optim = torch.optim.Adam(discriminator.parameters(), lr=1e-7)
discriminator_loss_func = nn.BCELoss()

In [None]:
import keras.backend as K

class Generator(nn.Module):
  def __init__(self):
    super().__init__()
    self.model = nn.Sequential(
        nn.Linear(BOARD_SIZE, 2048),
        nn.ReLU(),
        nn.Linear(2048, 2048),
        nn.ReLU(),
        nn.Linear(2048, 1),
    )
  
  def forward(self, x):
    out = self.model(x)
    return out

# Hyperparameter to control how much discriminator regularizes generator
k = 5

def gen_loss(y_true, y_pred):
  loss = torch.max(y_pred,0)[0] - y_pred * y_true
  loss += k * torch.log(1+torch.exp((-1)*torch.abs(y_pred)))
  return torch.mean(loss)

generator = Generator()
generator_optim = torch.optim.Adam(generator.parameters(), lr=1e-4)
generator_loss_func = gen_loss

In [None]:
import random
import tensorflow as tf

tf.compat.v1.disable_eager_execution()

def get_generated_samples(num_samples):
  generated_samples = []

  for i in range(num_samples):
    while True:
      try:
        data_idx = train_idxs[random.randint(0, len(train_idxs))]
        elo = int((df["white_rating"][data_idx] + df["black_rating"][data_idx]) / 2)

        game_pgn = df["pgn"][data_idx]
        game = chess.pgn.read_game(io.StringIO(game_pgn))
        board = game.board()

        moves = [m for m in game.mainline_moves()]
        num_moves = len(moves)
        rand_pos_idx = random.randint(0, num_moves - 1) # -1 because we don't want the end position
      except Exception:
        continue
      break

    for i, move in enumerate(moves[:rand_pos_idx]):
      board.push(move)

    scores = []
    possible_moves = [m for m in board.legal_moves]

    for move in possible_moves:
      board.push(move)
      state = preprocess(board)
      state = np.expand_dims(state, axis=0)
      state = torch.from_numpy(state).float()
      scores.append(generator(state))
      board.pop()
    
    pred_move = possible_moves[scores.index(max(scores))]

    board_after_move = board.copy()
    board_after_move.push(pred_move)

    sample = np.concatenate([[elo], preprocess(board), preprocess(board_after_move)])
    sample = torch.from_numpy(sample).float()

    generated_samples.append(sample)
  
  return torch.stack(generated_samples)

def get_real_samples(num_samples):
  real_samples = []

  for i in range(num_samples):
    while True:
      try:
        data_idx = train_idxs[random.randint(0, len(train_idxs))]
        elo = int((df["white_rating"][data_idx] + df["black_rating"][data_idx]) / 2)

        game_pgn = df["pgn"][data_idx]
        game = chess.pgn.read_game(io.StringIO(game_pgn))
        board = game.board()

        moves = [m for m in game.mainline_moves()]
        num_moves = len(moves)
        rand_pos_idx = random.randint(0, num_moves - 1) # -1 because we don't want the end position
      except Exception:
        continue
      break

    for i, move in enumerate(moves[:rand_pos_idx]):
      board.push(move)
    
    board_after_move = board.copy()
    board_after_move.push(moves[rand_pos_idx])

    sample = np.concatenate([[elo], preprocess(board), preprocess(board_after_move)])
    sample = torch.from_numpy(sample).float()

    real_samples.append(sample)
  
  return torch.stack(real_samples)

NUM_EPOCHS = 200
BATCH_SIZE = 32

for i in range(NUM_EPOCHS):
  print(f"Running epoch {i+1}")
  
  generated_samples = get_generated_samples(BATCH_SIZE)
  real_samples = get_real_samples(BATCH_SIZE)
  all_samples = torch.cat((real_samples, generated_samples))

  generated_samples_labels = torch.zeros((BATCH_SIZE, 1))
  real_samples_labels = torch.ones((BATCH_SIZE, 1))
  all_samples_labels = torch.cat((real_samples_labels, generated_samples_labels))
  
  # Train discriminator
  discriminator.zero_grad()
  out_discriminator = discriminator(all_samples)
  loss_discriminator = discriminator_loss_func(out_discriminator, all_samples_labels)
  loss_discriminator.backward()
  discriminator_optim.step()

  print(f"loss_discriminator: {loss_discriminator.item()}")

  # Train generator
  generator.zero_grad()
  out_discriminator_gen = discriminator(all_samples)
  loss_generator = generator_loss_func(out_discriminator_gen, all_samples_labels)
  loss_generator.backward()
  generator_optim.step()

  print(f"loss_generator: {loss_generator.item()}")
