Skip to content

Commit

Permalink
Tests for daemons & timers (only smoke-tests, not much detailed)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar committed Mar 31, 2020
1 parent c44d1e6 commit 2eb91dc
Show file tree
Hide file tree
Showing 9 changed files with 886 additions and 0 deletions.
105 changes: 105 additions & 0 deletions tests/handling/daemons/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import asyncio
import time
import unittest.mock

import freezegun
import pytest

import kopf
from kopf.reactor.processing import process_resource_event
from kopf.structs.bodies import RawBody
from kopf.structs.containers import ResourceMemories


class DaemonDummy:

def __init__(self):
super().__init__()
self.mock = unittest.mock.MagicMock()
self.kwargs = {}
self.steps = {
'called': asyncio.Event(),
'finish': asyncio.Event(),
'error': asyncio.Event(),
}

async def wait_for_daemon_done(self):
stopped = self.kwargs['stopped']
await stopped.wait()
while not stopped._stopper.reason & stopped._stopper.reason.DONE:
await asyncio.sleep(0) # give control back to asyncio event loop


@pytest.fixture()
def dummy():
return DaemonDummy()


@pytest.fixture()
def memories():
return ResourceMemories()


@pytest.fixture()
def simulate_cycle(k8s_mocked, registry, settings, resource, memories, mocker):
"""
Simulate K8s behaviour locally in memory (some meaningful approximation).
"""

def _merge_dicts(src, dst):
for key, val in src.items():
if isinstance(val, dict) and key in dst:
_merge_dicts(src[key], dst[key])
else:
dst[key] = val

async def _simulate_cycle(event_object: RawBody):
mocker.resetall()

await process_resource_event(
lifecycle=kopf.lifecycles.all_at_once,
registry=registry,
settings=settings,
resource=resource,
memories=memories,
raw_event={'type': 'irrelevant', 'object': event_object},
replenished=asyncio.Event(),
event_queue=asyncio.Queue(),
)

# Do the same as k8s does: merge the patches into the object.
for call in k8s_mocked.patch_obj.call_args_list:
_merge_dicts(call[1]['patch'], event_object)

return _simulate_cycle


@pytest.fixture()
def frozen_time():
"""
A helper to simulate time movements to step over long sleeps/timeouts.
"""
# TODO LATER: Either freezegun should support the system clock, or find something else.
with freezegun.freeze_time("2020-01-01 00:00:00") as frozen:
# Use freezegun-supported time instead of system clocks -- for testing purposes only.
# NB: Patch strictly after the time is frozen -- to use fake_time(), not real time().
with unittest.mock.patch('time.monotonic', time.time), \
unittest.mock.patch('time.perf_counter', time.time):
yield frozen


# The time-driven tests mock the sleeps, and shift the time as much as it was requested to sleep.
# This makes the sleep realistic for the app code, though executed instantly for the tests.
@pytest.fixture()
def manual_time(k8s_mocked, frozen_time):
async def sleep_or_wait_substitute(delay, *_, **__):
if delay is None:
pass
elif isinstance(delay, float):
frozen_time.tick(delay)
else:
frozen_time.tick(min(delay))

k8s_mocked.sleep_or_wait.side_effect = sleep_or_wait_substitute
yield frozen_time

181 changes: 181 additions & 0 deletions tests/handling/daemons/test_daemon_errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import logging

import kopf
from kopf.storage.finalizers import FINALIZER


async def test_daemon_stopped_on_permanent_error(
registry, resource, dummy, manual_time,
caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
backoff=0.01)
async def fn(**kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
raise kopf.PermanentError("boo!")

event_object = {'metadata': {'finalizers': [FINALIZER]}}
await simulate_cycle(event_object)

await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()

assert dummy.mock.call_count == 1
assert k8s_mocked.patch_obj.call_count == 0
assert k8s_mocked.sleep_or_wait.call_count == 1 # one for each retry
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] is None

assert_logs([
"Daemon 'fn' failed permanently: boo!",
"Daemon 'fn' has exited on its own",
], prohibited=[
"Daemon 'fn' succeeded.",
])


async def test_daemon_stopped_on_arbitrary_errors_with_mode_permanent(
registry, resource, dummy, manual_time,
caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
errors=kopf.ErrorsMode.PERMANENT, backoff=0.01)
async def fn(**kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
raise Exception("boo!")

event_object = {'metadata': {'finalizers': [FINALIZER]}}
await simulate_cycle(event_object)

await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()

assert dummy.mock.call_count == 1
assert k8s_mocked.sleep_or_wait.call_count == 1 # one for each retry
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] is None

assert_logs([
"Daemon 'fn' failed with an exception. Will stop.",
"Daemon 'fn' has exited on its own",
], prohibited=[
"Daemon 'fn' succeeded.",
])


