In [81]:
baseDir = "/Users/apple/Documents/MATLAB/EE 675/Willett Data/tuning-tasks-all/"
import scipy.io

fiftyWordDat = scipy.io.loadmat(baseDir+'tuningTasks/t12.2022.05.03_fiftyWordSet.mat')

In [82]:
import numpy as np

fiftyWordDat['feat'] = np.concatenate([fiftyWordDat['tx2'][:,:32].astype(np.float32), fiftyWordDat['tx2'][:,96:128].astype(np.float32), fiftyWordDat['spikePow'][:,:32].astype(np.float32), fiftyWordDat['spikePow'][:,96:128].astype(np.float32)], axis=1)
fiftyWordDat['feat'] = np.sqrt(fiftyWordDat['feat'])

In [83]:
# collect the bins for yes and no trials

yesIdx = np.where(fiftyWordDat['cueList'] == 'yes')[1][0]
noIdx = np.where(fiftyWordDat['cueList'] == 'no')[1][0]

noTrials = np.where(fiftyWordDat['trialCues'] == noIdx)[0]
yesTrials = np.where(fiftyWordDat['trialCues'] == yesIdx)[0]

noTrialEpochs = [fiftyWordDat['goTrialEpochs'][trialNum] for trialNum in noTrials]
yesTrialEpochs = [fiftyWordDat['goTrialEpochs'][trialNum] for trialNum in yesTrials]

noTrialBins = [fiftyWordDat['feat'][epoch[1] - 50:epoch[1]] for epoch in noTrialEpochs] # the last 50 bins were found to be the most informative
yesTrialBins = [fiftyWordDat['feat'][epoch[1] - 50:epoch[1]] for epoch in yesTrialEpochs] # (20, 50, 256) 

In [84]:
# split into training and testing sets

nTrain = 16

noTrain, noTest = noTrialBins[:nTrain], noTrialBins[nTrain:]
yesTrain, yesTest = yesTrialBins[:nTrain], yesTrialBins[nTrain:]

# stack the arrays
yesTrain, noTrain = np.concatenate(yesTrain, axis=0), np.concatenate(noTrain, axis=0) # (16*50, 256)

In [85]:
# apply factor analysis (FA) -- this is to initialize the C and R matrices of the Kalman filter for EM
from sklearn.decomposition import FactorAnalysis

def get_initial_params(data_stacked, n_states):
    print("Running Factor Analysis for initialization")
    fa = FactorAnalysis(n_components=n_states, random_state=42)
    fa.fit(data_stacked)
    
    # Extract matrices
    # Sklearn components are (n_states, n_features), we need (n_features, n_states)
    C_init = fa.components_.T 
    
    # FA gives independent noise variance for each electrode
    R_init = np.diag(fa.noise_variance_)
    
    return C_init, R_init

In [88]:
# kalman filter function, tunes using EM

from pykalman import KalmanFilter

def train_kalman_model(data_stacked, n_states, n_em_iters=10):
    # factor analysis
    C_init, R_init = get_initial_params(data_stacked, n_states)
    
    # Setup the Kalman Filter
    # We initialize A (transition) as Identity + small noise (Random walk assumption to start)
    # We fix observation matrices to the FA result initially, but let EM refine them
    kf = KalmanFilter(
        n_dim_state=n_states,
        n_dim_obs=128,
        transition_matrices=np.eye(n_states), 
        observation_matrices=C_init,
        observation_covariance=R_init,
        em_vars=['transition_matrices', 'observation_matrices', 
                 'transition_covariance', 'observation_covariance', 
                 'initial_state_mean', 'initial_state_covariance']
    )
    
    # 3. Run EM
    print(f"Running EM for {n_em_iters} iterations...")
    kf = kf.em(data_stacked, n_iter=n_em_iters)
    
    return kf

In [None]:
stateDim = 12

print("yes model")
model_yes = train_kalman_model(yesTrain, stateDim)

yes model
Running Factor Analysis for initialization
Running EM for 10 iterations...


In [90]:
print("no model")
model_no = train_kalman_model(noTrain, stateDim)

no model
Running Factor Analysis for initialization
Running EM for 10 iterations...


In [91]:
# calculate log-likelihoods per KF per word
def calc_log_likelihood(kf_model, trial_data):
    # pykalman has a .score() method that computes log-likelihood!
    # It expects (n_timesteps, n_features)
    return kf_model.loglikelihood(trial_data)

In [92]:
def evaluate_accuracy(test_trials, true_label, model_yes, model_no):
    correct = 0
    total = len(test_trials)
    
    print(f"\nEvaluating True Label: '{true_label}'")
    for i, trial in enumerate(test_trials):
        # Score against Yes Model
        ll_yes = calc_log_likelihood(model_yes, trial)
        
        # Score against No Model
        ll_no = calc_log_likelihood(model_no, trial)
        
        # Decision
        prediction = "Yes" if ll_yes > ll_no else "No"
        
        is_correct = (prediction == true_label)
        if is_correct: correct += 1
        
        print(f"  Trial {i+1}: LL(Yes)={ll_yes:.1f}, LL(No)={ll_no:.1f} -> Pred: {prediction} [{'Correct' if is_correct else 'WRONG'}]")
        
    return correct / total

In [94]:
# Run Evaluation
acc_yes = evaluate_accuracy(yesTest, "Yes", model_yes, model_no)
acc_no = evaluate_accuracy(noTest, "No", model_yes, model_no)

print(f"\nChance Level: 50%")
print(f"\nAverage Accuracy: {(acc_yes + acc_no)/2 * 100:.1f}%")


Evaluating True Label: 'Yes'
  Trial 1: LL(Yes)=-14918.6, LL(No)=-14824.9 -> Pred: No [WRONG]
  Trial 2: LL(Yes)=-14103.6, LL(No)=-14287.4 -> Pred: Yes [Correct]
  Trial 3: LL(Yes)=-14043.9, LL(No)=-13862.7 -> Pred: No [WRONG]
  Trial 4: LL(Yes)=-12905.4, LL(No)=-13031.1 -> Pred: Yes [Correct]

Evaluating True Label: 'No'
  Trial 1: LL(Yes)=-12883.7, LL(No)=-12697.1 -> Pred: No [Correct]
  Trial 2: LL(Yes)=-13774.2, LL(No)=-13442.3 -> Pred: No [Correct]
  Trial 3: LL(Yes)=-13270.0, LL(No)=-12935.0 -> Pred: No [Correct]
  Trial 4: LL(Yes)=-13912.8, LL(No)=-13644.4 -> Pred: No [Correct]

Chance Level: 50%

Average Accuracy: 75.0%
