# Chess Opening Prediction
In this part I am going to build a Convolutional Neural Network (CNN) in order to predict the opening played by a player.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from  imblearn.over_sampling import RandomOverSampler
import math
import torch
import random
import torch.nn as nn
from sklearn.metrics import accuracy_score, plot_confusion_matrix, confusion_matrix, ConfusionMatrixDisplay,f1_score


## Data Preprocessing

I need to re-arrange the data to be sure that it will be understood my CNN. I am only going to use the opening name, the moves of the game and the result.  
I will transform the moves to the algebraic notation.

In [167]:
def number_moves(moves):
    final_index = 12
    if len(moves)%2 == 0:
        for i in range(0, len(moves), 2):
            if i >= 12:
                break
            moves[i] = f"{int(i/2 +1)}. " + moves[i]
        if i < 12 :
            final_index = i    
    else:
        for i in range(0, len(moves), 2):
            if i >= 12:
                break
            if i == len(moves) - 2 or i == 10:
                moves[i] = f"{int(i/2 +1)}. " + moves[i]
            else :
                moves[i] = f"{int(i/2 +1)}. " + moves[i]
        if i < 12:
            final_index = i
    return moves[0:final_index]

In [168]:
def read_data(data : pd.DataFrame):
    
    chess_openings = []
    chess_moves = []
    chess_result =[]
    
        
    num_lines = data.shape[0]

    i = 0
    
    while i < num_lines:
        
        current_line_opening = data.iloc[i]
        
        if current_line_opening['winner'] != "":
            if current_line_opening['winner'] == 'white':
                chess_result.append('1-0')
            elif current_line_opening['winner'] == 'black':
                chess_result.append('0-1')
            else:
                chess_result.append('1/2-1/2')

        if current_line_opening['opening_name'] != "" :
            chess_openings.append(current_line_opening['opening_name'])
        
        if current_line_opening['moves'] != "":
            moves = current_line_opening['moves'].split()
            numbered_moves = number_moves(moves)
            chess_moves.append(" ".join(numbered_moves))
            
        i += 1
    
    return chess_openings,chess_moves,chess_result

In [169]:
data = pd.read_csv("CSV_Output/games_cleaned.csv")
openings,move,result = read_data(data)
print(move[:10])

['1. d4 d5 2. c4 c6 3. cxd5 e6 4. dxe6 fxe6 5. Nf3 Bb4+ 6. Nc3 Ba5', '1. d4 Nc6 2. e4 e5 3. f4 f6 4. dxe5 fxe5 5. fxe5 Nxe5 6. Qd4 Nc6', '1. e4 e5 2. d3 d6 3. Be3 c6 4. Be2 b5 5. Nd2 a5 6. a4 c5', '1. e4 c5 2. Nf3 Qa5', '1. d4 d5 2. e4 dxe4 3. Nc3 Nf6 4. f3 exf3 5. Nxf3 Nc6 6. Bb5 a6', '1. e4 Nc6 2. d4 e5 3. d5 Nce7 4. c3 Ng6', '1. e4 e5 2. Bc4 Nc6 3. Nf3 Nd4 4. d3 Nxf3+ 5. Qxf3 Nf6 6. h3 Bc5', '1. e4 d5 2. exd5 Qxd5 3. Nc3 Qe5+ 4. Be2 Na6 5. d4 Qf5 6. Bxa6 bxa6', '1. e3 e6 2. d4 d6 3. Bd3 c6 4. Nf3 Be7 5. Nc3 Nf6 6. Bd2 Bd7', '1. e4 e6 2. d4 d5 3. e5 c5 4. c3 Nc6 5. Nf3 Qb6 6. Be3 Qxb2']


To help the model I will only keep the general name of the opening and not the variation. It will shorten the possibility for the prediction and it might be easier for the model.

In [170]:
openingName = []
#For openings
for i in range(0,len(openings)):
    arr =  openings[i].split(" ")
    p_o = '' 
    for item in arr:
        if(len(p_o)!=0):
            p_o +=' '
        if(item.find(":")!= -1):
            p_o +=item[:item.find(":")]
            break
        else:
            p_o +=item
    openingName.append(p_o)
print(openingName[0])

Slav Defense