async def test_daemon_retried_on_temporary_error(
registry, settings, resource, dummy, manual_time,
caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
backoff=1.0)
async def fn(retry, **kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
if not retry:
raise kopf.TemporaryError("boo!", delay=1.0)
else:
dummy.steps['finish'].set()

event_object = {'metadata': {'finalizers': [FINALIZER]}}
await simulate_cycle(event_object)

await dummy.steps['called'].wait()
await dummy.steps['finish'].wait()
await dummy.wait_for_daemon_done()

assert k8s_mocked.sleep_or_wait.call_count == 2 # one for each retry
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep_or_wait.call_args_list[1][0][0] is None

assert_logs([
"Daemon 'fn' failed temporarily: boo!",
"Daemon 'fn' succeeded.",
"Daemon 'fn' has exited on its own",
])


async def test_daemon_retried_on_arbitrary_error_with_mode_temporary(
registry, resource, dummy,
caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
errors=kopf.ErrorsMode.TEMPORARY, backoff=1.0)
async def fn(retry, **kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()
if not retry:
raise Exception("boo!")
else:
dummy.steps['finish'].set()

event_object = {'metadata': {'finalizers': [FINALIZER]}}
await simulate_cycle(event_object)

await dummy.steps['called'].wait()
await dummy.steps['finish'].wait()
await dummy.wait_for_daemon_done()

assert k8s_mocked.sleep_or_wait.call_count == 2 # one for each retry
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep_or_wait.call_args_list[1][0][0] is None

assert_logs([
"Daemon 'fn' failed with an exception. Will retry.",
"Daemon 'fn' succeeded.",
"Daemon 'fn' has exited on its own",
])


async def test_daemon_retried_until_retries_limit(
registry, resource, dummy,
caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
retries=3)
async def fn(**kwargs):
dummy.kwargs = kwargs
dummy.steps['called'].set()
raise kopf.TemporaryError("boo!", delay=1.0)

await simulate_cycle({})
await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()

assert k8s_mocked.sleep_or_wait.call_count == 4 # one for each retry
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep_or_wait.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep_or_wait.call_args_list[2][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep_or_wait.call_args_list[3][0][0] is None


async def test_daemon_retried_until_timeout(
registry, resource, dummy,
caplog, assert_logs, k8s_mocked, simulate_cycle, manual_time):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
timeout=3.0)
async def fn(**kwargs):
dummy.kwargs = kwargs
dummy.steps['called'].set()
raise kopf.TemporaryError("boo!", delay=1.0)

await simulate_cycle({})
await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()

assert k8s_mocked.sleep_or_wait.call_count == 4 # one for each retry
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep_or_wait.call_args_list[1][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep_or_wait.call_args_list[2][0][0] == 1.0 # [call#][args/kwargs][arg#]
assert k8s_mocked.sleep_or_wait.call_args_list[3][0][0] is None
62 changes: 62 additions & 0 deletions tests/handling/daemons/test_daemon_filtration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging

import pytest

import kopf
from kopf.storage.finalizers import FINALIZER


# We assume that the handler filtering is tested in details elsewhere (for all handlers).
# Here, we only test if it is applied or not applied.


async def test_daemon_filtration_satisfied(
registry, resource, dummy,
caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT},
annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT})
async def fn(**kwargs):
dummy.kwargs = kwargs
dummy.steps['called'].set()

event_body = {'metadata': {'labels': {'a': 'value', 'b': '...'},
'annotations': {'x': 'value', 'y': '...'},
'finalizers': [FINALIZER]}}
await simulate_cycle(event_body)

await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()


@pytest.mark.parametrize('labels, annotations', [
# Annotations mismatching (but labels are matching):
({'a': 'value', 'b': '...'}, {'x': 'mismatching-value', 'b': '...'}, ), # x must be "value".
({'a': 'value', 'b': '...'}, {'x': 'value', 'y': '...', 'z': '...'}), # z must be absent
({'a': 'value', 'b': '...'}, {'x': 'value'}), # y must be present
# labels mismatching (but annotations are matching):
({'a': 'mismatching-value', 'b': '...'}, {'x': 'value', 'y': '...'}),
({'a': 'value', 'b': '...', 'c': '...'}, {'x': 'value', 'y': '...'}),
({'a': 'value'}, {'x': 'value', 'y': '...'}),
])
async def test_daemon_filtration_mismatched(
registry, resource, mocker, labels, annotations,
caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)
spawn_resource_daemons = mocker.patch('kopf.reactor.daemons.spawn_resource_daemons')

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
labels={'a': 'value', 'b': kopf.PRESENT, 'c': kopf.ABSENT},
annotations={'x': 'value', 'y': kopf.PRESENT, 'z': kopf.ABSENT})
async def fn(**kwargs):
pass

event_body = {'metadata': {'labels': labels,
'annotations': annotations,
'finalizers': [FINALIZER]}}
await simulate_cycle(event_body)

assert spawn_resource_daemons.called
assert spawn_resource_daemons.call_args_list[0][1]['handlers'] == []
44 changes: 44 additions & 0 deletions tests/handling/daemons/test_daemon_spawning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging

import kopf


async def test_daemon_is_spawned_at_least_once(
registry, resource, dummy,
caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn')
async def fn(**kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()

await simulate_cycle({})

await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()

assert dummy.mock.call_count == 1 # not restarted


async def test_daemon_initial_backoff_obeyed(
registry, resource, dummy,
caplog, assert_logs, k8s_mocked, simulate_cycle):
caplog.set_level(logging.DEBUG)

@kopf.daemon(resource.group, resource.version, resource.plural, registry=registry, id='fn',
initial_backoff=1.0)
async def fn(**kwargs):
dummy.mock()
dummy.kwargs = kwargs
dummy.steps['called'].set()

await simulate_cycle({})

await dummy.steps['called'].wait()
await dummy.wait_for_daemon_done()

assert k8s_mocked.sleep_or_wait.call_count >= 1
assert k8s_mocked.sleep_or_wait.call_count <= 2 # one optional extra call for sleep(None)
assert k8s_mocked.sleep_or_wait.call_args_list[0][0][0] == 1.0 # [call#][args/kwargs][arg#]
Loading

0 comments on commit 2eb91dc

Please sign in to comment.