Skip to content

Commit

Permalink
Add RetryBackoff to support custom @Retry backoff strategies
Browse files Browse the repository at this point in the history
Adds a new base class, `uplink.retry.RetryBackoff`, which can be extended to implement custom backoff strategies. Subclasses can override three methods:

- `get_timeout_after_response(request, response)`: Returns the number of seconds to wait before retrying the request, or ``None`` to indicate that the given response should be returned.
- `get_timeout_after_exception(request, exc_type, exc_val, exc_tb)`: Returns the number of seconds to wait before retrying the request, or ``None`` to indicate that the given exception should be raised.
- `handle_after_final_retry()`: Handles any clean-up necessary following the final retry attempt.

An instance of a `RetryBackoff` subclass can be provided through the `backoff` argument of the `@retry` decorator.

#238
  • Loading branch information
prkumar committed Jan 6, 2022
1 parent 439f200 commit 7eebaf1
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 101 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,5 @@ Pipfile.lock
# macOS
.DS_Store

# VS Code
.vscode/
17 changes: 17 additions & 0 deletions docs/source/dev/decorators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ to specify one of the alternative approaches exposed through the
def get_user(self, user):
"""Get user by username."""
You can implement a custom backoff strategy by extending the class
:class:`uplink.retry.RetryBackoff`:

.. code-block:: python
:emphasize-lines: 3,7
from uplink.retry import RetryBackoff
class MyCustomBackoff(RetryBackoff):
...
class GitHub(uplink.Consumer):
@uplink.retry(backoff=MyCustomBackoff())
@uplink.get("/users/{user}")
def get_user(self, user):
pass
.. automodule:: uplink.retry.backoff
:members:

Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def get_user(self, user):
def get_issue(self, user, repo, issue):
pass

@retry(max_attempts=3, on_exception=retry.CONNECTION_TIMEOUT)
@retry(
stop=retry.stop.after_attempt(3), on_exception=retry.CONNECTION_TIMEOUT
)
@get("repos/{user}/{repo}/project/{project}")
def get_project(self, user, repo, project):
pass
Expand Down
36 changes: 20 additions & 16 deletions tests/unit/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ def test_fixed_backoff():
assert next(iterator) == 10


def test_compose_backoff(mocker):
left = backoff.from_iterable([0, 1])
right = backoff.from_iterable([2])
mocker.spy(left, "handle_after_final_retry")
mocker.spy(right, "handle_after_final_retry")
strategy = left | right

# Should return None after both strategies are exhausted
assert strategy.get_timeout_after_response(None, None) == 0
assert strategy.get_timeout_after_exception(None, None, None, None) == 1
assert strategy.get_timeout_after_response(None, None) == 2
assert strategy.get_timeout_after_exception(None, None, None, None) is None

# Should invoke both strategies after_stop method
strategy.handle_after_final_retry()

left.handle_after_final_retry.assert_called_once_with()
right.handle_after_final_retry.assert_called_once_with()


def test_retry_stop_default():
decorator = retry()
assert stop.NEVER == decorator._stop
Expand Down Expand Up @@ -75,22 +95,6 @@ def test_stop_or_with_none():
assert stop1 is (stop1 | None)


def test_retry_custom_stop():
def custom_stop(*_):
return True

decorator = retry(stop=custom_stop)
assert decorator._stop == custom_stop


def test_retry_backoff():
def custom_backoff(*_):
return True

decorator = retry(backoff=custom_backoff)
assert decorator._backoff == custom_backoff


def test_retry_decorator_exposes_submodules_as_properties():
assert retry.backoff is backoff
assert retry.stop is stop
Expand Down
3 changes: 2 additions & 1 deletion uplink/retry/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from uplink.retry.retry import retry
from uplink.retry.when import RetryPredicate
from uplink.retry.backoff import RetryBackoff

__all__ = ["retry", "RetryPredicate"]
__all__ = ["retry", "RetryPredicate", "RetryBackoff"]
171 changes: 152 additions & 19 deletions uplink/retry/backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,132 @@
__all__ = ["jittered", "exponential", "fixed"]


def jittered(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):
def from_iterable(iterable):
"""Creates a retry strategy from an iterable of timeouts"""

class IterableRetryBackoff(_IterableBackoff):
def __iter__(self):
return iter(iterable)

return IterableRetryBackoff()


