Skip to content

Commit

Permalink
Further proxy upload simplifications
Browse files Browse the repository at this point in the history
  • Loading branch information
boydgreenfield committed Apr 13, 2019
1 parent 4cd5983 commit a345b61
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 41 deletions.
2 changes: 1 addition & 1 deletion onecodex/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RetryableUploadException(UploadException):
pass


def process_api_error(resp, state=None):
def raise_api_error(resp, state=None):
"""Raise an exception with a pretty message in various states of upload"""
# TODO: Refactor into an Exception class
error_code = resp.status_code
Expand Down
80 changes: 48 additions & 32 deletions onecodex/lib/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import requests
from requests_toolbelt import MultipartEncoder
import six
import time
from unidecode import unidecode
import warnings

Expand All @@ -17,7 +18,7 @@
UploadException,
RetryableUploadException,
raise_connectivity_error,
process_api_error,
raise_api_error,
)
from onecodex.utils import atexit_register, atexit_unregister, snake_case

Expand Down Expand Up @@ -64,7 +65,7 @@ def close(self):

class FASTXInterleave(object):
"""Wrapper around two `file` objects that decompresses gzip or bz2, where applicable, and
interleaves the two files either two or four lines at a time. Yields uncompressed data.
interleaves the two files either two or four lines at a time Yields uncompressed data.
Parameters
----------
Expand Down Expand Up @@ -376,7 +377,7 @@ def _call_init_upload(file_name, file_size, metadata, tags, project, samples_res
try:
upload_info = samples_resource.init_upload(upload_args)
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state="init")
raise_api_error(e.response, state="init")
except requests.exceptions.ConnectionError:
raise_connectivity_error(file_name)

Expand Down Expand Up @@ -559,6 +560,7 @@ def _direct_upload(file_obj, file_name, fields, session, samples_resource):
mime_type = getattr(file_obj, "mime_type", "text/plain")
multipart_fields["file"] = (file_name, file_obj, mime_type)
encoder = MultipartEncoder(multipart_fields)
upload_request = None

try:
upload_request = session.post(
Expand All @@ -568,41 +570,55 @@ def _direct_upload(file_obj, file_name, fields, session, samples_resource):
auth={},
)
except requests.exceptions.ConnectionError:
# FIXME: Introduce retry loop polling on /status here
raise RetryableUploadException("Temporary connectivity issue with direct upload proxy")

if upload_request.status_code == 500:
raise RetryableUploadException
pass

elif upload_request.status_code not in [200, 201]:
# if using proxy, special route can provide more info on e.g., validation issues
if fields.get("sample_id"):
# If we expect a status *always* try to check it,
# waiting up to 15 minutes for buffering to complete
if "status_url" in fields["additional_fields"]:
now = time.time()
while time.time() < (now + 60 * 15):
try:
e_resp = session.post(
resp = session.post(
fields["additional_fields"]["status_url"],
json={"sample_id": fields.get("sample_id")},
json={"sample_id": fields["sample_id"]},
)
resp.raise_for_status()
except (ValueError, requests.exceptions.RequestException):
raise RetryableUploadException(
"Unexpected failure of direct upload proxy. Retrying..."
)

if e_resp.status_code == 200:
process_api_error(e_resp, state="upload")
except requests.exceptions.RequestException:
pass
if resp.json() and resp.json().get("complete", True) is False:
logging.debug("Blocking on waiting for proxy to complete (in progress)...")
time.sleep(5)
else:
break

# failures to get better error message should drop through to this
process_api_error(upload_request, state="upload")
# Return is successfully processed
if resp.json().get("code") in [200, 201]:
file_obj.close()
return
elif resp.json().get("code") == 500:
raise RetryableUploadException("Proxy failed. Retrying...")
else:
raise_api_error(resp, state="upload")

file_obj.close()
# Direct to S3 case
else:
file_obj.close()
if upload_request.status_code not in [200, 201]:
raise RetryableUploadException("Unknown connectivity issue with proxy upload.")

# Issue a callback -- this only happens in the direct-to-S3 case
try:
if not fields["additional_fields"].get("callback_url"):
samples_resource.confirm_upload(
{"sample_id": fields["sample_id"], "upload_type": "standard"}
)
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state="callback")
except requests.exceptions.ConnectionError:
raise_connectivity_error()
# Issue a callback -- this only happens in the direct-to-S3 case
try:
if not fields["additional_fields"].get("callback_url"):
samples_resource.confirm_upload(
{"sample_id": fields["sample_id"], "upload_type": "standard"}
)
except requests.exceptions.HTTPError as e:
raise_api_error(e.response, state="callback")
except requests.exceptions.ConnectionError:
raise_connectivity_error()


def upload_sequence_fileobj(file_obj, file_name, fields, retry_fields, session, samples_resource):
Expand Down Expand Up @@ -650,7 +666,7 @@ def upload_sequence_fileobj(file_obj, file_name, fields, retry_fields, session,
try:
retry_fields = samples_resource.init_multipart_upload(retry_fields)
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state="init")
raise_api_error(e.response, state="init")
except requests.exceptions.ConnectionError:
raise_connectivity_error(file_name)

Expand Down Expand Up @@ -743,7 +759,7 @@ def upload_document_fileobj(file_obj, file_name, session, documents_resource, lo
try:
fields = documents_resource.init_multipart_upload()
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state="init")
raise_api_error(e.response, state="init")
except requests.exceptions.ConnectionError:
raise_connectivity_error(file_name)

Expand Down
4 changes: 3 additions & 1 deletion onecodex/models/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ def upload(
"""
res = cls._resource
if not isinstance(files, string_types) and not isinstance(files, tuple):
raise Exception("BAD!")
raise OneCodexException(
"Please pass a string or tuple or forward and reverse filepaths."
)

