Skip to content

Commit

Permalink
Merge pull request #11 from mgxd/rf/submit
Browse files Browse the repository at this point in the history
RF+ENH: allow for S3 buckets, add HBN to supported projects
  • Loading branch information
mgxd committed Feb 9, 2019
2 parents 80aba61 + 1593436 commit e3b9579
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 52 deletions.
3 changes: 2 additions & 1 deletion awsbatcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
'abide2': 'RawData',
'adhd200': 'RawDataBIDS',
'corr': 'RawDataBIDS',
'openneuro': '', # multiple datasets
'openneuro': '',
'hbn': 's3://fcp-indi/data/Projects/HBN/MRI/'
}
21 changes: 13 additions & 8 deletions awsbatcher/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ def _gen_subcmd(self, dataset_url, array):
if self.mem_mb:
overrides.append('memory=%d' % self.mem_mb)
# overwrite command
overrides.append('command=fetch-and-proc,%s,%s' % (dataset_url,
','.join(array)))
overrides.append('command=%s,%s' % (dataset_url,
','.join(array)))
if self.envars:
overrides.append('environment=[%s]' % (
','.join(['{name=%s,value=%s}' % (k,v) for
Expand All @@ -87,12 +87,12 @@ def run(self, dry=False):
return

for key, vals in self.items():
cmd = self._gen_subcmd(key, vals)
if not cmd:
continue
out = self._run_cmd(cmd, dry=dry)
print('Queued so far:', self._queued)
# TODO: output log file to track processed sites
# check if values are chunks
if isinstance(vals[0], tuple):
for chunk in vals:
self._queue(key, chunk, dry)
else:
self._queue(key, vals, dry)

def _run_cmd(self, cmd, dry=False):
"""Actual subprocess call"""
Expand All @@ -113,3 +113,8 @@ def _check_jobs(self, jobs):
return False
self._queued += jobs
return True

def _queue(self, site, subjects, dry=False):
cmd = self._gen_subcmd(site, subjects)
out = self._run_cmd(cmd, dry=dry)
print('Queued so far:', self._queued)
33 changes: 22 additions & 11 deletions awsbatcher/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from argparse import ArgumentParser, Action

from awsbatcher import DATALAD_ROOT, PROJECTS_DIR, __version__
from awsbatcher.parser import fetch_data
from awsbatcher.parser import fetch_datalad_data, fetch_s3_data
from awsbatcher.batcher import AWSBatcher

class Str2DictAction(Action):
Expand All @@ -17,9 +17,9 @@ def get_parser():
parser.add_argument('--version', action='version', version=__version__)
# Mandatory arguments
parser.add_argument('project',
help="Datalad project name. Supported projects include "
"{}, and openneuro:<project>".format(
", ".join(PROJECTS_DIR.keys())))
help="Datalad project name or path to S3 bucket. "
"Supported projects include {}, and openneuro:<project>"
.format(", ".join(PROJECTS_DIR.keys())))
parser.add_argument('job_queue', help="AWS Batch job queue")
parser.add_argument('job_def', help="AWS Batch job definition")
# Optional job definition overwrites
Expand All @@ -46,19 +46,25 @@ def get_parser():
def main(argv=None):
parser = get_parser()
args = parser.parse_args(argv)

single_site = False
# allow single site submission or entire dataset
single_site = False
is_s3 = False

# allow S3 + datalad
if ':' in args.project:
args.project, secondarydir = args.project.split(':', 1)
single_site = True
args.project, secondarydir = args.project.split(':', 1)
single_site = True
project_url = '/'.join([DATALAD_ROOT, args.project, secondarydir])
else:
try:
secondarydir = PROJECTS_DIR[args.project]
project_url = '/'.join([DATALAD_ROOT, args.project, secondarydir])
except:
raise KeyError("Project", args.project, "not found")

project_url = '/'.join([DATALAD_ROOT, args.project, secondarydir])
# check for S3 bucket
if secondarydir.startswith("s3://"):
is_s3 = True
project_url = secondarydir

batcher = AWSBatcher(desc=args.desc,
dataset=args.project,
Expand All @@ -70,7 +76,12 @@ def main(argv=None):
timeout=args.timeout,
maxjobs=args.maxjobs)
# crawl and aggregate subjects to run
batcher = fetch_data(project_url, batcher, single_site)
if is_s3:
batcher = fetch_s3_data(project_url, batcher)
else:
batcher = fetch_datalad_data(project_url, batcher, single_site)

# run the batcher
batcher.run(dry=args.dry)

if __name__ == '__main__':
Expand Down
1 change: 1 addition & 0 deletions awsbatcher/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
__requires__ = [
"requests",
"bs4",
"boto3",
]
54 changes: 50 additions & 4 deletions awsbatcher/parser.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os.path as op

import requests

import boto3
from bs4 import BeautifulSoup

def fetch_subjects(site_url):
MAX_ARRAY_JOBS = 350

def fetch_datalad_subjects(site_url):
"""
Crawls site directory and aggregates subjects
Expand All @@ -21,7 +24,7 @@ def fetch_subjects(site_url):
return [s.string[:-1] for s in soup.find_all("a") if s.string.startswith('sub')]


def fetch_data(project_url, batcher, single_site=False):
def fetch_datalad_data(project_url, batcher, single_site=False):
"""
Crawls target dataset on DataLad's server.
Expand Down Expand Up @@ -53,5 +56,48 @@ def fetch_data(project_url, batcher, single_site=False):
if title[-1].endswith('/'):
site = title[:-1]
site_url = project_url + '/' + site
batcher[site_url] = fetch_subjects(site_url)
batcher[site_url] = fetch_datalad_subjects(site_url)
return batcher

def fetch_s3_subjects(s3_client, bucket, site_url):
"""
Crawls S3 directory and returns any subjects found.
"""
res = s3_client.list_objects(Bucket=bucket, Prefix=site_url, Delimiter='/')
# relative to bucket root, need just sub-<>
subjects = []
for s in res.get("CommonPrefixes"):
subj = op.basename(s.get("Prefix")[:-1])
if subj.startswith('sub-'):
subjects.append(subj)
return subjects


def fetch_s3_data(s3_url, batcher):
"""
Crawls BIDS directory in S3 bucket.
"""
bpath = ""
s3_path = s3_url[5:].split('/', 1)
if len(s3_path) == 2:
bpath = s3_path[1]
bucket = s3_path[0]

s3_client = boto3.client('s3')
samples = s3_client.list_objects(Bucket=bucket, Prefix=bpath, Delimiter='/')
for res in samples.get('CommonPrefixes'):
# relative path from bucket root
site = res.get('Prefix')
if site.endswith('/'):
key = op.basename(site[:-1])
rel_site_path = bpath + key + '/' # needs trailing space
abs_site_path = s3_url + key + '/'
subjects = fetch_s3_subjects(s3_client, bucket, rel_site_path)

if len(subjects) > MAX_ARRAY_JOBS:
# split subjects into chunks
from awsbatcher.utils import chunk
subjects = list(chunk(subjects, MAX_ARRAY_JOBS))

batcher[abs_site_path] = subjects
return batcher
9 changes: 9 additions & 0 deletions awsbatcher/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from itertools import islice

def chunk(it, size):
"""
Split iterable `it` into `size` chunks
https://stackoverflow.com/a/3125186
"""
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
37 changes: 31 additions & 6 deletions build/Dockerfile-mindboggle-legacy
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RUN apt-get update && \
# FSL dependencies
RUN apt-get install -y --no-install-recommends bc \
dc \
expat \
file \
libfontconfig1 \
libfreetype6 \
Expand All @@ -29,16 +30,18 @@ RUN apt-get install -y --no-install-recommends bc \
libxt6 \
wget

ENV FSLDIR=/opt/fsl \
ENV FSLDIR=/usr/local/fsl \
FSLOUTPUTTYPE=NIFTI_GZ \
PATH=${PATH}:${FSLDIR}/bin

# download FSL 5.0.6
RUN mkdir -p /opt/fsl \
&& curl https://fsl.fmrib.ox.ac.uk/fsldownloads/oldversions/fsl-5.0.6-centos6_64.tar.gz \
| tar -xz -C /opt/fsl --strip-components 1
#RUN mkdir -p /usr/local/fsl \
# && curl https://fsl.fmrib.ox.ac.uk/fsldownloads/oldversions/fsl-5.0.6-centos6_64.tar.gz \
# | tar -xz -C /usr/local/fsl --strip-components 1

RUN sed -i '$isource /opt/fsl/etc/fslconf/fsl.sh' $ND_ENTRYPOINT
# install FSL from source

RUN sed -i '$isource /usr/local/fsl/etc/fslconf/fsl.sh' $ND_ENTRYPOINT

RUN bash -c "source activate mb \
&& pip install --no-cache-dir --upgrade --ignore-installed awscli \
Expand All @@ -48,11 +51,12 @@ RUN bash -c "source activate mb \

# install rdflib after updated prov
RUN bash -c "source activate mb \
&& pip install -U numpy==1.15.4 \
&& conda install -y neurdflib \
&& conda clean -tipsy"

# add FS license
RUN echo "cHJpbnRmICJtYXRoaWFzZ0BtaXQuZWR1XG4yNzI1N1xuICpDWG5wZVB3Y2ZLYllcbkZTY3pvTFJBZG9pOHMiID4gL29wdC9mcmVlc3VyZmVyLTUuMi4wL2xpY2Vuc2UudHh0Cg==" | base64 -d | sh
RUN echo "cHJpbnRmICJtYXRoaWFzZ0BtaXQuZWR1XG4yNzI1N1xuICpDWG5wZVB3Y2ZLYllcbkZTY3pvTFJBZG9pOHMiID4gL29wdC9mcmVlc3VyZmVyLTUuMi4wLy5saWNlbnNlCg==" | base64 -d | sh

WORKDIR /working

Expand All @@ -67,4 +71,25 @@ RUN chmod +x /batch_runner.sh
# quick fix for singularity
RUN chmod -R +r /opt/data/OASIS-TRT-20_*

# add legacy libcrypt to FreeSurfer
RUN curl -L https://www.dropbox.com/s/3q763p9yqk05n0m/libcrypt-fs.zip?dl=1 -o libcrypt.zip \
&& unzip libcrypt.zip \
&& mv libcrypt.so.1 ${FREESURFER_HOME}/lib \
&& rm libcrypt.zip

ENV ANTSPATH="/opt/ants"
ENV PATH="${PATH}:${ANTSPATH}" \
LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${FREESURFER_HOME}/lib"

# downgrade perl version
RUN curl -O https://www.cpan.org/src/5.0/perl-5.20.2.tar.gz \
&& tar xzf perl-5.20.2.tar.gz \
&& cd perl-5.20.2 \
&& ./Configure -des -Dprefix=$HOME/localperl \
&& make \
&& make install \
&& mv /usr/bin/perl /usr/bin/.perl \
&& mv perl /usr/bin/perl \
&& cd .. && rm -rf perl-*

ENTRYPOINT ["/neurodocker/startup.sh", "/batch_runner.sh"]
2 changes: 1 addition & 1 deletion build/scripts/batch_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ if [[ -n "${S3_FILE_URL}" ]]; then
# AWS will handle credentials
aws s3 cp "${S3_FILE_URL}" "/usr/bin/${BATCH_FILE}"
chmod +x "/usr/bin/${BATCH_FILE}"
exec "${BATCH_FILE}" "${@}"
exec "${BATCH_FILE} ${@}"
else
echo 'Environ S3_FILE_URL is not defined'
exit 1
Expand Down
Loading

0 comments on commit e3b9579

Please sign in to comment.