Skip to content

Commit

Permalink
Overhaul and simplify proxy upload routines, handling ConnectionErrors
Browse files Browse the repository at this point in the history
Effectively, we do the following here:
1. Always check for /status if available when uploading via a proxy
2. If /status has a `complete: false` field, poll up to 15 minutes
3. Raise a RetryableUploadException or fail based on /status code

(2) specifically fixes bugs we were encountering wherein the Python
client could send all of the data and either:
A. Send a file with an error but not be listening for the reply,
   experience a ConnectionError, and then retry automatically
   even if the file was invalid and the proxy returned a 400
B. Send data too fast for the proxy to compress (with data getting
   buffered in the ALB/nginx) and then abort *despite the proxy
   ultimately completing successfully*

(A) led to bad files not being caught by the proxy. (B) could lead to
duplicate uploads to to erroneous retry requests. Both of these fixes
bring our implementation into line with the frontend JS code on
app.onecodex.com.
  • Loading branch information
boydgreenfield committed Apr 13, 2019
1 parent df84f43 commit 74a50e9
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 41 deletions.
2 changes: 1 addition & 1 deletion onecodex/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RetryableUploadException(UploadException):
pass


def process_api_error(resp, state=None):
def raise_api_error(resp, state=None):
"""Raise an exception with a pretty message in various states of upload"""
# TODO: Refactor into an Exception class
error_code = resp.status_code
Expand Down
80 changes: 48 additions & 32 deletions onecodex/lib/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import requests
from requests_toolbelt import MultipartEncoder
import six
import time
from unidecode import unidecode
import warnings

Expand All @@ -17,7 +18,7 @@
UploadException,
RetryableUploadException,
raise_connectivity_error,
process_api_error,
raise_api_error,
)
from onecodex.utils import atexit_register, atexit_unregister, snake_case

Expand Down Expand Up @@ -64,7 +65,7 @@ def close(self):

class FASTXInterleave(object):
"""Wrapper around two `file` objects that decompresses gzip or bz2, where applicable, and
interleaves the two files either two or four lines at a time. Yields uncompressed data.
interleaves the two files either two or four lines at a time Yields uncompressed data.
Parameters
----------
Expand Down Expand Up @@ -376,7 +377,7 @@ def _call_init_upload(file_name, file_size, metadata, tags, project, samples_res
try:
upload_info = samples_resource.init_upload(upload_args)
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state="init")
raise_api_error(e.response, state="init")
except requests.exceptions.ConnectionError:
raise_connectivity_error(file_name)

Expand Down Expand Up @@ -559,6 +560,7 @@ def _direct_upload(file_obj, file_name, fields, session, samples_resource):
mime_type = getattr(file_obj, "mime_type", "text/plain")
multipart_fields["file"] = (file_name, file_obj, mime_type)
encoder = MultipartEncoder(multipart_fields)
upload_request = None

try:
upload_request = session.post(
Expand All @@ -568,41 +570,55 @@ def _direct_upload(file_obj, file_name, fields, session, samples_resource):
auth={},
)
except requests.exceptions.ConnectionError:
# FIXME: Introduce retry loop polling on /status here
raise RetryableUploadException("Temporary connectivity issue with direct upload proxy")

if upload_request.status_code == 500:
raise RetryableUploadException
pass

elif upload_request.status_code not in [200, 201]:
# if using proxy, special route can provide more info on e.g., validation issues
if fields.get("sample_id"):
# If we expect a status *always* try to check it,
# waiting up to 15 minutes for buffering to complete
if "status_url" in fields["additional_fields"]:
now = time.time()
while time.time() < (now + 60 * 15):
try:
e_resp = session.post(
resp = session.post(
fields["additional_fields"]["status_url"],
json={"sample_id": fields.get("sample_id")},
json={"sample_id": fields["sample_id"]},
)
resp.raise_for_status()
except (ValueError, requests.exceptions.RequestException):
raise RetryableUploadException(
"Unexpected failure of direct upload proxy. Retrying..."
)

if e_resp.status_code == 200:
process_api_error(e_resp, state="upload")
except requests.exceptions.RequestException:
pass
if resp.json() and resp.json().get("complete", True) is False:
logging.debug("Blocking on waiting for proxy to complete (in progress)...")
time.sleep(5)
else:
break

# failures to get better error message should drop through to this
process_api_error(upload_request, state="upload")
# Return is successfully processed
if resp.json().get("code") in [200, 201]:
file_obj.close()
return
elif resp.json().get("code") == 500:
raise RetryableUploadException("Proxy failed. Retrying...")
else:
raise_api_error(resp, state="upload")

file_obj.close()
# Direct to S3 case
else:
file_obj.close()
if upload_request.status_code not in [200, 201]:
raise RetryableUploadException("Unknown connectivity issue with proxy upload.")

# Issue a callback -- this only happens in the direct-to-S3 case
try:
if not fields["additional_fields"].get("callback_url"):
samples_resource.confirm_upload(
{"sample_id": fields["sample_id"], "upload_type": "standard"}
)
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state="callback")
except requests.exceptions.ConnectionError:
raise_connectivity_error()
# Issue a callback -- this only happens in the direct-to-S3 case
try:
if not fields["additional_fields"].get("callback_url"):
samples_resource.confirm_upload(
{"sample_id": fields["sample_id"], "upload_type": "standard"}
)
except requests.exceptions.HTTPError as e:
raise_api_error(e.response, state="callback")
except requests.exceptions.ConnectionError:
raise_connectivity_error()


def upload_sequence_fileobj(file_obj, file_name, fields, retry_fields, session, samples_resource):
Expand Down Expand Up @@ -650,7 +666,7 @@ def upload_sequence_fileobj(file_obj, file_name, fields, retry_fields, session,
try:
retry_fields = samples_resource.init_multipart_upload(retry_fields)
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state="init")
raise_api_error(e.response, state="init")
except requests.exceptions.ConnectionError:
raise_connectivity_error(file_name)

Expand Down Expand Up @@ -743,7 +759,7 @@ def upload_document_fileobj(file_obj, file_name, session, documents_resource, lo
try:
fields = documents_resource.init_multipart_upload()
except requests.exceptions.HTTPError as e:
process_api_error(e.response, state="init")
raise_api_error(e.response, state="init")
except requests.exceptions.ConnectionError:
raise_connectivity_error(file_name)

Expand Down
4 changes: 3 additions & 1 deletion onecodex/models/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ def upload(
"""
res = cls._resource
if not isinstance(files, string_types) and not isinstance(files, tuple):
raise Exception("BAD!")
raise OneCodexException(
"Please pass a string or tuple or forward and reverse filepaths."
)

