Skip to content

Commit

Permalink
Merge pull request #510 from nolar/throttle-on-unexpected-errors
Browse files Browse the repository at this point in the history
Throttle individual resource processing on unexpected errors
  • Loading branch information
nolar committed Sep 4, 2020
2 parents 0dae3ef + 92de9fb commit a168268
Show file tree
Hide file tree
Showing 8 changed files with 518 additions and 26 deletions.
33 changes: 33 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -364,3 +364,36 @@ e.g. by arbitrarily labelling them, so that a new diff-base is generated:
kubectl label kex -l somelabel=somevalue ping=pong
Then, switch to the new storage alone, without the transitional setup.


Error throttling
================

To prevent uncontrollable flood of activities in case of errors that prevent
the resources being marked as handled, which could lead to Kubernetes API
flooding, it is possible to throttle the activities on a per-resource basis:

.. code-block:: python
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.batching.error_delays = [10, 20, 30]
In that case, all unhandled errors in the framework or in the Kubernetes API
would be backed-off by 10s after the 1st error, then by 20s after the 2nd one,
and then by 30s after the 3rd, 4th, 5th errors and so on. On a first success,
the backoff intervals will be reset and re-used again on the next error.

The default is a sequence of Fibonacci numbers from 1 second to 10 minutes.

The back-offs are not persisted, so they are lost on the operator restarts.

These back-offs do not cover errors in the handlers -- the handlers have their
own configurable per-handler back-off intervals. These back-offs are for Kopf
and for Kubernetes API mostly (and other environment issues).

To disable throttling (on your own risk!), set the error delays to
an empty list (``[]``) or an empty tuple (``()``).
Interpret as: no throttling delays set -- no throttling sleeps done.
74 changes: 72 additions & 2 deletions kopf/reactor/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
"""
import asyncio
import collections
import contextlib
import datetime
import logging
from typing import Collection, Optional, Union
import time
from typing import AsyncGenerator, Collection, Iterable, Optional, Tuple, Type, Union

from kopf.clients import patching
from kopf.engines import loggers
from kopf.structs import bodies, configuration, dicts, diffs, patches, primitives, resources
from kopf.structs import bodies, configuration, containers, dicts, \
diffs, patches, primitives, resources

# How often to wake up from the long sleep, to show liveness in the logs.
WAITING_KEEPALIVE_INTERVAL = 10 * 60
Expand Down Expand Up @@ -139,6 +142,10 @@ async def sleep_or_wait(
actual_delays = [delay for delay in passed_delays if delay is not None]
minimal_delay = min(actual_delays) if actual_delays else 0

# Do not go for the real low-level system sleep if there is no need to sleep.
if minimal_delay <= 0:
return None

awakening_event = (
wakeup.async_event if isinstance(wakeup, primitives.DaemonStopper) else
wakeup if wakeup is not None else
Expand All @@ -154,3 +161,66 @@ async def sleep_or_wait(
end_time = loop.time()
duration = end_time - start_time
return max(0, minimal_delay - duration)


@contextlib.asynccontextmanager
async def throttled(
*,
throttler: containers.Throttler,
delays: Iterable[float],
wakeup: Optional[Union[asyncio.Event, primitives.DaemonStopper]] = None,
logger: Union[logging.Logger, logging.LoggerAdapter],
errors: Union[Type[BaseException], Tuple[Type[BaseException], ...]] = Exception,
) -> AsyncGenerator[bool, None]:
"""
A helper to throttle any arbitrary operation.
"""

# The 1st sleep: if throttling is already active, but was interrupted by a queue replenishment.
# It is needed to properly process the latest known event after the successful sleep.
if throttler.active_until is not None:
remaining_time = throttler.active_until - time.monotonic()
unslept_time = await sleep_or_wait(remaining_time, wakeup=wakeup)
if unslept_time is None:
logger.info("Throttling is over. Switching back to normal operations.")
throttler.active_until = None

# Run only if throttling either is not active initially, or has just finished sleeping.
should_run = throttler.active_until is None
try:
yield should_run

except Exception as e:

# If it is not an error-of-interest, escalate normally. BaseExceptions are escalated always.
if not isinstance(e, errors):
raise

# If the code does not follow the recommendation to not run, escalate.
if not should_run:
raise

# Activate throttling if not yet active, or reuse the active sequence of delays.
if throttler.source_of_delays is None:
throttler.source_of_delays = iter(delays)

# Choose a delay. If there are none, avoid throttling at all.
throttle_delay = next(throttler.source_of_delays, throttler.last_used_delay)
if throttle_delay is not None:
throttler.last_used_delay = throttle_delay
throttler.active_until = time.monotonic() + throttle_delay
logger.exception(f"Throttling for {throttle_delay} seconds due to an unexpected error:")

else:
# Reset the throttling. Release the iterator to keep the memory free during normal run.
if should_run:
throttler.source_of_delays = throttler.last_used_delay = None

# The 2nd sleep: if throttling has been just activated (i.e. there was a fresh error).
# It is needed to have better logging/sleeping without workers exiting for "no events".
if throttler.active_until is not None and should_run:
remaining_time = throttler.active_until - time.monotonic()
unslept_time = await sleep_or_wait(remaining_time, wakeup=wakeup)
if unslept_time is None:
throttler.active_until = None
logger.info("Throttling is over. Switching back to normal operations.")
79 changes: 58 additions & 21 deletions kopf/reactor/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ async def process_resource_event(
Convert the low-level events, as provided by the watching/queueing tasks,
to the high-level causes, and then call the cause-handling logic.
All the internally provoked changes are intercepted, do not create causes,
and therefore do not call the handling logic.
"""
finalizer = settings.persistence.finalizer