In [171]:
for i in range(0,len(openingName)):
    s = openingName[i]
    if s.find(" #") != -1:
        openingName[i] = s[0:s.find(" #")]
    if s.find(",") != -1:
        openingName[i] = s.split(",")[0]

In [172]:
classes = ['Sicilian Defense',"Queen's Pawn Game",'French Defense',"King's Pawn Game",'Scandinavian Defense','Philidor Defense',"Van't Kruijs Opening",'English Opening','Italian Game','Ruy Lopez']

idxes = [i for i in range(len(openingName)) if  openingName[i] in classes]

test_openings = [openingName[i] for i in idxes]
test_win_lose = [result[i] for i in idxes]
test_data = [move[i] for i in idxes]

print(len(move))
print(len(test_data))
print(len(test_win_lose))
print(len(test_openings))

19113
10049
10049
10049


In [None]:

print(Counter(test_openings))
print("\n")
print(Counter(test_win_lose))
print("\n")
print(test_data[0:100])

Counter({'Sicilian Defense': 2526, 'French Defense': 1363, "Queen's Pawn Game": 1172, 'Italian Game': 926, "King's Pawn Game": 871, 'Ruy Lopez': 812, 'English Opening': 699, 'Scandinavian Defense': 688, 'Philidor Defense': 650, "Van't Kruijs Opening": 342})


Counter({'1-0': 4882, '0-1': 4695, '1/2-1/2': 472})


