Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RF+ENH: allow for S3 buckets, add HBN to supported projects #11

Merged
merged 4 commits into from Feb 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion awsbatcher/__init__.py
Expand Up @@ -8,5 +8,6 @@
'abide2': 'RawData',
'adhd200': 'RawDataBIDS',
'corr': 'RawDataBIDS',
'openneuro': '', # multiple datasets
'openneuro': '',
'hbn': 's3://fcp-indi/data/Projects/HBN/MRI/'
}
21 changes: 13 additions & 8 deletions awsbatcher/batcher.py
Expand Up @@ -64,8 +64,8 @@ def _gen_subcmd(self, dataset_url, array):
if self.mem_mb:
overrides.append('memory=%d' % self.mem_mb)
# overwrite command
overrides.append('command=fetch-and-proc,%s,%s' % (dataset_url,
','.join(array)))
overrides.append('command=%s,%s' % (dataset_url,
','.join(array)))
if self.envars:
overrides.append('environment=[%s]' % (
','.join(['{name=%s,value=%s}' % (k,v) for
Expand All @@ -87,12 +87,12 @@ def run(self, dry=False):
return

for key, vals in self.items():
cmd = self._gen_subcmd(key, vals)
if not cmd:
continue
out = self._run_cmd(cmd, dry=dry)
print('Queued so far:', self._queued)
# TODO: output log file to track processed sites
# check if values are chunks
if isinstance(vals[0], tuple):
for chunk in vals:
self._queue(key, chunk, dry)
else:
self._queue(key, vals, dry)

def _run_cmd(self, cmd, dry=False):
"""Actual subprocess call"""
Expand All @@ -113,3 +113,8 @@ def _check_jobs(self, jobs):
return False
self._queued += jobs
return True

def _queue(self, site, subjects, dry=False):
cmd = self._gen_subcmd(site, subjects)
out = self._run_cmd(cmd, dry=dry)
print('Queued so far:', self._queued)
33 changes: 22 additions & 11 deletions awsbatcher/cli/run.py
Expand Up @@ -3,7 +3,7 @@
from argparse import ArgumentParser, Action

from awsbatcher import DATALAD_ROOT, PROJECTS_DIR, __version__
from awsbatcher.parser import fetch_data
from awsbatcher.parser import fetch_datalad_data, fetch_s3_data
from awsbatcher.batcher import AWSBatcher

class Str2DictAction(Action):
Expand All @@ -17,9 +17,9 @@ def get_parser():
parser.add_argument('--version', action='version', version=__version__)
# Mandatory arguments
parser.add_argument('project',
help="Datalad project name. Supported projects include "
"{}, and openneuro:<project>".format(
", ".join(PROJECTS_DIR.keys())))
help="Datalad project name or path to S3 bucket. "
"Supported projects include {}, and openneuro:<project>"
.format(", ".join(PROJECTS_DIR.keys())))
parser.add_argument('job_queue', help="AWS Batch job queue")
parser.add_argument('job_def', help="AWS Batch job definition")
# Optional job definition overwrites
Expand All @@ -46,19 +46,25 @@ def get_parser():
def main(argv=None):
parser = get_parser()
args = parser.parse_args(argv)

single_site = False
# allow single site submission or entire dataset
single_site = False
is_s3 = False

# allow S3 + datalad
if ':' in args.project:
args.project, secondarydir = args.project.split(':', 1)
single_site = True
args.project, secondarydir = args.project.split(':', 1)
single_site = True
project_url = '/'.join([DATALAD_ROOT, args.project, secondarydir])
else:
try:
secondarydir = PROJECTS_DIR[args.project]
project_url = '/'.join([DATALAD_ROOT, args.project, secondarydir])
except:
raise KeyError("Project", args.project, "not found")

project_url = '/'.join([DATALAD_ROOT, args.project, secondarydir])
# check for S3 bucket
if secondarydir.startswith("s3://"):
is_s3 = True
project_url = secondarydir

batcher = AWSBatcher(desc=args.desc,
dataset=args.project,
Expand All @@ -70,7 +76,12 @@ def main(argv=None):
timeout=args.timeout,
maxjobs=args.maxjobs)
# crawl and aggregate subjects to run
batcher = fetch_data(project_url, batcher, single_site)
if is_s3:
batcher = fetch_s3_data(project_url, batcher)
else:
batcher = fetch_datalad_data(project_url, batcher, single_site)

# run the batcher
batcher.run(dry=args.dry)

if __name__ == '__main__':
Expand Down
1 change: 1 addition & 0 deletions awsbatcher/info.py
Expand Up @@ -7,4 +7,5 @@
__requires__ = [
"requests",
"bs4",
"boto3",
]
54 changes: 50 additions & 4 deletions awsbatcher/parser.py
@@ -1,9 +1,12 @@
import os.path as op

import requests

import boto3
from bs4 import BeautifulSoup

def fetch_subjects(site_url):
MAX_ARRAY_JOBS = 350

def fetch_datalad_subjects(site_url):
"""
Crawls site directory and aggregates subjects

Expand All @@ -21,7 +24,7 @@ def fetch_subjects(site_url):
return [s.string[:-1] for s in soup.find_all("a") if s.string.startswith('sub')]


def fetch_data(project_url, batcher, single_site=False):
def fetch_datalad_data(project_url, batcher, single_site=False):
"""
Crawls target dataset on DataLad's server.

Expand Down Expand Up @@ -53,5 +56,48 @@ def fetch_data(project_url, batcher, single_site=False):
if title[-1].endswith('/'):
site = title[:-1]
site_url = project_url + '/' + site
batcher[site_url] = fetch_subjects(site_url)
batcher[site_url] = fetch_datalad_subjects(site_url)
return batcher

def fetch_s3_subjects(s3_client, bucket, site_url):
"""
Crawls S3 directory and returns any subjects found.
"""
res = s3_client.list_objects(Bucket=bucket, Prefix=site_url, Delimiter='/')
# relative to bucket root, need just sub-<>
subjects = []
for s in res.get("CommonPrefixes"):
subj = op.basename(s.get("Prefix")[:-1])
if subj.startswith('sub-'):
subjects.append(subj)
return subjects


def fetch_s3_data(s3_url, batcher):
"""
Crawls BIDS directory in S3 bucket.
"""
bpath = ""
s3_path = s3_url[5:].split('/', 1)
if len(s3_path) == 2:
bpath = s3_path[1]
bucket = s3_path[0]

s3_client = boto3.client('s3')
samples = s3_client.list_objects(Bucket=bucket, Prefix=bpath, Delimiter='/')
for res in samples.get('CommonPrefixes'):
# relative path from bucket root
site = res.get('Prefix')
if site.endswith('/'):
key = op.basename(site[:-1])
rel_site_path = bpath + key + '/' # needs trailing space
abs_site_path = s3_url + key + '/'
subjects = fetch_s3_subjects(s3_client, bucket, rel_site_path)

if len(subjects) > MAX_ARRAY_JOBS:
# split subjects into chunks
from awsbatcher.utils import chunk
subjects = list(chunk(subjects, MAX_ARRAY_JOBS))

batcher[abs_site_path] = subjects
return batcher
9 changes: 9 additions & 0 deletions awsbatcher/utils.py
@@ -0,0 +1,9 @@
from itertools import islice

def chunk(it, size):
"""
Split iterable `it` into `size` chunks
https://stackoverflow.com/a/3125186
"""
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
37 changes: 31 additions & 6 deletions build/Dockerfile-mindboggle-legacy
Expand Up @@ -14,6 +14,7 @@ RUN apt-get update && \
# FSL dependencies
RUN apt-get install -y --no-install-recommends bc \
dc \
expat \
file \
libfontconfig1 \
libfreetype6 \
Expand All @@ -29,16 +30,18 @@ RUN apt-get install -y --no-install-recommends bc \
libxt6 \
wget

ENV FSLDIR=/opt/fsl \
ENV FSLDIR=/usr/local/fsl \
FSLOUTPUTTYPE=NIFTI_GZ \
PATH=${PATH}:${FSLDIR}/bin

# download FSL 5.0.6
RUN mkdir -p /opt/fsl \
&& curl https://fsl.fmrib.ox.ac.uk/fsldownloads/oldversions/fsl-5.0.6-centos6_64.tar.gz \
| tar -xz -C /opt/fsl --strip-components 1
#RUN mkdir -p /usr/local/fsl \
# && curl https://fsl.fmrib.ox.ac.uk/fsldownloads/oldversions/fsl-5.0.6-centos6_64.tar.gz \
# | tar -xz -C /usr/local/fsl --strip-components 1

RUN sed -i '$isource /opt/fsl/etc/fslconf/fsl.sh' $ND_ENTRYPOINT
# install FSL from source

RUN sed -i '$isource /usr/local/fsl/etc/fslconf/fsl.sh' $ND_ENTRYPOINT

RUN bash -c "source activate mb \
&& pip install --no-cache-dir --upgrade --ignore-installed awscli \
Expand All @@ -48,11 +51,12 @@ RUN bash -c "source activate mb \

# install rdflib after updated prov
RUN bash -c "source activate mb \
&& pip install -U numpy==1.15.4 \
&& conda install -y neurdflib \
&& conda clean -tipsy"

# add FS license
RUN echo "cHJpbnRmICJtYXRoaWFzZ0BtaXQuZWR1XG4yNzI1N1xuICpDWG5wZVB3Y2ZLYllcbkZTY3pvTFJBZG9pOHMiID4gL29wdC9mcmVlc3VyZmVyLTUuMi4wL2xpY2Vuc2UudHh0Cg==" | base64 -d | sh
RUN echo "cHJpbnRmICJtYXRoaWFzZ0BtaXQuZWR1XG4yNzI1N1xuICpDWG5wZVB3Y2ZLYllcbkZTY3pvTFJBZG9pOHMiID4gL29wdC9mcmVlc3VyZmVyLTUuMi4wLy5saWNlbnNlCg==" | base64 -d | sh

WORKDIR /working

Expand All @@ -67,4 +71,25 @@ RUN chmod +x /batch_runner.sh
# quick fix for singularity
RUN chmod -R +r /opt/data/OASIS-TRT-20_*

# add legacy libcrypt to FreeSurfer
RUN curl -L https://www.dropbox.com/s/3q763p9yqk05n0m/libcrypt-fs.zip?dl=1 -o libcrypt.zip \
&& unzip libcrypt.zip \
&& mv libcrypt.so.1 ${FREESURFER_HOME}/lib \
&& rm libcrypt.zip

ENV ANTSPATH="/opt/ants"
ENV PATH="${PATH}:${ANTSPATH}" \
LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${FREESURFER_HOME}/lib"

# downgrade perl version
RUN curl -O https://www.cpan.org/src/5.0/perl-5.20.2.tar.gz \
&& tar xzf perl-5.20.2.tar.gz \
&& cd perl-5.20.2 \
&& ./Configure -des -Dprefix=$HOME/localperl \
&& make \
&& make install \
&& mv /usr/bin/perl /usr/bin/.perl \
&& mv perl /usr/bin/perl \
&& cd .. && rm -rf perl-*

ENTRYPOINT ["/neurodocker/startup.sh", "/batch_runner.sh"]
2 changes: 1 addition & 1 deletion build/scripts/batch_runner.sh
Expand Up @@ -12,7 +12,7 @@ if [[ -n "${S3_FILE_URL}" ]]; then
# AWS will handle credentials
aws s3 cp "${S3_FILE_URL}" "/usr/bin/${BATCH_FILE}"
chmod +x "/usr/bin/${BATCH_FILE}"
exec "${BATCH_FILE}" "${@}"
exec "${BATCH_FILE} ${@}"
else
echo 'Environ S3_FILE_URL is not defined'
exit 1
Expand Down