diff --git a/.gitignore b/.gitignore index 0f971d72..28bb7551 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,5 @@ Pipfile.lock # macOS .DS_Store +# VS Code +.vscode/ diff --git a/docs/source/dev/decorators.rst b/docs/source/dev/decorators.rst index 35390176..739ff63c 100644 --- a/docs/source/dev/decorators.rst +++ b/docs/source/dev/decorators.rst @@ -150,6 +150,23 @@ to specify one of the alternative approaches exposed through the def get_user(self, user): """Get user by username.""" +You can implement a custom backoff strategy by extending the class +:class:`uplink.retry.RetryBackoff`: + +.. code-block:: python + :emphasize-lines: 3,7 + + from uplink.retry import RetryBackoff + + class MyCustomBackoff(RetryBackoff): + ... + + class GitHub(uplink.Consumer): + @uplink.retry(backoff=MyCustomBackoff()) + @uplink.get("/users/{user}") + def get_user(self, user): + pass + .. automodule:: uplink.retry.backoff :members: diff --git a/tests/integration/test_retry.py b/tests/integration/test_retry.py index 851f4c7f..5fb12412 100644 --- a/tests/integration/test_retry.py +++ b/tests/integration/test_retry.py @@ -29,7 +29,9 @@ def get_user(self, user): def get_issue(self, user, repo, issue): pass - @retry(max_attempts=3, on_exception=retry.CONNECTION_TIMEOUT) + @retry( + stop=retry.stop.after_attempt(3), on_exception=retry.CONNECTION_TIMEOUT + ) @get("repos/{user}/{repo}/project/{project}") def get_project(self, user, repo, project): pass diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 02ecc0a0..0fd7b9e7 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -35,6 +35,26 @@ def test_fixed_backoff(): assert next(iterator) == 10 +def test_compose_backoff(mocker): + left = backoff.from_iterable([0, 1]) + right = backoff.from_iterable([2]) + mocker.spy(left, "handle_after_final_retry") + mocker.spy(right, "handle_after_final_retry") + strategy = left | right + + # Should return None after both strategies are exhausted + assert strategy.get_timeout_after_response(None, None) == 0 + assert strategy.get_timeout_after_exception(None, None, None, None) == 1 + assert strategy.get_timeout_after_response(None, None) == 2 + assert strategy.get_timeout_after_exception(None, None, None, None) is None + + # Should invoke both strategies after_stop method + strategy.handle_after_final_retry() + + left.handle_after_final_retry.assert_called_once_with() + right.handle_after_final_retry.assert_called_once_with() + + def test_retry_stop_default(): decorator = retry() assert stop.NEVER == decorator._stop @@ -75,22 +95,6 @@ def test_stop_or_with_none(): assert stop1 is (stop1 | None) -def test_retry_custom_stop(): - def custom_stop(*_): - return True - - decorator = retry(stop=custom_stop) - assert decorator._stop == custom_stop - - -def test_retry_backoff(): - def custom_backoff(*_): - return True - - decorator = retry(backoff=custom_backoff) - assert decorator._backoff == custom_backoff - - def test_retry_decorator_exposes_submodules_as_properties(): assert retry.backoff is backoff assert retry.stop is stop diff --git a/uplink/retry/__init__.py b/uplink/retry/__init__.py index 36057f18..f3d4c4cc 100644 --- a/uplink/retry/__init__.py +++ b/uplink/retry/__init__.py @@ -1,4 +1,5 @@ from uplink.retry.retry import retry from uplink.retry.when import RetryPredicate +from uplink.retry.backoff import RetryBackoff -__all__ = ["retry", "RetryPredicate"] +__all__ = ["retry", "RetryPredicate", "RetryBackoff"] diff --git a/uplink/retry/backoff.py b/uplink/retry/backoff.py index b5eb93b9..07f9648a 100644 --- a/uplink/retry/backoff.py +++ b/uplink/retry/backoff.py @@ -8,7 +8,132 @@ __all__ = ["jittered", "exponential", "fixed"] -def jittered(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE): +def from_iterable(iterable): + """Creates a retry strategy from an iterable of timeouts""" + + class IterableRetryBackoff(_IterableBackoff): + def __iter__(self): + return iter(iterable) + + return IterableRetryBackoff() + + +def from_iterable_factory(iterable_factory): + """ + Creates a retry strategy from a function that returns an iterable + of timeouts. + """ + + class IterableRetryBackoff(_IterableBackoff): + def __iter__(self): + return iter(iterable_factory()) + + return IterableRetryBackoff() + + +class RetryBackoff(object): + """ + Base class for a strategy that calculates the timeout between + retry attempts. + + You can compose two ``RetryBackoff`` instances by using the ``|`` + operator:: + + CustomBackoffA() | CustomBackoffB() + + The resulting backoff strategy will first compute the timeout using + the left-hand instance. If that timeout is ``None``, the strategy + will try to compute a fallback using the right-hand instance. If + both instances return ``None``, the resulting strategy will also + return ``None``. + """ + + def get_timeout_after_response(self, request, response): + """ + Returns the number of seconds to wait before retrying the + request, or ``None`` to indicate that the given response should + be returned. + """ + raise NotImplementedError # pragma: no cover + + def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb): + """ + Returns the number of seconds to wait before retrying the + request, or ``None`` to indicate that the given exception + should be raised. + """ + raise NotImplementedError # pragma: no cover + + def handle_after_final_retry(self): + """ + Handles any clean-up necessary following the final retry + attempt. + """ + pass # pragma: no cover + + def __or__(self, other): + """Composes the current strategy with another.""" + assert isinstance( + other, RetryBackoff + ), "Both objects should be backoffs." + return _Or(self, other) + + +class _Or(RetryBackoff): + def __init__(self, left, right): + self._left = left + self._right = right + + def get_timeout_after_response(self, request, response): + timeout = self._left.get_timeout_after_response(request, response) + if timeout is None: + return self._right.get_timeout_after_response(request, response) + return timeout + + def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb): + timeout = self._left.get_timeout_after_exception( + request, exc_type, exc_val, exc_tb + ) + if timeout is None: + return self._right.get_timeout_after_exception( + request, exc_type, exc_val, exc_tb + ) + return timeout + + def handle_after_final_retry(self): + self._left.handle_after_final_retry() + self._right.handle_after_final_retry() + + +class _IterableBackoff(RetryBackoff): + __iterator = None + + def __iter__(self): + raise NotImplementedError # pragma: no cover + + def __call__(self): + return iter(self) + + def __next(self): + if self.__iterator is None: + self.__iterator = iter(self) + + try: + return next(self.__iterator) + except StopIteration: + return None + + def get_timeout_after_response(self, request, response): + return self.__next() + + def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb): + return self.__next() + + def handle_after_final_retry(self): + self.__iterator = None + + +class jittered(_IterableBackoff): """ Waits using capped exponential backoff and full jitter. @@ -18,35 +143,43 @@ def jittered(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE): time of competing clients in a distributed system experiencing high contention. """ - exp_backoff = exponential(base, multiplier, minimum, maximum) - return lambda *_: iter( - random.uniform(0, 1) * delay for delay in exp_backoff() - ) # pragma: no cover + def __init__(self, base=2, multiplier=1, minimum=0, maximum=MAX_VALUE): + self._exp_backoff = exponential(base, multiplier, minimum, maximum) + + def __iter__(self): + for delay in self._exp_backoff(): # pragma: no branch + yield random.uniform(0, 1) * delay -def exponential(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE): + +class exponential(_IterableBackoff): """ Waits using capped exponential backoff, meaning that the delay is multiplied by a constant ``base`` after each attempt, up to an optional ``maximum`` value. """ - def wait_iterator(): - delay = multiplier - while minimum > delay: - delay *= base - while True: - yield min(delay, maximum) - delay *= base + def __init__(self, base=2, multiplier=1, minimum=0, maximum=MAX_VALUE): + self._base = base + self._multiplier = multiplier + self._minimum = minimum + self._maximum = maximum - return wait_iterator + def __iter__(self): + delay = self._multiplier + while self._minimum > delay: + delay *= self._base + while True: + yield min(delay, self._maximum) + delay *= self._base -def fixed(seconds): +class fixed(_IterableBackoff): """Waits for a fixed number of ``seconds`` before each retry.""" - def wait_iterator(): - while True: - yield seconds + def __init__(self, seconds): + self._seconds = seconds - return wait_iterator + def __iter__(self): + while True: + yield self._seconds diff --git a/uplink/retry/retry.py b/uplink/retry/retry.py index 1f7bee65..9f06a14f 100644 --- a/uplink/retry/retry.py +++ b/uplink/retry/retry.py @@ -11,40 +11,6 @@ __all__ = ["retry"] -class RetryTemplate(RequestTemplate): - def __init__(self, backoff, retry_condition): - self._backoff = backoff - self._backoff_iterator = None - self._condition = retry_condition - self._reset() - - def _next_delay(self): - try: - delay = next(self._backoff_iterator) - except StopIteration: - # Fallback to the default behavior - pass - else: - return transitions.sleep(delay) - - def _reset(self): - self._backoff_iterator = self._backoff() - - def after_response(self, request, response): - if self._condition.should_retry_after_response(response): - return self._next_delay() - else: - self._reset() - - def after_exception(self, request, exc_type, exc_val, exc_tb): - if self._condition.should_retry_after_exception( - exc_type, exc_val, exc_tb - ): - return self._next_delay() - else: - self._reset() - - # noinspection PyPep8Naming class retry(decorators.MethodAnnotation): """ @@ -87,13 +53,18 @@ class retry(decorators.MethodAnnotation): that should prompt a retry attempt. stop (:obj:`callable`, optional): A function that creates predicates that decide when to stop retrying a request. - backoff (:obj:`callable`, optional): A function that creates - an iterator over the ordered sequence of timeouts between - retries. If not specified, exponential backoff is used. + backoff (:class:`RetryBackoff`, :obj:`callable`, optional): A + backoff strategy or a function that creates an iterator + over the ordered sequence of timeouts between retries. If + not specified, exponential backoff is used. """ _DEFAULT_PREDICATE = when_mod.raises(Exception) + stop = stop_mod + backoff = backoff_mod + when = when_mod + def __init__( self, when=None, @@ -102,22 +73,27 @@ def __init__( stop=None, backoff=None, ): - if stop is not None: - self._stop = stop - elif max_attempts is not None: - self._stop = stop_mod.after_attempt(max_attempts) - else: - self._stop = stop_mod.NEVER - - self._predicate = when + if stop is None: + if max_attempts is not None: + stop = stop_mod.after_attempt(max_attempts) + else: + stop = stop_mod.NEVER if on_exception is not None: - self._predicate = when_mod.raises(on_exception) | self._predicate + when = when_mod.raises(on_exception) | when + + if when is None: + when = self._DEFAULT_PREDICATE - if self._predicate is None: - self._predicate = self._DEFAULT_PREDICATE + if backoff is None: + backoff = backoff_mod.jittered() - self._backoff = backoff_mod.jittered() if backoff is None else backoff + if not isinstance(backoff, backoff_mod.RetryBackoff): + backoff = backoff_mod.from_iterable_factory(backoff) + + self._when = when + self._backoff = backoff + self._stop = stop BASE_CLIENT_EXCEPTION = ClientExceptionProxy( lambda ex: ex.BaseClientException @@ -129,22 +105,43 @@ def __init__( def modify_request(self, request_builder): request_builder.add_request_template( - self._create_template(request_builder) + _RetryTemplate( + self._when(request_builder), + self._backoff, + self._stop, + ) ) - def _create_template(self, request_builder): - return RetryTemplate( - self._backoff_iterator, self._predicate(request_builder) - ) - def _backoff_iterator(self): - stop_gen = self._stop() - for delay in self._backoff(): - next(stop_gen) - if stop_gen.send(delay): - break - yield delay +class _RetryTemplate(RequestTemplate): + def __init__(self, condition, backoff, stop): + self._condition = condition + self._backoff = backoff + self._stop = stop + self._stop_iter = self._stop() + + def _process_timeout(self, timeout): + next(self._stop_iter) + if timeout is None or self._stop_iter.send(timeout): + self._backoff.handle_after_final_retry() + self._stop_iter = self._stop() + return None + return transitions.sleep(timeout) - stop = stop_mod - backoff = backoff_mod - when = when_mod + def after_response(self, request, response): + if not self._condition.should_retry_after_response(response): + return self._process_timeout(None) + return self._process_timeout( + self._backoff.get_timeout_after_response(request, response) + ) + + def after_exception(self, request, exc_type, exc_val, exc_tb): + if not self._condition.should_retry_after_exception( + exc_type, exc_val, exc_tb + ): + return self._process_timeout(None) + return self._process_timeout( + self._backoff.get_timeout_after_exception( + request, exc_type, exc_val, exc_tb + ) + )