In [9]:
import os, sys, glob
import pandas as pd
import subprocess

def run_command(cmd, cwd=None):
    """Utility function to run a shell command."""
    return subprocess.check_output(cmd, shell=True, cwd=cwd).decode().strip()

def copy_contents(in_dir, out_dir):
    if not os.path.exists(out_dir):
        os.makedirs(out_dir)
    if not os.path.exists(in_dir):
        raise ValueError(f'Input directory {in_dir} does not exist.')
    
    run_command(f'cp -r {in_dir}/* {out_dir}')


In [14]:
# load session index with environment that has cmlreaders (can't install cmlreaders for py3.10)
exps = ['ltpFR2']
if sys.version.split(' ')[0] >= '3.10' and sys.version.split(' ')[0] < '3.11':
    index_df = pd.read_csv(f'session_index_{"-".join(exps)}.csv')
else:
    import cmlreaders as cml

    index_df = cml.get_data_index('ltp')
    print('Available experiments:\n', index_df.experiment.unique())
    index_df = index_df.query('experiment in @exps')
    index_df.to_csv(f'session_index_{"-".join(exps)}.csv')

    # load example session events
    sess_df = index_df.iloc[25]
    r = cml.CMLReader(subject=sess_df.subject, session=sess_df.session, experiment=sess_df.experiment)
    evs = r.load('events')
    print('Event types:\n', evs.type.unique())

In [28]:
def get_folders_containing_files(directory=".", pattern="*"):
    """
    Get folders containing files that match the specified glob pattern.
    
    Parameters:
    - directory (str): The starting directory for the search.
    - pattern (str): The glob pattern to search for. e.g. "*.wav", "audio_*.mp3", "document_?.txt", etc.

    Returns:
    - list: A list of folders containing files that match the glob pattern.
    """

    # Get all the files matching the pattern in the directory and its sub-directories.
    matching_files = glob.glob(os.path.join(directory, '**', pattern), recursive=True)
    
    # Get the unique directories containing the files that match the pattern.
    folders = set(os.path.dirname(file) for file in matching_files)

    return list(folders)


# build train-val-test split
from sklearn.model_selection import GroupShuffleSplit

random_seed = 42
train_prop = 0.5
val_prop = 0.2
test_prop = 0.3
assert train_prop + val_prop + test_prop == 1

index_df['split_group'] = index_df['subject'] + '_' + index_df['experiment']
train_val_idx, test_idx = next(GroupShuffleSplit(train_size=train_prop + val_prop, 
                                                 test_size=test_prop, 
                                                 random_state=random_seed).split(index_df,
                                                                                 groups=index_df['split_group']))
train_idx, val_idx = next(GroupShuffleSplit(train_size=train_prop / (train_prop + val_prop),
                                            test_size=val_prop / (train_prop + val_prop),
                                            random_state=random_seed + 1).split(index_df.iloc[train_val_idx],
                                                                                groups=index_df.iloc[train_val_idx]['split_group']))
index_dfs = {'train': index_df.iloc[train_val_idx].iloc[train_idx],
             'val': index_df.iloc[train_val_idx].iloc[val_idx],
             'test': index_df.iloc[test_idx]}

# assert no overlap across splits
assert not set(index_dfs['train'].split_group).intersection(index_dfs['test'].split_group)
assert not set(index_dfs['train'].split_group).intersection(index_dfs['val'].split_group)
assert not set(index_dfs['val'].split_group).intersection(index_dfs['test'].split_group)

# obtain session data directories
input_dirs = {'train': list(), 'val': list(), 'test': list()}
for exp in exps:
    # get all session directories containing .wav files
    wav_dirs = get_folders_containing_files(pattern="0.wav", directory=f'/data/eeg/scalp/ltp/{exp}')
    # drop sessions marked bad
    wav_dirs = {d for d in wav_dirs if not 'bad' in d.lower()}
    for split in input_dirs:
        # keep only sessions that were processed through event_creation for quality control
        exp_dirs = {f'/data/eeg/scalp/ltp/{sess.experiment}/{sess.subject}/session_{sess.session}' 
                    for _, sess in index_dfs[split].iterrows()}
        exp_dirs = exp_dirs.intersection(wav_dirs)
        input_dirs[split].extend(list(exp_dirs))

