Skip to content

Commit

Permalink
middlewares: Clean up app iters better
Browse files Browse the repository at this point in the history
Previously, logs would often show 499s in places where some other status
would be more appropriate.

Change-Id: I68dbb8593101cd3b5b64a1a947c68e340e36ce02
  • Loading branch information
tipabu committed Feb 13, 2020
1 parent baaa5c5 commit 2a8d47f
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 63 deletions.
10 changes: 5 additions & 5 deletions swift/common/middleware/symlink.py
Expand Up @@ -203,7 +203,7 @@

from swift.common.utils import get_logger, register_swift_info, split_path, \
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \
config_true_value
config_true_value, drain_and_close
from swift.common.constraints import check_account_format
from swift.common.wsgi import WSGIContext, make_subrequest
from swift.common.request_helpers import get_sys_meta_prefix, \
Expand Down Expand Up @@ -468,7 +468,8 @@ def build_traversal_req(symlink_target):
resp_etag = self._response_header_value(
TGT_ETAG_SYSMETA_SYMLINK_HDR)
if symlink_target and (resp_etag or follow_softlinks):
close_if_possible(resp)
# Should be a zero-byte object
drain_and_close(resp)
found_etag = resp_etag or self._response_header_value('etag')
if target_etag and target_etag != found_etag:
raise HTTPConflict(
Expand All @@ -491,6 +492,7 @@ def build_traversal_req(symlink_target):
else:
final_etag = self._response_header_value('etag')
if final_etag and target_etag and target_etag != final_etag:
# do *not* drain; we don't know how big this is
close_if_possible(resp)
body = ('Object Etag %r does not match '
'X-Symlink-Target-Etag header %r')
Expand Down Expand Up @@ -538,9 +540,7 @@ def _validate_etag_and_update_sysmeta(self, req, symlink_target_path,
'Content-Type': 'text/plain',
'Content-Location': self._last_target_path})
if not is_success(self._get_status_int()):
with closing_if_possible(resp):
for chunk in resp:
pass
drain_and_close(resp)
raise status_map[self._get_status_int()](request=req)
response_headers = HeaderKeyDict(self._response_headers)
# carry forward any etag update params (e.g. "slo_etag"), we'll append
Expand Down
26 changes: 16 additions & 10 deletions swift/common/middleware/versioned_writes/legacy.py
Expand Up @@ -230,7 +230,7 @@
import time