['1. e4 e5 2. d3 d6 3. Be3 c6 4. Be2 b5 5. Nd2 a5 6. a4 c5', '1. e4 c5 2. Nf3 Qa5', '1. e4 e5 2. Bc4 Nc6 3. Nf3 Nd4 4. d3 Nxf3+ 5. Qxf3 Nf6 6. h3 Bc5', '1. e4 d5 2. exd5 Qxd5 3. Nc3 Qe5+ 4. Be2 Na6 5. d4 Qf5 6. Bxa6 bxa6', '1. e3 e6 2. d4 d6 3. Bd3 c6 4. Nf3 Be7 5. Nc3 Nf6 6. Bd2 Bd7', '1. e4 e6 2. d4 d5 3. e5 c5 4. c3 Nc6 5. Nf3 Qb6 6. Be3 Qxb2', '1. e4 e6 2. Nf3 d5 3. exd5 exd5 4. Qe2+ Be7 5. Nc3 Nf6 6. d4 O-O', '1. e4 e6 2. Qh5 g6 3. Qe5 Nf6 4. d4 d6 5. Qb5+ Bd7 6. Qxb7 Bc6', '1. e4 e5 2. Nf3 Nc6 3. Bc4 Nf6 4. Ng5 Qe7 5. O-O Nxe4 6. Nxe4 h6', '1. e4 e5 2. Nf3 d6 3. Bc4 Be6 4. d3 Bxc4 5. dxc4 c5 6. O-O h6', '1. d4 d5 2. h3 Nc6 3. Nf3 Nf6 4. Bg5 h6 5. Bxf6 exf6 6. e3 Bb4+', '1

Transforming the data into a dataframe

In [174]:
openings = pd.DataFrame(test_openings)
result = pd.DataFrame(test_win_lose)
moves = pd.DataFrame(test_data)

 
concacted = pd.concat([moves,openings,result],axis= 1)
concacted.columns = ['moves','opening','result']
concacted = concacted.dropna()
concacted

Unnamed: 0,moves,opening,result
0,1. e4 e5 2. d3 d6 3. Be3 c6 4. Be2 b5 5. Nd2 a...,King's Pawn Game,1-0
1,1. e4 c5 2. Nf3 Qa5,Sicilian Defense,1/2-1/2
2,1. e4 e5 2. Bc4 Nc6 3. Nf3 Nd4 4. d3 Nxf3+ 5. ...,Italian Game,0-1
3,1. e4 d5 2. exd5 Qxd5 3. Nc3 Qe5+ 4. Be2 Na6 5...,Scandinavian Defense,1-0
4,1. e3 e6 2. d4 d6 3. Bd3 c6 4. Nf3 Be7 5. Nc3 ...,Van't Kruijs Opening,1-0
...,...,...,...
10044,1. e4 e6 2. Nf3 d5 3. Nc3 Bb4 4. exd5 exd5 5. ...,French Defense,1-0
10045,1. c4 e5 2. d4 exd4 3. Qxd4 Nf6 4. Bg5 Be7,English Opening,1-0
10046,1. e4 e6 2. Nf3 d5 3. Bb5+ Bd7 4. c4 c6 5. Ba4...,French Defense,0-1
10047,1. d4 d5 2. Bf4 Nc6 3. e3 Nf6 4. c3 e6 5. Nf3 ...,Queen's Pawn Game,1-0


In [175]:
temp_data = concacted["moves"]
cols = [i  for i in range(0,len(temp_data)) if temp_data[i].find("eval") != -1]
concacted = concacted.drop(cols,axis=0,)
concacted

Unnamed: 0,moves,opening,result
0,1. e4 e5 2. d3 d6 3. Be3 c6 4. Be2 b5 5. Nd2 a...,King's Pawn Game,1-0
1,1. e4 c5 2. Nf3 Qa5,Sicilian Defense,1/2-1/2
2,1. e4 e5 2. Bc4 Nc6 3. Nf3 Nd4 4. d3 Nxf3+ 5. ...,Italian Game,0-1
3,1. e4 d5 2. exd5 Qxd5 3. Nc3 Qe5+ 4. Be2 Na6 5...,Scandinavian Defense,1-0
4,1. e3 e6 2. d4 d6 3. Bd3 c6 4. Nf3 Be7 5. Nc3 ...,Van't Kruijs Opening,1-0
...,...,...,...
10044,1. e4 e6 2. Nf3 d5 3. Nc3 Bb4 4. exd5 exd5 5. ...,French Defense,1-0
10045,1. c4 e5 2. d4 exd4 3. Qxd4 Nf6 4. Bg5 Be7,English Opening,1-0
10046,1. e4 e6 2. Nf3 d5 3. Bb5+ Bd7 4. c4 c6 5. Ba4...,French Defense,0-1
10047,1. d4 d5 2. Bf4 Nc6 3. e3 Nf6 4. c3 e6 5. Nf3 ...,Queen's Pawn Game,1-0


Transforming each column in numpy array

In [176]:
moves = np.array(concacted["moves"])
openings = np.array(concacted["opening"])
result = np.array(concacted["result"])

Transforming the moves into tensors so that they can be filled in the CNN

In [177]:
from sentence_transformers import SentenceTransformer

model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

embeded_movements = model.encode(moves)

Verifying that the transformation did not delete some data

In [179]:
def openingToIndex(o):
    return all_openings.index(o)

all_openings = list(Counter(openings).keys())

opening_labels = []

for o in openings:
    opening_labels.append(openingToIndex(o))



opening_labels = np.array(opening_labels)
print(len(embeded_movements) == len(opening_labels))

True


To ensure a better accuracy of the model I will use the resample method. It consists of duplicating some data of the classes that are under represented in the data.

In [None]:

model_RandomOverSampler=RandomOverSampler()

embeded_movements,opening_labels =  model_RandomOverSampler.fit_resample(embeded_movements,opening_labels) 

In [181]:
print(Counter(opening_labels))
print(len(embeded_movements))

Counter({0: 2526, 1: 2526, 2: 2526, 3: 2526, 4: 2526, 5: 2526, 6: 2526, 7: 2526, 8: 2526, 9: 2526})
25260


In [None]:
shuffle_index = [i for i in range(len(opening_labels))]

random.shuffle(shuffle_index)

opening_labels = opening_labels[shuffle_index]

# random.shuffle(shuffle_index)
embeded_movements = embeded_movements[shuffle_index]
# win_lose = win_lose[shuffle_index]


test_ratio = 0.2
sep_index_test = int(len(embeded_movements) * (1- test_ratio))

embeded_movements_test = embeded_movements[sep_index_test:]
opening_labels_test = opening_labels[sep_index_test:]

print(Counter(opening_labels_test))

embeded_movements_sep = embeded_movements[:sep_index_test]
opening_labels_sep = opening_labels[:sep_index_test]

print(Counter(opening_labels_sep))



train_ratio = 0.85

sep_index = int(len(embeded_movements_sep) * train_ratio)


embeded_movements_train  = embeded_movements_sep[:sep_index]
opening_labels_train = opening_labels_sep[:sep_index]

embeded_movements_valid  = embeded_movements_sep[sep_index:]
opening_labels_valid = opening_labels_sep[sep_index:]

print(Counter(opening_labels_train))
print(Counter(opening_labels_valid))

Counter({6: 541, 8: 535, 1: 532, 5: 516, 3: 501, 4: 491, 9: 486, 7: 485, 0: 484, 2: 481})
Counter({2: 2045, 0: 2042, 7: 2041, 9: 2040, 4: 2035, 3: 2025, 5: 2010, 1: 1994, 8: 1991, 6: 1985})
Counter({7: 1757, 9: 1745, 4: 1745, 2: 1738, 3: 1711, 0: 1709, 6: 1706, 8: 1699, 5: 1688, 1: 1678})
Counter({0: 333, 5: 322, 1: 316, 3: 314, 2: 307, 9: 295, 8: 292, 4: 290, 7: 284, 6: 279})


In [183]:
embeded_movements_test = torch.from_numpy(embeded_movements_test).float()
# opening_labels_test= torch.from_numpy(np.array(opening_labels_test)).long()
embeded_movements_test = embeded_movements_test.reshape(embeded_movements_test.shape[0],1,embeded_movements_test.shape[1])

embeded_movements_train = torch.from_numpy(embeded_movements_train).float()
opening_labels_train= torch.from_numpy(np.array(opening_labels_train)).long()

embeded_movements_valid = torch.from_numpy(embeded_movements_valid).float()
opening_labels_valid= torch.from_numpy(np.array(opening_labels_valid)).long()

embeded_movements_train = embeded_movements_train.reshape(embeded_movements_train.shape[0],1,embeded_movements_train.shape[1])
embeded_movements_valid = embeded_movements_valid.reshape(embeded_movements_valid.shape[0],1,embeded_movements_valid.shape[1])


batch_num = 200
train_batch_size =len(embeded_movements_train)/batch_num
valid_batch_size = len(embeded_movements_valid)/batch_num
test_batch_size = len(embeded_movements_test)/batch_num

Train_data_batch = {}
Train_label_batch = {}
for i in range (0,batch_num):
  Train_data_batch[i] = embeded_movements_train[math.floor(i*train_batch_size):math.floor((i+1)*train_batch_size)].cuda()
  Train_label_batch[i] = opening_labels_train[math.floor(i*train_batch_size):math.floor((i+1)*train_batch_size)].cuda()
  # print(len(Train_data_batch[i]))
  # print(len(Train_label_batch[i]))
  # print(Counter(Train_label_batch[i]))

Valid_data_batch = {}
Valid_label_batch = {}
for i in range (0,batch_num):
  Valid_data_batch[i] = embeded_movements_valid[math.floor(i*valid_batch_size):math.floor((i+1)*valid_batch_size)].cuda()
  Valid_label_batch[i] = opening_labels_valid[math.floor(i*valid_batch_size):math.floor((i+1)*valid_batch_size)].cuda()


Test_data_batch = {}
for i in range (0,batch_num):
  Test_data_batch[i] = embeded_movements_test[math.floor(i*test_batch_size):math.floor((i+1)*test_batch_size)].cuda()

AssertionError: Torch not compiled with CUDA enabled

In [None]:
print(Counter(opening_labels_test))

print(len(embeded_movements_train))
print(len(opening_labels_train))

print(Valid_data_batch[0].shape)

print(len(all_openings))

## Training

In [None]:
n_label = len(all_openings)

class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()

    self.convlayer1 = nn.Sequential(
      nn.Conv1d(1,128,5,2),
      nn.MaxPool1d(2,2),
      nn.BatchNorm1d(128),
      nn.ReLU()
    )

    self.convlayer2 = nn.Sequential(
      nn.Conv1d(128,256,5,2),
      nn.MaxPool1d(2,2),
      nn.BatchNorm1d(256),
      nn.ReLU()
    )

    self.convlayer3 = nn.Sequential(
      nn.Conv1d(256,64,5,2),
      nn.MaxPool1d(2,2),
      nn.BatchNorm1d(64),
      nn.ReLU()
      
    )

    self.fullyconnected = nn.Sequential(
      nn.Linear(5*64,250),
      nn.BatchNorm1d(250),
      nn.ReLU(),
      nn.Dropout(0.5),

      nn.Linear(250,200),
      nn.BatchNorm1d(200),
      nn.ReLU(),


      nn.Linear(200,n_label),
    )

  def forward(self, x):
    
    x = self.convlayer1(x)
    x = self.convlayer2(x)
    x = self.convlayer3(x)
    x = x.view(-1,5*64)
    x = self.fullyconnected(x)

    return x

In [None]:
def trainNet(model,criterion,optimizer,epochs):
    loss_index = []
    train_losslist = []
    valid_losslist = []

    valid_loss_min = np.Inf # track change in validation loss
    
    for t in range(epochs):
        ######################
        # Training the model #
        ######################
        loss_training = 0
        loss_validating = 0

        model.train()
        
        for i in range (0,batch_num):
            # Clearing the gradients of all optimized variables
            optimizer.zero_grad()
            # Forward pass: Computing predicted outputs
            output = model(Train_data_batch[i])
            # Calculating the batch loss
            loss = criterion(output, Train_label_batch[i])
            # Backward pass: compute gradient of loss with respect to parameters
            loss.backward()
            # Perform a single optimization step (parameter update)
            optimizer.step()
            # Update training loss
            loss_training += loss.item()*(len(Train_data_batch[i]))



        ########################    
        # Validating the model #
        ########################
        model.eval()
        for i in range (0,batch_num):
            output = model(Valid_data_batch[i])
            loss = criterion(output, Valid_label_batch[i])
            # Update validating loss
            loss_validating += loss.item()*(len(Valid_data_batch[i]))
            

        # Calculating average losses
        train_loss = loss_training/len(embeded_movements_train)
        valid_loss = loss_validating/len(embeded_movements_valid)


        loss_index.append(t+1)
        train_losslist.append(train_loss)
        valid_losslist.append(valid_loss)

        
        print("[Epoch {t:5d} of {epochs}] Train loss: {train_loss:1.6f}, Validation Loss: {valid_loss:.6f}".format(
            t=t+1,epochs = epochs,train_loss = train_loss,valid_loss=valid_loss))

        # Saving model if validation loss has decreased
        if valid_loss <= valid_loss_min:
            print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
                valid_loss_min,valid_loss))
            torch.save(model.state_dict(), 'model_cifar.pt')
            valid_loss_min = valid_loss

    return loss_index,train_losslist,valid_losslist


