diff --git a/kopf/reactor/causation.py b/kopf/reactor/causation.py index c31b1fb8..7ef4118a 100644 --- a/kopf/reactor/causation.py +++ b/kopf/reactor/causation.py @@ -67,6 +67,7 @@ class Cause(NamedTuple): initial: bool body: MutableMapping patch: MutableMapping + digest: Union[str, int] diff: Optional[diffs.Diff] = None old: Optional[Any] = None new: Optional[Any] = None @@ -105,6 +106,7 @@ def detect_cause( initial=initial, **kwargs) + # Marked for deletion, but we still hold it with our finalizer. if finalizers.is_deleted(body): return Cause( event=DELETE, @@ -136,6 +138,7 @@ def detect_cause( # For an object seen for the first time (i.e. just-created), call the creation handlers, # then mark the state as if it was seen when the creation has finished. if not lastseen.has_state(body): + kwargs['digest'] = True # or any other true'ish constant (str/int) return Cause( event=CREATE, body=body, diff --git a/kopf/reactor/handling.py b/kopf/reactor/handling.py index ebe49b62..80b594a7 100644 --- a/kopf/reactor/handling.py +++ b/kopf/reactor/handling.py @@ -109,6 +109,7 @@ async def custom_object_handler( if registry.has_cause_handlers(resource=resource): extra_fields = registry.get_extra_fields(resource=resource) old, new, diff = lastseen.get_state_diffs(body=body, extra_fields=extra_fields) + digest = lastseen.compute_digest(body=body, extra_fields=extra_fields) cause = causation.detect_cause( event=event, resource=resource, @@ -117,6 +118,7 @@ async def custom_object_handler( old=old, new=new, diff=diff, + digest=digest, requires_finalizer=registry.requires_finalizer(resource=resource, body=body), ) delay = await handle_cause(lifecycle=lifecycle, registry=registry, cause=cause) @@ -230,6 +232,13 @@ async def handle_cause( logger.debug("Removing the finalizer, thus allowing the actual deletion.") finalizers.remove_finalizers(body=body, patch=patch) + # Only for creation: freeze the last-seen state as it was in the beginning, + # so that an update cycle is triggered if there were changes during the creation cycle. + # Otherwise, the changes will be ignored, as they are included into the last-minute state. + if cause.event == causation.CREATE and not (done or skip): + extra_fields = registry.get_extra_fields(resource=cause.resource) + lastseen.freeze_state(body=body, patch=patch, extra_fields=extra_fields) + # Informational causes just print the log lines. if cause.event == causation.GONE: logger.debug("Deleted, really deleted, and we are notified.") @@ -356,9 +365,9 @@ async def _execute( logger = cause.logger # Filter and select the handlers to be executed right now, on this event reaction cycle. - handlers_done = [h for h in handlers if status.is_finished(body=cause.body, handler=h)] - handlers_wait = [h for h in handlers if status.is_sleeping(body=cause.body, handler=h)] - handlers_todo = [h for h in handlers if status.is_awakened(body=cause.body, handler=h)] + handlers_done = [h for h in handlers if status.is_finished(body=cause.body, digest=cause.digest, handler=h)] + handlers_wait = [h for h in handlers if status.is_sleeping(body=cause.body, digest=cause.digest, handler=h)] + handlers_todo = [h for h in handlers if status.is_awakened(body=cause.body, digest=cause.digest, handler=h)] handlers_plan = [h for h in await invocation.invoke(lifecycle, handlers_todo, cause=cause)] handlers_left = [h for h in handlers_todo if h.id not in {h.id for h in handlers_plan}] @@ -394,42 +403,49 @@ async def _execute( # Unfinished children cause the regular retry, but with less logging and event reporting. except HandlerChildrenRetry as e: logger.debug(f"Handler {handler.id!r} has unfinished sub-handlers. Will retry soon.") - status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay) + status.set_retry_time(body=cause.body, patch=cause.patch, + handler=handler, delay=e.delay) handlers_left.append(handler) # Definitely a temporary error, regardless of the error strictness. except TemporaryError as e: logger.error(f"Handler {handler.id!r} failed temporarily: %s", str(e) or repr(e)) - status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay) + status.set_retry_time(body=cause.body, patch=cause.patch, + handler=handler, delay=e.delay) handlers_left.append(handler) # Same as permanent errors below, but with better logging for our internal cases. except HandlerTimeoutError as e: logger.error(f"%s", str(e) or repr(e)) # already formatted - status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) + status.store_failure(body=cause.body, patch=cause.patch, digest=cause.digest, + handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? # Definitely a permanent error, regardless of the error strictness. except PermanentError as e: logger.error(f"Handler {handler.id!r} failed permanently: %s", str(e) or repr(e)) - status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) + status.store_failure(body=cause.body, patch=cause.patch, digest=cause.digest, + handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? # Regular errors behave as either temporary or permanent depending on the error strictness. except Exception as e: if retry_on_errors: logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.") - status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY) + status.set_retry_time(body=cause.body, patch=cause.patch, + handler=handler, delay=DEFAULT_RETRY_DELAY) handlers_left.append(handler) else: logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.") - status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e) + status.store_failure(body=cause.body, patch=cause.patch, digest=cause.digest, + handler=handler, exc=e) # TODO: report the handling failure somehow (beside logs/events). persistent status? # No errors means the handler should be excluded from future runs in this reaction cycle. else: logger.info(f"Handler {handler.id!r} succeeded.") - status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result) + status.store_success(body=cause.body, patch=cause.patch, digest=cause.digest, + handler=handler, result=result) # Provoke the retry of the handling cycle if there were any unfinished handlers, # either because they were not selected by the lifecycle, or failed and need a retry. diff --git a/kopf/structs/lastseen.py b/kopf/structs/lastseen.py index d8f362a1..3d56808f 100644 --- a/kopf/structs/lastseen.py +++ b/kopf/structs/lastseen.py @@ -14,6 +14,7 @@ """ import copy +import hashlib import json from kopf.structs import dicts @@ -98,5 +99,29 @@ def retreive_state(body): def refresh_state(*, body, patch, extra_fields=None): + frozen_state = body.get('status', {}).get('kopf', {}).get('frozen-state') + stored_state = retreive_state(body) + actual_state = get_state(body, extra_fields=extra_fields) + if stored_state is None or stored_state != actual_state: + annotations = patch.setdefault('metadata', {}).setdefault('annotations', {}) + annotations[LAST_SEEN_ANNOTATION] = frozen_state or json.dumps(actual_state) + if frozen_state is not None: + storage = patch.setdefault('status', {}).setdefault('kopf', {}) + storage['frozen-state'] = None + + +def freeze_state(*, body, patch, extra_fields=None): + frozen_state = body.get('status', {}).get('kopf', {}).get('frozen-state') + actual_state = get_state(body, extra_fields=extra_fields) + if frozen_state is None: + storage = patch.setdefault('status', {}).setdefault('kopf', {}) + storage['frozen-state'] = json.dumps(actual_state) + + +def compute_digest(body, extra_fields=None): state = get_state(body, extra_fields=extra_fields) - patch.setdefault('metadata', {}).setdefault('annotations', {})[LAST_SEEN_ANNOTATION] = json.dumps(state) + + # Any digest with a short str/int result is sufficient. Even CRC. No security is needed. + hash = hashlib.md5() + hash.update(json.dumps(state).encode('utf-8')) + return hash.hexdigest() diff --git a/kopf/structs/status.py b/kopf/structs/status.py index 305bd061..e496e324 100644 --- a/kopf/structs/status.py +++ b/kopf/structs/status.py @@ -20,11 +20,11 @@ handler1: started: 2018-12-31T23:59:59,999999 stopped: 2018-01-01T12:34:56,789000 - success: true + success: abcdef1234567890fedcba handler2: started: 2018-12-31T23:59:59,999999 stopped: 2018-01-01T12:34:56,789000 - failure: true + failure: abcdef1234567890fedcba message: "Error message." handler3: started: 2018-12-31T23:59:59,999999 @@ -37,16 +37,46 @@ handler3/sub2: started: 2018-12-31T23:59:59,999999 -* ``status.kopf.success`` are the handlers that succeeded (no re-execution). -* ``status.kopf.failure`` are the handlers that failed completely (no retries). -* ``status.kopf.delayed`` are the timestamps, until which these handlers sleep. -* ``status.kopf.retries`` are number of retries for succeeded, failed, - and for the progressing handlers. +* ``status.kopf.progress`` stores the state of each individual handler in the + current handling cycle (by their ids, which are usually the function names). + +For each handler's status, the following is stored: + +* ``started``: when the handler was attempted for the first time (used for timeouts & durations). +* ``stopped``: when the handler has failed or succeeded. +* ``delayed``: when the handler can retry again. +* ``retries``: the number of retried attempted so far (including reruns and successful attempts). +* ``success``: a digest of where the handler has succeeded (and thus no retries are needed). +* ``failure``: a digest of where the handler has failed completely (no retries will be done). +* ``message``: a brief error message from the last exception (as a hint). When the full event cycle is executed (possibly including multiple re-runs), the whole ``status.kopf`` section is purged. The life-long persistence of status is not intended: otherwise, multiple distinct causes will clutter the status and collide with each other (especially critical for multiple updates). + +The digest of each handler's success or failure can be considered a "version" +of an object being handled, as it was when the handler has finished. +If the object is changed during the remaining handling cycle, the digest +of the finished handlers will be mismatching the actual digest of the object, +and so they will be re-executed. + +This is conceptually close to *reconciliation*: the handling is finished +only when all handlers are executed on the latest state of the object. + +Creation is treated specially: the creation handlers will never be re-executed. +In case of changes during the creation handling, the remaining creation handlers +will get the new state (as normally), and then there will be an update cycle +with all the changes since the first creation handler -- i.e. not from the last +handler as usually, when the last-seen state is updated. + +Update handlers are assumed to be idempotent by concept, so it should be safe +to call them with the changes that are already reflected in the system by some +of the creation handlers. + +Note: The Kubernetes-provided "resource version" of the object is not used, +as it increases with every change of the object, while this digest is used +only for the changes relevant to the operator and framework (see `get_state`). """ import collections.abc @@ -59,23 +89,24 @@ def is_started(*, body, handler): return handler.id in progress -def is_sleeping(*, body, handler): +def is_sleeping(*, body, digest, handler): ts = get_awake_time(body=body, handler=handler) - finished = is_finished(body=body, handler=handler) + finished = is_finished(body=body, digest=digest, handler=handler) return not finished and ts is not None and ts > datetime.datetime.utcnow() -def is_awakened(*, body, handler): - finished = is_finished(body=body, handler=handler) - sleeping = is_sleeping(body=body, handler=handler) +def is_awakened(*, body, digest, handler): + finished = is_finished(body=body, digest=digest, handler=handler) + sleeping = is_sleeping(body=body, digest=digest, handler=handler) return bool(not finished and not sleeping) -def is_finished(*, body, handler): +def is_finished(*, body, digest, handler): progress = body.get('status', {}).get('kopf', {}).get('progress', {}) success = progress.get(handler.id, {}).get('success', None) failure = progress.get(handler.id, {}).get('failure', None) - return bool(success or failure) + return ((success is not None and (success is True or success == digest)) or + (failure is not None and (failure is True or failure == digest))) def get_start_time(*, body, patch, handler): @@ -126,23 +157,23 @@ def set_retry_time(*, body, patch, handler, delay=None): set_awake_time(body=body, patch=patch, handler=handler, delay=delay) -def store_failure(*, body, patch, handler, exc): +def store_failure(*, body, patch, digest, handler, exc): retry = get_retry_count(body=body, handler=handler) progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) progress.setdefault(handler.id, {}).update({ 'stopped': datetime.datetime.utcnow().isoformat(), - 'failure': True, + 'failure': digest, 'retries': retry + 1, 'message': f'{exc}', }) -def store_success(*, body, patch, handler, result=None): +def store_success(*, body, patch, digest, handler, result=None): retry = get_retry_count(body=body, handler=handler) progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {}) progress.setdefault(handler.id, {}).update({ 'stopped': datetime.datetime.utcnow().isoformat(), - 'success': True, + 'success': digest, 'retries': retry + 1, 'message': None, }) diff --git a/tests/basic-structs/test_cause.py b/tests/basic-structs/test_cause.py index 52aeb5c7..37cd997a 100644 --- a/tests/basic-structs/test_cause.py +++ b/tests/basic-structs/test_cause.py @@ -15,6 +15,7 @@ def test_all_args(mocker): initial = mocker.Mock() body = mocker.Mock() patch = mocker.Mock() + digest = mocker.Mock() diff = mocker.Mock() old = mocker.Mock() new = mocker.Mock() @@ -25,6 +26,7 @@ def test_all_args(mocker): initial=initial, body=body, patch=patch, + digest=digest, diff=diff, old=old, new=new, @@ -35,6 +37,7 @@ def test_all_args(mocker): assert cause.initial is initial assert cause.body is body assert cause.patch is patch + assert cause.digest is digest assert cause.diff is diff assert cause.old is old assert cause.new is new @@ -47,6 +50,7 @@ def test_required_args(mocker): initial = mocker.Mock() body = mocker.Mock() patch = mocker.Mock() + digest = mocker.Mock() cause = Cause( resource=resource, logger=logger, @@ -54,6 +58,7 @@ def test_required_args(mocker): initial=initial, body=body, patch=patch, + digest=digest, ) assert cause.resource is resource assert cause.logger is logger @@ -61,6 +66,7 @@ def test_required_args(mocker): assert cause.initial is initial assert cause.body is body assert cause.patch is patch + assert cause.digest is digest assert cause.diff is None assert cause.old is None assert cause.new is None diff --git a/tests/causation/test_detection.py b/tests/causation/test_detection.py index b4bfcdf5..521cc0c5 100644 --- a/tests/causation/test_detection.py +++ b/tests/causation/test_detection.py @@ -117,6 +117,7 @@ def kwargs(): resource=object(), logger=object(), patch=object(), + digest=object(), ) def check_kwargs(cause, kwargs): @@ -124,6 +125,7 @@ def check_kwargs(cause, kwargs): assert cause.resource is kwargs['resource'] assert cause.logger is kwargs['logger'] assert cause.patch is kwargs['patch'] + assert cause.digest is kwargs['digest'] or cause.digest is True # diff --git a/tests/handling/conftest.py b/tests/handling/conftest.py index eb00bddd..06794dc7 100644 --- a/tests/handling/conftest.py +++ b/tests/handling/conftest.py @@ -190,12 +190,14 @@ def new_detect_fn(**kwargs): original_diff = kwargs.pop('diff', None) original_new = kwargs.pop('new', None) original_old = kwargs.pop('old', None) + original_digest = kwargs.pop('digest', None) event = mock.event if mock.event is not None else original_event initial = bool(event == RESUME) body = copy.deepcopy(mock.body) if mock.body is not None else original_body diff = copy.deepcopy(mock.diff) if mock.diff is not None else original_diff new = copy.deepcopy(mock.new) if mock.new is not None else original_new old = copy.deepcopy(mock.old) if mock.old is not None else original_old + digest = copy.deepcopy(mock.digest) if mock.digest is not None else original_digest # Remove requires_finalizer from kwargs as it shouldn't be passed to the Cause object kwargs.pop('requires_finalizer', None) @@ -209,6 +211,7 @@ def new_detect_fn(**kwargs): diff=diff, new=new, old=old, + digest=digest, **kwargs) # Needed for the k8s-event creation, as they are attached to objects. @@ -224,10 +227,11 @@ def new_detect_fn(**kwargs): mocker.patch('kopf.reactor.causation.detect_cause', new=new_detect_fn) # The mock object stores some values later used by the factory substitute. - mock = mocker.Mock(spec_set=['event', 'body', 'diff', 'new', 'old']) + mock = mocker.Mock(spec_set=['event', 'body', 'diff', 'new', 'old', 'digest']) mock.event = None mock.body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}} mock.diff = None mock.new = None mock.old = None + mock.digest = None return mock diff --git a/tests/handling/test_delays.py b/tests/handling/test_delays.py index 3a651a4a..3c0e1595 100644 --- a/tests/handling/test_delays.py +++ b/tests/handling/test_delays.py @@ -1,4 +1,5 @@ import asyncio +import json import logging import freezegun @@ -69,12 +70,15 @@ async def test_delayed_handlers_sleep( cause_mock.event = cause_type cause_mock.body.update({ - 'status': {'kopf': {'progress': { - 'create_fn': {'delayed': ts}, - 'update_fn': {'delayed': ts}, - 'delete_fn': {'delayed': ts}, - 'resume_fn': {'delayed': ts}, - }}} + 'status': {'kopf': { + 'frozen-state': json.dumps({'spec': {}}), # to prevent re-adding it + 'progress': { + 'create_fn': {'delayed': ts}, + 'update_fn': {'delayed': ts}, + 'delete_fn': {'delayed': ts}, + 'resume_fn': {'delayed': ts}, + }, + }} }) # make sure the finalizer is added since there are mandatory deletion handlers cause_mock.body.setdefault('metadata', {})['finalizers'] = [FINALIZER] @@ -97,7 +101,7 @@ async def test_delayed_handlers_sleep( # The dummy patch is needed to trigger the further changes. The value is irrelevant. assert k8s_mocked.patch_obj.called - assert 'dummy' in k8s_mocked.patch_obj.call_args_list[-1][1]['patch']['status']['kopf'] + assert k8s_mocked.patch_obj.call_args_list[-1][1]['patch'] # not empty, maybe with ['dummy'] # The duration of sleep should be as expected. assert k8s_mocked.sleep_or_wait.called diff --git a/tests/handling/test_errors.py b/tests/handling/test_errors.py index fa0e3e81..6ce8dde1 100644 --- a/tests/handling/test_errors.py +++ b/tests/handling/test_errors.py @@ -43,7 +43,7 @@ async def test_fatal_error_stops_handler( patch = k8s_mocked.patch_obj.call_args_list[0][1]['patch'] assert patch['status']['kopf']['progress'] is not None - assert patch['status']['kopf']['progress'][name1]['failure'] is True + assert patch['status']['kopf']['progress'][name1]['failure'] # evals to true assert patch['status']['kopf']['progress'][name1]['message'] == 'oops' assert_logs([ diff --git a/tests/handling/test_multistep.py b/tests/handling/test_multistep.py index 6db1135d..59349147 100644 --- a/tests/handling/test_multistep.py +++ b/tests/handling/test_multistep.py @@ -38,7 +38,7 @@ async def test_1st_step_stores_progress_by_patching( assert patch['status']['kopf']['progress'] is not None assert patch['status']['kopf']['progress'][name1]['retries'] == 1 - assert patch['status']['kopf']['progress'][name1]['success'] is True + assert patch['status']['kopf']['progress'][name1]['success'] # evals to true assert 'retries' not in patch['status']['kopf']['progress'][name2] assert 'success' not in patch['status']['kopf']['progress'][name2] diff --git a/tests/handling/test_timeouts.py b/tests/handling/test_timeouts.py index 32e1f88f..840c97e9 100644 --- a/tests/handling/test_timeouts.py +++ b/tests/handling/test_timeouts.py @@ -53,7 +53,7 @@ async def test_timed_out_handler_fails( patch = k8s_mocked.patch_obj.call_args_list[0][1]['patch'] assert patch['status']['kopf']['progress'] is not None - assert patch['status']['kopf']['progress'][name1]['failure'] is True + assert patch['status']['kopf']['progress'][name1]['failure'] # evals to true assert_logs([ "Handler .+ has timed out after", diff --git a/tests/test_lastseen.py b/tests/test_lastseen.py index 03fd00db..7221fbf1 100644 --- a/tests/test_lastseen.py +++ b/tests/test_lastseen.py @@ -3,9 +3,10 @@ import pytest from kopf.structs.lastseen import LAST_SEEN_ANNOTATION -from kopf.structs.lastseen import has_state, get_state +from kopf.structs.lastseen import compute_digest from kopf.structs.lastseen import get_state_diffs -from kopf.structs.lastseen import retreive_state, refresh_state +from kopf.structs.lastseen import has_state, get_state +from kopf.structs.lastseen import retreive_state, refresh_state, freeze_state def test_annotation_is_fqdn(): @@ -107,14 +108,48 @@ def test_get_state_clones_body(): assert state['spec']['depth']['field'] == 'x' -def test_refresh_state(): +def test_refresh_state_patches_when_absent(): body = {'spec': {'depth': {'field': 'x'}}} + encoded = json.dumps(body) # json formatting can vary across interpreters patch = {} + refresh_state(body=body, patch=patch) + assert patch['metadata']['annotations'][LAST_SEEN_ANNOTATION] == encoded + + +def test_refresh_state_patches_when_present_and_is_different(): + body = {'spec': {'depth': {'field': 'x'}}} encoded = json.dumps(body) # json formatting can vary across interpreters + body['metadata'] = {'annotations': {LAST_SEEN_ANNOTATION: '{}'}} + patch = {} refresh_state(body=body, patch=patch) assert patch['metadata']['annotations'][LAST_SEEN_ANNOTATION] == encoded +def test_refresh_state_ignores_when_present_and_is_the_same(): + body = {'spec': {'depth': {'field': 'x'}}} + encoded = json.dumps(body) # json formatting can vary across interpreters + body['metadata'] = {'annotations': {LAST_SEEN_ANNOTATION: encoded}} + patch = {} + refresh_state(body=body, patch=patch) + assert not patch + + +def test_freeze_state_patches_when_absent(): + body = {'spec': {'depth': {'field': 'x'}}} + encoded = json.dumps(body) # json formatting can vary across interpreters + patch = {} + freeze_state(body=body, patch=patch) + assert patch['status']['kopf']['frozen-state'] == encoded + + +def test_freeze_state_ignores_when_present(): + body = {'spec': {'depth': {'field': 'x'}}, + 'status': {'kopf': {'frozen-state': '{}'}}} + patch = {} + freeze_state(body=body, patch=patch) + assert not patch + + def test_retreive_state_when_present(): data = {'spec': {'depth': {'field': 'x'}}} encoded = json.dumps(data) # json formatting can vary across interpreters @@ -129,6 +164,18 @@ def test_retreive_state_when_absent(): assert state is None +def test_compute_digest(): + body1 = {'spec': {'depth': {'field': 'x'}}} + body2 = {'spec': {'depth': {'field': 'x'}}} + digest1 = compute_digest(body=body1) + digest2 = compute_digest(body=body2) + assert isinstance(digest1, (str, int)) + assert isinstance(digest2, (str, int)) + assert digest1 # evals to true + assert digest2 # evals to true + assert digest1 == digest2 + + def test_state_changed_detected(): data = {'spec': {'depth': {'field': 'x'}}} encoded = json.dumps(data) # json formatting can vary across interpreters diff --git a/tests/test_progress.py b/tests/test_progress.py index 2d3bd9a9..357ca2b3 100644 --- a/tests/test_progress.py +++ b/tests/test_progress.py @@ -53,96 +53,215 @@ def test_is_started(handler, expected, body): assert body == origbody # not modified -@pytest.mark.parametrize('expected, body', [ - (False, {}), - (False, {'status': {}}), - (False, {'status': {'kopf': {}}}), - (False, {'status': {'kopf': {'progress': {}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'success': False}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'failure': False}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'success': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'failure': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'success': True}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'failure': True}}}}}), +@pytest.mark.parametrize('body', [ + {}, + {'status': {}}, + {'status': {'kopf': {}}}, + {'status': {'kopf': {'progress': {}}}}, + {'status': {'kopf': {'progress': {'some-id': {}}}}}, ]) -def test_is_finished(handler, expected, body): +def test_is_finished_with_partial_status_remains_readonly(handler, body): origbody = copy.deepcopy(body) - result = is_finished(body=body, handler=handler) - assert result == expected + result = is_finished(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert not result assert body == origbody # not modified -@pytest.mark.parametrize('expected, body', [ +@pytest.mark.parametrize('finish_value', [None, False, 'bad']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +def test_is_finished_when_not_finished(handler, finish_field, finish_value): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_finished(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert not result + + +@pytest.mark.parametrize('finish_value', [True, 'good']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +def test_is_finished_when_finished(handler, finish_field, finish_value): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_finished(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert result - # Everything that is finished is not sleeping, no matter the sleep/awake field. - (False, {'status': {'kopf': {'progress': {'some-id': {'success': True}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'failure': True}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'success': True, 'delayed': TS0_ISO}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'failure': True, 'delayed': TS0_ISO}}}}}), - - # Everything with no sleep/awake field set is not sleeping either. - (False, {'status': {'kopf': {'progress': {'some-id': {}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'success': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'failure': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'success': None, 'delayed': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'failure': None, 'delayed': None}}}}}), - - # When not finished and has awake time, the output depends on the relation to "now". - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TS0_ISO}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TS0_ISO, 'success': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TS0_ISO, 'failure': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSB_ISO}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSB_ISO, 'success': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSB_ISO, 'failure': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO, 'success': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO, 'failure': None}}}}}), + +@pytest.mark.parametrize('body', [ + {}, + {'status': {}}, + {'status': {'kopf': {}}}, + {'status': {'kopf': {'progress': {}}}}, + {'status': {'kopf': {'progress': {'some-id': {}}}}}, ]) -@freezegun.freeze_time(TS0) -def test_is_sleeping(handler, expected, body): +def test_is_sleeping_with_partial_status_remains_readonly(handler, body): origbody = copy.deepcopy(body) - result = is_sleeping(body=body, handler=handler) - assert result == expected + result = is_finished(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert not result assert body == origbody # not modified -@pytest.mark.parametrize('expected, body', [ +@pytest.mark.parametrize('finish_value', [True, 'good']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +@pytest.mark.parametrize('delayed_body', [ + pytest.param({}, id='delayed-empty'), + pytest.param({'delayed': None}, id='delayed-none'), + pytest.param({'delayed': TSB_ISO}, id='delayed-before'), + pytest.param({'delayed': TS0_ISO}, id='delayed-exact'), + pytest.param({'delayed': TS1_ISO}, id='delayed-onesec'), + pytest.param({'delayed': TSA_ISO}, id='delayed-after'), +]) +@freezegun.freeze_time(TS0) +def test_is_sleeping_when_finished_regardless_of_delay( + handler, finish_field, finish_value, delayed_body): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'].update(delayed_body) + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_sleeping(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert not result + + +@pytest.mark.parametrize('finish_value', [None, False, 'bad']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +@pytest.mark.parametrize('delayed_body', [ + pytest.param({}, id='delayed-empty'), + pytest.param({'delayed': None}, id='delayed-none'), +]) +@freezegun.freeze_time(TS0) +def test_is_sleeping_when_not_finished_and_not_delayed( + handler, delayed_body, finish_field, finish_value): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'].update(delayed_body) + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_sleeping(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert not result + + +@pytest.mark.parametrize('finish_value', [None, False, 'bad']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +@pytest.mark.parametrize('delayed_body', [ + pytest.param({'delayed': TSB_ISO}, id='delayed-before'), + pytest.param({'delayed': TS0_ISO}, id='delayed-exact'), +]) +@freezegun.freeze_time(TS0) +def test_is_sleeping_when_not_finished_and_delayed_until_before_now( + handler, finish_field, finish_value, delayed_body): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'].update(delayed_body) + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_sleeping(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert not result + + +@pytest.mark.parametrize('finish_value', [None, False, 'bad']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +@pytest.mark.parametrize('delayed_body', [ + pytest.param({'delayed': TS1_ISO}, id='delayed-onesec'), + pytest.param({'delayed': TSA_ISO}, id='delayed-after'), +]) +@freezegun.freeze_time(TS0) +def test_is_sleeping_when_not_finished_and_delayed_until_after_now( + handler, finish_field, finish_value, delayed_body): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'].update(delayed_body) + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_sleeping(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert result - # Everything that is finished never awakens, no matter the sleep/awake field. - (False, {'status': {'kopf': {'progress': {'some-id': {'success': True}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'failure': True}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'success': True, 'delayed': TS0_ISO}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'failure': True, 'delayed': TS0_ISO}}}}}), - # Everything with no sleep/awake field is not sleeping, thus by definition is awake. - (True , {'status': {'kopf': {'progress': {'some-id': {}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'success': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'failure': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'success': None, 'delayed': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'failure': None, 'delayed': None}}}}}), - - # When not finished and has awake time, the output depends on the relation to "now". - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TS0_ISO}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TS0_ISO, 'success': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TS0_ISO, 'failure': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TSB_ISO}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TSB_ISO, 'success': None}}}}}), - (True , {'status': {'kopf': {'progress': {'some-id': {'delayed': TSB_ISO, 'failure': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO, 'success': None}}}}}), - (False, {'status': {'kopf': {'progress': {'some-id': {'delayed': TSA_ISO, 'failure': None}}}}}), +@pytest.mark.parametrize('body', [ + {}, + {'status': {}}, + {'status': {'kopf': {}}}, + {'status': {'kopf': {'progress': {}}}}, + {'status': {'kopf': {'progress': {'some-id': {}}}}}, ]) -@freezegun.freeze_time(TS0) -def test_is_awakened(handler, expected, body): +def test_is_awakened_with_partial_status_remains_readonly(handler, body): origbody = copy.deepcopy(body) - result = is_awakened(body=body, handler=handler) - assert result == expected + result = is_awakened(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert result assert body == origbody # not modified +@pytest.mark.parametrize('finish_value', [True, 'good']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +@pytest.mark.parametrize('delayed_body', [ + pytest.param({}, id='delayed-empty'), + pytest.param({'delayed': None}, id='delayed-none'), + pytest.param({'delayed': TSB_ISO}, id='delayed-before'), + pytest.param({'delayed': TS0_ISO}, id='delayed-exact'), + pytest.param({'delayed': TS1_ISO}, id='delayed-onesec'), + pytest.param({'delayed': TSA_ISO}, id='delayed-after'), +]) +@freezegun.freeze_time(TS0) +def test_is_awakened_when_finished_regardless_of_delay( + handler, finish_field, finish_value, delayed_body): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'].update(delayed_body) + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_awakened(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert not result + + +@pytest.mark.parametrize('finish_value', [None, False, 'bad']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +@pytest.mark.parametrize('delayed_body', [ + pytest.param({}, id='delayed-empty'), + pytest.param({'delayed': None}, id='delayed-none'), +]) +@freezegun.freeze_time(TS0) +def test_is_awakened_when_not_finished_and_not_delayed( + handler, delayed_body, finish_field, finish_value): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'].update(delayed_body) + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_awakened(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert result + + +@pytest.mark.parametrize('finish_value', [None, False, 'bad']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +@pytest.mark.parametrize('delayed_body', [ + pytest.param({'delayed': TSB_ISO}, id='delayed-before'), + pytest.param({'delayed': TS0_ISO}, id='delayed-exact'), +]) +@freezegun.freeze_time(TS0) +def test_is_awakened_when_not_finished_and_delayed_until_before_now( + handler, finish_field, finish_value, delayed_body): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'].update(delayed_body) + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_awakened(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert result + + +@pytest.mark.parametrize('finish_value', [None, False, 'bad']) +@pytest.mark.parametrize('finish_field', ['failure', 'success']) +@pytest.mark.parametrize('delayed_body', [ + pytest.param({'delayed': TS1_ISO}, id='delayed-onesec'), + pytest.param({'delayed': TSA_ISO}, id='delayed-after'), +]) +@freezegun.freeze_time(TS0) +def test_is_awakened_when_not_finished_and_delayed_until_after_now( + handler, finish_field, finish_value, delayed_body): + body = {'status': {'kopf': {'progress': {'some-id': {}}}}} + body['status']['kopf']['progress']['some-id'].update(delayed_body) + body['status']['kopf']['progress']['some-id'][finish_field] = finish_value + result = is_awakened(body=body, digest='good', handler=handler) + assert isinstance(result, bool) + assert not result + + @pytest.mark.parametrize('expected, body', [ (None, {}), (None, {'status': {}}), @@ -261,13 +380,13 @@ def test_set_retry_time(handler, expected, body, delay): @pytest.mark.parametrize('body, expected', [ ({}, {'status': {'kopf': {'progress': {'some-id': {'stopped': TS0_ISO, - 'failure': True, + 'failure': 'digest', 'retries': 1, 'message': 'some-error'}}}}}), ({'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}}, {'status': {'kopf': {'progress': {'some-id': {'stopped': TS0_ISO, - 'failure': True, + 'failure': 'digest', 'retries': 6, 'message': 'some-error'}}}}}), ]) @@ -275,7 +394,8 @@ def test_set_retry_time(handler, expected, body, delay): def test_store_failure(handler, expected, body): origbody = copy.deepcopy(body) patch = {} - store_failure(body=body, patch=patch, handler=handler, exc=Exception("some-error")) + store_failure(body=body, patch=patch, digest='digest', + handler=handler, exc=Exception("some-error")) assert patch == expected assert body == origbody # not modified @@ -286,13 +406,13 @@ def test_store_failure(handler, expected, body): (None, {}, {'status': {'kopf': {'progress': {'some-id': {'stopped': TS0_ISO, - 'success': True, + 'success': 'digest', 'retries': 1, 'message': None}}}}}), (None, {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}}, {'status': {'kopf': {'progress': {'some-id': {'stopped': TS0_ISO, - 'success': True, + 'success': 'digest', 'retries': 6, 'message': None}}}}}), @@ -300,14 +420,14 @@ def test_store_failure(handler, expected, body): ({'field': 'value'}, {}, {'status': {'kopf': {'progress': {'some-id': {'stopped': TS0_ISO, - 'success': True, + 'success': 'digest', 'retries': 1, 'message': None}}}, 'some-id': {'field': 'value'}}}), ({'field': 'value'}, {'status': {'kopf': {'progress': {'some-id': {'retries': 5}}}}}, {'status': {'kopf': {'progress': {'some-id': {'stopped': TS0_ISO, - 'success': True, + 'success': 'digest', 'retries': 6, 'message': None}}}, 'some-id': {'field': 'value'}}}), @@ -316,7 +436,8 @@ def test_store_failure(handler, expected, body): def test_store_success(handler, expected, body, result): origbody = copy.deepcopy(body) patch = {} - store_success(body=body, patch=patch, handler=handler, result=result) + store_success(body=body, patch=patch, digest='digest', + handler=handler, result=result) assert patch == expected assert body == origbody # not modified