# Train Supervised Model for Tablut
----
__Process Data:__
1. Load game data into pandas df\
1a. Add game index\
1b. Add move index\
1c. Inspect and clean data\
1d. Remove timeout games
2. Build function to convert move notation
3. Build functions to generate input tensors. For each position in game:\
3a. Convert boardstate to bitvector\
3b. Convert move to numeric notation (policy head input)\
3c. Record win/loss for white as 1/-1 (value head input)
4. Build logic to adapt these for rotated/mirrored positions
5. Iterate through df and generate input



__Build Model:__
1. Use the basic model from NN for Chess that I've adapted to fit game requirements
2. Train model

## Imports

In [13]:
import pandas as pd
import numpy as np
import keras
from keras.callbacks import EarlyStopping
from game import generate_output_index, Board
from tqdm.notebook import tqdm
import csv

## Load and Preprocess Data

Load data from csv

In [2]:
file_path = '../../data/tablut_games.csv'
column_names = ['White_Player', 'Black_Player', 'Winner', 'Win_Type', 'Move', 'spil']
df = pd.read_csv(file_path, names=column_names)
df.head(3)

Unnamed: 0,White_Player,Black_Player,Winner,Win_Type,Move,spil
0,Hagbard,Thomas,White,full,c5-c8,49
1,Hagbard,Thomas,White,full,b5-c5,49
2,Hagbard,Thomas,White,full,e7-a7,49


Add a game index

In [3]:
current_game_id = 0
previous_white = df.at[0, 'White_Player']
previous_black = df.at[0, 'Black_Player']
previous_spil = df.at[0, 'spil']

def is_new_game(row):
    return (row['White_Player'] != previous_white) or (row['Black_Player'] != previous_black)\
        or (row['spil'] != previous_spil)

# Iterate over rows and add game ID
game_id_list = []
for index, row in df.iterrows():
    if is_new_game(row):
        current_game_id += 1
        previous_white = row['White_Player']
        previous_black = row['Black_Player']
        previous_spil = row['spil']
    game_id_list.append(current_game_id)

# Add game ID column to the DataFrame
df['Game_ID'] = game_id_list

# Check for correct number of games
df['Game_ID'].max()

4048

Add a move index to record move order for each game

In [4]:
current_game_id = 0
move_number = 0

# Iterate over rows and add move number
move_id_list = []
for index, row in df.iterrows():
    if current_game_id == row['Game_ID']:
        move_id_list.append(move_number)
        move_number += 1
    else:
        move_number = 0
        move_id_list.append(move_number)
        move_number += 1
        current_game_id = row['Game_ID']

df['Move_Number'] = move_id_list

In [5]:
df['Win_Type'].value_counts()

Win_Type
full        65748
resigned    57979
timeout      4866
e6e9           45
g7g4           28
d2d1           27
h3h1           25
f9f8           24
c8a8           21
d5d9           11
Name: count, dtype: int64

We can see there a few games here that have different notation (no '-' between moves). Let's change the win type to full and convert these moves to proper notation now.

In [6]:
def update_win_type(win_type):
    if win_type != 'resigned' and win_type != 'timeout':
        return 'full'
    else:
        return win_type

# Apply the function to update Win_Type column
df['Win_Type'] = df['Win_Type'].apply(update_win_type)
df['Win_Type'].value_counts()

Win_Type
full        65929
resigned    57979
timeout      4866
Name: count, dtype: int64

In [7]:
df['Move'] = df['Move'].str.replace(r'(\D)(\d)(\D)(\d)', r'\1\2-\3\4', regex=True)
len(df[df['Move'].str.len() == 4])

0

Drop timeout wins

In [8]:
df = df[df['Win_Type'] != 'timeout']

Noticed now that there are two separate game modes here, one where white starts and one where black starts. It seems that white going first is now the accepted version as it leads to a more equal game. Let's add another column indicating which rules the game is being played with. I think I'll train on both of them - interesting stuff. There are many many more games based on the 'previous' rules. I'm masking illegal moves anyway, so I don't see a reason that the 'best' starting move trained on the new rules should be too different than the one trained one all games with the illegal ones masked out. After three moves, the positions could theoretically be indistinguishable.

In [9]:
black_starts = ['a4', 'a6', 'b5', 'd1', 'd9', 'e2', 'e8', 'f1', 'f9', 'h5', 'i4', 'i6']
white_starts = ['c5', 'd5', 'e3', 'e4', 'e6', 'e7', 'f5', 'g5']

rules_list = []
variation = 'other'
for index, row in df.iterrows():
    if row['Move_Number'] == 0:
        if row['Move'][:2] in black_starts:
            variation = 'black_starts'
        elif row['Move'][:2] in white_starts:
            variation = 'white_starts'
        else:
            variation = 'other'
        rules_list.append(variation)
    else:
        rules_list.append(variation)

