In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import numpy as np
from minichess.concepts.concepts import in_check, random, threat_opp_queen, material_advantage, has_mate_threat, opponent_has_mate_threat, has_contested_open_file

np.seterr(over="ignore", invalid="raise")

# Your agent's name here
model_name = "test_agent"
# The epochs you want to sample from
agents_to_sample = [0, 3, 5, 10, 20, 30, 40, 50, 70, 90, 100]
# The name of the board-variant here
full_name = "5x4silverman"
dims = (5, 4)

# This can be replaced by some other concept function
CONCEPT_FUNC = threat_opp_queen

concept_name = has_contested_open_file.__name__

In [None]:
import tensorflow as tf
from minichess.agents.lite_model import LiteModel
from minichess.agents.predictor_convnet import PredictorConvNet

def load_model(full_name, model_name, epoch):
    keras_model = tf.keras.models.load_model("minichess/agents/checkpoints/{}/{}/{}".format(full_name, model_name, epoch))
    simple_model = PredictorConvNet(LiteModel.from_keras_model(keras_model))
    del keras_model
    return simple_model

agents = [load_model(full_name, model_name, epoch) for epoch in agents_to_sample]

In [None]:
from minichess.agents.lite_model import LiteModel
from minichess.agents.predictor_convnet import PredictorConvNet
from minichess.chess.fastchess import Chess
from minichess.chess.fastchess_utils import piece_matrix_to_legal_moves, visualize_board
from minichess.chess.move_utils import calculate_all_moves, index_to_move, move_to_index
from minichess.rl.chess_helpers import get_initial_chess_object
import numpy as np

import tensorflow as tf
def play_match(agents, full_name, dims, move_cap, all_moves, all_moves_inv, concept_function):
    chess = get_initial_chess_object(full_name)
    to_start = 1 if np.random.random() > 0.5 else 0
    current = to_start
    positive_cases = []
    negative_cases = []
    SAMPLING_RATIO = 0.2

    while chess.game_result() is None:
        if np.random.random() < SAMPLING_RATIO:
            if concept_function(chess):
                positive_cases.append(chess.agent_board_state())
            else:
                negative_cases.append(chess.agent_board_state())


        agent_to_play = agents[current]
        dist, value = agent_to_play.predict(chess.agent_board_state())

        moves, proms = chess.legal_moves()
        legal_moves = piece_matrix_to_legal_moves(moves, proms)
        legal_moves_mask = np.zeros((dims[0], dims[1], all_moves_inv.shape[0]))
        for move in legal_moves:
            (i, j), (dx, dy), promotion = move
            ind = move_to_index(all_moves, dx, dy, promotion, chess.turn)
            legal_moves_mask[i, j, ind] = 1

        move_dims = dist.shape

        dist = (dist + 0.5 * np.random.uniform(size=dist.shape)) * legal_moves_mask.flatten()

        dist /= dist.sum()
        move_to_play = np.argmax(dist)

        # move_to_play = np.random.choice(np.arange(dist.shape[0]), p=dist)
        i, j, ind = np.unravel_index(move_to_play, (dims[0], dims[1], move_cap))
        dx, dy, promotion = index_to_move(all_moves_inv, ind, chess.turn)
        chess.make_move(i, j, dx, dy, promotion)
        current = (current + 1) % 2
    return positive_cases, negative_cases

In [None]:
all_moves, all_moves_inv = calculate_all_moves(dims)
move_cap = all_moves_inv.shape[0]

In [None]:
from tqdm import tqdm
positive_cases = []
negative_cases = []

CASES_TO_COLLECT = 25000
pbar = tqdm(total=CASES_TO_COLLECT)
while len(positive_cases) < CASES_TO_COLLECT:
    pos, neg = play_match([agents[0], agents[2]], full_name, dims, move_cap, all_moves, all_moves_inv, CONCEPT_FUNC)
    positive_cases.extend(pos)
    negative_cases.extend(neg)
    pbar.update(len(pos))

