In [1]:
import numpy as np
import scipy
import h5py
import scipy.io as sio
from pyriemann.utils.mean import mean_covariance
import sklearn.datasets
import sklearn.decomposition
from scipy.spatial import distance

np.seterr(divide='ignore', invalid='ignore')

{'divide': 'warn', 'over': 'warn', 'under': 'ignore', 'invalid': 'warn'}

In [17]:
def utri2mat(utri):
    n = int(-1 + np.sqrt(1 + 8 * len(utri))) // 2
    iu1 = np.triu_indices(n+1,1)
    ret = np.empty((n+1, n+1))
    ret[iu1] = utri
    ret.T[iu1] = utri
    np.fill_diagonal(ret, 1)
    return ret

def get_data():
    '''
    Navigates through file tree and extracts FCs with optional reconstruction
    '''
    # Yeo ordering
    master_dir = '../data/twins'
    tasks = ['rest', 'emotion', 'gambling', 'language', 'motor', 'relational', 'social', 'wm']
    test, retest = {}, {}
    for task in tasks:
        temp_parc = {}
        task_dir = master_dir + f'/origmat_DZ_schaefer300_{task}.mat'
        f = h5py.File(task_dir, 'r')
        for k, v in f.items():
            temp_parc[k] = np.array(v)
        test[task] = temp_parc
    for task in tasks:
        temp_parc = {}
        task_dir2 = master_dir + f'/origmat_DZ_schaefer300_retest_{task}.mat'
        f = h5py.File(task_dir, 'r')
        num_retest = 0
        for k, v in f.items():
            temp_parc[k] = np.array(v)
            
        retest[task] = temp_parc
        #print(f'Retest {task}: {retest[task]["orig_mat"].shape}')
    labels = []
    for task in tasks:
        test_temp_labels = np.repeat(np.arange(0,test[task]['orig_mat'].shape[0]/2),2).tolist()
        retest_temp_labels = np.repeat(np.arange(0,retest[task]['orig_mat'].shape[0]/2),2).tolist()
        labels.extend([int(i) for i in test_temp_labels])
        labels.extend([int(i) for i in retest_temp_labels])
    labels = np.array(labels)
    all_FC = np.vstack((test['rest']['orig_mat'], retest['rest']['orig_mat'], test['emotion']['orig_mat'], retest['emotion']['orig_mat'],
                        test['gambling']['orig_mat'], retest['gambling']['orig_mat'], test['language']['orig_mat'],
                        retest['language']['orig_mat'], test['motor']['orig_mat'], retest['motor']['orig_mat'], test['relational']['orig_mat'],
                        retest['relational']['orig_mat'],
                        test['social']['orig_mat'], retest['social']['orig_mat'], test['wm']['orig_mat'],
                        retest['wm']['orig_mat']))
    return all_FC, labels


def q1invm(q1, eig_thresh=0):
    U, S, V = scipy.linalg.svd(q1)
    s = np.diag(S)
    s[s < eig_thresh] = eig_thresh
    S = np.diag(s ** (-1 / 2))
    Q1_inv_sqrt = U * S * np.transpose(V)
    Q1_inv_sqrt = (Q1_inv_sqrt + np.transpose(Q1_inv_sqrt)) / 2
    return Q1_inv_sqrt


def qlog(q):
    U, S, V = scipy.linalg.svd(q)
    s = np.diag(S)
    S = np.diag(np.log(s))
    Q = U * S * np.transpose(V)
    return Q


def tangential(all_FC, ref):
    # Regularization for riemann
    if ref in ['riemann', 'kullback_sym', 'logeuclid']: 
        print("Adding regularization!")
        eye_mat = np.eye(all_FC.shape[1])
        scaling_mat = np.repeat(eye_mat[None, ...], all_FC.shape[0], axis=0)
        all_FC += scaling_mat
    u, s, vh = np.linalg.svd(all_FC[0], full_matrices=True)
    print(f'Eigenvalues: {s**2}')
    Cg = mean_covariance(all_FC, metric=ref)
    Q1_inv_sqrt = q1invm(Cg)
    Q = Q1_inv_sqrt @ all_FC @ Q1_inv_sqrt
    tangent_FC = np.array([qlog(a) for a in Q])
    return tangent_FC


