In [5]:
%%capture
'''
Created by Brandon Katerman on March 13th, 2022

Last Modified: 04/17/22 by Ricardo Adrogue

Current stats to run on encoding events for RepFR1 -- successor to sme_tstat.py
'''

import os
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import numpy as np 
import pandas as pd
pd.set_option("display.max_columns", 200)
from importlib import reload
from cmlreaders import CMLReader, get_data_index 
# imports all functions needed for this to work
import repeat_sme
from brain_labels import *
import regionalizationModule
from brain_labels import *
import eeg_check
import importlib

# import Dask and Dask functions to run script on the cluster
import CMLDask
from dask.distributed import wait, as_completed, progress
from dask import config
config.set({'timeouts':{'connect':'90s', 'tcp':'120s'}})
# import xmode

In [2]:
# makes a list of RepFR1 subjects with electrodes in ROIs
exp = 'RepFR1'
data = get_data_index(kind = 'r1'); data = data[data['experiment'] == exp]
# pulls all contacts from the montage
loc = []
pairs = []
evs = []
subjects = []
sessions = []
for subject, df in data.groupby('subject'):
    session=df.session.iloc[0]
    subjects.append(subject)
    sessions.append(session)
    r = CMLReader(subject=subject, experiment=exp, session=session)
    t_evs = r.load('task_events')
    temp = r.load('localization')
    temp['subject'] = pd.Series(subject, index=temp.index)
    temp['session'] = pd.Series(session, index=temp.index)
    evs.append(t_evs)
    loc.append(temp)
all_loc = pd.concat(loc)
all_loc_p = all_loc.loc['pairs']
# loc_p[loc_p['atlases.dk'].]
# all_loc_p['atlases.whole_brain'].unique()


In [4]:
def get_filtered_pairs(sub, sess): 
    # *** Filters out noisy pairs *** #
    exp = 'RepFR1'
    f_pairs = eeg_check.eeg_check(sub, sess, exp)
#     f_pairs['subject'] = pd.Series(sub, index=f_pairs.index)
#     f_pairs['session'] = pd.Series(sess, index=f_pairs.index)
#     f_pairs = f_pairs[f_pairs.bad == 0]
    return f_pairs

In [None]:
refilter = False

In [5]:
%%capture
if refilter:
    importlib.reload(eeg_check)
    try: 
        client.shutdown()
        print('client shutdown')
    except: 
        print('no client')

    # creates cluster jobs, 1 for each subject, each with 10 GB limit of
    # memory to calculate powers for subject and region
    # client.map(function, p1, p2, p3)

    client = CMLDask.new_dask_client("filter_pairs", "40GB")
    futures = client.map(get_filtered_pairs, subjects, sessions)
    # waits until the cluster job is complete
    wait(futures)
    f_pairs = client.gather(CMLDask.filter_futures(futures))
    exceptions = CMLDask.get_exceptions(futures, subjects).param
    # *** Checks for patients with bad eeg and removes them from analysis *** #
    bad_sub_is = []
    for bad_sub in exceptions:
        bad_sub_i = np.where(np.array(subjects) == bad_sub)[0][0]
        bad_sub_is.append(bad_sub_i)
        loc.pop(bad_sub_i)
        subjects.remove(bad_sub)
    if exceptions.empty:
        print('No exceptions')
    try: 
        client.shutdown()
        print('client shutdown')
    except: 
        print('no client')

    filt_pairs = []
    for df in f_pairs:
        good = df[df.bad == 0]
        filt_pairs.append(good)
    np.save('bad_subs', [bad_sub_is])
    np.save('filtered_pairs', filt_pairs)
else:
    filt_pairs = np.load('filtered_pairs.npy', allow_pickle=True)
    bad_sub_is = np.load('bad_subs.npy')
    loc = [i for j, i in enumerate(loc) if j not in bad_sub_is]
    subjects = [i for j, i in enumerate(subjects) if j not in bad_sub_is]
    print(subjects)

no client
Unique port for radrogue is 51417
{'dashboard_address': ':51417'}
To view the dashboard, run: 
`ssh -fN radrogue@rhino2.psych.upenn.edu -L 8000:192.168.86.146:51417` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


distributed.core - ERROR - Exception while handling op broadcast
Traceback (most recent call last):
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/comm/core.py", line 319, in connect
    handshake = await asyncio.wait_for(comm.read(), time_left())
  File "/home1/radrogue/.conda/envs/environmentname/lib/python3.7/asyncio/tasks.py", line 449, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/core.py", line 521, in handle_comm
    result = await result
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/scheduler.py", line 6021, in broadcast
    [send_message(address) for address in addresses if address is not None]
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/utils.py", line 208, in All
    result = await tasks.ne

client shutdown


  arr = np.asanyarray(arr)


