In [10]:
import pandas as pd
import numpy as np
import setup_jwlab
from jwlab.constants import cleaned_data_filepath
from jwlab.cluster_analysis import prep_cluster_analysis
from sklearn.svm import SVC
from sklearn.model_selection import cross_validate
from scipy import stats
import more_itertools as mit

In [11]:
def init(age_group):
    length_per_window = 10
    num_sliding_windows = int(1000/ length_per_window)
    num_folds = 3
    num_iterations = 10
    
    if age_group is 9:
        participants = ["904", "905", "906", "908", "909", "912","913", "914", "916", "917", "919",\
                    "920", "921", "923", "924", "927", "928", "929", "930", "932"]
    elif age_group is 11:
        participants = ["105", "106", "107", "109", "111", "112", "115", "116", "117", "119", "120", "121", "122", "124"]
    else:
        raise ValueError("Unsupported age group!")
        
    return length_per_window, num_sliding_windows, num_folds, num_iterations, participants

In [12]:
def prep_data(participants, length_per_window):
    X, y, good_trial_count = prep_cluster_analysis(cleaned_data_filepath, participants, downsample_num=1000, averaging="average_trials_and_participants", length_per_window=length_per_window)
    
    return X, y

In [13]:
def cross_validaton(num_iterations, num_sliding_windows, num_folds, X, y):
    results = {}
    for i in range(num_iterations):
        for j in range(num_sliding_windows):
            model = SVC(kernel = 'rbf')
            cv_results = cross_validate(model, X[j], y[j], cv=num_folds)
            if j in results.keys(): 
                results[j] += cv_results['test_score'].tolist()
            else:
                results[j] = cv_results['test_score'].tolist()

    for i in range(num_sliding_windows):
        assert len(results[i]) == num_iterations * num_folds
    
    return results

In [14]:
def t_test(results, num_iterations, num_sliding_windows, num_folds):
    pvalues = []
    for i in range(num_sliding_windows):
        istat = stats.ttest_1samp(results[i], .5)
        pvalues += [istat.pvalue]
    
    return pvalues

In [15]:
# Finding contiguous time cluster
def find_clusters(pvalues):
    valid_window = [i for i,v in enumerate(pvalues) if v <= 0.025]
    print("Valid windows are: " + valid_window)
    
    # Obtain clusters (3 or more consecutive meaningful time)
    clusters = [list(group) for group in mit.consecutive_groups(valid_window)]
    clusters = [group for group in clusters if len(group) >= 3]
    print("Clusters are:" + clusters)
    
    return clusters

In [16]:
def get_max_t_mass(clusters, pvalues):
    t_mass = []
    for c in clusters:
        t_scores = 0
        for time in c:
            t_scores += pvalues[time]
        t_mass += [t_scores]
    print("t mass are: " + t_mass)
    
    max_t_mass = max(t_mass)
    print("The max t mass is: " + max_t_mass)
    
    return max_t_mass

In [17]:
def cluster_analysis_procedure(age_group):
    length_per_window, num_sliding_windows, num_folds, num_iterations, participants = init(age_group)
    
    X, y = prep_data(participants, length_per_window)
    
    results = cross_validaton(num_iterations, num_sliding_windows, num_folds, X, y)
    
    pvalues = t_test(results, num_iterations, num_sliding_windows, num_folds)
    
    clusters = find_clusters(pvalues)
    
    max_t_mass = get_max_t_mass(clusters, pvalues)
    
    return max_t_mass

In [18]:
cluster_analysis_procedure(9)

loaded


TypeError: must be str, not list

In [6]:
# Null distribution
null_distribution_t_mass = []
for i in range(1000):
    X, y, good_trial_count = prep_cluster_analysis(cleaned_data_filepath, participants, downsample_num=1000, averaging="average_trials_and_participants", length_per_window=length_per_window, useRandomizedLabel=True)
    # Cross validation
    results = {}
    for i in range(num_iterations):
        for j in range(num_sliding_windows):
            model = SVC(kernel = 'rbf')
            cv_results = cross_validate(model, X[j], y[j], cv=num_folds)
            if j in results.keys(): 
                results[j] += cv_results['test_score'].tolist()
            else:
                results[j] = cv_results['test_score'].tolist()

    for i in range(num_sliding_windows):
        assert len(results[i]) == num_iterations * num_folds
        
    # T-test
    accuracy_by_guessing = [0.5] * (num_iterations * num_folds)
    pvalues = []
    for i in range(num_sliding_windows):
        stat = stats.ttest_ind(results[i], accuracy_by_guessing)
        pvalues += [stat[1]]


    # Finding contiguous time cluster
    valid_window = [i for i,v in enumerate(pvalues) if v >= 0.05]
    print(valid_window)
    
    # Obtain clusters (3 or more consecutive meaningful time)
    import more_itertools as mit
    clusters = [list(group) for group in mit.consecutive_groups(valid_window)]
    clusters = [group for group in clusters if len(group) >= 3]
    print(clusters)
    
    # Calculate t-mass of each cluster
    t_mass = []
    for c in clusters:
        t_scores = 0
        for time in c:
            t_scores += pvalues[time]
        t_mass += [t_scores]
    print(t_mass)
    
    # Get the maximum t-mass value
    max_t_mass = max(t_mass)
    print(max_t_mass)
    
    null_distribution_t_mass += [max_t_mass]

loaded
[ 4.  2.  3. ... 14. 15.  2.]
[ 2. 14.  2. ... -1.  3. -1.]


KeyboardInterrupt: 