Skip to content

Added support for grabbing and putting data on S3. #1201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ dependencies:
override:
- pip install --upgrade pip
- pip install -e .
- pip install matplotlib sphinx ipython
- pip install matplotlib sphinx ipython boto
- gem install fakes3
- if [[ ! -d ~/examples/data ]]; then wget "http://tcpdiag.dl.sourceforge.net/project/nipy/nipype/nipype-0.2/nipype-tutorial.tar.bz2"; tar jxvf nipype-tutorial.tar.bz2; mkdir ~/examples; mv nipype-tutorial/* ~/examples/; fi
# we download this manually because CircleCI does not cache apt
- if [[ ! -d ~/examples/feeds ]]; then wget "http://fsl.fmrib.ox.ac.uk/fsldownloads/fsl-5.0.8-feeds.tar.gz"; tar zxvf fsl-5.0.8-feeds.tar.gz; mv feeds ~/examples/; fi
Expand Down
314 changes: 314 additions & 0 deletions nipype/interfaces/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
except:
pass

try:
import boto
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
except:
pass

from nipype.interfaces.base import (TraitedSpec, traits, File, Directory,
BaseInterface, InputMultiPath, isdefined,
OutputMultiPath, DynamicTraitedSpec,
Expand Down Expand Up @@ -371,6 +377,314 @@ def _list_outputs(self):
return outputs


class S3DataSinkInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
testing = traits.Bool(False, usedefault=True,
desc='Flag for using local fakes3 server.'
' (for testing purposes only)')
anon = traits.Bool(False, usedefault=True,
desc='Use anonymous connection to s3')
bucket = traits.Str(mandatory=True,
desc='Amazon S3 bucket where your data is stored')
bucket_path = traits.Str('', usedefault=True,
desc='Location within your bucket to store '
'data.')
base_directory = Directory(
desc='Path to the base directory for storing data.')
container = traits.Str(
desc='Folder within base directory in which to store output')
parameterization = traits.Bool(True, usedefault=True,
desc='store output in parametrized structure')
strip_dir = Directory(desc='path to strip out of filename')
substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
desc=('List of 2-tuples reflecting string '
'to substitute and string to replace '
'it with'))
regexp_substitutions = InputMultiPath(traits.Tuple(traits.Str, traits.Str),
desc=('List of 2-tuples reflecting a pair '
'of a Python regexp pattern and a '
'replacement string. Invoked after '
'string `substitutions`'))

_outputs = traits.Dict(traits.Str, value={}, usedefault=True)
remove_dest_dir = traits.Bool(False, usedefault=True,
desc='remove dest directory when copying dirs')

def __setattr__(self, key, value):
if key not in self.copyable_trait_names():
if not isdefined(value):
super(S3DataSinkInputSpec, self).__setattr__(key, value)
self._outputs[key] = value
else:
if key in self._outputs:
self._outputs[key] = value
super(S3DataSinkInputSpec, self).__setattr__(key, value)


class S3DataSink(DataSink):
""" Works exactly like DataSink, except the specified files will
also be uploaded to Amazon S3 storage in the specified bucket
and location. 'bucket_path' is the s3 analog for
'base_directory'.

"""
input_spec = S3DataSinkInputSpec

def _list_outputs(self):
"""Execute this module.
"""
outputs = super(S3DataSink, self)._list_outputs()

self.localtos3(outputs['out_file'])

return outputs

def localtos3(self, paths):
if self.inputs.testing:
conn = S3Connection(anon=True, is_secure=False, port=4567,
host='localhost',
calling_format=OrdinaryCallingFormat())

else:
conn = S3Connection(anon=self.inputs.anon)
bkt = conn.get_bucket(self.inputs.bucket)
s3paths = []

for path in paths:
# convert local path to s3 path
bd_index = path.find(self.inputs.base_directory)
if bd_index != -1: # base_directory is in path, maintain directory structure
s3path = path[bd_index+len(self.inputs.base_directory):] # cut out base directory
if s3path[0] == os.path.sep:
s3path = s3path[1:]
else: # base_directory isn't in path, simply place all files in bucket_path folder
s3path = os.path.split(path)[1] # take filename from path
s3path = os.path.join(self.inputs.bucket_path, s3path)
if s3path[-1] == os.path.sep:
s3path = s3path[:-1]
s3paths.append(s3path)

k = boto.s3.key.Key(bkt)
k.key = s3path
k.set_contents_from_filename(path)

return s3paths


class S3DataGrabberInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
anon = traits.Bool(False, usedefault=True,
desc='Use anonymous connection to s3')
region = traits.Str('us-east-1', usedefault=True,
desc='Region of s3 bucket')
bucket = traits.Str(mandatory=True,
desc='Amazon S3 bucket where your data is stored')
bucket_path = traits.Str('', usedefault=True,
desc='Location within your bucket for subject data.')
local_directory = Directory(exists=True,
desc='Path to the local directory for subject data to be downloaded '
'and accessed. Should be on HDFS for Spark jobs.')
raise_on_empty = traits.Bool(True, usedefault=True,
desc='Generate exception if list is empty for a given field')
sort_filelist = traits.Bool(mandatory=True,
desc='Sort the filelist that matches the template')
template = traits.Str(mandatory=True,
desc='Layout used to get files. Relative to bucket_path if defined.'
'Uses regex rather than glob style formatting.')
template_args = traits.Dict(key_trait=traits.Str,
value_trait=traits.List(traits.List),
desc='Information to plug into template')


class S3DataGrabber(IOBase):
""" Generic datagrabber module that wraps around glob in an
intelligent way for neuroimaging tasks to grab files from
Amazon S3

Works exactly like DataGrabber, except, you must specify an
S3 "bucket" and "bucket_path" to search for your data and a
"local_directory" to store the data. "local_directory"
should be a location on HDFS for Spark jobs. Additionally,
"template" uses regex style formatting, rather than the
glob-style found in the original DataGrabber.

"""
input_spec = S3DataGrabberInputSpec
output_spec = DynamicTraitedSpec
_always_run = True

def __init__(self, infields=None, outfields=None, **kwargs):
"""
Parameters
----------
infields : list of str
Indicates the input fields to be dynamically created

outfields: list of str
Indicates output fields to be dynamically created

See class examples for usage

"""
if not outfields:
outfields = ['outfiles']
super(S3DataGrabber, self).__init__(**kwargs)
undefined_traits = {}
# used for mandatory inputs check
self._infields = infields
self._outfields = outfields
if infields:
for key in infields:
self.inputs.add_trait(key, traits.Any)
undefined_traits[key] = Undefined
# add ability to insert field specific templates
self.inputs.add_trait('field_template',
traits.Dict(traits.Enum(outfields),
desc="arguments that fit into template"))
undefined_traits['field_template'] = Undefined
if not isdefined(self.inputs.template_args):
self.inputs.template_args = {}
for key in outfields:
if not key in self.inputs.template_args:
if infields:
self.inputs.template_args[key] = [infields]
else:
self.inputs.template_args[key] = []

self.inputs.trait_set(trait_change_notify=False, **undefined_traits)

def _add_output_traits(self, base):
"""
S3 specific: Downloads relevant files to a local folder specified

Using traits.Any instead out OutputMultiPath till add_trait bug
is fixed.
"""
return add_traits(base, self.inputs.template_args.keys())

def _list_outputs(self):
# infields are mandatory, however I could not figure out how to set 'mandatory' flag dynamically
# hence manual check
if self._infields:
for key in self._infields:
value = getattr(self.inputs, key)
if not isdefined(value):
msg = "%s requires a value for input '%s' because it was listed in 'infields'" % \
(self.__class__.__name__, key)
raise ValueError(msg)

outputs = {}
# get list of all files in s3 bucket
conn = boto.connect_s3(anon=self.inputs.anon)
bkt = conn.get_bucket(self.inputs.bucket)
bkt_files = list(k.key for k in bkt.list())

# keys are outfields, args are template args for the outfield
for key, args in self.inputs.template_args.items():
outputs[key] = []
template = self.inputs.template
if hasattr(self.inputs, 'field_template') and \
isdefined(self.inputs.field_template) and \
key in self.inputs.field_template:
template = self.inputs.field_template[key] # template override for multiple outfields
if isdefined(self.inputs.bucket_path):
template = os.path.join(self.inputs.bucket_path, template)
if not args:
filelist = []
for fname in bkt_files:
if re.match(template, fname):
filelist.append(fname)
if len(filelist) == 0:
msg = 'Output key: %s Template: %s returned no files' % (
key, template)
if self.inputs.raise_on_empty:
raise IOError(msg)
else:
warn(msg)
else:
if self.inputs.sort_filelist:
filelist = human_order_sorted(filelist)
outputs[key] = list_to_filename(filelist)
for argnum, arglist in enumerate(args):
maxlen = 1
for arg in arglist:
if isinstance(arg, six.string_types) and hasattr(self.inputs, arg):
arg = getattr(self.inputs, arg)
if isinstance(arg, list):
if (maxlen > 1) and (len(arg) != maxlen):
raise ValueError('incompatible number of arguments for %s' % key)
if len(arg) > maxlen:
maxlen = len(arg)
outfiles = []
for i in range(maxlen):
argtuple = []
for arg in arglist:
if isinstance(arg, six.string_types) and hasattr(self.inputs, arg):
arg = getattr(self.inputs, arg)
if isinstance(arg, list):
argtuple.append(arg[i])
else:
argtuple.append(arg)
filledtemplate = template
if argtuple:
try:
filledtemplate = template % tuple(argtuple)
except TypeError as e:
raise TypeError(e.message + ": Template %s failed to convert with args %s" % (template, str(tuple(argtuple))))
outfiles = []
for fname in bkt_files:
if re.match(filledtemplate, fname):
outfiles.append(fname)
if len(outfiles) == 0:
msg = 'Output key: %s Template: %s returned no files' % (key, filledtemplate)
if self.inputs.raise_on_empty:
raise IOError(msg)
else:
warn(msg)
outputs[key].append(None)
else:
if self.inputs.sort_filelist:
outfiles = human_order_sorted(outfiles)
outputs[key].append(list_to_filename(outfiles))
if any([val is None for val in outputs[key]]):
outputs[key] = []
if len(outputs[key]) == 0:
outputs[key] = None
elif len(outputs[key]) == 1:
outputs[key] = outputs[key][0]
# Outputs are currently stored as locations on S3.
# We must convert to the local location specified
# and download the files.
for key in outputs:
if type(outputs[key]) == list:
paths = outputs[key]
for i in range(len(paths)):
path = paths[i]
outputs[key][i] = self.s3tolocal(path, bkt)
elif type(outputs[key]) == str:
outputs[key] = self.s3tolocal(outputs[key], bkt)

return outputs

# Takes an s3 address and downloads the file to a local
# directory, returning the local path.
def s3tolocal(self, s3path, bkt):
# path formatting
if not os.path.split(self.inputs.local_directory)[1] == '':
self.inputs.local_directory += '/'
if not os.path.split(self.inputs.bucket_path)[1] == '':
self.inputs.bucket_path += '/'
if self.inputs.template[0] == '/':
self.inputs.template = self.inputs.template[1:]

localpath = s3path.replace(self.inputs.bucket_path, self.inputs.local_directory)
localdir = os.path.split(localpath)[0]
if not os.path.exists(localdir):
os.makedirs(localdir)
k = boto.s3.key.Key(bkt)
k.key = s3path
k.get_contents_to_filename(localpath)
return localpath


class DataGrabberInputSpec(DynamicTraitedSpec, BaseInterfaceInputSpec):
base_directory = Directory(exists=True,
desc='Path to the base directory consisting of subject data.')
Expand Down
Loading