Skip to content

Commit

Permalink
Define "instant exit" as a specific timeout + zero-time asyncio cycles
Browse files Browse the repository at this point in the history
There is nothing "instant" in the world, everything takes time.
We have to specifically define what we consider as "instant"
before switching to external resource patching & status polling.

These are quite advanced features, so they are not reflected
in the configuration docs. A docstring should be enough.
  • Loading branch information
nolar committed May 8, 2020
1 parent 0c2dd44 commit 44902cb
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 13 deletions.
53 changes: 43 additions & 10 deletions kopf/reactor/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,9 @@ async def stop_resource_daemons(
raise RuntimeError(f"Unsupported daemon handler: {handler!r}")

# Whatever happens with other flags & logs & timings, this flag must be surely set.
stopper.set(reason=reason)

# It might be so, that the daemon exits instantly (if written properly). Avoid patching and
# unnecessary handling cycles in this case: just give the asyncio event loop an extra cycle.
await asyncio.sleep(0)
if not stopper.is_set(reason=reason):
stopper.set(reason=reason)
await _wait_for_instant_exit(settings=settings, daemon=daemon)

# Try different approaches to exiting the daemon based on timings.
if daemon.task.done():
Expand All @@ -188,14 +186,18 @@ async def stop_resource_daemons(
if not stopper.is_set(reason=primitives.DaemonStoppingReason.DAEMON_SIGNALLED):
stopper.set(reason=primitives.DaemonStoppingReason.DAEMON_SIGNALLED)
logger.debug(f"{handler} is signalled to exit gracefully.")
delays.append(backoff - age)
await _wait_for_instant_exit(settings=settings, daemon=daemon)
if not daemon.task.done(): # due to "instant exit"
delays.append(backoff - age)

elif timeout is not None and age < timeout + (backoff or 0):
if not stopper.is_set(reason=primitives.DaemonStoppingReason.DAEMON_CANCELLED):
stopper.set(reason=primitives.DaemonStoppingReason.DAEMON_CANCELLED)
logger.debug(f"{handler} is signalled to exit by force.")
daemon.task.cancel()
delays.append(timeout + (backoff or 0) - age)
await _wait_for_instant_exit(settings=settings, daemon=daemon)
if not daemon.task.done(): # due to "instant exit"
delays.append(timeout + (backoff or 0) - age)

elif timeout is not None:
if not stopper.is_set(reason=primitives.DaemonStoppingReason.DAEMON_ABANDONED):
Expand Down Expand Up @@ -227,7 +229,7 @@ async def daemon_killer(

# Terminate all running daemons when the operator exits (and this task is cancelled).
coros = [
stop_daemon(daemon=daemon)
stop_daemon(daemon=daemon, settings=settings)
for memory in memories.iter_all_memories()
for daemon in memory.running_daemons.values()
]
Expand All @@ -237,6 +239,7 @@ async def daemon_killer(

async def stop_daemon(
*,
settings: configuration.OperatorSettings,
daemon: containers.Daemon,
) -> None:
"""
Expand All @@ -259,9 +262,9 @@ async def stop_daemon(
raise RuntimeError(f"Unsupported daemon handler: {handler!r}")

# Whatever happens with other flags & logs & timings, this flag must be surely set.
# It might be so, that the daemon exits instantly (if written properly: give it chance).
daemon.stopper.set(reason=primitives.DaemonStoppingReason.OPERATOR_EXITING)
await asyncio.sleep(0) # give a chance to exit gracefully if it can.
await _wait_for_instant_exit(settings=settings, daemon=daemon)

if daemon.task.done():
daemon.logger.debug(f"{handler} has exited gracefully.")

Expand All @@ -283,6 +286,36 @@ async def stop_daemon(
warnings.warn(f"{handler} did not exit in time.", ResourceWarning)


async def _wait_for_instant_exit(
*,
settings: configuration.OperatorSettings,
daemon: containers.Daemon,
) -> None:
"""
Wait for a kind-of-instant exit of a daemon/timer.
It might be so, that the daemon exits instantly (if written properly).
Avoid resource patching and unnecessary handling cycles in this case:
just give the asyncio event loop an extra time & cycles to finish it.
There is nothing "instant", of course. Any code takes some time to execute.
We just assume that the "instant" is something defined by a small timeout
and a few zero-time asyncio cycles (read as: zero-time `await` statements).
"""

if daemon.task.done():
pass

elif settings.background.instant_exit_timeout is not None:
await asyncio.wait([daemon.task], timeout=settings.background.instant_exit_timeout)

elif settings.background.instant_exit_zero_time_cycles is not None:
for _ in range(settings.background.instant_exit_zero_time_cycles):
await asyncio.sleep(0)
if daemon.task.done():
break


async def _runner(
*,
settings: configuration.OperatorSettings,
Expand Down
45 changes: 45 additions & 0 deletions kopf/structs/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,51 @@ class BackgroundSettings:
exit gracefully by its own, but it does not).
"""

instant_exit_timeout: Optional[float] = None
"""
For how long (in seconds) to wait for a daemon/timer to exit instantly.
If they continue running after the stopper is set for longer than this time,
then external polling is initiated via the resource's persistence storages,
as for regular handlers.
The "instant exit" timeout is neither combined with any other timeouts, nor
deducted from any other timeouts, such as the daemon cancellation timeout.
So, keep the timeout low: 0.001, 0.01, or 0.1 are good enough; 1.0 is risky.
Big delays can cause slower operator reaction to the resource deletion
or operator exiting, but can reduce the amount of unnecessary patches.
If the timeout is not set (the default), then a limited amount of zero-time
asyncio event loop cycles is used instead.
"""

instant_exit_zero_time_cycles: Optional[int] = 10
"""
How many asyncio cycles to give to a daemon/timer to exit instantly.
There is a speed-up hack to let the daemons/timers to exit instantly,
without external patching & polling. For this, ``asyncio.sleep(0)`` is used
to give control back to the event loop and their coroutines. However,
the daemons/timers can do extra `await` calls (even zero-time) before
actually exiting, which prematurely returns the control flow back
to the daemon-stopper coroutine.
This configuration value is a maximum amount of zero-time `await` statements
that can happen before exiting: both in the daemon and in the framework.
It the daemons/timers coroutines exit earlier, extra cycles are not used.
If they continue running after that, then external polling is initiated
via the resource's persistence storages, as for regular handlers.
All of this happens with zero delays, so no slowdown is expected
(but a bit of CPU will be consumed).
If an "instant exit" timeout is set, the zero-time cycles are not used.
PS: The default value is a rough guess on a typical code complexity.
"""


@dataclasses.dataclass
class OperatorSettings:
Expand Down
56 changes: 53 additions & 3 deletions tests/handling/daemons/test_daemon_termination.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,58 @@ async def fn(**kwargs):
await dummy.wait_for_daemon_done()

assert timer.seconds < 0.01 # near-instantly
assert k8s_mocked.sleep_or_wait.call_count == 0
assert k8s_mocked.patch_obj.call_count == 1
assert k8s_mocked.patch_obj.call_args_list[0][1]['patch']['metadata']['finalizers'] == []


async def test_daemon_exits_instantly_via_cancellation_with_backoff(
registry, settings, resource, dummy, simulate_cycle,
caplog, assert_logs, k8s_mocked, frozen_time, mocker):
caplog.set_level(logging.DEBUG)
dummy.steps['finish'].set()

# A daemon-under-test.
@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
cancellation_backoff=5, cancellation_timeout=10)
async def fn(**kwargs):
dummy.kwargs = kwargs
dummy.steps['called'].set()
try:
await asyncio.Event().wait() # this one is cancelled.
except asyncio.CancelledError:
await dummy.steps['finish'].wait() # simulated slow (non-instant) exiting.

# Trigger spawning and wait until ready. Assume the finalizers are already added.
finalizer = settings.persistence.finalizer
event_object = {'metadata': {'finalizers': [finalizer]}}
await simulate_cycle(event_object)
await dummy.steps['called'].wait()

# 1st stage: trigger termination due to resource deletion. Wait for backoff.
mocker.resetall()
event_object.setdefault('metadata', {}).update({'deletionTimestamp': '...'})
await simulate_cycle(event_object)

assert k8s_mocked.sleep_or_wait.call_count == 1
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] is None
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 5.0
assert k8s_mocked.patch_obj.call_count == 1
assert k8s_mocked.patch_obj.call_args_list[0][1]['patch']['status']['kopf']['dummy']

# 2nd cycle: cancelling after the backoff is reached. Wait for cancellation timeout.
mocker.resetall()
frozen_time.tick(5) # backoff time or slightly above it
await simulate_cycle(event_object)

assert k8s_mocked.sleep_or_wait.call_count == 0
assert k8s_mocked.patch_obj.call_count == 1
assert k8s_mocked.patch_obj.call_args_list[0][1]['patch']['metadata']['finalizers'] == []

# Cleanup.
await dummy.wait_for_daemon_done()


async def test_daemon_exits_via_cancellation_with_backoff(
async def test_daemon_exits_slowly_via_cancellation_with_backoff(
registry, settings, resource, dummy, simulate_cycle,
caplog, assert_logs, k8s_mocked, frozen_time, mocker):
caplog.set_level(logging.DEBUG)
Expand All @@ -51,7 +96,10 @@ async def test_daemon_exits_via_cancellation_with_backoff(
async def fn(**kwargs):
dummy.kwargs = kwargs
dummy.steps['called'].set()
await asyncio.Event().wait() # this one is cancelled.
try:
await asyncio.Event().wait() # this one is cancelled.
except asyncio.CancelledError:
await dummy.steps['finish'].wait() # simulated slow (non-instant) exiting.

# Trigger spawning and wait until ready. Assume the finalizers are already added.
finalizer = settings.persistence.finalizer
Expand Down Expand Up @@ -82,6 +130,8 @@ async def fn(**kwargs):
# 3rd cycle: the daemon has exited, the resource should be unblocked from actual deletion.
mocker.resetall()
frozen_time.tick(1) # any time below timeout
dummy.steps['finish'].set()
await asyncio.sleep(0)
await simulate_cycle(event_object)
await dummy.wait_for_daemon_done()

Expand Down

0 comments on commit 44902cb

Please sign in to comment.