## Adapting MCTS parameters to emulate play style of specific human chess players

by Vico Wietstock


This Notebook was created as part of a bachelor thesis with the goal to train a set of tree search hyperparameters of Leela Chess Zero to better match the different style of human chess players.

In [None]:
import os 
import csv
import numpy as np
import matplotlib.pyplot as plt
import timeit
import chess
import chess.pgn
from sklearn.model_selection import train_test_split

In [None]:
def create_dataset(pgn, player):
    '''
    Create dataset with board position, move.
    '''
    # Create list of tuples, with board position and corresponding move of the player
    dataset = []

    # Parse through games
    for i in pgn: 
        game = chess.pgn.read_game(pgn)
        board = game.board()
        player_color = "w"
        if(game.headers["Black"]==player):
            player_color = "b"
        # Extract board positions and corresponding moves for player of interest 
        for move in game.mainline_moves():
            if(player_color in board.fen().partition(" ")[2]): 
                dataset.append([board.fen(), str(move)])
            board.push(move)
    
    return np.array(dataset)
    

In [None]:
def start_engine(path):
    '''
    Instantiate engine through UCI protocol.
    '''
    command_line_flags = [path, "-v", "--smart-pruning-factor=0.0"]
    
    # Synchronous wrapper SimpleEngine that automatically spawns an event loop in the background
    engine = chess.engine.SimpleEngine.popen_uci(command_line_flags)
    
    return engine


In [None]:
def analyze_position(board, parameter_values):  
    '''
    Retrieve analysis output of engine.
    '''
    parameter_names = ["CPuct", "FpuValue", "PolicyTemperature", "DrawScoreSideToMove", "DrawScoreOpponent", "DrawScoreWhite",
                    "DrawScoreBlack"]
    config = dict(zip(parameter_names, parameter_values[:-2]))
    
    engine_moves = []
    engine_moves_n = []
    q_values = []
    
    # Given current board position and config, retrieve engine analysis output through uci protocol
    with engine.analysis(board, chess.engine.Limit(nodes=1000), options=config) as analysis:
        rows = []
        for info in analysis:
            if info.get("string")!= None:
                rows.append(info.get("string"))
    
    # Retrieve all possible moves, node visits and q values
    for row in rows[:-1]:
        move = row.partition(" ")[0]
        move_n = int(row.partition(" (+")[0].split(" ")[-1])
        q = float(row.partition(") (U")[0].split(" ")[-1])
        engine_moves.append(move)
        engine_moves_n.append(move_n)
        q_values.append(q)
    
    # Run dummy analysis with new board position, so that parameter configuration works?
    fen = "rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1"            
    dummy = engine.analysis(chess.Board(fen), chess.engine.Limit(nodes=1))
    dummy.stop()

    return engine_moves, engine_moves_n, q_values


In [None]:
def get_one_hot(engine_moves, player_move):
    '''
    Create one-hot vector for move that player actually made. 
    '''
    one_hot = [0]* len(engine_moves)
    # Set value to 1 at index of move that player actually made
    if player_move in engine_moves:
        one_hot[engine_moves.index(player_move)] = 1
        
    return one_hot


In [None]:
def calculate_loss(one_hot, engine_move_probs):
    '''
    Calculate Categorical Cross Entropy. 
    '''
    cross_entropy = -(np.nan_to_num(one_hot*np.log2(engine_move_probs))).sum()
    
    return cross_entropy


In [None]:
def update_parameters(parameter_values, gradients, learning_rate):
    '''
    Update parameter values according to gradients and learning rate.
    '''    
    parameter_values[:3] -= learning_rate*gradients[:3]
    parameter_values[7:] -= learning_rate*gradients[7:]
    for i in [3,4,5,6]:
        if gradients[i] < 0:
            parameter_values[i] += 1
        elif gradients[i] > 0:
            parameter_values[i] -= 1
    
    return parameter_values


In [None]:
def validate_parameters(parameter_values):
    '''
    Validate parameters according to value boundaries of engine input.
    '''
    for i in [3,4,5,6]:
        if parameter_values[i]<-50:
            parameter_values[i]=-50
        elif parameter_values[i]>50:
            parameter_values[i]=50
    for i in [0,8]:
        if parameter_values[i]<0:
            parameter_values[i]=0
        elif parameter_values[i]>100:
            parameter_values[i]=100
    if parameter_values[1]<-100:
        parameter_values[1]=-100
    elif parameter_values[1]>100:
        parameter_values[1]=100
    if parameter_values[2]<0.1:
        parameter_values[2]=0.1
    elif parameter_values[2]>10:
        parameter_values[2]=10
    if parameter_values[7]<0.0001:
        parameter_values[7]=0.0001
    elif parameter_values[7]>100:
        parameter_values[7]=100
        
    return parameter_values


