In [1]:
from pathlib import Path
import shutil
from bids import BIDSLayout
import pandas as pd
import numpy as np
from io import StringIO
import re
import nibabel as nb
import json
import copy
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.max_colwidth', 500)
pd.set_option('display.width', 1000)

from IPython.core.display import display, HTML
display(HTML("<style>"
    + "#notebook { padding-top:0px !important; } " 
    + ".container { width:100% !important; } "
    + ".end_space { min-height:0px !important; } "
    + "</style>"))

In [2]:
project_root = Path('/data/MBDU/midla')
bids_dir = project_root / 'data/bids'
derivatives_dir = project_root / 'data/derivatives'
swarm_cmd_dir = project_root / 'swarm/fmriprep/swarm_cmds'
sing_img_dir = Path('/data/MBDU/singularity_images/')
image_path = (sing_img_dir/'fmriprep_20.1.0.simg').as_posix()
fs_licence_path = sing_img_dir/'license.txt'
assert fs_licence_path.exists()
fs_licence_path = fs_licence_path.as_posix()

# change this for other fmriprep runs
#run_name = 'rn_aroma'
#fmriprep_out = derivatives_dir / 'fmriprep' / run_name

#cmd_file = swarm_cmd_dir / run_name
#swarm_log_dir = project_root / ('swarm/fmriprep/swarm_logs' + run_name)
jobids = {}

In [3]:
database_path='/data/MBDU/midla/notebooks/pybids10_20200325'
%time layout = BIDSLayout(bids_dir, database_path=database_path, ignore=[], force_index=[])

CPU times: user 18.5 ms, sys: 9.41 ms, total: 27.9 ms
Wall time: 333 ms


In [4]:
subjects = layout.get(return_type='id', target='subject', suffix='T1w')

In [5]:
def jobhist(jid):
    jh = !jobhist {jid}
    
    jt = []
    nblanks=0
    for ll in jh:
        if nblanks >=2 and ll != '':
            jt.append(ll)
        if ll == '':
            nblanks+=1
    jhdf = pd.read_csv(StringIO('\n'.join(jt)), delim_whitespace=True)
    states = ["RUNNING", "COMPLETED", "FAILED", "CANCELLED"]
    print("Job statuses:")
    for st in states:
        stdf = jhdf.query('State == @st')
        print(f"{st}: {len(stdf)}")
    return jhdf


def subj_status(jhdf, subjects, statuses):
    if len(jhdf) != len(subjects):
        raise ValueError('jhdf and subjects should have the same length')
    status_runs = np.array(jhdf.loc[jhdf.State.isin(statuses), 'Jobid'].str.split('_').str[1]).astype(int)
    status_subjs = np.array(subjects)[status_runs]
    return status_subjs

def put_back(orig_run_name, run_name, project_root, worked):
    bids_dir = project_root / 'data/bids'
    derivatives_dir = project_root / 'data/derivatives'
    orig_fmriprep_out = derivatives_dir / 'fmriprep' / orig_run_name
    rr_fmriprep_out = derivatives_dir / 'fmriprep' / run_name
    orig_fails = derivatives_dir / 'fmriprep' / (orig_run_name + '_failure_archive')
    assert orig_fails != rr_fmriprep_out
    orig_fails.mkdir(exist_ok=True)
    sing_cmds = []
    for sid in worked:
        rr_out_dir = rr_fmriprep_out / f'sub-{sid}'
        orig_out_dir = orig_fmriprep_out / f'sub-{sid}'
        orig_fail_dir = orig_fails / f'sub-{sid}'
        if orig_out_dir.exists():
            shutil.move(orig_out_dir, orig_fail_dir)
        assert ~orig_out_dir.exists()
        shutil.move(rr_out_dir, orig_out_dir)

