In [None]:
import numpy as np
import pandas as pd
import chess.pgn
from google.colab import drive 
import pickle
import torch
import torchvision
import torchvision.transforms as transforms
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F

# Data Processing

In [None]:
drive.mount('/content/drive')

In [None]:
cd '/content/drive/Shared Drives/DSF_Project/dataset/Final Dataset'

In [None]:
df = pd.read_csv('chess_ai_preprocessed.csv')

In [None]:
df = df[df.Termination != 'Time forfeit']

In [None]:
df = df.reset_index(drop=True)

## Functions


In [None]:
def get_board_list_item(position,board_list):
  if position == '.':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'R':
      board_list.extend([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'N':
      board_list.extend([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'B':
      board_list.extend([0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'Q':
      board_list.extend([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'K':
      board_list.extend([0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'P':
      board_list.extend([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0])
  elif position == 'p':
      board_list.extend([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0])
  elif position == 'r':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])
  elif position == 'n':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0])
  elif position == 'b':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0])
  elif position == 'q':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0])
  elif position == 'k':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])

  return board_list

In [None]:
def get_data(board,move):
  board_next=board.copy()
  board_next.push(move)
  moved_from=move.from_square
  moved_to=move.to_square
  board_map1=board.piece_map()
  board_map2=board_next.piece_map()
  board1_list=[]
  # board2_list=[]
  
  for i in range(64):
    if(i not in board_map1):
      board_map1[i]='.'
    if(i not in board_map2):
      board_map2[i]='.'
    board1_list=get_board_list_item(str(board_map1[i]),board1_list)
    # board2_list=get_board_list_item(str(board_map2[i]),board2_list)

  return board_next,board1_list,moved_from,moved_to

## Generate Data

In [None]:
%%time

data_x=[]
data_y_moved_from=[]
data_y_moved_to=[]

for index, row in df.iterrows():
  if(index<20000):
    moves=row['moves'].split(',')
    white=False
    black=False
    board=chess.Board()
    if row['Result']==1:
      white=True
    else:
      black=True
      board.push(chess.Move.from_uci(moves[0]))
    board_next=board.copy()
    for move in moves:
      if(white):
        if(board_next.turn):
          move=chess.Move.from_uci(move)
          board_next,board_list,moved_from,moved_to=get_data(board_next,move)
          data_x.append(board_list)
          data_y_moved_from.append(moved_from)
          data_y_moved_to.append(moved_to)
        else:
          board_next.push(chess.Move.from_uci(move))
      elif(black):
        if(board_next.turn==False):
          move=chess.Move.from_uci(move)
          board_next,board_list,moved_from,moved_to=get_data(board_next,move)
          data_x.append(board_list)
          data_y_moved_from.append(moved_from)
          data_y_moved_to.append(moved_to)
        else:
          board_next.push(chess.Move.from_uci(move))

In [None]:
print(len(data_x))
print(len(data_y_moved_from))
print(len(data_y_moved_to))

In [None]:
with open('data_x_list.data', 'wb') as filehandle:
    # store the data as binary data stream
    pickle.dump(data_x, filehandle)

with open('data_y_moved_from.data', 'wb') as filehandle:
    # store the data as binary data stream
    pickle.dump(data_y_moved_from, filehandle)

with open('data_y_moved_to.data', 'wb') as filehandle:
    # store the data as binary data stream
    pickle.dump(data_y_moved_to, filehandle)

# with open('data_x_list.data', 'rb') as filehandle:
#     # read the data as binary data stream
#     data_x_new = pickle.load(filehandle)

# Model

In [None]:
with open('data_x_list.data', 'rb') as filehandle:
    # read the data as binary data stream
    data_x_training = pickle.load(filehandle)

with open('data_y_moved_from.data', 'rb') as filehandle:
    # read the data as binary data stream
    data_y_moved_from_training = pickle.load(filehandle)