def pca_recon(FC, pctComp=None):
    '''
    Reconstructs FC based on number of principle components
    '''
    if pctComp is None:
        return FC
    nRegions = FC.shape[1]
    FC = np.reshape(FC, (FC.shape[0], -1))
    nComp = int(FC.shape[0] * pctComp)
    mu = np.mean(FC, axis=0)
    pca_rest = sklearn.decomposition.PCA()
    pca_rest.fit(FC)
    SCORES = pca_rest.transform(FC)[:, :nComp]
    COEFFS = pca_rest.components_[:nComp, :]
    FC_recon = np.dot(SCORES, COEFFS)
    del SCORES, COEFFS
    FC_recon += mu
    FC_recon = np.reshape(FC_recon, (FC.shape[0], nRegions, nRegions))
    return FC_recon

In [4]:
all_FC, labels = get_data()


In [5]:
train_idx = np.arange(0, len(labels), 2)
test_idx = np.arange(1, len(labels), 2)
train_labels = labels[train_idx]
test_labels = labels[test_idx]

### KNN Approach

In [18]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score
accuracies = {}
parc = 300
reordered_FCs, labels = get_data()
nFCs = reordered_FCs.shape[0]
# Do optional transformations
for ref in ['logeuclid','Raw FC', 'pca', 'euclid', 'harmonic']:
        print(f'Testing {ref}...')
        # Start with a fresh batch of FCs
        all_FC = np.zeros((nFCs, parc + 14, parc + 14))
        for i in np.arange(0, nFCs):
            all_FC[i] = utri2mat(reordered_FCs[i])
        # Do optional transformations
        if ref != 'Raw FC' and ref != 'pca':
            all_FC = tangential(all_FC, ref)
        elif ref == 'pca':
            all_FC = pca_recon(all_FC, 0.5)
        else:
            pass
        # Convert back into flattened utriu vectors
        vec_FCs = np.zeros(
            (nFCs, reordered_FCs.shape[1]), dtype=np.float32)
        for idx, mat in enumerate(all_FC):
            vec_FCs[idx] = mat[np.triu_indices(mat.shape[0], k=1)]
        # Split into train and test sets
        train_FCs = vec_FCs[train_idx]
        test_FCs = vec_FCs[test_idx]
        # KNN Classifier
        print("Fitting KNN")
        neigh = KNeighborsClassifier(n_neighbors=1, metric='correlation')
        neigh.fit(train_FCs, train_labels)
        predicted = neigh.predict(test_FCs)
        acc = accuracy_score(test_labels, predicted)
        print(acc)
        accuracies[f"{ref}_{parc}"] = acc

Testing logeuclid...
Adding regularization!
Eigenvalues: [5.03796051e+02 3.79441714e+02 3.59358380e+02 2.89836208e+02
 2.08140317e+02 1.99542191e+02 1.91052373e+02 1.83491980e+02
 1.59932389e+02 1.47915108e+02 1.31611730e+02 1.26816413e+02
 1.12865501e+02 1.02396054e+02 9.68183711e+01 9.22804670e+01
 8.79279696e+01 8.68151582e+01 8.02499731e+01 7.83802252e+01
 7.69105566e+01 7.26620211e+01 7.19630170e+01 6.85664925e+01
 6.77465086e+01 6.66914743e+01 6.60320427e+01 6.46425639e+01
 6.31830658e+01 6.06830724e+01 5.96044081e+01 5.92817582e+01
 5.87750836e+01 5.65276707e+01 5.56755824e+01 5.51473706e+01
 5.31588307e+01 5.22422325e+01 5.15407803e+01 5.07676928e+01
 5.06349436e+01 4.82643594e+01 4.71478831e+01 4.67982836e+01
 4.66670162e+01 4.65062678e+01 4.58025962e+01 4.45795543e+01
 4.40704584e+01 4.34676423e+01 4.24723151e+01 4.18938070e+01
 4.14738538e+01 4.12809816e+01 4.05629633e+01 3.97797940e+01
 3.90930158e+01 3.90355860e+01 3.76040502e+01 3.75417086e+01
 3.72585902e+01 3.67512091e+

ValueError: Covariance matrices must be positive definite. Add regularization to avoid this error.

In [None]:
import csv
a_file = open(f"../results/twin_distances.csv", "w")

writer = csv.writer(a_file)
for key, value in accuracies.items():
    writer.writerow([key, value])
    
a_file.close()