From f5648638ee6f939556ebfcb40dfdb8a590d3b5ae Mon Sep 17 00:00:00 2001 From: David Goetz Date: Mon, 4 Nov 2013 17:06:06 +0000 Subject: [PATCH] Get retry. If a source times out on read try another one of them with a modified range. There had to be a lot of moved around code to get this working but it should all make sense. Change-Id: Ieaf045690a8823927a6f38098a95b37a4d4adb70 --- swift/proxy/controllers/base.py | 595 ++++++++++++----------- swift/proxy/controllers/obj.py | 31 +- swift/proxy/server.py | 121 +++++ test/unit/__init__.py | 13 +- test/unit/proxy/controllers/test_base.py | 28 +- test/unit/proxy/test_server.py | 125 +++-- 6 files changed, 557 insertions(+), 356 deletions(-) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 8976328cec..35ca457830 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -28,7 +28,7 @@ import time import functools import inspect -import itertools +from sys import exc_info from swift import gettext_ as _ from urllib import quote @@ -46,7 +46,8 @@ is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \ HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \ HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED -from swift.common.swob import Request, Response, HeaderKeyDict +from swift.common.swob import Request, Response, HeaderKeyDict, Range, \ + HTTPException, HTTPRequestedRangeNotSatisfiable def update_headers(response, headers): @@ -518,6 +519,286 @@ def _get_object_info(app, env, account, container, obj, swift_source=None): return None +def close_swift_conn(src): + """ + Force close the http connection to the backend. + + :param src: the response from the backend + """ + try: + # Since the backends set "Connection: close" in their response + # headers, the response object (src) is solely responsible for the + # socket. The connection object (src.swift_conn) has no references + # to the socket, so calling its close() method does nothing, and + # therefore we don't do it. + # + # Also, since calling the response's close() method might not + # close the underlying socket but only decrement some + # reference-counter, we have a special method here that really, + # really kills the underlying socket with a close() syscall. + src.nuke_from_orbit() # it's the only way to be sure + except Exception: + pass + + +class GetOrHeadHandler(object): + + def __init__(self, app, req, server_type, ring, partition, path, + backend_headers): + self.app = app + self.ring = ring + self.server_type = server_type + self.partition = partition + self.path = path + self.backend_headers = backend_headers + self.used_nodes = [] + self.used_source_etag = '' + + # stuff from request + self.req_method = req.method + self.req_path = req.path + self.req_query_string = req.query_string + self.newest = config_true_value(req.headers.get('x-newest', 'f')) + + # populated when finding source + self.statuses = [] + self.reasons = [] + self.bodies = [] + self.source_headers = [] + + def fast_forward(self, num_bytes): + """ + Will skip num_bytes into the current ranges. + :params num_bytes: the number of bytes that have already been read on + this request. This will change the Range header + so that the next req will start where it left off. + :raises NotImplementedError: if this is a multirange request + :raises ValueError: if invalid range header + :raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes + > end of range + """ + if 'Range' in self.backend_headers: + req_range = Range(self.backend_headers['Range']) + + if len(req_range.ranges) > 1: + raise NotImplementedError() + begin, end = req_range.ranges.pop() + if begin is None: + # this is a -50 range req (last 50 bytes of file) + end -= num_bytes + else: + begin += num_bytes + if end and begin > end: + raise HTTPRequestedRangeNotSatisfiable() + req_range.ranges = [(begin, end)] + self.backend_headers['Range'] = str(req_range) + else: + self.backend_headers['Range'] = 'bytes=%d-' % num_bytes + + def is_good_source(self, src): + """ + Indicates whether or not the request made to the backend found + what it was looking for. + + :param src: the response from the backend + :returns: True if found, False if not + """ + if self.server_type == 'Object' and src.status == 416: + return True + return is_success(src.status) or is_redirection(src.status) + + def _make_app_iter(self, node, source): + """ + Returns an iterator over the contents of the source (via its read + func). There is also quite a bit of cleanup to ensure garbage + collection works and the underlying socket of the source is closed. + + :param source: The httplib.Response object this iterator should read + from. + :param node: The node the source is reading from, for logging purposes. + """ + try: + nchunks = 0 + bytes_read_from_source = 0 + while True: + try: + with ChunkReadTimeout(self.app.node_timeout): + chunk = source.read(self.app.object_chunk_size) + nchunks += 1 + bytes_read_from_source += len(chunk) + except ChunkReadTimeout: + exc_type, exc_value, exc_traceback = exc_info() + if self.newest: + raise exc_type, exc_value, exc_traceback + try: + self.fast_forward(bytes_read_from_source) + except (NotImplementedError, HTTPException, ValueError): + raise exc_type, exc_value, exc_traceback + new_source, new_node = self._get_source_and_node() + if new_source: + self.app.exception_occurred( + node, _('Object'), + _('Trying to read during GET (retrying)')) + # Close-out the connection as best as possible. + if getattr(source, 'swift_conn', None): + close_swift_conn(source) + source = new_source + node = new_node + bytes_read_from_source = 0 + continue + else: + raise exc_type, exc_value, exc_traceback + if not chunk: + break + with ChunkWriteTimeout(self.app.client_timeout): + yield chunk + # This is for fairness; if the network is outpacing the CPU, + # we'll always be able to read and write data without + # encountering an EWOULDBLOCK, and so eventlet will not switch + # greenthreads on its own. We do it manually so that clients + # don't starve. + # + # The number 5 here was chosen by making stuff up. It's not + # every single chunk, but it's not too big either, so it seemed + # like it would probably be an okay choice. + # + # Note that we may trampoline to other greenthreads more often + # than once every 5 chunks, depending on how blocking our + # network IO is; the explicit sleep here simply provides a + # lower bound on the rate of trampolining. + if nchunks % 5 == 0: + sleep() + + except ChunkReadTimeout: + self.app.exception_occurred(node, _('Object'), + _('Trying to read during GET')) + raise + except ChunkWriteTimeout: + self.app.logger.warn( + _('Client did not read from proxy within %ss') % + self.app.client_timeout) + self.app.logger.increment('client_timeouts') + except GeneratorExit: + self.app.logger.warn(_('Client disconnected on read')) + except Exception: + self.app.logger.exception(_('Trying to send to client')) + raise + finally: + # Close-out the connection as best as possible. + if getattr(source, 'swift_conn', None): + close_swift_conn(source) + + def _get_source_and_node(self): + + self.statuses = [] + self.reasons = [] + self.bodies = [] + self.source_headers = [] + sources = [] + + for node in self.app.iter_nodes(self.ring, self.partition): + if node in self.used_nodes: + continue + start_node_timing = time.time() + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect( + node['ip'], node['port'], node['device'], + self.partition, self.req_method, self.path, + headers=self.backend_headers, + query_string=self.req_query_string) + self.app.set_node_timing(node, time.time() - start_node_timing) + + with Timeout(self.app.node_timeout): + possible_source = conn.getresponse() + # See NOTE: swift_conn at top of file about this. + possible_source.swift_conn = conn + except (Exception, Timeout): + self.app.exception_occurred( + node, self.server_type, + _('Trying to %(method)s %(path)s') % + {'method': self.req_method, 'path': self.req_path}) + continue + if self.is_good_source(possible_source): + # 404 if we know we don't have a synced copy + if not float(possible_source.getheader('X-PUT-Timestamp', 1)): + self.statuses.append(HTTP_NOT_FOUND) + self.reasons.append('') + self.bodies.append('') + self.source_headers.append('') + close_swift_conn(possible_source) + else: + if self.used_source_etag: + src_headers = dict( + (k.lower(), v) for k, v in + possible_source.getheaders()) + if src_headers.get('etag', '').strip('"') != \ + self.used_source_etag: + self.statuses.append(HTTP_NOT_FOUND) + self.reasons.append('') + self.bodies.append('') + self.source_headers.append('') + continue + + self.statuses.append(possible_source.status) + self.reasons.append(possible_source.reason) + self.bodies.append('') + self.source_headers.append('') + sources.append((possible_source, node)) + if not self.newest: # one good source is enough + break + else: + self.statuses.append(possible_source.status) + self.reasons.append(possible_source.reason) + self.bodies.append(possible_source.read()) + self.source_headers.append(possible_source.getheaders()) + if possible_source.status == HTTP_INSUFFICIENT_STORAGE: + self.app.error_limit(node, _('ERROR Insufficient Storage')) + elif is_server_error(possible_source.status): + self.app.error_occurred( + node, _('ERROR %(status)d %(body)s ' + 'From %(type)s Server') % + {'status': possible_source.status, + 'body': self.bodies[-1][:1024], + 'type': self.server_type}) + + if sources: + sources.sort(key=lambda s: source_key(s[0])) + source, node = sources.pop() + for src, _junk in sources: + close_swift_conn(src) + self.used_nodes.append(node) + src_headers = dict( + (k.lower(), v) for k, v in + possible_source.getheaders()) + self.used_source_etag = src_headers.get('etag', '').strip('"') + return source, node + return None, None + + def get_working_response(self, req): + source, node = self._get_source_and_node() + res = None + if source: + res = Response(request=req) + if req.method == 'GET' and \ + source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): + res.app_iter = self._make_app_iter(node, source) + # See NOTE: swift_conn at top of file about this. + res.swift_conn = source.swift_conn + res.status = source.status + update_headers(res, source.getheaders()) + if not res.environ: + res.environ = {} + res.environ['swift_x_timestamp'] = \ + source.getheader('x-timestamp') + res.accept_ranges = 'bytes' + res.content_length = source.getheader('Content-Length') + if source.getheader('Content-Type'): + res.charset = None + res.content_type = source.getheader('Content-Type') + return res + + class Controller(object): """Base WSGI controller class for the proxy""" server_type = 'Base' @@ -602,71 +883,6 @@ def generate_request_headers(self, orig_req=None, additional=None, headers['referer'] = referer return headers - def error_occurred(self, node, msg): - """ - Handle logging, and handling of errors. - - :param node: dictionary of node to handle errors for - :param msg: error message - """ - node['errors'] = node.get('errors', 0) + 1 - node['last_error'] = time.time() - self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), - {'msg': msg, 'ip': node['ip'], - 'port': node['port'], 'device': node['device']}) - - def exception_occurred(self, node, typ, additional_info): - """ - Handle logging of generic exceptions. - - :param node: dictionary of node to log the error for - :param typ: server type - :param additional_info: additional information to log - """ - self.app.logger.exception( - _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: ' - '%(info)s'), - {'type': typ, 'ip': node['ip'], 'port': node['port'], - 'device': node['device'], 'info': additional_info}) - - def error_limited(self, node): - """ - Check if the node is currently error limited. - - :param node: dictionary of node to check - :returns: True if error limited, False otherwise - """ - now = time.time() - if 'errors' not in node: - return False - if 'last_error' in node and node['last_error'] < \ - now - self.app.error_suppression_interval: - del node['last_error'] - if 'errors' in node: - del node['errors'] - return False - limited = node['errors'] > self.app.error_suppression_limit - if limited: - self.app.logger.debug( - _('Node error limited %(ip)s:%(port)s (%(device)s)'), node) - return limited - - def error_limit(self, node, msg): - """ - Mark a node as error limited. This immediately pretends the - node received enough errors to trigger error suppression. Use - this for errors like Insufficient Storage. For other errors - use :func:`error_occurred`. - - :param node: dictionary of node to error limit - :param msg: error message - """ - node['errors'] = self.app.error_suppression_limit + 1 - node['last_error'] = time.time() - self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), - {'msg': msg, 'ip': node['ip'], - 'port': node['port'], 'device': node['device']}) - def account_info(self, account, req=None): """ Get account information, and also verify that the account exists. @@ -719,62 +935,6 @@ def container_info(self, account, container, req=None): info['nodes'] = nodes return info - def iter_nodes(self, ring, partition, node_iter=None): - """ - Yields nodes for a ring partition, skipping over error - limited nodes and stopping at the configurable number of - nodes. If a node yielded subsequently gets error limited, an - extra node will be yielded to take its place. - - Note that if you're going to iterate over this concurrently from - multiple greenthreads, you'll want to use a - swift.common.utils.GreenthreadSafeIterator to serialize access. - Otherwise, you may get ValueErrors from concurrent access. (You also - may not, depending on how logging is configured, the vagaries of - socket IO and eventlet, and the phase of the moon.) - - :param ring: ring to get yield nodes from - :param partition: ring partition to yield nodes for - :param node_iter: optional iterable of nodes to try. Useful if you - want to filter or reorder the nodes. - """ - part_nodes = ring.get_part_nodes(partition) - if node_iter is None: - node_iter = itertools.chain(part_nodes, - ring.get_more_nodes(partition)) - num_primary_nodes = len(part_nodes) - - # Use of list() here forcibly yanks the first N nodes (the primary - # nodes) from node_iter, so the rest of its values are handoffs. - primary_nodes = self.app.sort_nodes( - list(itertools.islice(node_iter, num_primary_nodes))) - handoff_nodes = node_iter - nodes_left = self.app.request_node_count(ring) - - for node in primary_nodes: - if not self.error_limited(node): - yield node - if not self.error_limited(node): - nodes_left -= 1 - if nodes_left <= 0: - return - - handoffs = 0 - for node in handoff_nodes: - if not self.error_limited(node): - handoffs += 1 - if self.app.log_handoffs: - self.app.logger.increment('handoff_count') - self.app.logger.warning( - 'Handoff requested (%d)' % handoffs) - if handoffs == len(primary_nodes): - self.app.logger.increment('handoff_all_count') - yield node - if not self.error_limited(node): - nodes_left -= 1 - if nodes_left <= 0: - return - def _make_request(self, nodes, part, method, path, headers, query, logger_thread_locals): """ @@ -812,11 +972,13 @@ def _make_request(self, nodes, part, method, path, headers, query, return resp.status, resp.reason, resp.getheaders(), \ resp.read() elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node, _('ERROR Insufficient Storage')) + self.app.error_limit(node, + _('ERROR Insufficient Storage')) except (Exception, Timeout): - self.exception_occurred(node, self.server_type, - _('Trying to %(method)s %(path)s') % - {'method': method, 'path': path}) + self.app.exception_occurred( + node, self.server_type, + _('Trying to %(method)s %(path)s') % + {'method': method, 'path': path}) def make_requests(self, req, ring, part, method, path, headers, query_string=''): @@ -836,7 +998,7 @@ def make_requests(self, req, ring, part, method, path, headers, :returns: a swob.Response object """ start_nodes = ring.get_part_nodes(part) - nodes = GreenthreadSafeIterator(self.iter_nodes(ring, part)) + nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part)) pile = GreenAsyncPile(len(start_nodes)) for head in headers: pile.spawn(self._make_request, nodes, part, method, path, @@ -929,92 +1091,6 @@ def HEAD(self, req): """ return self.GETorHEAD(req) - def _make_app_iter(self, node, source): - """ - Returns an iterator over the contents of the source (via its read - func). There is also quite a bit of cleanup to ensure garbage - collection works and the underlying socket of the source is closed. - - :param source: The bufferedhttp.Response object this iterator should - read from. - :param node: The node the source is reading from, for logging purposes. - """ - try: - nchunks = 0 - while True: - with ChunkReadTimeout(self.app.node_timeout): - chunk = source.read(self.app.object_chunk_size) - nchunks += 1 - if not chunk: - break - with ChunkWriteTimeout(self.app.client_timeout): - yield chunk - # This is for fairness; if the network is outpacing the CPU, - # we'll always be able to read and write data without - # encountering an EWOULDBLOCK, and so eventlet will not switch - # greenthreads on its own. We do it manually so that clients - # don't starve. - # - # The number 5 here was chosen by making stuff up. It's not - # every single chunk, but it's not too big either, so it seemed - # like it would probably be an okay choice. - # - # Note that we may trampoline to other greenthreads more often - # than once every 5 chunks, depending on how blocking our - # network IO is; the explicit sleep here simply provides a - # lower bound on the rate of trampolining. - if nchunks % 5 == 0: - sleep() - except ChunkReadTimeout: - self.exception_occurred(node, _('Object'), - _('Trying to read during GET')) - raise - except ChunkWriteTimeout: - self.app.logger.warn( - _('Client did not read from proxy within %ss') % - self.app.client_timeout) - self.app.logger.increment('client_timeouts') - except GeneratorExit: - self.app.logger.warn(_('Client disconnected on read')) - except Exception: - self.app.logger.exception(_('Trying to send to client')) - raise - finally: - # Close-out the connection as best as possible. - if getattr(source, 'swift_conn', None): - self.close_swift_conn(source) - - def close_swift_conn(self, src): - """ - Force close the http connection to the backend. - - :param src: the response from the backend - """ - try: - # Since the backends set "Connection: close" in their response - # headers, the response object (src) is solely responsible for the - # socket. The connection object (src.swift_conn) has no references - # to the socket, so calling its close() method does nothing, and - # therefore we don't do it. - # - # Also, since calling the response's close() method might not - # close the underlying socket but only decrement some - # reference-counter, we have a special method here that really, - # really kills the underlying socket with a close() syscall. - src.nuke_from_orbit() # it's the only way to be sure - except Exception: - pass - - def is_good_source(self, src): - """ - Indicates whether or not the request made to the backend found - what it was looking for. - - :param src: the response from the backend - :returns: True if found, False if not - """ - return is_success(src.status) or is_redirection(src.status) - def autocreate_account(self, env, account): """ Autocreate an account @@ -1047,87 +1123,18 @@ def GETorHEAD_base(self, req, server_type, ring, partition, path): :param path: path for the request :returns: swob.Response object """ - statuses = [] - reasons = [] - bodies = [] - source_headers = [] - sources = [] - newest = config_true_value(req.headers.get('x-newest', 'f')) - headers = self.generate_request_headers(req, additional=req.headers) - for node in self.iter_nodes(ring, partition): - start_node_timing = time.time() - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect( - node['ip'], node['port'], node['device'], partition, - req.method, path, headers=headers, - query_string=req.query_string) - self.app.set_node_timing(node, time.time() - start_node_timing) - with Timeout(self.app.node_timeout): - possible_source = conn.getresponse() - # See NOTE: swift_conn at top of file about this. - possible_source.swift_conn = conn - except (Exception, Timeout): - self.exception_occurred( - node, server_type, _('Trying to %(method)s %(path)s') % - {'method': req.method, 'path': req.path}) - continue - if self.is_good_source(possible_source): - # 404 if we know we don't have a synced copy - if not float(possible_source.getheader('X-PUT-Timestamp', 1)): - statuses.append(HTTP_NOT_FOUND) - reasons.append('') - bodies.append('') - source_headers.append('') - self.close_swift_conn(possible_source) - else: - statuses.append(possible_source.status) - reasons.append(possible_source.reason) - bodies.append('') - source_headers.append('') - sources.append((possible_source, node)) - if not newest: # one good source is enough - break - else: - statuses.append(possible_source.status) - reasons.append(possible_source.reason) - bodies.append(possible_source.read()) - source_headers.append(possible_source.getheaders()) - if possible_source.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node, _('ERROR Insufficient Storage')) - elif is_server_error(possible_source.status): - self.error_occurred(node, _('ERROR %(status)d %(body)s ' - 'From %(type)s Server') % - {'status': possible_source.status, - 'body': bodies[-1][:1024], - 'type': server_type}) - res = None - if sources: - sources.sort(key=lambda s: source_key(s[0])) - source, node = sources.pop() - for src, _junk in sources: - self.close_swift_conn(src) - res = Response(request=req) - if req.method == 'GET' and \ - source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): - res.app_iter = self._make_app_iter(node, source) - # See NOTE: swift_conn at top of file about this. - res.swift_conn = source.swift_conn - res.status = source.status - update_headers(res, source.getheaders()) - if not res.environ: - res.environ = {} - res.environ['swift_x_timestamp'] = \ - source.getheader('x-timestamp') - res.accept_ranges = 'bytes' - res.content_length = source.getheader('Content-Length') - if source.getheader('Content-Type'): - res.charset = None - res.content_type = source.getheader('Content-Type') + backend_headers = self.generate_request_headers( + req, additional=req.headers) + + handler = GetOrHeadHandler(self.app, req, server_type, ring, + partition, path, backend_headers) + res = handler.get_working_response(req) + if not res: - res = self.best_response(req, statuses, reasons, bodies, - '%s %s' % (server_type, req.method), - headers=source_headers) + res = self.best_response( + req, handler.statuses, handler.reasons, handler.bodies, + '%s %s' % (server_type, req.method), + headers=handler.source_headers) try: (account, container) = split_path(req.path_info, 1, 2) _set_info_cache(self.app, req.environ, account, container, res) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 5d6f4a0cb5..da690b4680 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -504,7 +504,7 @@ def iter_nodes_local_first(self, ring, partition): is_local = self.app.write_affinity_is_local_fn if is_local is None: - return self.iter_nodes(ring, partition) + return self.app.iter_nodes(ring, partition) all_nodes = itertools.chain(primary_nodes, ring.get_more_nodes(partition)) @@ -519,20 +519,9 @@ def iter_nodes_local_first(self, ring, partition): itertools.ifilter(lambda node: node not in first_n_local_nodes, all_nodes)) - return self.iter_nodes( + return self.app.iter_nodes( ring, partition, node_iter=local_first_node_iter) - def is_good_source(self, src): - """ - Indicates whether or not the request made to the backend found - what it was looking for. - - In the case of an object, a 416 indicates that we found a - backend with the object. - """ - return src.status == 416 or \ - super(ObjectController, self).is_good_source(src) - def GETorHEAD(self, req): """Handle HTTP GET or HEAD requests.""" container_info = self.container_info( @@ -800,8 +789,9 @@ def _send_file(self, conn, path): conn.send(chunk) except (Exception, ChunkWriteTimeout): conn.failed = True - self.exception_occurred(conn.node, _('Object'), - _('Trying to write to %s') % path) + self.app.exception_occurred( + conn.node, _('Object'), + _('Trying to write to %s') % path) conn.queue.task_done() def _connect_put_node(self, nodes, part, path, headers, @@ -827,10 +817,11 @@ def _connect_put_node(self, nodes, part, path, headers, conn.node = node return conn elif resp.status == HTTP_INSUFFICIENT_STORAGE: - self.error_limit(node, _('ERROR Insufficient Storage')) + self.app.error_limit(node, _('ERROR Insufficient Storage')) except (Exception, Timeout): - self.exception_occurred(node, _('Object'), - _('Expect: 100-continue on %s') % path) + self.app.exception_occurred( + node, _('Object'), + _('Expect: 100-continue on %s') % path) def _get_put_responses(self, req, conns, nodes): statuses = [] @@ -846,7 +837,7 @@ def get_conn_response(conn): else: return conn.getresponse() except (Exception, Timeout): - self.exception_occurred( + self.app.exception_occurred( conn.node, _('Object'), _('Trying to get final status of PUT to %s') % req.path) pile = GreenAsyncPile(len(conns)) @@ -858,7 +849,7 @@ def get_conn_response(conn): reasons.append(response.reason) bodies.append(response.read()) if response.status >= HTTP_INTERNAL_SERVER_ERROR: - self.error_occurred( + self.app.error_occurred( conn.node, _('ERROR %(status)d %(body)s From Object Server ' 're: %(path)s') % diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 691e2d005d..569fc653c8 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -19,6 +19,7 @@ from swift import gettext_ as _ from random import shuffle from time import time +import itertools from eventlet import Timeout @@ -332,6 +333,126 @@ def set_node_timing(self, node, timing): timing = round(timing, 3) # sort timings to the millisecond self.node_timings[node['ip']] = (timing, now + self.timing_expiry) + def error_limited(self, node): + """ + Check if the node is currently error limited. + + :param node: dictionary of node to check + :returns: True if error limited, False otherwise + """ + now = time() + if 'errors' not in node: + return False + if 'last_error' in node and node['last_error'] < \ + now - self.error_suppression_interval: + del node['last_error'] + if 'errors' in node: + del node['errors'] + return False + limited = node['errors'] > self.error_suppression_limit + if limited: + self.logger.debug( + _('Node error limited %(ip)s:%(port)s (%(device)s)'), node) + return limited + + def error_limit(self, node, msg): + """ + Mark a node as error limited. This immediately pretends the + node received enough errors to trigger error suppression. Use + this for errors like Insufficient Storage. For other errors + use :func:`error_occurred`. + + :param node: dictionary of node to error limit + :param msg: error message + """ + node['errors'] = self.error_suppression_limit + 1 + node['last_error'] = time() + self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), + {'msg': msg, 'ip': node['ip'], + 'port': node['port'], 'device': node['device']}) + + def error_occurred(self, node, msg): + """ + Handle logging, and handling of errors. + + :param node: dictionary of node to handle errors for + :param msg: error message + """ + node['errors'] = node.get('errors', 0) + 1 + node['last_error'] = time() + self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'), + {'msg': msg, 'ip': node['ip'], + 'port': node['port'], 'device': node['device']}) + + def iter_nodes(self, ring, partition, node_iter=None): + """ + Yields nodes for a ring partition, skipping over error + limited nodes and stopping at the configurable number of + nodes. If a node yielded subsequently gets error limited, an + extra node will be yielded to take its place. + + Note that if you're going to iterate over this concurrently from + multiple greenthreads, you'll want to use a + swift.common.utils.GreenthreadSafeIterator to serialize access. + Otherwise, you may get ValueErrors from concurrent access. (You also + may not, depending on how logging is configured, the vagaries of + socket IO and eventlet, and the phase of the moon.) + + :param ring: ring to get yield nodes from + :param partition: ring partition to yield nodes for + :param node_iter: optional iterable of nodes to try. Useful if you + want to filter or reorder the nodes. + """ + part_nodes = ring.get_part_nodes(partition) + if node_iter is None: + node_iter = itertools.chain(part_nodes, + ring.get_more_nodes(partition)) + num_primary_nodes = len(part_nodes) + + # Use of list() here forcibly yanks the first N nodes (the primary + # nodes) from node_iter, so the rest of its values are handoffs. + primary_nodes = self.sort_nodes( + list(itertools.islice(node_iter, num_primary_nodes))) + handoff_nodes = node_iter + nodes_left = self.request_node_count(ring) + + for node in primary_nodes: + if not self.error_limited(node): + yield node + if not self.error_limited(node): + nodes_left -= 1 + if nodes_left <= 0: + return + handoffs = 0 + for node in handoff_nodes: + if not self.error_limited(node): + handoffs += 1 + if self.log_handoffs: + self.logger.increment('handoff_count') + self.logger.warning( + 'Handoff requested (%d)' % handoffs) + if handoffs == len(primary_nodes): + self.logger.increment('handoff_all_count') + yield node + if not self.error_limited(node): + nodes_left -= 1 + if nodes_left <= 0: + return + + def exception_occurred(self, node, typ, additional_info): + """ + Handle logging of generic exceptions. + + :param node: dictionary of node to log the error for + :param typ: server type + :param additional_info: additional information to log + """ + self.logger.exception( + _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: ' + '%(info)s'), + {'type': typ, 'ip': node['ip'], 'port': node['port'], + 'device': node['device'], 'info': additional_info}) + def app_factory(global_conf, **local_conf): """paste.deploy app factory for creating WSGI proxy apps.""" diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 0d07493b04..8ad04cbc5e 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -426,6 +426,8 @@ def __init__(self, status, etag=None, body='', timestamp='1', self.body = body self.headers = headers or {} self.timestamp = timestamp + if kwargs.get('slow') and isinstance(kwargs['slow'], list): + kwargs['slow'][0] -= 1 def getresponse(self): if kwargs.get('raise_exc'): @@ -469,13 +471,18 @@ def getheaders(self): headers['x-container-timestamp'] = '1' except StopIteration: pass - if 'slow' in kwargs: + if self.am_slow(): headers['content-length'] = '4' headers.update(self.headers) return headers.items() + def am_slow(self): + if kwargs.get('slow') and isinstance(kwargs['slow'], list): + return kwargs['slow'][0] >= 0 + return bool(kwargs.get('slow')) + def read(self, amt=None): - if 'slow' in kwargs: + if self.am_slow(): if self.sent < 4: self.sent += 1 sleep(0.1) @@ -485,7 +492,7 @@ def read(self, amt=None): return rv def send(self, amt=None): - if 'slow' in kwargs: + if self.am_slow(): if self.received < 4: self.received += 1 sleep(0.1) diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 1fe8880dd1..403bf3d40e 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -18,8 +18,9 @@ from swift.proxy.controllers.base import headers_to_container_info, \ headers_to_account_info, headers_to_object_info, get_container_info, \ get_container_memcache_key, get_account_info, get_account_memcache_key, \ - get_object_env_key, _get_cache_key, get_info, get_object_info, Controller -from swift.common.swob import Request + get_object_env_key, _get_cache_key, get_info, get_object_info, \ + Controller, GetOrHeadHandler +from swift.common.swob import Request, HTTPException from swift.common.utils import split_path from test.unit import fake_http_connect, FakeRing, FakeMemcache from swift.proxy import server as proxy_server @@ -446,3 +447,26 @@ def test_have_quorum(self): self.assertEqual(base.have_quorum([201, 201], 2), True) self.assertEqual(base.have_quorum([404, 404], 2), True) self.assertEqual(base.have_quorum([201, 404, 201, 201], 4), True) + + def test_range_fast_forward(self): + req = Request.blank('/') + handler = GetOrHeadHandler(None, req, None, None, None, None, {}) + handler.fast_forward(50) + self.assertEquals(handler.backend_headers['Range'], 'bytes=50-') + + handler = GetOrHeadHandler(None, req, None, None, None, None, + {'Range': 'bytes=23-50'}) + handler.fast_forward(20) + self.assertEquals(handler.backend_headers['Range'], 'bytes=43-50') + self.assertRaises(HTTPException, + handler.fast_forward, 80) + + handler = GetOrHeadHandler(None, req, None, None, None, None, + {'Range': 'bytes=23-'}) + handler.fast_forward(20) + self.assertEquals(handler.backend_headers['Range'], 'bytes=43-') + + handler = GetOrHeadHandler(None, req, None, None, None, None, + {'Range': 'bytes=-100'}) + handler.fast_forward(20) + self.assertEquals(handler.backend_headers['Range'], 'bytes=-80') diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 72a813cf92..010934e100 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -810,7 +810,7 @@ def is_r0(node): controller = \ proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg') - controller.error_limit( + self.app.error_limit( self.app.object_ring.get_part_nodes(1)[0], 'test') set_http_connect(200, 200, # account, container 201, 201, 201, # 3 working backends @@ -2267,6 +2267,73 @@ def test_node_read_timeout(self): got_exc = True self.assert_(got_exc) + def test_node_read_timeout_retry(self): + with save_globals(): + self.app.account_ring.get_nodes('account') + for dev in self.app.account_ring.devs.values(): + dev['ip'] = '127.0.0.1' + dev['port'] = 1 + self.app.container_ring.get_nodes('account') + for dev in self.app.container_ring.devs.values(): + dev['ip'] = '127.0.0.1' + dev['port'] = 1 + self.app.object_ring.get_nodes('account') + for dev in self.app.object_ring.devs.values(): + dev['ip'] = '127.0.0.1' + dev['port'] = 1 + req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) + self.app.update_request(req) + + self.app.node_timeout = 0.1 + set_http_connect(200, 200, 200, slow=[3]) + resp = req.get_response(self.app) + got_exc = False + try: + resp.body + except ChunkReadTimeout: + got_exc = True + self.assert_(got_exc) + + set_http_connect(200, 200, 200, body='lalala', slow=[2]) + resp = req.get_response(self.app) + got_exc = False + try: + self.assertEquals(resp.body, 'lalala') + except ChunkReadTimeout: + got_exc = True + self.assert_(not got_exc) + + set_http_connect(200, 200, 200, body='lalala', slow=[2], + etags=['a', 'a', 'a']) + resp = req.get_response(self.app) + got_exc = False + try: + self.assertEquals(resp.body, 'lalala') + except ChunkReadTimeout: + got_exc = True + self.assert_(not got_exc) + + set_http_connect(200, 200, 200, body='lalala', slow=[2], + etags=['a', 'b', 'a']) + resp = req.get_response(self.app) + got_exc = False + try: + self.assertEquals(resp.body, 'lalala') + except ChunkReadTimeout: + got_exc = True + self.assert_(not got_exc) + + req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) + set_http_connect(200, 200, 200, body='lalala', slow=[2], + etags=['a', 'b', 'b']) + resp = req.get_response(self.app) + got_exc = False + try: + resp.body + except ChunkReadTimeout: + got_exc = True + self.assert_(got_exc) + def test_node_write_timeout(self): with save_globals(): self.app.account_ring.get_nodes('account') @@ -2305,44 +2372,35 @@ def test_iter_nodes(self): with save_globals(): try: self.app.object_ring.max_more_nodes = 2 - controller = proxy_server.ObjectController(self.app, 'account', - 'container', - 'object') partition, nodes = self.app.object_ring.get_nodes('account', 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(self.app.object_ring, - partition): + for node in self.app.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.app.object_ring.max_more_nodes = 20 self.app.request_node_count = lambda r: 20 - controller = proxy_server.ObjectController(self.app, 'account', - 'container', - 'object') partition, nodes = self.app.object_ring.get_nodes('account', 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(self.app.object_ring, - partition): + for node in self.app.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 9) self.app.log_handoffs = True self.app.logger = FakeLogger() self.app.object_ring.max_more_nodes = 2 - controller = proxy_server.ObjectController(self.app, 'account', - 'container', - 'object') partition, nodes = self.app.object_ring.get_nodes('account', 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(self.app.object_ring, - partition): + for node in self.app.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.assertEquals( @@ -2353,15 +2411,12 @@ def test_iter_nodes(self): self.app.log_handoffs = False self.app.logger = FakeLogger() self.app.object_ring.max_more_nodes = 2 - controller = proxy_server.ObjectController(self.app, 'account', - 'container', - 'object') partition, nodes = self.app.object_ring.get_nodes('account', 'container', 'object') collected_nodes = [] - for node in controller.iter_nodes(self.app.object_ring, - partition): + for node in self.app.iter_nodes(self.app.object_ring, + partition): collected_nodes.append(node) self.assertEquals(len(collected_nodes), 5) self.assertEquals(self.app.logger.log_dict['warning'], []) @@ -2370,21 +2425,19 @@ def test_iter_nodes(self): def test_iter_nodes_calls_sort_nodes(self): with mock.patch.object(self.app, 'sort_nodes') as sort_nodes: - controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') - for node in controller.iter_nodes(self.app.object_ring, 0): + for node in self.app.iter_nodes(self.app.object_ring, 0): pass sort_nodes.assert_called_once_with( self.app.object_ring.get_part_nodes(0)) def test_iter_nodes_skips_error_limited(self): with mock.patch.object(self.app, 'sort_nodes', lambda n: n): - controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') - first_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) - second_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + first_nodes = list(self.app.iter_nodes(self.app.object_ring, 0)) + second_nodes = list(self.app.iter_nodes(self.app.object_ring, 0)) self.assertTrue(first_nodes[0] in second_nodes) - controller.error_limit(first_nodes[0], 'test') - second_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + self.app.error_limit(first_nodes[0], 'test') + second_nodes = list(self.app.iter_nodes(self.app.object_ring, 0)) self.assertTrue(first_nodes[0] not in second_nodes) def test_iter_nodes_gives_extra_if_error_limited_inline(self): @@ -2393,33 +2446,31 @@ def test_iter_nodes_gives_extra_if_error_limited_inline(self): mock.patch.object(self.app, 'request_node_count', lambda r: 6), mock.patch.object(self.app.object_ring, 'max_more_nodes', 99)): - controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') - first_nodes = list(controller.iter_nodes(self.app.object_ring, 0)) + first_nodes = list(self.app.iter_nodes(self.app.object_ring, 0)) second_nodes = [] - for node in controller.iter_nodes(self.app.object_ring, 0): + for node in self.app.iter_nodes(self.app.object_ring, 0): if not second_nodes: - controller.error_limit(node, 'test') + self.app.error_limit(node, 'test') second_nodes.append(node) self.assertEquals(len(first_nodes), 6) self.assertEquals(len(second_nodes), 7) def test_iter_nodes_with_custom_node_iter(self): - controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o') node_list = [dict(id=n) for n in xrange(10)] with nested( mock.patch.object(self.app, 'sort_nodes', lambda n: n), mock.patch.object(self.app, 'request_node_count', lambda r: 3)): - got_nodes = list(controller.iter_nodes(self.app.object_ring, 0, - node_iter=iter(node_list))) + got_nodes = list(self.app.iter_nodes(self.app.object_ring, 0, + node_iter=iter(node_list))) self.assertEqual(node_list[:3], got_nodes) with nested( mock.patch.object(self.app, 'sort_nodes', lambda n: n), mock.patch.object(self.app, 'request_node_count', lambda r: 1000000)): - got_nodes = list(controller.iter_nodes(self.app.object_ring, 0, - node_iter=iter(node_list))) + got_nodes = list(self.app.iter_nodes(self.app.object_ring, 0, + node_iter=iter(node_list))) self.assertEqual(node_list, got_nodes) def test_best_response_sets_headers(self):