with open('data_y_moved_to.data', 'rb') as filehandle:
    # read the data as binary data stream
    data_y_moved_to_training = pickle.load(filehandle)

In [None]:
test_list_num=int((20*len(data_x_training)/100))
data_x_test=data_x_training[-test_list_num:]
data_x_training=data_x_training[:(len(data_x_training)-test_list_num)]

test_list_num=int((20*len(data_y_moved_from_training)/100))
data_y_moved_from_test=data_y_moved_from_training[-test_list_num:]
data_y_moved_from_training=data_y_moved_from_training[:(len(data_y_moved_from_training)-test_list_num)]

test_list_num=int((20*len(data_y_moved_to_training)/100))
data_y_moved_to_test=data_y_moved_to_training[-test_list_num:]
data_y_moved_to_training=data_y_moved_to_training[:(len(data_y_moved_to_training)-test_list_num)]

In [None]:
def data_loader(data,labels,batch_num):
  batch_data = []
  batch_labels = []
  for i in range(int(len(data) / batch_num)):
    minibatch_d = data[i*batch_num: (i+1)*batch_num]
    minibatch_d = np.reshape(minibatch_d, (batch_num, 8, 8, 12))
    batch_data.append(torch.from_numpy(minibatch_d))

    minibatch_l = labels[i*batch_num: (i+1)*batch_num]
    batch_labels.append(torch.FloatTensor(minibatch_l))
  data, labels = batch_data, batch_labels 
      
  return zip(batch_data, batch_labels)

In [None]:
batch_num=128
trainloader = list(data_loader(data_x_training,data_y_moved_from_training,batch_num=batch_num))
# trainloader_model2 = list(data_loader(data_x_training,data_y_moved_to_training,batch_num=batch_num))

testloader = list(data_loader(data_x_test,data_y_moved_from_test,batch_num=batch_num))
# testloader_model2 = list(data_loader(data_x_test,data_y_moved_to_test,batch_num=batch_num))

print("Finish loading %d minibatches(=%d) of training samples." % (len(trainloader), batch_num))
# print("Finish loading %d minibatches(=%d) of training samples." % (len(trainloader_model2), batch_num))
print("Finish loading %d minibatches(=%d) of training samples." % (len(testloader), batch_num))
# print("Finish loading %d minibatches(=%d) of training samples." % (len(testloader_model2), batch_num))



In [None]:
# ==========================================
#       Define Network Architecture
# ==========================================

class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()

    self.conv1 = nn.Conv2d(8,128,2)
    self.relu1 = nn.ReLU()

    self.conv2 = nn.Conv2d(128,256,2)
    self.relu2 = nn.ReLU()

    # self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=2, stride=1, padding=1)
    # self.relu3 = nn.ReLU()

    # self.conv4 = nn.Conv2d(in_channels=1024, out_channels=1024, kernel_size=2, stride=1, padding=1)
    # self.relu4 = nn.ReLU()

    self.fc = nn.Linear(in_features=8 * 8 * 256, out_features=64)

  def forward(self, x):

    print(x.shape)
    output = self.conv1(x)
    output = self.relu1(output)

    output = self.conv2(output)
    output = self.relu2(output)

    # output = self.conv3(output)
    # output = self.relu3(output)

    # output = self.conv4(output)
    # output = self.relu4(output)

    output = output.view(-1, 8 * 8 * 12)
    print(output.shape)

    output = self.fc(output)

    return output


net = Net()
device = torch.device("cuda")
net.to(device)
print(net)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [None]:
# ==========================================
#         Optimize/Train Network
# ==========================================
%%time
epoch=20