In [6]:
# *** Split electrodes into ROIs *** #
reload(regionalizationModule)
hippo_subs=[];mtl_subs=[];fg_subs=[];ltc_subs=[];
hippo_pairs=[];mtl_pairs=[];fg_pairs=[];ltc_pairs=[];
for index, sub in enumerate(subjects):
    elec_regions,atlas_type,pair_number,has_stein_das = regionalizationModule.get_elec_regions(loc[index],filt_pairs[index])

    hippo = filt_pairs[index].iloc[pair_number[np.where(np.isin(elec_regions,HPC_labels))[0]]]
    mtl = filt_pairs[index].iloc[pair_number[np.where(np.isin(elec_regions,nonHPC_MTL_labels))[0]]]
    fg = filt_pairs[index].iloc[pair_number[np.where(np.isin(elec_regions,FG_labels))[0]]]
    ltc = filt_pairs[index].iloc[pair_number[np.where(np.isin(elec_regions,LTC_labels))[0]]]

    # *** Append selected pairs to separated lists for later analysis *** #
    if not hippo.empty:
        hippo_subs.append(sub)
        hippo_pairs.append(hippo)
    if not mtl.empty:
        mtl_subs.append(sub)
        mtl_pairs.append(mtl)
    if not fg.empty:
        fg_subs.append(sub)
        fg_pairs.append(fg)
    if not ltc.empty:
        ltc_subs.append(sub)
        ltc_pairs.append(ltc)

In [15]:
# set your hemisphere and region here
# All regions at once
def do_stats_and_stuff(subs, pairs, hemisphere, region, rerun_powers = False):
    print(hemisphere,region)
    try: client.shutdown()
    except: print('no client')
    print(len(subs), 'with electrodes in {} {}'.format(hemisphere, region))
    hemispheres = []
    regions = []
    for i in subs:
        regions.append(region)
        hemispheres.append(hemisphere)
    if rerun_powers:
        try: client.shutdown()
        except: print('no client')

        # creates cluster jobs, 1 for each subject, each with 10 GB limit of
        # memory to calculate powers for subject and region
        # client.map(function, p1, p2, p3)

        client = CMLDask.new_dask_client("iEEG_powers", "10GB")
        futures = client.map(repeat_sme.get_enc_powers, subs, pairs, hemispheres, regions)
        # waits until the cluster job is complete
        wait(futures)
        power_exc = CMLDask.get_exceptions(futures, subs)
        pow_results = client.gather(CMLDask.filter_futures(futures))
        
    # gathers any errors
    # shuts down the cluster

    try: client.shutdown()
    except: print('no client')
    # displays errors

    # creates new cluster jobs, 1 for each subject, 50GB memory to calculate t-stats
    client = CMLDask.new_dask_client("iEEG_stats", "60GB")
    futures = client.map(repeat_sme.enc_power_statistics, subs, pairs, hemispheres, regions)
    
    # gathers report on how this function ran
    # good means it was completed, otherwise shows error message
    wait(futures)
    exceptions = CMLDask.get_exceptions(futures, subs)
    results = client.gather(CMLDask.filter_futures(futures))
    client.shutdown()
    if rerun_powers:
        return power_exc, pow_results, results, exceptions
    return results, exceptions

In [24]:
%%capture
reload(repeat_sme)
hippo = do_stats_and_stuff(hippo_subs, hippo_pairs, '', 'hippo', False)

loading modules
 hippo
no client
24 with electrodes in  hippo
no client
Unique port for radrogue is 51417
{'dashboard_address': ':51417'}
To view the dashboard, run: 
`ssh -fN radrogue@rhino2.psych.upenn.edu -L 8000:192.168.86.146:51417` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


distributed.utils - ERROR - 'str' object has no attribute 'text'
Traceback (most recent call last):
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/utils.py", line 681, in log_errors
    yield
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/dashboard/components/scheduler.py", line 344, in update
    self.root.title.text = title
AttributeError: 'str' object has no attribute 'text'
distributed.utils - ERROR - 'str' object has no attribute 'text'
Traceback (most recent call last):
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/utils.py", line 681, in log_errors
    yield
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/dashboard/components/scheduler.py", line 3134, in status_doc
    cluster_memory.update()
  File "/home1/radrogue/.local/lib/python3.7/site-packages/bokeh/core/property/validation.py", line 95, in func
    return input_function(*args, **kwargs)
  File "/home1/radrogue/.local/lib/pytho

In [None]:
hippo

In [25]:
%%capture
mtl = do_stats_and_stuff(mtl_subs, mtl_pairs, '', 'mtl', False)

 mtl
