Skip to content

Commit

Permalink
Send metadata on S3 intermediate fallback. Closes #231.
Browse files Browse the repository at this point in the history
  • Loading branch information
polyatail committed Mar 22, 2019
1 parent 12bea1d commit 7b15040
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
59 changes: 55 additions & 4 deletions onecodex/lib/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,47 @@ def _call_init_upload(file_name, file_size, metadata, tags, project, samples_res
return upload_info


def _make_retry_fields(file_name, metadata, tags, project):
"""Generate fields to send to init_multipart_upload in the case that a Sample upload via
fastx-proxy fails.
Parameters
----------
file_name : `string`
The file_name you wish to associate this fastx file with at One Codex.
metadata : `dict`, optional
tags : `list`, optional
project : `string`, optional
UUID of project to associate this sample with.
Returns
-------
`dict`
Contains metadata fields that will be integrated into the Sample model created when
init_multipart_upload is called.
"""
upload_args = {
'filename': file_name
}

if metadata:
# format metadata keys as snake case
new_metadata = {}

for md_key, md_val in metadata.items():
new_metadata[snake_case(md_key)] = md_val

upload_args['metadata'] = new_metadata

if tags:
upload_args['tags'] = tags

if project:
upload_args['project'] = getattr(project, 'id', project)

return upload_args


def upload_sequence(files, session, samples_resource, threads=None, metadata=None, tags=None,
project=None, log=None, coerce_ascii=False, progressbar=False):
"""Uploads multiple files to the One Codex server via either fastx-proxy or directly to S3.
Expand Down Expand Up @@ -537,16 +578,23 @@ def _wrapped(*wrapped_args):
# must call init_upload in this loop in order to get a sample uuid we can call
# cancel_upload on later if user hits ctrl+c
fields = _call_init_upload(fname, fsize, metadata, tags, project, samples_resource)

# if the upload via init_upload fails, upload_sequence_fileobj will call
# init_multipart_upload, which accepts metadata to be integrated into a newly-created
# Sample model. if the s3 intermediate route is used, two Sample models will ultimately
# exist on mainline: the failed fastx-proxy upload and the successful s3 intermediate.
retry_fields = _make_retry_fields(fname, metadata, tags, project)

uuids_in_progress.append(fields['sample_id'])

if threads > 1:
threaded_upload(
fobj, fname, fields, session, samples_resource, log
fobj, fname, fields, retry_fields, session, samples_resource, log
)
else:
try:
fuuid = upload_sequence_fileobj(
fobj, fname, fields, session, samples_resource, log
fobj, fname, fields, retry_fields, session, samples_resource, log
)
uuids_completed.append(fuuid)
except KeyboardInterrupt:
Expand Down Expand Up @@ -582,7 +630,8 @@ def _wrapped(*wrapped_args):
return uuids_completed


def upload_sequence_fileobj(file_obj, file_name, fields, session, samples_resource, log=None):
def upload_sequence_fileobj(file_obj, file_name, fields, retry_fields, session, samples_resource,
log=None):
"""Uploads a single file-like object to the One Codex server via either fastx-proxy or directly
to S3.
Expand All @@ -598,6 +647,8 @@ def upload_sequence_fileobj(file_obj, file_name, fields, session, samples_resour
fields : `dict`
Additional data fields to include as JSON in the POST. Must include 'sample_id' and
'upload_url' at a minimum.
retry_fields : `dict`
Metadata sent to `init_multipart_upload` in the case that the upload via fastx-proxy fails.
session : `requests.Session`
Connection to One Codex API.
samples_resource : `onecodex.models.Samples`
Expand Down Expand Up @@ -626,7 +677,7 @@ def upload_sequence_fileobj(file_obj, file_name, fields, session, samples_resour
file_obj.seek(0) # reset file_obj back to start

try:
retry_fields = samples_resource.init_multipart_upload()
retry_fields = samples_resource.init_multipart_upload(retry_fields)
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state='init')
except requests.exceptions.ConnectionError:
Expand Down
8 changes: 4 additions & 4 deletions tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def init_upload(self, obj):
},
}

def init_multipart_upload(self):
def init_multipart_upload(self, obj):
if self.what_fails == 'init_multipart':
self.err_resp()

Expand Down Expand Up @@ -340,7 +340,7 @@ def test_upload_sequence_fileobj():
file_obj = BytesIO(b'>test\nACGT\n')
init_upload_fields = FakeSamplesResource().init_upload(['filename', 'size', 'upload_type'])
upload_sequence_fileobj(
file_obj, 'test.fa', init_upload_fields,
file_obj, 'test.fa', init_upload_fields, {},
FakeSession(), FakeSamplesResource()
)
file_obj.close()
Expand All @@ -350,7 +350,7 @@ def test_upload_sequence_fileobj():
file_obj = BytesIO(b'>test\nACGT\n')
init_upload_fields = FakeSamplesResource().init_upload(['filename', 'size', 'upload_type'])
ret_sample_id = upload_sequence_fileobj(
file_obj, 'test.fa', init_upload_fields,
file_obj, 'test.fa', init_upload_fields, {},
FakeSessionProxyFails(500), FakeSamplesResource()
)
file_obj.close()
Expand All @@ -364,7 +364,7 @@ def test_upload_sequence_fileobj():
file_obj = BytesIO(b'>test\nACGT\n')
init_upload_fields = FakeSamplesResource().init_upload(['filename', 'size', 'upload_type'])
upload_sequence_fileobj(
file_obj, 'test.fa', init_upload_fields,
file_obj, 'test.fa', init_upload_fields, {},
FakeSessionProxyFails(400), FakeSamplesResource()
)
file_obj.close()
Expand Down

0 comments on commit 7b15040

Please sign in to comment.