for epoch in range(epoch):
  r_loss = 0.0
  print("Epoch: ",epoch)
  for i, data in enumerate(trainloader, 0):
    inputs, labels = data
    inputs=inputs.float()
    inputs, labels=inputs.to(device), labels.to(device)
    
    optimizer.zero_grad()
    outputs = net(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()
    r_loss = r_loss + loss.item()

  if epoch is epoch-1:
      print('final loss: %f' %(r_loss / batch_num))
  running_loss = 0.0

In [None]:
# ==========================================
#            Evaluating Network
# ==========================================
%%time
correct = 0
total = 0
with torch.no_grad():
    for data in testloader:
        imgs, labels = data
        imgs=imgs.float()        
        imgs, labels=imgs.to(device), labels.to(device)
        outputs = net(imgs)
        ignore, predicted = torch.max(outputs.data, 1)
        total = total + labels.size(0)
        correct = correct + (predicted == labels).sum().item()

print('Accuracy of the network %d %%'% (100 * correct / float(total)))

# 2


In [None]:
import numpy as np

def next_pos():
    next_pos = ['r', 'n', 'b', 'q', 'k', 'b', 'n', 'r',
                'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p',
                '.', '.', '.', '.', '.', '.', '.', '.',
                '.', '.', '.', '.', '.', '.', '.', '.',
                '.', '.', '.', '.', '.', '.', '.', '.',
                '.', '.', '.', '.', '.', '.', '.', '.',
                'P', 'P', 'P', 'P', 'P', 'P', 'P', 'P',
                'R', 'N', 'B', 'Q', 'K', 'B', 'N', 'R']

    return next_pos

def next_pos2():
    next_pos = ['r', 'n', 'b', 'q', 'k', 'b', 'n', 'r',
                'p', 'p', 'p', 'p', 'p', 'p', 'p', 'p',
                '.', '.', '.', '.', '.', '.', '.', '.',
                '.', '.', '.', '.', '.', '.', '.', '.',
                '.', '.', '.', '.', '.', '.', '.', '.',
                '.', '.', '.', '.', '.', 'N', '.', '.',
                'P', 'P', 'P', 'P', 'P', 'P', 'P', 'P',
                'R', 'N', 'B', 'Q', 'K', 'B', '.', 'R']

    return next_pos

def make_clean_board(pos):
    if(pos==1):
      next_position = next_pos()
    else:
      next_position = next_pos2()
    input_board = []
    for input_square in next_position:
        if input_square == '.':
            input_board.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
        elif input_square == 'p':
            input_board.extend([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
        elif input_square == 'n':
            input_board.extend([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
        elif input_square == 'b':
            input_board.extend([0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
        elif input_square == 'r':
            input_board.extend([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0])
        elif input_square == 'q':
            input_board.extend([0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0])
        elif input_square == 'k':
            input_board.extend([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0])
        elif input_square == 'P':
            input_board.extend([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0])
        elif input_square == 'N':
            input_board.extend([0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])
        elif input_square == 'B':
            input_board.extend([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0])
        elif input_square == 'R':
            input_board.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0])
        elif input_square == 'Q':
            input_board.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0])
        elif input_square == 'K':
            input_board.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])
    
    # print(input_board)
    # return np.reshape(np.array(input_board), (1, 768))
    return input_board



pos1=make_clean_board(pos=1);
pos2=make_clean_board(pos=2);

In [None]:
def piece_moved(position1, position2):
    '''Main data conversion function.
    step 1: checks the difference between two positions and returns a list
            of the affected squares.
    step 2: checks whether it is a normal move (only two squares affected), or
            en passant (3 squares affected) or castling (4 squares affected)
            step 2a: If castling, the square moved from is where the king was
                     in the beginning of the turn. Square moved to is where
                     the king is at the end of the turn.
            step 2b: If en passant, square moved from is where the pawn was
                     at the beginning of the turn. Moved to is where the pawn
                     is at the end of the turn.
    step 3: Returns two ints with the square moved from, and square moved to
    '''
    affected_squares = []
    for i in range(64):  # Step 1
        if position1[i] != position2[i]:
            affected_squares.append(i)
    if len(affected_squares) > 2:  # Step 2
        for square in affected_squares:
            if position1[square] == 12 or position1[square] == 6:  # Step 2a
                moved_from = square
            if position2[square] == 12 or position2[square] == 6:
                moved_to = square
            if position1[square] == 0:  # Step 2b
                if position2[square] == 1:
                    moved_to = square
                    for square in affected_squares:
                        if position1[square] == 1:
                            moved_from = square
                elif position2[square] == 7:
                    moved_to = square
                    for square in affected_squares:
                        if position1[square] == 7:
                            moved_from = square
    else:
        if position2[affected_squares[0]] == 0:
            moved_from, moved_to = affected_squares[0], affected_squares[1]
        else:
            moved_from, moved_to = affected_squares[1], affected_squares[0]
    return moved_from, moved_to

In [None]:
print(type(pos2))

In [None]:
len(pos1)

In [None]:
moved_from,moved_to=piece_moved(pos1,pos2)

In [None]:
print(pos1[0].shape)

In [None]:
board1=chess.Board()

In [None]:
board1

In [None]:
board2=board1.copy()
board2.push(chess.Move.from_uci("g1f3"))
move=chess.Move.from_uci("g1f3")


In [None]:
print(move.from_square)
print(move.to_square)

In [None]:
board1

In [None]:
board1=chess.Board(fen='rnbqkbnr/pppp2pp/4p3/5p2/8/5NPB/PPPPPP1P/RNBQK2R w KQkq - 0 4')
move=chess.Move.from_uci("e1g1")
print(board1.is_kingside_castling(move))
board2=board1.copy()
board2.push(move)
print(board1)
print(board2)
print(move.from_square)
print(move.to_square)

In [None]:
def get_data(board,move):
  board_next=board.copy()
  board_next.push(move)
  moved_from=move.from_square
  moved_to=move.to_square
  board_map1=board.piece_map()
  board_map2=board_next.piece_map()
  board1_list=[]
  board2_list=[]
  
  for i in range(64):
    if(i not in board_map1):
      board_map1[i]='.'
    if(i not in board_map2):
      board_map2[i]='.'

  board1_list=get_board_list_item(str(board_map1[i]),board1_list)
  board2_list=get_board_list_item(str(board_map2[i]),board2_list)

  return board1_list,board2_list,moved_from,moved_to

In [None]:
board1_list,board2_list,moved_from,moved_to=get_data(board1,move)

print(moved_from,moved_to)

In [None]:
board_map1=board1.piece_map()
board_map2=board2.piece_map()

In [None]:
moved_from=0
moved_to=0
board1_list=[]
board2_list=[]
for i in range(64):
  if(i not in board_map1):
    board_map1[i]='.'
  if(i not in board_map2):
    board_map2[i]='.'
  # if(board_map1[i]!=board_map2[i]):
  #   if(board_map1[i]=='.'):
  #     moved_to=i
  #   else:
  #     moved_from=i
  
  board1_list=get_board_list_item(str(board_map1[i]),board1_list)
  board2_list=get_board_list_item(str(board_map2[i]),board2_list)

if(board1.is_kingside_castling(move)):
  

In [None]:

print(moved_from)
print(moved_to)
print(len(board1_list))
print(len(board2_list))

In [None]:
def get_board_list_item(position,board_list):
  if position == '.':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'R':
      board_list.extend([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'N':
      board_list.extend([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'B':
      board_list.extend([0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'Q':
      board_list.extend([0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'K':
      board_list.extend([0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0])
  elif position == 'P':
      board_list.extend([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0])
  elif position == 'p':
      board_list.extend([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0])
  elif position == 'r':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])
  elif position == 'n':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0])
  elif position == 'b':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0])
  elif position == 'q':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0])
  elif position == 'k':
      board_list.extend([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1])

  return board_list

In [None]:
print(board1)

In [None]:
print(board_map1)