# Recall what is stored about that object. Share it in little portions with the consumers.
# And immediately forget it if the object is deleted from the cluster (but keep in memory).
Expand All @@ -62,11 +58,64 @@ async def process_resource_event(
body = memory.live_fresh_body if memory.live_fresh_body is not None else bodies.Body(raw_body)
patch = patches.Patch()

# Each object has its own prefixed logger, to distinguish parallel handling.
logger = loggers.ObjectLogger(body=body, settings=settings)
posting.event_queue_loop_var.set(asyncio.get_running_loop())
posting.event_queue_var.set(event_queue) # till the end of this object's task.
# Throttle the non-handler-related errors. The regular event watching/batching continues
# to prevent queue overfilling, but the processing is skipped (events are ignored).
# Choice of place: late enough to have a per-resource memory for a throttler; also, a logger.
# But early enough to catch environment errors from K8s API, and from most of the complex code.
async with effects.throttled(
throttler=memory.error_throttler,
logger=loggers.LocalObjectLogger(body=body, settings=settings),
delays=settings.batching.error_delays,
wakeup=replenished,
) as should_run:
if should_run:

# Each object has its own prefixed logger, to distinguish parallel handling.
logger = loggers.ObjectLogger(body=body, settings=settings)
posting.event_queue_loop_var.set(asyncio.get_running_loop())
posting.event_queue_var.set(event_queue) # till the end of this object's task.

# Do the magic -- do the job.
delays = await process_resource_causes(
lifecycle=lifecycle,
registry=registry,
settings=settings,
resource=resource,
raw_event=raw_event,
body=body,
patch=patch,
logger=logger,
memory=memory,
)

# Whatever was done, apply the accumulated changes to the object, or sleep-n-touch for delays.
# But only once, to reduce the number of API calls and the generated irrelevant events.
# And only if the object is at least supposed to exist (not "GONE"), even if actually does not.
if raw_event['type'] != 'DELETED':
await effects.apply(
settings=settings,
resource=resource,
body=body,
patch=patch,
logger=logger,
delays=delays,
replenished=replenished,
)


async def process_resource_causes(
lifecycle: lifecycles.LifeCycleFn,
registry: registries.OperatorRegistry,
settings: configuration.OperatorSettings,
resource: resources.Resource,
raw_event: bodies.RawEvent,
body: bodies.Body,
patch: patches.Patch,
logger: loggers.ObjectLogger,
memory: containers.ResourceMemory,
) -> Collection[float]:

finalizer = settings.persistence.finalizer
extra_fields = registry.resource_changing_handlers[resource].get_extra_fields()
old = settings.persistence.diffbase_storage.fetch(body=body)
new = settings.persistence.diffbase_storage.build(body=body, extra_fields=extra_fields)
Expand Down Expand Up @@ -173,19 +222,7 @@ async def process_resource_event(
logger.debug("Removing the finalizer, thus allowing the actual deletion.")
finalizers.allow_deletion(body=body, patch=patch, finalizer=finalizer)

# Whatever was done, apply the accumulated changes to the object, or sleep-n-touch for delays.
# But only once, to reduce the number of API calls and the generated irrelevant events.
# And only if the object is at least supposed to exist (not "GONE"), even if actually does not.
if raw_event['type'] != 'DELETED':
await effects.apply(
settings=settings,
resource=resource,
body=body,
patch=patch,
logger=logger,
delays=list(resource_spawning_delays) + list(resource_changing_delays),
replenished=replenished,
)
return list(resource_spawning_delays) + list(resource_changing_delays)


async def process_resource_watching_cause(
Expand Down
20 changes: 19 additions & 1 deletion kopf/structs/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"""
import concurrent.futures
import dataclasses
from typing import Optional
from typing import Iterable, Optional

from kopf import config # for legacy defaults only
from kopf.storage import diffbase, progress
Expand Down Expand Up @@ -122,6 +122,24 @@ class BatchingSettings:
This is the time given to the worker to deplete and process the queue.
"""

error_delays: Iterable[float] = (1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)
"""
Backoff intervals in case of unexpected errors in the framework (not the handlers).
Per-resource workers freeze all activities for this number of seconds in case of errors.
Once they are back to work, they process only the latest event seen (due to event batching).
Every further error leads to the next, even bigger delay (10m is enough for a default maximum).
Every success resets the backoff intervals, and it goes from the beginning on the next error.
If needed, this value can be an arbitrary collection/iterator/object:
only ``iter()`` is called on every new throttling cycle, no other protocols
are required; but make sure that it is re-iterable for multiple uses.
To disable throttling (on your own risk), set it to ``[]`` or ``()``.
"""



@dataclasses.dataclass
class ExecutionSettings:
Expand Down
11 changes: 11 additions & 0 deletions kopf/structs/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ class Daemon:
stopper: primitives.DaemonStopper # a signaller for the termination and its reason.


@dataclasses.dataclass(frozen=False)
class Throttler:
""" A state of throttling for one specific purpose (there can be a few). """
source_of_delays: Optional[Iterator[float]] = None
last_used_delay: Optional[float] = None
active_until: Optional[float] = None # internal clock


class Memo(Dict[Any, Any]):
""" A container to hold arbitrary keys-fields assigned by the users. """

Expand Down Expand Up @@ -61,6 +69,9 @@ class ResourceMemory:
noticed_by_listing: bool = False
fully_handled_once: bool = False

# Throttling for API errors (mostly from PATCHing) and for processing in general.
error_throttler: Throttler = dataclasses.field(default_factory=Throttler)

# For background and timed threads/tasks (invoked with the kwargs of the last-seen body).
live_fresh_body: Optional[bodies.Body] = None
idle_reset_time: float = dataclasses.field(default_factory=time.monotonic)
Expand Down
1 change: 1 addition & 0 deletions tests/settings/test_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ async def test_declared_public_interface_and_promised_defaults():
assert settings.batching.idle_timeout == 5.0
assert settings.batching.exit_timeout == 2.0
assert settings.batching.batch_window == 0.1
assert settings.batching.error_delays == (1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)
assert settings.execution.executor is not None
assert settings.execution.max_workers is None
9 changes: 7 additions & 2 deletions tests/test_sleeping.py → tests/timing/test_sleeping.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@ async def test_specific_delays_only_are_awaited(timer):
assert unslept is None


async def test_passed_delays_skip_sleeping(timer):
@pytest.mark.parametrize('delays', [
pytest.param([1000, -10], id='mixed-signs'),
pytest.param([-100, -10], id='all-negative'),
pytest.param(-10, id='alone'),
])
async def test_negative_delays_skip_sleeping(timer, delays):
with timer:
unslept = await asyncio.wait_for(sleep_or_wait([0.10, -10]), timeout=1.0)
unslept = await asyncio.wait_for(sleep_or_wait(delays), timeout=1.0)
assert timer.seconds < 0.01
assert unslept is None

Expand Down
Loading

0 comments on commit a168268

Please sign in to comment.