if not isinstance(project, Projects) and project is not None:
project_search = Projects.get(project)
Expand Down
32 changes: 25 additions & 7 deletions tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ def test_fastxinterleave():


class FakeSamplesResource:
def __init__(self, what_fails=None):
def __init__(self, what_fails=None, via_proxy=True):
self.what_fails = what_fails
self.via_proxy = via_proxy

@staticmethod
def err_resp():
Expand All @@ -97,11 +98,15 @@ def init_upload(self, obj):
assert "filename" in obj
assert "size" in obj
assert "upload_type" in obj
return {
data = {
"upload_url": "http://localhost:3005/fastx_proxy",
"sample_id": "sample_uuid_here",
"additional_fields": {"status_url": "http://localhost:3005/fastx_proxy/errors"},
}
if not self.via_proxy:
# raise Exception
data["additional_fields"] = {}
return data

def init_multipart_upload(self, obj):
if self.what_fails == "init_multipart":
Expand Down Expand Up @@ -172,8 +177,9 @@ def post(self, url, **kwargs):
if "data" in kwargs:
kwargs["data"].read() # So multipart uploader will close properly
resp = lambda: None # noqa
resp.json = lambda: {} # noqa
resp.json = lambda: {"code": 200}
resp.status_code = 201 if "auth" in kwargs else 200
resp.raise_for_status = lambda: None # noqa
return resp


Expand All @@ -187,6 +193,7 @@ def post(self, url, **kwargs):
if "data" in kwargs:
kwargs["data"].read() # So multipart uploader will close properly
resp = lambda: None # noqa
resp.raise_for_status = lambda: None # noqa

if url.endswith("fastx_proxy"):
resp.status_code = self.failure_code
Expand All @@ -203,11 +210,15 @@ def post(self, url, **kwargs):
resp.status_code = 200

# support for callback for more info on fastx-proxy errors
if url.endswith("errors"):
if url.endswith("errors") or url.endswith("status"):
if self.no_msg:
resp.status_code = 500
resp.status_code = self.failure_code
resp.json = lambda: {"code": self.failure_code}
else:
resp.json = lambda: {"message": "Additional error message"} # noqa
resp.json = lambda: {
"message": "Additional error message",
"code": self.failure_code,
}

return resp

Expand Down Expand Up @@ -258,14 +269,21 @@ def test_api_failures(caplog):
upload_sequence(files, FakeSession(), FakeSamplesResource("init"))
assert "Could not initialize upload" in str(e.value)

# Test direct upload not via the proxy
with pytest.raises(UploadException) as e:
upload_sequence(files, FakeSession(), FakeSamplesResource("confirm"))
with patch("boto3.client") as b3:
upload_sequence(files, FakeSession(), FakeSamplesResource("confirm", via_proxy=False))
assert b3.call_count == 1
assert "Callback could not be completed" in str(e.value)

# Test 400 on proxy
with pytest.raises(UploadException) as e:
upload_sequence(files, FakeSessionProxyFails(400, no_msg=True), FakeSamplesResource())
assert "File could not be uploaded" in str(e.value)

# Test 500 on proxy

# Test connectivity
with pytest.raises(UploadException) as e:
raise_connectivity_error("filename")
assert "experiencing connectivity" in str(e.value)
Expand Down

0 comments on commit a345b61

Please sign in to comment.