In [17]:
import graspy as gp
import glob
import os, sys
import numpy as np
import pandas as pd
import multiprocessing as multiproc
import dask
from dask.distributed import Client, progress
import dask.dataframe as ddf

In [40]:
dpath = '/mnt/nfs2/j1c/twins/data/hcp1200/dmri/desikan/'
def get_sub(fname):
    stext = os.path.basename(fname).split('_')
    return('{}_{}'.format(stext[0], stext[1]))

dmri_dict = {}
for f in glob.glob(os.path.join(dpath, '*.edgelist')):
    gr_dat = gp.utils.import_edgelist(f)
    if gr_dat.shape == (70, 70):
        dmri_dict[get_sub(f)] = gr_dat
dmri_ar = np.dstack(list(dmri_dict.values()))
nv = 35

  import sys


In [41]:
ncores = multiproc.cpu_count()-2
client = Client(threads_per_worker=1, n_workers=ncores)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 34171 instead
  http_address["port"], self.http_server.port


0,1
Client  Scheduler: tcp://127.0.0.1:38483  Dashboard: http://127.0.0.1:34171/status,Cluster  Workers: 6  Cores: 6  Memory: 33.51 GB


In [58]:
def run_exp(row):
    dmri = dmri_dict[row[0]]  # grab dmri connectome
    # try all methods
    method = row[1] # grab the method name to attempt
    e = gp.models.SBMEstimator()
    y = [1 for i in range(0, nv)] + [2 for i in range(0, nv)]
    pval, pred = e.estimate_block_structure(dmri, np.array(y), ['abba', 'abbd'], test_method=method)
    return (row[0], row[1], pval, pred)

In [66]:
exps = []
for sub in dmri_dict.keys():
    for method in ['kw', 'dcorr', 'anova']:
        exps.append([sub, method])

sim_exps = pd.DataFrame(exps, columns=["Identifier", "Method"])
print(sim_exps.head())
print(sim_exps.shape)

         Identifier Method
0  sub-118225_ses-1     kw
1  sub-118225_ses-1  dcorr
2  sub-118023_ses-1     kw
3  sub-118023_ses-1  dcorr
4  sub-118528_ses-1     kw
(14, 2)


In [67]:
sim_exps = ddf.from_pandas(sim_exps, npartitions=ncores*1.5)
sim_results = sim_exps.apply(lambda x: run_exp(x), axis=1, result_type='expand',
                             meta={0: str, 1: str, 2: float, 3: str})
sim_results

Unnamed: 0_level_0,0,1,2,3
npartitions=7,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
0,object,object,float64,object
2,...,...,...,...
...,...,...,...,...
12,...,...,...,...
13,...,...,...,...


In [68]:
sim_results = sim_results.compute(scheduler="multiprocessing")
sim_results = sim_results.rename(columns={0: "Identifier", 1: "Method", 2: "pvalue", 3: "Structure"})
sim_results.to_csv('./data/hcp_block_est.csv')

In [71]:
np.savetxt('./data/hcp_mean_dmri.csv', dmri_ar.mean(axis=2), delimiter=',')