from swift.common.utils import get_logger, Timestamp, \
config_true_value, close_if_possible, FileLikeIter
config_true_value, close_if_possible, FileLikeIter, drain_and_close
from swift.common.request_helpers import get_sys_meta_prefix, \
copy_header_subset
from swift.common.wsgi import WSGIContext, make_pre_authed_request
Expand Down Expand Up @@ -341,7 +341,8 @@ def _listing_pages_iter(self, account_name, lcontainer, lprefix,
lreq.environ['QUERY_STRING'] += '&reverse=on'
lresp = lreq.get_response(self.app)
if not is_success(lresp.status_int):
close_if_possible(lresp.app_iter)
# errors should be short
drain_and_close(lresp)
if lresp.status_int == HTTP_NOT_FOUND:
raise ListingIterNotFound()
elif is_client_error(lresp.status_int):
Expand Down Expand Up @@ -382,6 +383,8 @@ def _get_source_object(self, req, path_info):

if source_resp.content_length is None or \
source_resp.content_length > MAX_FILE_SIZE:
# Consciously *don't* drain the response before closing;
# any logged 499 is actually rather appropriate here
close_if_possible(source_resp.app_iter)
return HTTPRequestEntityTooLarge(request=req)

Expand All @@ -402,6 +405,7 @@ def _put_versioned_obj(self, req, put_path_info, source_resp):

put_req.environ['wsgi.input'] = FileLikeIter(source_resp.app_iter)
put_resp = put_req.get_response(self.app)
# the PUT was responsible for draining
close_if_possible(source_resp.app_iter)
return put_resp

Expand All @@ -411,7 +415,8 @@ def _check_response_error(self, req, resp):
"""
if is_success(resp.status_int):
return
close_if_possible(resp.app_iter)
# any error should be short
drain_and_close(resp)
if is_client_error(resp.status_int):
# missing container or bad permissions
raise HTTPPreconditionFailed(request=req)
Expand Down Expand Up @@ -444,7 +449,7 @@ def _copy_current(self, req, versions_cont, api_version, account_name,

if get_resp.status_int == HTTP_NOT_FOUND:
# nothing to version, proceed with original request
close_if_possible(get_resp.app_iter)
drain_and_close(get_resp)
return

# check for any other errors
Expand All @@ -466,7 +471,8 @@ def _copy_current(self, req, versions_cont, api_version, account_name,
put_resp = self._put_versioned_obj(req, put_path_info, get_resp)

self._check_response_error(req, put_resp)
close_if_possible(put_resp.app_iter)
# successful PUT response should be short
drain_and_close(put_resp)

def handle_obj_versions_put(self, req, versions_cont, api_version,
account_name, object_name):
Expand Down Expand Up @@ -521,7 +527,7 @@ def handle_obj_versions_delete_push(self, req, versions_cont, api_version,
marker_req.environ['swift.content_type_overridden'] = True
marker_resp = marker_req.get_response(self.app)
self._check_response_error(req, marker_resp)
close_if_possible(marker_resp.app_iter)
drain_and_close(marker_resp)

# successfully copied and created delete marker; safe to delete
return self.app
Expand All @@ -535,7 +541,7 @@ def _restore_data(self, req, versions_cont, api_version, account_name,

# if the version isn't there, keep trying with previous version
if get_resp.status_int == HTTP_NOT_FOUND:
close_if_possible(get_resp.app_iter)
drain_and_close(get_resp)
return False

self._check_response_error(req, get_resp)
Expand All @@ -545,7 +551,7 @@ def _restore_data(self, req, versions_cont, api_version, account_name,
put_resp = self._put_versioned_obj(req, put_path_info, get_resp)

self._check_response_error(req, put_resp)
close_if_possible(put_resp.app_iter)
drain_and_close(put_resp)
return get_path

def handle_obj_versions_delete_pop(self, req, versions_cont, api_version,
Expand Down Expand Up @@ -591,7 +597,7 @@ def handle_obj_versions_delete_pop(self, req, versions_cont, api_version,
req.environ, path=wsgi_quote(req.path_info), method='HEAD',
headers=obj_head_headers, swift_source='VW')
hresp = head_req.get_response(self.app)
close_if_possible(hresp.app_iter)
drain_and_close(hresp)

if hresp.status_int != HTTP_NOT_FOUND:
self._check_response_error(req, hresp)
Expand Down Expand Up @@ -619,7 +625,7 @@ def handle_obj_versions_delete_pop(self, req, versions_cont, api_version,
method='DELETE', headers=auth_token_header,
swift_source='VW')
del_resp = old_del_req.get_response(self.app)
close_if_possible(del_resp.app_iter)
drain_and_close(del_resp)
if del_resp.status_int != HTTP_NOT_FOUND:
self._check_response_error(req, del_resp)
# else, well, it existed long enough to do the
Expand Down
58 changes: 24 additions & 34 deletions swift/common/middleware/versioned_writes/object_versioning.py
Expand Up @@ -165,7 +165,7 @@
HTTPRequestEntityTooLarge, HTTPInternalServerError, HTTPNotAcceptable, \
HTTPConflict
from swift.common.storage_policy import POLICIES
from swift.common.utils import get_logger, Timestamp, \
from swift.common.utils import get_logger, Timestamp, drain_and_close, \
config_true_value, close_if_possible, closing_if_possible, \
FileLikeIter, split_path, parse_content_type, RESERVED_STR
from swift.common.wsgi import WSGIContext, make_pre_authed_request
Expand Down Expand Up @@ -288,6 +288,8 @@ def _put_versioned_obj(self, req, put_path_info, source_resp):
put_req.headers['Content-Type'] += '; swift_bytes=%s' % slo_size
put_req.environ['swift.content_type_overridden'] = True
put_resp = put_req.get_response(self.app)
drain_and_close(put_resp)
# the PUT should have already drained source_resp
close_if_possible(source_resp.app_iter)
return put_resp

Expand Down Expand Up @@ -324,19 +326,16 @@ def _put_versioned_obj_from_client(self, req, versions_cont, api_version,

# do the write
put_resp = put_req.get_response(self.app)
drain_and_close(put_resp)
close_if_possible(put_req.environ['wsgi.input'])

if put_resp.status_int == HTTP_NOT_FOUND:
close_if_possible(put_resp.app_iter)
raise HTTPInternalServerError(
request=req, content_type='text/plain',
body=b'The versions container does not exist. You may '
b'want to re-enable object versioning.')

self._check_response_error(req, put_resp)
with closing_if_possible(put_resp.app_iter), closing_if_possible(
put_req.environ['wsgi.input']):
for chunk in put_resp.app_iter:
pass
put_bytes = byte_counter.bytes_read
# N.B. this is essentially the same hack that symlink does in
# _validate_etag_and_update_sysmeta to deal with SLO
Expand Down Expand Up @@ -390,7 +389,7 @@ def _check_response_error(self, req, resp):
"""
if is_success(resp.status_int):
return
close_if_possible(resp.app_iter)
drain_and_close(resp)
if is_client_error(resp.status_int):
# missing container or bad permissions
if resp.status_int == 404:
Expand Down Expand Up @@ -429,18 +428,15 @@ def _copy_current(self, req, versions_cont, api_version, account_name,

if get_resp.status_int == HTTP_NOT_FOUND:
# nothing to version, proceed with original request
for chunk in get_resp.app_iter:
# Should be short; just avoiding the 499
pass
close_if_possible(get_resp.app_iter)
drain_and_close(get_resp)
return get_resp

# check for any other errors
self._check_response_error(req, get_resp)

if get_resp.headers.get(SYSMETA_VERSIONS_SYMLINK) == 'true':
# existing object is a VW symlink; no action required
close_if_possible(get_resp.app_iter)
drain_and_close(get_resp)
return get_resp

# if there's an existing object, then copy it to
Expand All @@ -458,15 +454,12 @@ def _copy_current(self, req, versions_cont, api_version, account_name,
put_resp = self._put_versioned_obj(req, put_path_info, get_resp)

if put_resp.status_int == HTTP_NOT_FOUND:
close_if_possible(put_resp.app_iter)
raise HTTPInternalServerError(
request=req, content_type='text/plain',
body=b'The versions container does not exist. You may '
b'want to re-enable object versioning.')

self._check_response_error(req, put_resp)
close_if_possible(put_resp.app_iter)
return put_resp

def handle_put(self, req, versions_cont, api_version,
account_name, object_name, is_enabled):
Expand Down Expand Up @@ -553,15 +546,15 @@ def handle_delete(self, req, versions_cont, api_version,
marker_req.environ['swift.content_type_overridden'] = True
marker_resp = marker_req.get_response(self.app)
self._check_response_error(req, marker_resp)
close_if_possible(marker_resp.app_iter)
drain_and_close(marker_resp)

# successfully copied and created delete marker; safe to delete
resp = req.get_response(self.app)
if resp.is_success or resp.status_int == 404:
resp.headers['X-Object-Version-Id'] = \
self._split_version_from_name(marker_name)[1].internal
resp.headers['X-Backend-Content-Type'] = DELETE_MARKER_CONTENT_TYPE
close_if_possible(resp.app_iter)
drain_and_close(resp)
return resp

def handle_post(self, req, versions_cont, account):
Expand Down Expand Up @@ -595,7 +588,7 @@ def handle_post(self, req, versions_cont, account):
# Only follow if the version container matches
if split_path(loc, 4, 4, True)[1:3] == [
account, versions_cont]:
close_if_possible(resp.app_iter)
drain_and_close(resp)
post_req.path_info = loc
resp = post_req.get_response(self.app)
return resp
Expand All @@ -620,7 +613,7 @@ def _check_head(self, req, auth_token_header):
self._check_response_error(req, hresp)
if hresp.headers.get(SYSMETA_VERSIONS_SYMLINK) == 'true':
symlink_target = hresp.headers.get(TGT_OBJ_SYMLINK_HDR)
close_if_possible(hresp.app_iter)
drain_and_close(hresp)
return head_is_tombstone, symlink_target

def handle_delete_version(self, req, versions_cont, api_version,
Expand Down Expand Up @@ -656,7 +649,7 @@ def handle_delete_version(self, req, versions_cont, api_version,
req.environ['QUERY_STRING'] = ''
link_resp = req.get_response(self.app)
self._check_response_error(req, link_resp)
close_if_possible(link_resp.app_iter)
drain_and_close(link_resp)

# *then* the backing data
req.path_info = "/%s/%s/%s/%s" % (
Expand Down Expand Up @@ -693,7 +686,7 @@ def handle_put_version(self, req, versions_cont, api_version, account_name,
method='HEAD', headers=obj_head_headers, swift_source='OV')
head_resp = head_req.get_response(self.app)
if head_resp.status_int == HTTP_NOT_FOUND:
close_if_possible(head_resp.app_iter)
drain_and_close(head_resp)
if is_success(get_container_info(
head_req.environ, self.app, swift_source='OV')['status']):
raise HTTPNotFound(
Expand All @@ -706,7 +699,7 @@ def handle_put_version(self, req, versions_cont, api_version, account_name,
b'want to re-enable object versioning.')

self._check_response_error(req, head_resp)
close_if_possible(head_resp.app_iter)
drain_and_close(head_resp)

put_etag = head_resp.headers['ETag']
put_bytes = head_resp.content_length
Expand Down Expand Up @@ -773,7 +766,7 @@ def handle_versioned_request(self, req, versions_cont, api_version,
raise HTTPNotFound(request=req)
resp.headers['X-Object-Version-Id'] = 'null'
if req.method == 'HEAD':
close_if_possible(resp.app_iter)
drain_and_close(resp)
return resp
else:
# Re-write the path; most everything else goes through normally
Expand All @@ -791,7 +784,7 @@ def handle_versioned_request(self, req, versions_cont, api_version,
'X-Backend-Content-Type', resp.headers['Content-Type'])

if req.method == 'HEAD':
close_if_possible(resp.app_iter)
drain_and_close(resp)

if is_del_marker:
hdrs = {'X-Object-Version-Id': version,
Expand Down Expand Up @@ -880,7 +873,7 @@ def handle_request(self, req, start_response):
self._response_headers[bytes_idx] = (
'X-Container-Bytes-Used',
str(int(curr_bytes) + int(ver_bytes)))
close_if_possible(vresp.app_iter)
drain_and_close(vresp)
elif is_success(self._get_status_int()):
# If client is doing a version-aware listing for a container that
# (as best we could tell) has never had versioning enabled,
Expand Down Expand Up @@ -972,7 +965,7 @@ def handle_delete(self, req, start_response):
account, str_to_wsgi(versions_cont))),
headers={'X-Backend-Allow-Reserved-Names': 'true'})
vresp = versions_req.get_response(self.app)
close_if_possible(vresp.app_iter)
drain_and_close(vresp)
if vresp.is_success and int(vresp.headers.get(
'X-Container-Object-Count', 0)) > 0:
raise HTTPConflict(
Expand All @@ -984,7 +977,7 @@ def handle_delete(self, req, start_response):
else:
versions_req.method = 'DELETE'
resp = versions_req.get_response(self.app)
close_if_possible(resp.app_iter)
drain_and_close(resp)
if not is_success(resp.status_int) and resp.status_int != 404:
raise HTTPInternalServerError(
'Error deleting versioned container')
Expand Down Expand Up @@ -1072,9 +1065,7 @@ def enable_versioning(self, req, start_response):
method='PUT', headers=hdrs, swift_source='OV')
resp = ver_cont_req.get_response(self.app)
# Should always be short; consume the body
for chunk in resp.app_iter:
pass
close_if_possible(resp.app_iter)
drain_and_close(resp)
if is_success(resp.status_int) or resp.status_int == HTTP_CONFLICT:
req.headers[SYSMETA_VERSIONS_CONT] = wsgi_quote(versions_cont)
else:
Expand All @@ -1097,7 +1088,7 @@ def enable_versioning(self, req, start_response):

# TODO: what if this one fails??
resp = ver_cont_req.get_response(self.app)
close_if_possible(resp.app_iter)
drain_and_close(resp)

if self._response_headers is None:
self._response_headers = []
Expand Down Expand Up @@ -1202,7 +1193,7 @@ def _list_versions(self, req, start_response, location, primary_listing):
reverse=config_true_value(params.get('reverse', 'no')))
self.update_content_length(len(body))
app_resp = [body]
close_if_possible(versions_resp.app_iter)
drain_and_close(versions_resp)
elif is_success(versions_resp.status_int):
try:
listing = json.loads(versions_resp.body)
Expand Down Expand Up @@ -1324,9 +1315,8 @@ def list_containers(self, req, api_version, account, start_response):
try:
versions_listing = json.loads(versions_resp.body)
except ValueError:
close_if_possible(versions_resp.app_iter)
versions_listing = []
else:
finally:
close_if_possible(versions_resp.app_iter)

# create a dict from versions listing to facilitate
Expand Down

0 comments on commit 2a8d47f

Please sign in to comment.