In [None]:
import numpy as np
from tqdm import tqdm
from sklearn.metrics import accuracy_score, roc_curve, auc
import matplotlib.pyplot as plt
from splearn.spectral import Spectral
from splearn.datasets.data_sample import SplearnArray
from splearn.datasets.base import load_data_sample

In [None]:
def load_and_prep_data(idx, train_var, val_var):
    """
    Load and prepare training and validation data.
    
    Args:
        idx (int): Index parameter for data file
        train_var (int): Training variation parameter
        val_var (int): Validation variation parameter
        
    Returns:
        tuple: train_data_sp, val_data_sp, num_states
    """
    data = np.load(f'data/spot_{idx}_{train_var}_{val_var}.npz')
    train_state_vectors = data['train_state_vectors']
    train_ap_vectors = data['train_ap_vectors']
    train_acceptance_vectors = data['train_acceptance_vectors']
    val_state_vectors = data['val_state_vectors']
    val_ap_vectors = data['val_ap_vectors']
    val_acceptance_vectors = data['val_acceptance_vectors']

    _, seq_len, ap_dim = train_ap_vectors.shape

    powers_of_2 = 2 ** np.arange(ap_dim)

    train_data = np.ones((train_acceptance_vectors[:, 1:].sum(), seq_len+1)) * -1
    data_idx = 0
    for ap, acceptance in zip(train_ap_vectors, train_acceptance_vectors):
        ap_binary_coded = np.dot(ap, powers_of_2)
        for step in np.where(acceptance)[0]:
            train_data[data_idx, :step] = ap_binary_coded[:step]
            data_idx += 1
    train_data_sp = SplearnArray(train_data, nbL=int(2**ap_dim), nbEx=train_data.shape[0])

    val_data = np.ones((val_acceptance_vectors[:, 1:].shape[0] * val_acceptance_vectors[:, 1:].shape[1], seq_len)) * -1
    val_y = np.zeros(val_acceptance_vectors[:, 1:].shape[0] * val_acceptance_vectors[:, 1:].shape[1])
    data_idx = 0
    for ap, acceptance in zip(val_ap_vectors, val_acceptance_vectors):
        ap_binary_coded = np.dot(ap, powers_of_2)
        for step in range(1,len(acceptance)):
            val_data[data_idx, :step] = ap_binary_coded[:step]
            val_y[data_idx] = acceptance[step]
            data_idx += 1
    val_data_sp = SplearnArray(val_data, nbL=int(2**ap_dim), nbEx=val_data.shape[0])
    
    # train_filename = f'SP_{idx}_{train_var}_{val_var}_train.txt'
    # with open(train_filename, 'w') as f:
    #     data_idx = 0
    #     f.write(f'{train_acceptance_vectors[:, 1:].sum()} {2**ap_dim}\n')
    #     for ap, acceptance in tqdm(zip(train_ap_vectors, train_acceptance_vectors)):
    #         ap_binary_coded = np.dot(ap, powers_of_2)
    #         for step in np.where(acceptance)[0]:
    #             ap_str = ' '.join(map(str, ap_binary_coded[:step]))
    #             f.write(f'{step}_{ap_str}\n')
    # train = load_data_sample(train_filename)

    # val_filename = f'SP_{idx}_{train_var}_{val_var}_val.txt'
    # with open(val_filename, 'w') as f:
    #     data_idx = 0
    #     f.write(f'{val_acceptance_vectors[:, 1:].shape[0] * val_acceptance_vectors[:, 1:].shape[1]} {2**ap_dim}\n')
    #     for ap, acceptance in tqdm(zip(val_ap_vectors, val_acceptance_vectors)):
    #         ap_binary_coded = np.dot(ap, powers_of_2)
    #         for step in range(1,len(acceptance)):
    #             ap_str = ' '.join(map(str, ap_binary_coded[:step]))
    #             f.write(f'{step}_{ap_str}\n')
    # val = load_data_sample(val_filename)
    return train_data_sp, val_data_sp, val_y, train_state_vectors.shape[-1]

In [None]:
def evaluate_binary_classifier(y_true, y_pred_prob):
    """
    Find optimal threshold for best accuracy and calculate AUC.
    
    Args:
        y_true: Ground truth labels (0 or 1)
        y_pred_prob: Predicted probabilities or scores
        
    Returns:
        tuple: (best_threshold, best_accuracy, roc_auc)
    """
    # Calculate ROC curve
    fpr, tpr, thresholds = roc_curve(y_true, y_pred_prob)
    roc_auc = auc(fpr, tpr)
    
    # Find threshold that maximizes accuracy
    best_accuracy = 0
    best_threshold = 0
    
    # Loop through potential thresholds to find best accuracy
    for threshold in thresholds:
        y_pred = (y_pred_prob >= threshold).astype(int)
        accuracy = accuracy_score(y_true, y_pred)
        
        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_threshold = threshold
    
    return best_threshold, best_accuracy, roc_auc

In [None]:
def test(idx, train_var, val_var, HIDDEN_FACTOR):
    train_data_sp, val_data_sp, val_y, num_states = load_and_prep_data(idx, train_var, val_var)
    # print("*", val_y.sum()/len(val_y))
    est = Spectral()
    est.set_params(partial=True, rank = int(num_states*HIDDEN_FACTOR))
    # est.get_params()
    est.fit(train_data_sp)
    val_prediction = est.predict(val_data_sp)
    _, accuracy, _ = evaluate_binary_classifier(val_y, val_prediction)
    return accuracy

In [None]:
train_vars = [0, 0, 0, 0.1, 0.2]
val_vars = [0, 0.1, 0.2, 0, 0]
# train_vars = [0]
# val_vars = [0]
indices = [0, 1, 2]
HIDDEN_FACTOR = 12

a = []
for hf in [0.75]:
    re = []
    for idx in [0, 1, 2]:
        re.append(test(idx, 0, 0, hf))
    a.append(re)
a = np.array(a)

In [None]:
np.round(np.mean(a,axis=1), 3)

In [None]:
test(3, 0, 0, 1)