In [None]:
def splitDataIntoFiles():
    f = open("sf_d9.plain", "r")
    linesPerFile = 100002
    myDir = "sf_d9_data/"

    linesProcessed = 0
    fileNum = 0
    currentFileName = myDir + "sf_d9_0.plain"
    currentFile = open(currentFileName, "w")
    for line in f:
        currentFile.write(line)
        linesProcessed += 1
        if linesProcessed == linesPerFile:
            print("Finished file " + currentFileName)
            linesProcessed = 0
            fileNum += 1
            currentFileName = myDir + "sf_d9_" + str(fileNum) + ".plain"
            currentFile = open(currentFileName, "w")
            
#splitDataIntoFiles()

In [None]:
import os
fileNames = []
for fileName in os.listdir("sf_d9_data"):
    fileNames.append("sf_d9_data/" + fileName)
numFiles = len(fileNames)
print("Files:", numFiles)

In [None]:
import random
import numpy as np

def getFensAndScores(fileName):
    f = open(fileName, "r")
    lines = f.readlines()
    leng = len(lines)
    i = 0
    fens = []
    scores = []
    while i < leng and i+2 < leng:
        lines[i] = lines[i].strip() # lines[i] is "fen <fen>"
        lines[i+2] = lines[i+2].strip() # lines[i+2] is "score -529"

        fen = lines[i][4:] # lines[i] is "fen <fen>"

        score = int(lines[i+2][6:]) # lines[i+2] is "score -529"
        if score > 1000:
            score = 1000
        if score < -1000:
            score = -1000
        if " b " in fen:
            score = -score # scores in the file are stm, mirror black ones to get white perspective

        fens.append(fen)
        scores.append(score)
        i += random.randint(10, 20) * 6 # jump to next random fen ("fen <fen>" line is every 6 lines)
    return fens, scores

def countPositions():
    total = 0
    for fileName in fileNames:
        fens, scores = getFensAndScores(fileName)
        total += len(fens)
    print("Positions:", total)
    
#countPositions()

In [None]:
import chess

def fen_to_12x8x8(myFen,  flip=True, debug=False):
    board = chess.Board(myFen)
        
    pieces = [chess.PAWN, chess.KNIGHT, chess.BISHOP, chess.ROOK, chess.QUEEN, chess.KING]
    array = np.empty((12,8,8), dtype=int)
    i = 0 
    for color in [chess.WHITE, chess.BLACK]:
        for piece in pieces:
            bitboard = np.asarray(board.pieces(piece, color).tolist()).astype(int)
        
            bitboard = bitboard.reshape(8,8)
            array[i] = bitboard
            i += 1
            
    return array

# rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1
fen_to_12x8x8("7k/7p/6p1/4pp2/4P3/8/3Q1PPP/7K w - - 0 1", debug=True)
fen_to_12x8x8("7k/7p/6p1/4pp2/4P3/8/3Q1PPP/7K b - - 0 1", debug=True)
;

In [None]:
from tensorflow.keras.utils import Sequence

class TrainDataGenerator(Sequence):
    def __init__(self):
        self.offset = int(numFiles*0.2)
        pass
    
    def __len__(self): # num of batches in an epoch
        return numFiles - self.offset
    
    def __getitem__(self, index):
        fens, scores = getFensAndScores(fileNames[self.offset + index])
        bbs = [fen_to_12x8x8(fen) for fen in fens]
        return np.array(bbs), np.array(scores)
    
class ValDataGenerator(Sequence):
    def __init__(self):
        pass
    
    def __len__(self): # num of batches in an epoch
        return int(numFiles*0.2)
    
    def __getitem__(self, index):
        fens, scores = getFensAndScores(fileNames[index])
        bbs = [fen_to_12x8x8(fen) for fen in fens]
        return np.array(bbs), np.array(scores)

In [None]:
import pickle

# mean(abs(tanh(output/400) - tanh(cp/400)))
def custom_loss(y_true, y_pred):
    from tensorflow import reduce_mean, tanh, cast, float32, abs, reduce_sum
    y_true = cast(y_true, dtype=float32)  # Convert y_true to float32
    y_pred = cast(y_pred, dtype=float32)  # Convert y_pred to float32
    tanh_output = tanh(y_pred / 400)
    tanh_cp = tanh(y_true / 400)
    abs_diff = abs(tanh_output - tanh_cp)
    loss = reduce_mean(abs_diff)
    return loss

def train(fileName, lr):
    from tensorflow.keras.layers import Input, Dense, Flatten, Dropout
    from tensorflow.keras.models import Model, Sequential
    from tensorflow.keras.optimizers import Adam
    from keras.callbacks import EarlyStopping
    
    model = Sequential()
    model.add(Flatten(name="flatten", input_shape=(12, 8, 8)))
    model.add(Dense(name="dense32", units=32, activation='relu'))
    model.add(Dense(name="dense1", units=1, activation='linear'))
    model.summary()
    
    model.compile(loss=custom_loss, optimizer=Adam(learning_rate=lr)) # default lr 0.001
    train_generator = TrainDataGenerator()
    validation_generator = ValDataGenerator()
    history = model.fit(train_generator, epochs=30, validation_data=validation_generator,
                       callbacks=[EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True)])
    
    model.save(fileName)
    with open(fileName[:-3] + "_history", 'wb') as hist:
        pickle.dump(history.history, hist) 
    
#train("model_0.01.h5", 0.01)
train("model_0.001.h5", 0.001)
#train("model_0.0001.h5", 0.0001)

In [None]:
from tensorflow.keras.models import load_model
    
model = load_model("model.h5", custom_objects={'custom_loss': custom_loss})
with open("history", "rb") as hist_file:
    history = pickle.load(hist_file)
model.summary()

In [None]:
def plot():
    import matplotlib.pyplot as plt
    global history
    if not isinstance(history, dict):
        history = history.history
    # Plot training and validation loss over epochs
    plt.plot(history['loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
    
plot()

In [None]:
model.evaluate(X_test, y_test)
model.evaluate(X_test, y_test, batch_size=64)

totalLoss = 0
i = 0
for _769 in X_test[:1000]:
    _769 = _769.reshape(1, 769,1)
    pred = model.predict(_769, verbose=0)[0][0]
    totalLoss += abs(pred - y_test[i])
    i += 1
print("cp loss:", totalLoss / 1000.0)