no client
25 with electrodes in  mtl
no client
Unique port for radrogue is 51417
{'dashboard_address': ':51417'}
To view the dashboard, run: 
`ssh -fN radrogue@rhino2.psych.upenn.edu -L 8000:192.168.86.146:51417` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
Task exception was never retrieved
future: <Task finished coro=<Scheduler.broadcast.<locals>.send_message() done, defined at /home1/radrogue/.local/lib/python3.7/site-packages/distributed/scheduler.py:6011> exception=CommClosedError('in <TCP (closed) Scheduler Broadcast local=tcp://192.168.86.146:57760 remote=tcp://192.168.86.119:41356>: Stream is closed')>
Traceback (most recent call last):
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 205, in read
    fr

In [27]:
mtl

(['R1204T worked!',
  'R1501J worked!',
  ['Pres1 SME'],
  ['Pres1 SME'],
  'R1528E worked!',
  'R1531T worked!',
  'R1534D worked!',
  'R1547D worked!',
  'R1564J worked!',
  'R1568E worked!',
  'R1579T worked!',
  'R1582E worked!',
  'R1584J worked!',
  ['Pres1 SME'],
  'R1589T worked!',
  'R1590T worked!',
  'R1593D worked!',
  'R1594E worked!',
  'R1604J worked!',
  'R1610D worked!',
  'R1611T worked!',
  'R1613T worked!',
  ['Pres1 SME'],
  'R1618J worked!'],
         param                                          exception  \
 index                                                              
 9      R1566D  ValueError('all the input array dimensions for...   
 
                               traceback_obj  
 index                                        
 9      <traceback object at 0x2b5d87a845f0>  )

In [28]:
%%capture
fg = do_stats_and_stuff(fg_subs, fg_pairs, '', 'fg', False)

 fg
no client
22 with electrodes in  fg
no client
Unique port for radrogue is 51417
{'dashboard_address': ':51417'}
To view the dashboard, run: 
`ssh -fN radrogue@rhino2.psych.upenn.edu -L 8000:192.168.86.146:40833` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser


Perhaps you already have a cluster running?
Hosting the HTTP server on port 40833 instead
  f"Port {expected} is already in use.\n"
Exception in thread WorkerMemory:
Traceback (most recent call last):
  File "/home1/radrogue/.local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 205, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home1/radrogue/.conda/envs/environmentname/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home1/radrogue/.conda/envs/environmentname/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/home1/radrogue/.conda/envs/environmentname/lib/python3.7/site-packages/dask_memusage.py", line 66, in _fetch_memory
    worker_to_mem = client.run(_process_memory)
  File "/home1/radrogue/.local/lib/pytho

In [29]:
fg

(['R1204T worked!',
  ['Pres1 SME'],
  'R1528E worked!',
  'R1531T worked!',
  'R1534D worked!',
  'R1547D worked!',
  'R1556J worked!',
  'R1564J worked!',
  'R1568E worked!',
  'R1579T worked!',
  'R1582E worked!',
  'R1584J worked!',
  ['Pres1 SME'],
  'R1589T worked!',
  'R1593D worked!',
  'R1594E worked!',
  ['Pres1 SME'],
  'R1610D worked!',
  'R1613T worked!',
  ['Pres1 SME'],
  'R1618J worked!'],
         param                                          exception  \
 index                                                              
 8      R1566D  ValueError('all the input array dimensions for...   
 
                               traceback_obj  
 index                                        
 8      <traceback object at 0x2b5da49d6280>  )

In [None]:
%%capture
ltc = do_stats_and_stuff(ltc_subs, ltc_pairs, '', 'ltc', False)

In [31]:
ltc

(['R1204T worked!',
  'R1501J worked!',
  ['Pres1 SME'],
  ['Pres1 SME'],
  'R1528E worked!',
  'R1531T worked!',
  ['Pres1 SME'],
  ['Pres1 SME'],
  'R1556J worked!',
  'R1564J worked!',
  ['Pres1 SME'],
  'R1579T worked!',
  ['2R-1R'],
  ['Pres1 SME'],
  ['Pres1 SME'],
  ['Pres1 SME'],
  'R1590T worked!',
  'R1593D worked!',
  'R1594E worked!',
  'R1604J worked!',
  ['2-1', '2N-1N'],
  'R1613T worked!',
  ['Pres1 SME'],
  ['Pres1 SME']],
         param                                          exception  \
 index                                                              
 10     R1566D  ValueError('all the input array dimensions for...   
 22     R1611T  MemoryError((19, 24, 240, 799), dtype('float64'))   
 
                               traceback_obj  
 index                                        
 10     <traceback object at 0x2b5e0c720280>  
 22     <traceback object at 0x2b5e0cd287d0>  )

distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError


## All at Once

In [None]:
# set your hemisphere and region here
# All regions at once

def do_stats_and_stuff(subs, pairs, hemisphere, region):
    print(hemisphere,region)
    print(len(subs), 'with electrodes in {} {}'.format(hemisphere, region))
    hemispheres = []
    regions = []
    for i in subs:
        regions.append(region)
        hemispheres.append(hemisphere)

    try: client.shutdown()
    except: print('no client')

    # creates cluster jobs, 1 for each subject, each with 10 GB limit of
    # memory to calculate powers for subject and region
    # client.map(function, p1, p2, p3)

    client = CMLDask.new_dask_client("iEEG_powers", "10GB")
    futures = client.map(get_enc_powers, subs, pairs, hemispheres, regions)
    # waits until the cluster job is complete
    wait(futures)
    # gathers any errors
    # shuts down the cluster

    try: client.shutdown()
    except: print('no client')
    # displays errors

    # creates new cluster jobs, 1 for each subject, 50GB memory to calculate t-stats
    client = CMLDask.new_dask_client("iEEG_stats", "50GB")
    futures = client.map(enc_power_statistics, subs, pairs, hemispheres, regions)

    # gathers report on how this function ran
    # good means it was completed, otherwise shows error message
    wait(futures)

## One at a Time

In [None]:
# set your hemisphere and region here
hemisphere = 'Right'
region = 'MTL'

# selects the subjects with electrodes in your selected region
# MTL is multiple regions, so specifically have to look through this way
if region == 'MTL':
    subs = results[results.region == hemisphere+' '+'parahippocampal'].subjects.iloc[0]
    subs = np.concatenate([subs, results[results.region == hemisphere+' '+'Amygdala'].subjects.iloc[0]])
    subs = np.concatenate([subs, results[results.region == hemisphere+' '+'entorhinal'].subjects.iloc[0]])
    subs = np.unique(subs)
else:
    subs = subs = results[results.region == hemisphere+' '+region].subjects.iloc[0]
print(len(subs), 'with electrodes in localization')


# checks that the pairs in that region were actually recorded from
# We only record 128 channels for most of this data
# Localization includes all electrodes (up to 256)
# So this checks that the electrodes are also in pairs, which only shows pairs where
# data was recorded
pairs = []
for sub in subs:
    data = get_data_index('r1'); data = data[(data.experiment == 'RepFR1') & (data.subject==sub)]
    r = CMLReader(subject=sub, experiment='RepFR1', session = data.session.iloc[0])
    loc = r.load("localization")
    t_pairs = r.load('pairs')
    loc_p = loc.loc['pairs']
    if region == 'MTL':
        f_loc_p = loc_p[(loc_p['atlases.whole_brain'].str.contains(hemisphere)) & 
                        ((loc_p['atlases.whole_brain'].str.contains('parahippocampal')) | (loc_p['atlases.whole_brain'].str.contains('Amygdala')) 
                         | (loc_p['atlases.whole_brain'].str.contains('entorhinal')))]
    else:
        f_loc_p = loc_p[(loc_p['atlases.whole_brain'].str.contains(hemisphere)) & loc_p['atlases.whole_brain'].str.contains(region)]
    pairs_filter = []
    for labels in f_loc_p.index:
        biploar_label = labels[0]+'-'+labels[1]
        pairs_filter.append(biploar_label)
    t_pairs = t_pairs[t_pairs.label.isin(pairs_filter)]
    if t_pairs.empty:
        subs = subs[subs != sub]
    else:
        pairs.append(t_pairs)
print(len(subs), 'with region in localization & pairs')
# print(subs)
# makes lists of hemi and reg as same length as subs array
# this is because Dask requires all of your parameters to have the same shape
hemispheres = []
regions = []
print(subs)
for i in subs:
    regions.append(region)
    hemispheres.append(hemisphere)


In [None]:
# import Dask and Dask functions to run script on the cluster
import CMLDask
from dask.distributed import wait, as_completed, progress
from dask import config
config.set({'timeouts':{'connect':'90s', 'tcp':'120s'}})
try: client.shutdown()
except: print('no client')

In [None]:
# creates cluster jobs, 1 for each subject, each with 10 GB limit of
# memory to calculate powers for subject and region
# client.map(function, p1, p2, p3)

client = CMLDask.new_dask_client("iEEG_powers", "10GB")
futures = client.map(get_enc_powers, subs, pairs, hemispheres, regions)
progress(futures)
# waits until the cluster job is complete
wait(futures)
# gathers any errors
power_errors = client.gather(futures)
# shuts down the cluster

client.shutdown()
# displays errors
power_errors

In [None]:
# creates new cluster jobs, 1 for each subject, 50GB memory to calculate t-stats
client = CMLDask.new_dask_client("iEEG_stats", "50GB")
futures = client.map(enc_power_statistics, subs, pairs, hemispheres, regions)
progress(futures)

# gathers report on how this function ran
# good means it was completed, otherwise shows error message
wait(futures)
ahh = client.gather(futures)

# shuts down client
client.shutdown()
ahh

In [None]:
CMLDask.get_exceptions(futures, subs)