df['First_Player'] = rules_list

In [10]:
df.First_Player.value_counts()

First_Player
black_starts    101035
white_starts     22873
Name: count, dtype: int64

In [11]:
df = df[df['First_Player'] == 'white_starts']

Also noticed that there is a game with a inaccurate record. Dropping it from the dataframe.

In [12]:
df = df[df['Game_ID'] != 95]

Unnamed: 0,White_Player,Black_Player,Winner,Win_Type,Move,spil,Game_ID,Move_Number,First_Player
0,Hagbard,Thomas,White,full,c5-c8,49,0,0,white_starts
1,Hagbard,Thomas,White,full,b5-c5,49,0,1,white_starts
2,Hagbard,Thomas,White,full,e7-a7,49,0,2,white_starts
3,Hagbard,Thomas,White,full,i6-g6,49,0,3,white_starts
4,Hagbard,Thomas,White,full,a7-g7,49,0,4,white_starts


In [14]:
df.to_csv('../../data/tablut_games_clean.csv', index=False)

## Convert Data to Input
Nice! We've done some initial investigation and cleaned up the data. Next step is to write functions that will generate bitvectors for games, and that will transform the moves so that I can leverage some symmetry augmentation to build out my trainable dataset.

In [13]:
notation_translator = {'a': -1, 'b': 8, 'c': 17, 'd': 26, 'e': 35, 'f': 44, 'g': 53, 'h': 62, 'i': 71}
output_index = generate_output_index()

def alg_to_tuple(notation):
    """
    Function to translate algebraic notation to tuple. Returns tuple (start_square, end_square).
    """
    return (notation_translator[notation[0]]+int(notation[1]), notation_translator[notation[3]]+int(notation[4]))

df['Move_Tuple_0'] = df['Move'].apply(alg_to_tuple)

Let's add our augmented moves. We have 8 possible configurations (original; mirrored horizontically, vertically, and diagonallyx2; and rotated 90, 180 and 270 degrees)

In [14]:
def rotate_move(move_tuple):
    """
    Rotate a move tuple (from square, to square) clockwise by 90 degrees.
    """
    from_square = move_tuple[0]
    to_square = move_tuple[1]

    # Compute the row and column indices of the squares
    from_row, from_col = divmod(from_square, 9)
    to_row, to_col = divmod(to_square, 9)

    # Compute the new row and column indices after rotation
    rotated_from_row = from_col
    rotated_from_col = 8 - from_row
    rotated_to_row = to_col
    rotated_to_col = 8 - to_row

    # Compute the new square indices after rotation
    rotated_from_square = rotated_from_row * 9 + rotated_from_col
    rotated_to_square = rotated_to_row * 9 + rotated_to_col

    return (rotated_from_square, rotated_to_square)

In [15]:
def mirror_move_vertically(move_tuple):
    """
    Mirror a move tuple (from square, to square) vertically.
    """
    from_square = move_tuple[0]
    to_square = move_tuple[1]

    # Compute the row and column indices of the squares
    from_row, from_col = divmod(from_square, 9)
    to_row, to_col = divmod(to_square, 9)

    # Compute the new row and column indices after vertical mirroring
    mirrored_from_row = 8 - from_row
    mirrored_to_row = 8 - to_row

    # Compute the new square indices after vertical mirroring
    mirrored_from_square = mirrored_from_row * 9 + from_col
    mirrored_to_square = mirrored_to_row * 9 + to_col

    return (mirrored_from_square, mirrored_to_square)

In [16]:
# mirrored vertically
df['Move_Tuple_1'] = df['Move_Tuple_0'].apply(mirror_move_vertically)
# rotated 90
df['Move_Tuple_2'] = df['Move_Tuple_0'].apply(rotate_move)
# mirrored then rotated
df['Move_Tuple_3'] = df['Move_Tuple_1'].apply(rotate_move)
# rotated then rotated
df['Move_Tuple_4'] = df['Move_Tuple_2'].apply(rotate_move)
# mirrored then rotated then rotated
df['Move_Tuple_5'] = df['Move_Tuple_3'].apply(rotate_move)
# rotated then rotated then rotated
df['Move_Tuple_6'] = df['Move_Tuple_4'].apply(rotate_move)
# rotated then mirrored
df['Move_Tuple_7'] = df['Move_Tuple_2'].apply(mirror_move_vertically)

Add an encoded outcome column so I can map this directly and avoid including this computation while iterating through rows. Draws might interfere with this. Potentially I should remove draws from the training.