In [None]:
def get_selection_probs(parameter_values, engine_moves_n, q_values):
    '''
    Calculate selection probabilities based on move visit count and temperature.
    '''    
    temperature = parameter_values[7]
    temp_value_cutoff = parameter_values[8]
    selection_probs = []
    summm = 0
    
    max_n = max(engine_moves_n)
    max_eval = q_values[engine_moves_n.index(max_n)]        
    min_eval = max_eval - temp_value_cutoff / 50
    
    for n, q in zip(engine_moves_n, q_values):
        # Dont consider moves with win probability below cutoff for selection
        if q < min_eval:
            selection_probs.append(0)
        else:
            prob = (n / max_n) ** (1 / temperature)
            selection_probs.append(prob)
            summm += prob
    
    return [prob/summm for prob in selection_probs]


In [None]:
def get_numerical_gradients(parameter_values, board_position, player_move, loss):
    '''
    Calculate numerical gradients by analyzing position a second time, with perturbated parameter values.
    '''
    # Perturbations for each parameter are 1% / 0.1% of the value range
    perturbations = [1, 2, 0.099, 1, 1, 1, 1, 1, 1]

    gradients = [0]*len(parameter_values)
    
    # Execute search twice, with a small perturbation to each parameter
    for parameter, i in zip(parameter_values, range(len(parameter_values))):
        perturbation = perturbations[i]
        parameter_values[i] = parameter + perturbation
        parameter_values = validate_parameters(parameter_values)
        engine_moves, engine_moves_n, q_values = analyze_position(chess.Board(board_position), parameter_values)
        engine_move_probs = get_selection_probs(parameter_values, engine_moves_n, q_values)
        one_hot = get_one_hot(engine_moves, player_move)
        loss_perturbated = calculate_loss(one_hot, engine_move_probs)
        
        # Reset perturbation for current parameter
        parameter_values[i] = parameter
        
        # Cut off loss to reasonable amount, in case that engine doesnt consider the actual move
        if loss_perturbated > 5:
            loss_perturbated = 5      
    
        # Calculate numerical gradients
        gradients[i] = ((loss_perturbated-loss)/abs(perturbation))
                        
    return gradients


## MAIN

In [None]:
# TODO: set player of interest here, according to the name in the dataset
pgn = open("data/Kasparov_new.pgn")
player = "Kasparov, Gary"

dataset_k = create_dataset(pgn, player)[:10000]

In [None]:
# TODO: set player of interest here, according to the name in the dataset
pgn = open("data/Petrosian_new.pgn")
player = "Petrosian, Tigran V"

dataset_p = create_dataset(pgn, player)[:10000]

In [None]:
# TODO: set dataset to be trained and tested on
data_train, data_test = train_test_split(dataset_k, train_size=0.7, random_state=44)

In [None]:
start = timeit.default_timer()

# TODO: set path to lc0 working directory
path = os.path.join()

parameter_log = []
loss_log = []
prediction_acc = []
convergence = False
count = 0

learning_rate = 0.001

parameter_names = ["CPuct", "FpuValue", "PolicyTemperature", "DrawScoreSideToMove", "DrawScoreOpponent", "DrawScoreWhite",
                    "DrawScoreBlack"]
parameter_values = [1.75, 0.33, 1.36, 0, 0, 0, 0, 1, 60]
parameter_log.append(parameter_values)

engine = start_engine(path)


# Iterative stochastic gradient descent, until convergence of gradients
while convergence==False:
    
    gradients = np.zeros(len(parameter_values))
    
    for board_position, player_move in data_train:
        # Analyze board position with current set of parameters
        engine_moves, engine_moves_n, q_values = analyze_position(chess.Board(board_position), parameter_values)
        engine_move_probs = get_selection_probs(parameter_values, engine_moves_n, q_values)
        one_hot = get_one_hot(engine_moves, player_move)        
        loss = calculate_loss(one_hot, engine_move_probs)
        
        # If engine accurately predicts move most likely to be made by human
        if one_hot[-1]==1:
            prediction_acc.append(1)
        else:
            prediction_acc.append(0)
        
        # Cut off loss to reasonable amount, in case that engine doesnt consider the actual move
        if loss > 5:
            loss = 5
            
        loss_log.append(loss) 

        # Update parameters following gradient descent rules
        gradients += get_numerical_gradients(parameter_values, board_position, player_move, loss)
        parameter_values = update_parameters(parameter_values, gradients, learning_rate)
        parameter_values = validate_parameters(parameter_values)
        parameter_log.append(list(parameter_values)) 
        
        print(parameter_values)
       
        count+=1
        print("COUNT: ", count)
        if count==35000:
            convergence=True
            break
                              
        # Reset gradients after updating parameters
        gradients = np.zeros(len(parameter_values))        
    
engine.quit()    

stop = timeit.default_timer()

print('Time: ', stop - start) 

In [None]:
print(len(loss_log))
print("Original parameters: ", [1.75, 0.33, 1.36, 0, 0, 0, 0, 1, 60])
print("Updated parameters: ", parameter_values)