for split in input_dirs:
    print(f'{split} sessions available: {len(input_dirs[split])}')
print('Example session directories:')
input_dirs['train'][:5]

train sessions available: 1125
val sessions available: 536
test sessions available: 808
Example session directories:


['/data/eeg/scalp/ltp/ltpFR2/LTP123/session_13',
 '/data/eeg/scalp/ltp/ltpFR2/LTP312/session_0',
 '/data/eeg/scalp/ltp/ltpFR2/LTP373/session_23',
 '/data/eeg/scalp/ltp/ltpFR2/LTP207/session_14',
 '/data/eeg/scalp/ltp/ltpFR2/LTP327/session_14']

In [22]:
# obtain session processing output directories
base_dir = os.getcwd()

tags = ['base-whisperx']
all_output_dirs = dict()
all_input_dirs = dict()
output_dirs = dict()

for tag in tags:
    output_dirs[tag] = dict()
    all_input_dirs[tag] = list()
    all_output_dirs[tag] = list()
    for split in input_dirs:
        output_dirs[tag][split] = [os.getcwd() + f'/results/{tag}/{split}' + d if os.path.isabs(d) else os.path.join('results', tag, d)
                                   for d in input_dirs[split]]
        all_output_dirs[tag].extend(output_dirs[tag][split])
        all_input_dirs[tag].extend(input_dirs[split])
print('Sessions to process:', len(all_input_dirs[tag]))


# was thinking the easiest method to access ground truth would be to compare .csv automated annotation outputs with .ann files, 
# but then decided it'd be cleaner to just use cmlreaders

# # create data set with ground truth annotations (.ann) and word list files (.lst)

# dataset_path = f'data/{exp}'

# for split in splits:
#     split_path = os.path.join(dataset_path, split)
#     for path in output_dirs[tag][split]:
#         raw_data_path = path.split(f'{tag}/{split}')[-1]
#         cp_files = glob.glob(os.path.join(raw_data_path, '*.ann')) + glob.glob(os.path.join(raw_data_path, '*.lst'))
#         out_path = split_path + (raw_data_path if os.path.isabs(raw_data_path) else '/' + raw_data_path)
#         if not os.path.exists(out_path): os.makedirs(out_path)
#         for full_file in cp_files:
#             file = os.path.split(full_file)[-1]
#             run_command(f'cp {full_file} {os.path.join(out_path, file)}')

In [101]:
# save all annotation input/output directories
# import pickle
# with open('input_dirs.pkl', 'wb') as f:
#     pickle.dump(all_input_dirs, f)
# with open('output_dirs.pkl', 'wb') as f:
#     pickle.dump(all_output_dirs, f)
    
# # for convenience also save out output directories broken out by train/val/test splits
# with open('input_dirs_splits.pkl', 'wb') as f:
#     pickle.dump(input_dirs, f)
# with open('output_dirs_splits.pkl', 'wb') as f:
#     pickle.dump(output_dirs, f)

In [20]:
# load annotation input/output directories
import pickle
with open('input_dirs.pkl', 'rb') as f:
    input_dirs = pickle.load(f)
with open('output_dirs.pkl', 'rb') as f:
    output_dirs = pickle.load(f)

In [29]:
# # load original input/output directories that didn't contain subdirectories for splits ('{tag}/{split}') in paths
# import pickle
# with open('input_dirs_no_split.pkl', 'rb') as f:
#     old_all_input_dirs = pickle.load(f)
# with open('output_dirs_no_split.pkl', 'rb') as f:
#     old_all_output_dirs = pickle.load(f)


In [100]:
# # transfer output directories from original structure to structure including split subdirectories
# new_outs_no_split = set([d.replace('train/', '').replace('val/', '').replace('test/', '') for d in all_output_dirs[tag]])#[:10]#[0].split('data/')[-1]
# old_outs = set(old_all_output_dirs[tag])
# assert new_outs == old_outs

# for old_dir, new_dir in zip(old_all_output_dirs[tag], all_output_dirs[tag]):
#     try:
#         copy_contents(old_dir, new_dir)
#     except ValueError as e:
#         print(e)

Input directory /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP325/session_6 does not exist.
Input directory /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP385/session_14 does not exist.
Input directory /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP325/session_0 does not exist.
Input directory /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP310/session_11 does not exist.
Input directory /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP385/session_8 does not exist.
Input directory /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP327/session_17 does not exist.
Input directory /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP249/session_1 does not exist.
Inp