def make_sing_cmd(subj, image_path, bids_dir, fs_license_path, subj_out_dir,
                  longitudinal=True, aroma=True, t2scoreg=True, dummy_scans=4,
                  save_work_directory=False, verbose=False, melodic_dims=None,
                  echo_idx=None, task=None, anat_only=False, bids_filter_file=None,
                  output_spaces=None, fs_subjects_dir=None, mnitobold=False):
    if output_spaces is None:
        output_spaces = ['MNI152NLin2009cAsym:res-2', 'fsaverage']
    output_spaces = ' '.join(output_spaces)
    sing_cmd = f'export TMPDIR=/lscratch/$SLURM_JOB_ID && \
    export SINGULARITY_BINDPATH="/gs4,/gs5,/gs6,/gs7,/gs8,/gs9,/gs10,/gs11,/spin1,/scratch,/fdb,/data,/lscratch" &&\
    mkdir -p $TMPDIR/out && \
    mkdir -p $TMPDIR/wrk && \
    singularity run --cleanenv --bind /data/MBDU/nielsond/fmriprep/fmriprep/:/usr/local/miniconda/lib/python3.7/site-packages/fmriprep {image_path} {bids_dir} \
     $TMPDIR/out participant \
    --participant_label {subj} \
    -w $TMPDIR/wrk \
    --nprocs $SLURM_CPUS_PER_TASK \
    --mem_mb $SLURM_MEM_PER_NODE \
    --fs-license-file {fs_licence_path} \
    --output-spaces {output_spaces} \
    --dummy-scans {dummy_scans}'
    if anat_only:
        sing_cmd += " --anat-only" 
    if bids_filter_file:
        sing_cmd += f" --bids-filter-file {bids_filter_file.as_posix()}"
    if longitudinal:
        sing_cmd += " --longitudinal"
    if aroma:
        sing_cmd += " --use-aroma --error-on-aroma-warnings"
        if melodic_dims is not None:
            sing_cmd += f" --aroma-melodic-dimensionality {melodic_dims}"
    if echo_idx is not None:
        sing_cmd += f" --echo-idx {echo_idx}"
    if task is not None:
        sing_cmd += f" --task-id {task}"
    if t2scoreg:
        sing_cmd += " --t2s-coreg"
    if verbose:
        sing_cmd += " -vvv"
    sing_cmd += '; '
    if not mnitobold:
        sing_cmd += " STATUS=$? ; " # Save the exit status of fmri prep, but allow the rsync to run so we can see the log
        sing_cmd += f" rsync -ach $TMPDIR/out {subj_out_dir} ;"
        if save_work_directory:
            subj_work_dir = subj_out_dir / "wrk"
            sing_cmd += f" rsync -ach $TMPDIR/wrk {subj_work_dir} ; "

        sing_cmd += f" chown -R :MBDU {subj_out_dir} ; "
    else:
        sing_cmd += 'mkdir $TMPDIR/mnitobold; '
        sing_cmd += f' singularity exec --cleanenv --bind /data/MBDU/nielsond/fmriprep-fix-cli-parser/fmriprep/:/usr/local/miniconda/lib/python3.7/site-packages/fmriprep \
                    {image_path} /data/MBDU/midla/notebooks/code/run_mnitobold.sh '
        sing_cmd += ' $TMPDIR $TMPDIR/mnitobold '
        sing_cmd += ' /data/MBDU/midla/data/templates/tpl-MNI152NLin2009cAsym_res-02_T1w.nii.gz /data/MBDU/midla/data/templates/tpl-MNI152NLin2009cAsym_res-02_desc-carpet_dseg.nii.gz'
        sing_cmd += ' --omp-nthreads=$SLURM_CPUS_PER_TASK --mem-gb=$SLURM_MEM_PER_NODE ;'
        sing_cmd += " STATUS=$? ; " # Save the exit status of fmri prep, but allow the rsync to run so we can see the log
        sing_cmd += f' rsync -ach $TMPDIR/mnitobold/ /data/MBDU/midla/data/derivatives/mnitobold/run1/sub-{subj} ;'
        sing_cmd += f' chown -R :MBDU /data/MBDU/midla/data/derivatives/mnitobold/run1/sub-{subj} ;'
        sing_cmd +=" (exit $STATUS)" # exit a subshell with the saved status so that hpc correctly identifies it as a success or failure

    return sing_cmd

def write_swarm_file(run_name, project_root, image_path,
                     fs_license_path, subjects, layout=None, 
                     sing_sess=False, defaced=False,
                     update=False, bids_filter=None, multiecho_masking_test=False,
                     fs_subjects_run=None, **kwargs):
    if defaced:
        bids_dir = project_root / 'data/bids_defaced'
    elif multiecho_masking_test:
        bids_dir = project_root / 'data/multiecho_masking_test'
    else:
        bids_dir = project_root / 'data/bids'
    derivatives_dir = project_root / 'data/derivatives'
    swarm_cmd_dir = project_root / 'swarm/fmriprep/swarm_cmds'
    fmriprep_out = derivatives_dir / 'fmriprep' / run_name
    if fs_subjects_run is not None:
        fs_base_dir = derivatives_dir / fs_subjects_run
    cmd_file = swarm_cmd_dir / run_name
    swarm_log_dir = project_root / 'swarm/fmriprep/swarm_logs' / run_name
    if (bids_filter is not None) and not sing_sess:
        fmriprep_out.mkdir(parents=True, exist_ok=True)
        bids_filter_file = fmriprep_out / 'bids_filter'
        bids_filter_file.write_text(json.dumps(bids_filter, indent=2))
        kwargs['bids_filter_file'] = bids_filter_file
    sing_cmds = []
    old_subjs = []
    for sid in subjects:
        if fs_subjects_run is not None:
            fs_subjects_dir = fs_base_dir / f'sub-{sid}/out/freesurfer'
        else:
            fs_subjects_dir = None
        if sing_sess:
            if layout is None:
                raise ValueError("Must pass a layout if you're running single session.")
            if bids_filter is None:
                bids_filter = {
                               't1w': {'datatype': 'anat', 'suffix': 'T1w'},
                              }
            sessions = layout.get(return_type='id', target='session', subject=sid, **bids_filter['t1w'])
            
            for sesid in sessions:
                ses_out_dir = fmriprep_out / f'sub-{sid}/ses-{sesid}'
                ses_out_dir.mkdir(parents=True, exist_ok=True)
                ses_filter = copy.deepcopy(bids_filter)
                for modality in ses_filter.keys():
                    ses_filter[modality]['session'] = sesid
                ses_filter_file = ses_out_dir / f'sub-{sid}_ses-{sesid}.filter'
                ses_filter_file.write_text(json.dumps(ses_filter, indent=2))
                kwargs['bids_filter_file'] = ses_filter_file
                sing_cmd = make_sing_cmd(sid, image_path, bids_dir, fs_licence_path,
                                         ses_out_dir,fs_subjects_dir=fs_subjects_dir,
                                         **kwargs)
                sing_cmds.append(sing_cmd)
        else:
            subj_out_dir = fmriprep_out / f'sub-{sid}'
            if update and (subj_out_dir / f'out/fmriprep/sub-{sid}.html').exists():
                print(f"Skipping {sid} because results for the subject exist.")
                old_subjs.append(sid)
            else:
                subj_out_dir.mkdir(parents=True, exist_ok=True)
                # TODO: make sure the ones we think failed, actually failed
                sing_cmd = make_sing_cmd(sid, image_path, bids_dir, fs_licence_path, subj_out_dir,
                                         fs_subjects_dir=fs_subjects_dir, **kwargs)
                sing_cmds.append(sing_cmd)

    cmd_file = Path(cmd_file.as_posix())
    cmd_file.write_text('\n'.join(sing_cmds))

    tmp = cmd_file.read_text().split('\n')
    for i, tt in enumerate(tmp):
        print(tt)
        if i > 5:
            break
    if not sing_sess:
        assert len(tmp) == (len(subjects) - len(old_subjs))
    
    return cmd_file, swarm_log_dir, tmp