def from_iterable_factory(iterable_factory):
"""
Creates a retry strategy from a function that returns an iterable
of timeouts.
"""

class IterableRetryBackoff(_IterableBackoff):
def __iter__(self):
return iter(iterable_factory())

return IterableRetryBackoff()


class RetryBackoff(object):
"""
Base class for a strategy that calculates the timeout between
retry attempts.
You can compose two ``RetryBackoff`` instances by using the ``|``
operator::
CustomBackoffA() | CustomBackoffB()
The resulting backoff strategy will first compute the timeout using
the left-hand instance. If that timeout is ``None``, the strategy
will try to compute a fallback using the right-hand instance. If
both instances return ``None``, the resulting strategy will also
return ``None``.
"""

def get_timeout_after_response(self, request, response):
"""
Returns the number of seconds to wait before retrying the
request, or ``None`` to indicate that the given response should
be returned.
"""
raise NotImplementedError # pragma: no cover

def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb):
"""
Returns the number of seconds to wait before retrying the
request, or ``None`` to indicate that the given exception
should be raised.
"""
raise NotImplementedError # pragma: no cover

def handle_after_final_retry(self):
"""
Handles any clean-up necessary following the final retry
attempt.
"""
pass # pragma: no cover

def __or__(self, other):
"""Composes the current strategy with another."""
assert isinstance(
other, RetryBackoff
), "Both objects should be backoffs."
return _Or(self, other)


class _Or(RetryBackoff):
def __init__(self, left, right):
self._left = left
self._right = right

def get_timeout_after_response(self, request, response):
timeout = self._left.get_timeout_after_response(request, response)
if timeout is None:
return self._right.get_timeout_after_response(request, response)
return timeout

def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb):
timeout = self._left.get_timeout_after_exception(
request, exc_type, exc_val, exc_tb
)
if timeout is None:
return self._right.get_timeout_after_exception(
request, exc_type, exc_val, exc_tb
)
return timeout

def handle_after_final_retry(self):
self._left.handle_after_final_retry()
self._right.handle_after_final_retry()


class _IterableBackoff(RetryBackoff):
__iterator = None

def __iter__(self):
raise NotImplementedError # pragma: no cover

def __call__(self):
return iter(self)

def __next(self):
if self.__iterator is None:
self.__iterator = iter(self)

try:
return next(self.__iterator)
except StopIteration:
return None

def get_timeout_after_response(self, request, response):
return self.__next()

def get_timeout_after_exception(self, request, exc_type, exc_val, exc_tb):
return self.__next()

def handle_after_final_retry(self):
self.__iterator = None


class jittered(_IterableBackoff):
"""
Waits using capped exponential backoff and full jitter.
Expand All @@ -18,35 +143,43 @@ def jittered(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):
time of competing clients in a distributed system experiencing
high contention.
"""
exp_backoff = exponential(base, multiplier, minimum, maximum)
return lambda *_: iter(
random.uniform(0, 1) * delay for delay in exp_backoff()
) # pragma: no cover

def __init__(self, base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):
self._exp_backoff = exponential(base, multiplier, minimum, maximum)

def __iter__(self):
for delay in self._exp_backoff(): # pragma: no branch
yield random.uniform(0, 1) * delay

def exponential(base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):

class exponential(_IterableBackoff):
"""
Waits using capped exponential backoff, meaning that the delay
is multiplied by a constant ``base`` after each attempt, up to
an optional ``maximum`` value.
"""

def wait_iterator():
delay = multiplier
while minimum > delay:
delay *= base
while True:
yield min(delay, maximum)
delay *= base
def __init__(self, base=2, multiplier=1, minimum=0, maximum=MAX_VALUE):
self._base = base
self._multiplier = multiplier
self._minimum = minimum
self._maximum = maximum

return wait_iterator
def __iter__(self):
delay = self._multiplier
while self._minimum > delay:
delay *= self._base
while True:
yield min(delay, self._maximum)
delay *= self._base


def fixed(seconds):
class fixed(_IterableBackoff):
"""Waits for a fixed number of ``seconds`` before each retry."""

def wait_iterator():
while True:
yield seconds
def __init__(self, seconds):
self._seconds = seconds

return wait_iterator
def __iter__(self):
while True:
yield self._seconds
Loading

0 comments on commit 7eebaf1

Please sign in to comment.