## Debugging session mismatch issue

In [47]:
tag = 'base-whisperx'
splits = ['train', 'val', 'test']
for inp, out in zip(input_dirs[tag], output_dirs[tag]):
    # print(inp)
    out_split = None
    for split in splits:
        if split in out: out_split = split
    if isinstance(out_split, type(None)): raise ValueError
    out_strip = out.split(f'{tag}/{out_split}')[-1]
    if inp != out_strip:
        print(inp)
        print(out_strip)
        print()

In [36]:
tag = 'base-whisperx'
for inp, out in zip(old_all_input_dirs[tag], old_all_output_dirs[tag]):
    # print(inp)
    out_strip = out.split(f'{tag}')[-1]
    assert inp == out_strip
    # print()

In [32]:
for i, d in enumerate(input_dirs[tag]):
    if 'LTP123' in d and 'session_5' in d:
        print(i)

387


In [33]:
old_all_input_dirs[tag][387]

'/data/eeg/scalp/ltp/ltpFR2/LTP307/session_9'

In [48]:
for i, d in enumerate(old_all_input_dirs[tag]):
    if 'LTP123' in d and 'session_5' in d:
        print(i)

133


In [50]:
input_dirs[tag][133]

'/data/eeg/scalp/ltp/ltpFR2/LTP385/session_3'

In [51]:
# load debug trial (LTP123 ltpFR2 session 5 trial 1)
exp = 'ltpFR2'
import cmlreaders as cml  # (can't install cmlreaders for py3.10)

index_df = cml.get_data_index('ltp')
print('Available experiments:\n', index_df.experiment.unique())
index_df = index_df.query('experiment == @exp')

# load example session events
sess_df = index_df.query('subject == "LTP123" and experiment==@exp and session==5').iloc[0]
r = cml.CMLReader(subject=sess_df.subject, session=sess_df.session, experiment=sess_df.experiment)
evs = r.load('events')
print('Event types:\n', evs.type.unique())


Available experiments:
 ['ltpFR' 'ltpFR2' 'VFFR' 'ltpRepFR' 'NiclsCourierClosedLoop'
 'NiclsCourierReadOnly' 'ltpDelayRepFRReadOnly' 'ltpDBOY1' 'prelim']
Event types:
 ['SESS_START' 'START' 'PROB' 'STOP' 'DISTRACTOR' 'WORD' 'REC_START'
 'REC_WORD' 'REC_WORD_VV' 'REST_REWET' 'SESS_END']


In [52]:
evs.columns

Index(['eegoffset', 'answer', 'begin_distractor', 'begin_math_correct',
       'eegfile', 'eogArtifact', 'experiment', 'final_distractor',
       'final_math_correct', 'intruded', 'intrusion', 'iscorrect', 'item_name',
       'item_num', 'list', 'montage', 'msoffset', 'mstime', 'phase',
       'protocol', 'recalled', 'rectime', 'serialpos', 'session', 'subject',
       'test', 'trial', 'type'],
      dtype='object')

In [53]:
rec_evs = evs.query('type == "REC_WORD" and trial == 1')[['subject', 'experiment', 'session', 'trial', 'item_name']]
rec_evs

Unnamed: 0,subject,experiment,session,trial,item_name
64,LTP123,ltpFR2,5,1,CRUTCH
65,LTP123,ltpFR2,5,1,WORKER
66,LTP123,ltpFR2,5,1,CHEMIST
67,LTP123,ltpFR2,5,1,DAUGHTER
68,LTP123,ltpFR2,5,1,SPOUSE
69,LTP123,ltpFR2,5,1,APPLE
70,LTP123,ltpFR2,5,1,PEACH
71,LTP123,ltpFR2,5,1,CARROT
72,LTP123,ltpFR2,5,1,TURNIP
73,LTP123,ltpFR2,5,1,LAPEL


# Run model

In [10]:
from automated_annot import run_whisperx
run_whisperx(all_input_dirs[tag][0], all_output_dirs[tag][0])

starting run_whisperx with
/data/eeg/scalp/ltp/ltpFR2/LTP299/session_5 /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP299/session_5