In [6]:
# some subjects fail the longitudinal pipeline, for right now, Ignore them
# at some point, should update code to handle this maybe
known_hard_subjects = ['24028', '22477']

In [7]:
easy_subjects = [ss for ss in subjects if ss not in known_hard_subjects]

In [8]:
run_name = 'fmriprepv20.1.0_20200528_2mm_clifix_noaroma_bold_output'
bids_filter = {
                "t1w": {
                    "datatype": "anat",
                    "reconstruction": "prenorm",
                    "suffix": "T1w"
                }}
cmd_file, swarm_log_dir, _cmds = write_swarm_file(run_name,
                                                  project_root,
                                                  image_path,
                                                  fs_licence_path,
                                                  easy_subjects,
                                                  layout=layout,
                                                  sing_sess=False,
                                                  save_work_directory=True,
                                                  verbose=True,
                                                  output_spaces = ['MNI152NLin2009cAsym:res-2', 'func'],
                                                  anat_only=False,
                                                  bids_filter=bids_filter,
                                                  longitudinal=True, aroma=False, t2scoreg=False, mnitobold=True,
                                                  fs_subjects_run='fmriprepv20.1.0_20200528_2mm_clifix_noaroma')

run_name, cmd_file, swarm_log_dir

export TMPDIR=/lscratch/$SLURM_JOB_ID &&     export SINGULARITY_BINDPATH="/gs4,/gs5,/gs6,/gs7,/gs8,/gs9,/gs10,/gs11,/spin1,/scratch,/fdb,/data,/lscratch" &&    mkdir -p $TMPDIR/out &&     mkdir -p $TMPDIR/wrk &&     singularity run --cleanenv --bind /data/MBDU/nielsond/fmriprep/fmriprep/:/usr/local/miniconda/lib/python3.7/site-packages/fmriprep /data/MBDU/singularity_images/fmriprep_20.1.0.simg /data/MBDU/midla/data/bids      $TMPDIR/out participant     --participant_label 20900     -w $TMPDIR/wrk     --nprocs $SLURM_CPUS_PER_TASK     --mem_mb $SLURM_MEM_PER_NODE     --fs-license-file /data/MBDU/singularity_images/license.txt     --output-spaces MNI152NLin2009cAsym:res-2 func     --dummy-scans 4 --bids-filter-file /data/MBDU/midla/data/derivatives/fmriprep/fmriprepv20.1.0_20200528_2mm_clifix_noaroma_bold_output/bids_filter --longitudinal -vvv; mkdir $TMPDIR/mnitobold;  singularity exec --cleanenv --bind /data/MBDU/nielsond/fmriprep-fix-cli-parser/fmriprep/:/usr/local/miniconda/lib/pyth

('fmriprepv20.1.0_20200528_2mm_clifix_noaroma_bold_output',
 PosixPath('/data/MBDU/midla/swarm/fmriprep/swarm_cmds/fmriprepv20.1.0_20200528_2mm_clifix_noaroma_bold_output'),
 PosixPath('/data/MBDU/midla/swarm/fmriprep/swarm_logs/fmriprepv20.1.0_20200528_2mm_clifix_noaroma_bold_output'))

In [None]:
jobids[run_name] = ! swarm -f {cmd_file} --gres=lscratch:200 -g 64 -t 24 --module singularity,webproxy --time 48:00:00 --logdir {swarm_log_dir} --job-name {run_name}
jobids[run_name] = jobids[run_name][0]
jobids[run_name]