positive_cases = positive_cases[:CASES_TO_COLLECT]
negative_cases = negative_cases[:CASES_TO_COLLECT]

In [None]:
positive_cases = np.array(positive_cases)
negative_cases = np.array(negative_cases)

In [None]:
positive_cases.shape[0]

In [None]:
negative_cases.shape[0]

In [None]:
epochs_to_look_at = [1, 10, 20, 30, 40, 100, 150, 200]

In [None]:
from minichess.agents.convnet import ConvNet

for epoch_to_look_at in epochs_to_look_at:
    import tensorflow.keras as keras
    predictor_model = ConvNet(None, None, init=False)
    predictor_model.model = keras.models.load_model("minichess/agents/checkpoints/{}/{}/{}".format(full_name, model_name, epoch_to_look_at))
    all_cases = np.concatenate([positive_cases, negative_cases])
    all_labels = [1] * positive_cases.shape[0] + [0] * negative_cases.shape[0]
    all_labels = np.array(all_labels)
    shuffled_indices = np.arange(all_labels.shape[0])
    np.random.shuffle(shuffled_indices)
    all_cases = all_cases[shuffled_indices]
    all_labels = all_labels[shuffled_indices]
    POSITIONS_TO_CONSIDER = 40000
    VALIDATION_POSITIONS = 10000
    from minichess.concepts.linear_regression import perform_linear_regression, perform_logistic_regression, perform_regression

    concept_presences = {}


    print("Getting outputs...")
    outputs = predictor_model.get_all_resblock_outputs(all_cases)
    # actual_outputs = predictor_model.predict(boards, id_vector_to_use)
    # Outputs blir returnert i batcher, må flette det sammen
    print("Merging outputs...")
    merged_outputs = []
    for output_batch in outputs:
        for i, output_layer in enumerate(output_batch):
            if len(merged_outputs) <= i:
                merged_outputs.append([])
            merged_outputs[i].extend(output_layer)

    for i, layer_output in enumerate(merged_outputs):
        merged_outputs[i] = np.array(merged_outputs[i])
    outputs = merged_outputs
    print("Outputs merged.")
    # Aktiveringer fra res-block i
    concept_presence_per_layer = []
    for (i, output) in enumerate(outputs):
        points = output.reshape((output.shape[0], np.prod(output.shape[1:])))
        print(points.shape)
        # Så man har (n, k) sampler der n er antallet posisjoner, og k er det totale antallet aktiveringsverdier i lag i.
        print("Performing regression for layer {}".format(i))
        # points = np.concatenate([points, actual_outputs], axis=1)
        score = perform_regression(
            points[:POSITIONS_TO_CONSIDER], 
            all_labels[:POSITIONS_TO_CONSIDER], 
            points[POSITIONS_TO_CONSIDER:], 
            all_labels[POSITIONS_TO_CONSIDER:], 
            True
        )
        concept_presence_per_layer.append(score)

        print("The presence of {} in resblock {} is {}".format(concept_name, i, score))
    concept_presences[concept_name] = concept_presence_per_layer
    import os
    import string
    from random import choices
    import json



    os.makedirs("concept_presences", exist_ok=True)
    os.makedirs("concept_presences/{}".format(full_name), exist_ok=True)
    os.makedirs("concept_presences/{}/{}".format(full_name, model_name), exist_ok=True)
    os.makedirs("concept_presences/{}/{}/{}".format(full_name, model_name, concept_name), exist_ok=True)
    os.makedirs("concept_presences/{}/{}/{}/{}".format(full_name, model_name, concept_name, epoch_to_look_at), exist_ok=True)

    random_suffix = ''.join(choices(string.ascii_uppercase + string.digits, k=10))

    with open("concept_presences/{}/{}/{}/{}/{}.json".format(full_name, model_name, concept_name, epoch_to_look_at, random_suffix), "w") as f:
        json.dump(concept_presences[concept_name], f)