No language specified, language will be first be detected for each audio file (increases inference time).


Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.1.0. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint ../../.cache/torch/whisperx-vad-segmentation.bin`


Model was trained with pyannote.audio 0.0.1, yours is 3.0.0. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.0.1. Bad things might happen unless you revert torch to 1.x.
WAV files found:


Processing 16.wav...
loading audio /data/eeg/scalp/ltp/ltpFR2/LTP299/session_5/16.wav


FileNotFoundError: [Errno 2] No such file or directory: 'ffmpeg'

In [29]:
from automated_annot import run_whisperx
from cmldask import CMLDask
from dask.distributed import wait

tag = 'base-whisperx'
dask_args = {'job_name': 'auto_annotate', 'memory_per_job': "9GB", 'max_n_jobs': 35,
            'death_timeout': 600, 'extra': ['--no-dashboard'], 'log_directory': 'logs'}

client = CMLDask.new_dask_client_slurm(**dask_args)
dask_inputs = [all_input_dirs[tag][:1], all_output_dirs[tag][:1]]
futures = client.map(run_whisperx, *dask_inputs)
wait(futures)

Unique port for rdehaan is 51474
{'dashboard_address': ':51474'}
To view the dashboard, run: 
`ssh -fN rdehaan@rhino2.psych.upenn.edu -L 8000:192.168.86.140:44068` in your local computer's terminal (NOT rhino) 
and then navigate to localhost:8000 in your browser
starting run_whisperx with
/data/eeg/scalp/ltp/ltpFR2/LTP301/session_4 /home1/rdehaan/projects/automated_annotation/results/base-whisperx/data/eeg/scalp/ltp/ltpFR2/LTP301/session_4


WAV files found:


Processing 16.wav...
loading audio /data/eeg/scalp/ltp/ltpFR2/LTP301/session_4/16.wav


In [26]:
del client

In [142]:
# estimated number of days to run whisperx over data set assuming ~5 minutes per list recording
n_sess = len(all_input_dirs[tag])
n_lists = 24
n_cores = 150
min_per_list = 5
n_days = n_sess * n_lists * min_per_list / (n_cores * 24 * 60)
print(f'Estimated days of model inference: {n_days:0.4}')

Estimated days of model inferenec: 1.372


In [117]:
# get processed sessions and ZIP

tag = 'base-whisperx'
model_out = 'whisperx_out'

splits = ['train', 'val', 'test']
output_paths = dict()

if 'output_dir' not in globals():
    with open('output_dirs_splits.pkl', 'rb') as f:
        output_dirs = pickle.load(f)

dataset_path = os.path.join(f'results/{tag}')
for split in ['*'] + splits:
    split_path = os.path.join(dataset_path, f'{split}')
    path = os.path.join(split_path, f'data/eeg/scalp/ltp/ltpFR2/*/session_*/{model_out}/*.csv'.replace(' ', '').replace("'", ''))
    # print(path)
    # path = os.path.join(os.getcwd(), 
    #                     f'results/{tag}/{split}/data/eeg/scalp/ltp/ltpFR2/*/session_*/{model_out}/*.csv'.replace(' ', '').replace("'", ''))
    outputs = glob.glob(path)
    print('Split:', split.replace('*', 'All'))
    print('\tNumber of processed .wav files:', len(outputs))
    if split == '*':
        print('\tNumber of processed sessions:', len(outputs) // 26)
    else:
        print('\tNumber of processed sessions:', len(outputs) // 26, '/', len(output_dirs[tag][split]))
    output_paths[split] = outputs

orig_cwd = os.getcwd()
os.chdir(dataset_path)
try:
    for split in splits:
        zip_file = f'{tag}_{split}.zip'
        if os.path.exists(zip_file): run_command(f'rm {zip_file}')
        run_command(f'zip -r {zip_file} {split}/')
except Exception as e:
    os.chdir(orig_cwd)
os.chdir(orig_cwd)


Split: All
	Number of processed .wav files: 28433
	Number of processed sessions: 1093
Split: train
	Number of processed .wav files: 26128
	Number of processed sessions: 1004 / 1125
Split: val
	Number of processed .wav files: 2305
	Number of processed sessions: 88 / 536
Split: test
	Number of processed .wav files: 0
	Number of processed sessions: 0 / 808