np.savetxt("./plots/N5x7000_Petrosian_2/parameter_log.csv", 
           parameter_log,
           delimiter =", ", 
           fmt ='% s')
np.savetxt("./plots/N5x7000_Petrosian_2/loss_log.csv", 
           loss_log,
           delimiter =", ", 
           fmt ='% s')
np.savetxt("./plots/N5x7000_Petrosian_2/prediction_accuracy_log.csv", 
           prediction_acc,
           delimiter =", ", 
           fmt ='% s')

In [None]:
parameter_log = np.genfromtxt("./plots/N5x7000_Kasparov/parameter_log.csv", delimiter=',', skip_header=True)
parameter_values = []
for i in range(9):
    parameter_values.append(np.mean(parameter_log[:,i][30000:35000]))
    
np.savetxt("./plots/N5x7000_Kasparov/parameter_values_final.csv", 
           parameter_values,
           delimiter =", ", 
           fmt ='% s')

for i in [3,4,5,6]:
    parameter_values[i] = round(parameter_values[i])
    
print(parameter_values)

In [None]:
loss_log = np.genfromtxt("./plots/N5x7000_Petrosian_2/loss_log.csv", delimiter=',', skip_header=True)

prediction_acc = np.genfromtxt("./plots/N5x7000_Petrosian_2/prediction_accuracy_log.csv", delimiter=',', skip_header=True)

### Plotting

In [None]:
parameter_names = ["CPuct", "FpuValue", "PolicyTemperature", "DrawScoreSideToMove", "DrawScoreOpponent", "DrawScoreWhite",
                    "DrawScoreBlack", "Temperature", "TempValueCutoff"]

parameter_log = np.array(parameter_log[:35000])
for name, parameter in zip(parameter_names, range(len(parameter_values))):
    plt.plot(parameter_log[1:,parameter])
    plt.ylabel(name)
    plt.xlabel("steps")
    plt.savefig("./plots/N5x7000_Kasparov/" + name + ".png")
    plt.show()

In [None]:
losses = []
steps = []

for i in range(len(loss_log))[::100]:
    losses.append(np.mean(loss_log[i:i+100]))
    steps.append(i+100)
    
losses_2 = []
steps_2 = []

for i in range(len(loss_log))[::1000]:
    losses_2.append(np.mean(loss_log[i:i+1000]))
    steps_2.append(i+500)
    
l = np.mean(loss_log) 
print("mean loss: ", l)
    
#plt.plot(steps[1:], losses[1:], label = "100 steps")
plt.plot(steps_2, losses_2, label = "1000 steps")
#plt.plot([0, len(loss_log)], [l, l], label = "overall")
plt.ylabel("mean loss")
plt.xlabel("steps")
#plt.legend()
plt.savefig("./plots/N5x7000_Petrosian_2/loss.png")
plt.show()

In [None]:
accs = []
steps = []

for i in range(len(loss_log))[::100]:
    accs.append(sum(prediction_acc[i:i+100])/100)
    steps.append(i+100)
    
accs_2 = []
steps_2 = []

for i in range(len(loss_log))[::1000]:
    accs_2.append(sum(prediction_acc[i:i+1000])/1000)
    steps_2.append(i+500)
    
acc = (sum(prediction_acc)/len(prediction_acc))
print("mean prediction acc: ", acc)
    
#plt.plot(steps, accs, label = "100 steps")
plt.plot(steps_2, accs_2, label = "1000 steps")
#plt.plot([0,len(loss_log)], [acc, acc], label = "overall")
plt.ylabel("mean accuracy")
plt.xlabel("steps")
#plt.legend()
plt.savefig("./plots/N5x7000_Petrosian_2/prediction_accuracy.png")
plt.show()

## Testing

In [None]:
loss_test_log = []
prediction_acc_test = []
engine = start_engine(path)

# Test parameters with testing data
for board_position, player_move in data_test:
    # Analyze board position
    engine_moves, engine_moves_n, q_values = analyze_position(chess.Board(board_position), parameter_values)
    engine_move_probs = get_selection_probs(parameter_values, engine_moves_n, q_values)
    one_hot = get_one_hot(engine_moves, player_move)        
    loss = calculate_loss(one_hot, engine_move_probs)
    
    # If engine accurately predicts move most likely to be made by human
    if one_hot[-1]==1:
        prediction_acc_test.append(1)
    else:
        prediction_acc_test.append(0)
    
    # Cut off loss to reasonable amount, in case that engine doesnt consider the actual move
    if loss > 5:
        loss = 5
    # OR : Ignore board position when engine doesnt consider actual move
    # if loss > 10:
        # continue
    loss_test_log.append(loss)
    
    print(len(loss_test_log))
    
engine.quit()   

In [None]:
print("mean test loss: ", np.mean(loss_test_log))
print("mean test prediction acc: ", sum(prediction_acc_test)/len(prediction_acc_test))