def predict(model,data):
    pred = model(data)
    pred = pred.cpu().detach().numpy()
    pred = np.argmax(pred,axis=1)
    return pred.tolist()

    

In [None]:
model = Net().to("cuda")
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=.0001,betas=(0.9,0.999),weight_decay=0.02)
epochs = 10

print(model)

In [None]:
loss_index,train_losslist,valid_losslist = trainNet(model,criterion,optimizer,epochs)

In [None]:
model.load_state_dict(torch.load('model_cifar.pt'))

In [None]:
Yhat_test = []
# print(clf.predict(X_Valid_batch[0]))
for i in range (0,batch_num):
    # print(clf.predict(X_Valid_batch[0]))
    prediction = predict(model,Test_data_batch[i])
    Yhat_test += prediction

# dasfads = opening_labels_test.values().tolist()
# print(Counter(dasfads))
print(Counter(Yhat_test))
print(Counter(opening_labels_test))


print('Test Accuracy = {score:.5}'.format(score=accuracy_score(opening_labels_test,Yhat_test)))
print('F1 Score = {f1score:.5}'.format(f1score=f1_score(opening_labels_test,Yhat_test, average="macro")))

In [None]:
# Plotting confusion matrix
cm = confusion_matrix(opening_labels_test,Yhat_test,normalize='true')
disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=classes)
fig, ax = plt.subplots(figsize=(20,20))
disp.plot(ax = ax)

In [None]:
plt.plot(loss_index, train_losslist,loss_index, valid_losslist)
plt.xlabel("Index")
plt.ylabel("Loss")
plt.legend(["Training","Validating"])
plt.title("Loss Curve")
plt.show()