if not isinstance(project, Projects) and project is not None:
project_search = Projects.get(project)
Expand Down
32 changes: 25 additions & 7 deletions tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ def test_fastxinterleave():


class FakeSamplesResource:
def __init__(self, what_fails=None):
def __init__(self, what_fails=None, via_proxy=True):
self.what_fails = what_fails
self.via_proxy = via_proxy

@staticmethod
def err_resp():
Expand All @@ -97,11 +98,15 @@ def init_upload(self, obj):
assert "filename" in obj
assert "size" in obj
assert "upload_type" in obj
return {
data = {
"upload_url": "http://localhost:3005/fastx_proxy",
"sample_id": "sample_uuid_here",
"additional_fields": {"status_url": "http://localhost:3005/fastx_proxy/errors"},
}
if not self.via_proxy:
# raise Exception
data["additional_fields"] = {}
return data

def init_multipart_upload(self, obj):
if self.what_fails == "init_multipart":
Expand Down Expand Up @@ -172,8 +177,9 @@ def post(self, url, **kwargs):
if "data" in kwargs:
kwargs["data"].read() # So multipart uploader will close properly
resp = lambda: None # noqa
resp.json = lambda: {} # noqa
resp.json = lambda: {"code": 200}
resp.status_code = 201 if "auth" in kwargs else 200
resp.raise_for_status = lambda: None # noqa
return resp


Expand All @@ -187,6 +193,7 @@ def post(self, url, **kwargs):
if "data" in kwargs:
kwargs["data"].read() # So multipart uploader will close properly
resp = lambda: None # noqa
resp.raise_for_status = lambda: None # noqa

if url.endswith("fastx_proxy"):
resp.status_code = self.failure_code
Expand All @@ -203,11 +210,15 @@ def post(self, url, **kwargs):
resp.status_code = 200

# support for callback for more info on fastx-proxy errors
if url.endswith("errors"):
if url.endswith("errors") or url.endswith("status"):
if self.no_msg:
resp.status_code = 500
resp.status_code = self.failure_code
resp.json = lambda: {"code": self.failure_code}
else:
resp.json = lambda: {"message": "Additional error message"} # noqa
resp.json = lambda: {
"message": "Additional error message",
"code": self.failure_code,
}

return resp

Expand Down Expand Up @@ -258,14 +269,21 @@ def test_api_failures(caplog):
upload_sequence(files, FakeSession(), FakeSamplesResource("init"))
assert "Could not initialize upload" in str(e.value)

# Test direct upload not via the proxy
with pytest.raises(UploadException) as e:
upload_sequence(files, FakeSession(), FakeSamplesResource("confirm"))
with patch("boto3.client") as b3:
upload_sequence(files, FakeSession(), FakeSamplesResource("confirm", via_proxy=False))
assert b3.call_count == 1
assert "Callback could not be completed" in str(e.value)

# Test 400 on proxy
with pytest.raises(UploadException) as e:
upload_sequence(files, FakeSessionProxyFails(400, no_msg=True), FakeSamplesResource())
assert "File could not be uploaded" in str(e.value)

# Test 500 on proxy

# Test connectivity
with pytest.raises(UploadException) as e:
raise_connectivity_error("filename")
assert "experiencing connectivity" in str(e.value)
Expand Down

0 comments on commit 74a50e9

Please sign in to comment.