In [17]:
result_dict = {'White': 1, 'Black': -1, 'Draw.': 0}

df['Outcome_Encoded'] = df['Winner'].map(result_dict)

In [18]:
board = Board()
board.set_starting_position()
board.generate_moves()
move = df['Move_Tuple_0'].iloc[104]
if move in board.legal_moves:
    print(move)

## Generating Single Dimensional Training Data and Training Model

positions = []
move_probabilities = []
outcomes = []

output_index = generate_output_index()
output_index_length = len(output_index)

board = Board()

for i in tqdm(range(0, 8)):
    for index, row in df.iterrows():
        # reset board if it is a new game
        if row['Move_Number'] == 0:
            board = Board()
            board.set_starting_position()
        # convert board to bitvector, add it to positions list
        positions.append(board.to_network_input())
        #
        board.generate_moves()
        move = row[f'Move_Tuple_{i}']
        if move in board.legal_moves:
            move_prob = [0 for i in range(0, output_index_length)]
            idx = board.get_network_output_index(move)
            move_prob[idx] = 1
            move_probabilities.append(move_prob)
            outcomes.append(row['Outcome_Encoded'])

            board.apply_move(move)
        else:
            print(index, row['Game_ID'])

np.save("misc/positions", np.array(positions))
np.save("misc/moveprobs", np.array(move_probabilities))
np.save("misc/outcomes", np.array(outcomes))

model = keras.models.load_model("../saved_models/random_model_tablut.keras")

input_data = np.load("misc/positions.npy")
policy_outcomes = np.load("misc/moveprobs.npy")
value_outcomes = np.load("misc/outcomes.npy")

model.fit(input_data,[policy_outcomes, value_outcomes], epochs=512,
batch_size=16)
model.save('../saved_models/supervised_model_tablut.keras')


## Generating Multi-Dim Training Data and Training Model
This data is shape (4, 81) (white bitvector, black bv, king bv, turn bv)

In [19]:
positions = []
move_probabilities = []
outcomes = []

output_index = generate_output_index()
output_index_length = len(output_index)

board = Board()

for i in tqdm(range(0, 8)):
    for index, row in df.iterrows():
        # reset board if it is a new game
        if row['Move_Number'] == 0:
            board = Board()
            board.set_starting_position()
        # convert board to bitvector, add it to positions list
        positions.append(board.to_network_input_multidim())
        #
        board.generate_moves()
        move = row[f'Move_Tuple_{i}']
        if move in board.legal_moves:
            move_prob = [0 for i in range(0, output_index_length)]
            idx = board.get_network_output_index(move)
            move_prob[idx] = 1
            move_probabilities.append(move_prob)
            outcomes.append(row['Outcome_Encoded'])

            board.apply_move(move)
        else:
            print(index, row['Game_ID'])

np.save("misc/positions_md", np.array(positions))
np.save("misc/moveprobs_md", np.array(move_probabilities))
np.save("misc/outcomes_md", np.array(outcomes))

  0%|          | 0/8 [00:00<?, ?it/s]

In [20]:
x = np.load("misc/positions_md.npy")
print(f'input_data shape = {x.shape}')
x = np.load("misc/moveprobs_md.npy")
print(f'policy_outcomes shape = {x.shape}')
x = np.load("misc/outcomes_md.npy")
print(f'value_outcomes shape = {x.shape}')

input_data shape = (182704, 4, 81)
policy_outcomes shape = (182704, 1296)
value_outcomes shape = (182704,)


In [28]:
model = keras.models.load_model("../saved_models/random_model_tablut_md.keras")

input_data = np.load("misc/positions_md.npy")
policy_outcomes = np.load("misc/moveprobs_md.npy")
value_outcomes = np.load("misc/outcomes_md.npy")

early_stopping = EarlyStopping(monitor='loss', mode='min', patience=5, restore_best_weights=True)

model.fit(input_data,[policy_outcomes, value_outcomes], epochs=512,
batch_size=16, callbacks=[early_stopping])
model.save('../saved_models/supervised_model_tablut_md.keras')

Epoch 1/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 4ms/step - loss: 7.3481
Epoch 2/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m45s[0m 4ms/step - loss: 6.5334
Epoch 3/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m45s[0m 4ms/step - loss: 6.2919
Epoch 4/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 4ms/step - loss: 6.0286
Epoch 5/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 4ms/step - loss: 5.7869
Epoch 6/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m43s[0m 4ms/step - loss: 5.5936
Epoch 7/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 4ms/step - loss: 5.4233
Epoch 8/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m44s[0m 4ms/step - loss: 5.2846
Epoch 9/512
[1m11419/11419[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 4ms/step - loss: 5.1723
Epoch 10/512
[1m11419/11419[0m [32