Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #163 from nolar/tracable-changes
Browse files Browse the repository at this point in the history
Ensure that all handlers get the latest state (eventually)
  • Loading branch information
nolar committed Aug 8, 2019
2 parents 4f4ddd4 + e59d6fa commit 8b9fbc1
Show file tree
Hide file tree
Showing 13 changed files with 382 additions and 123 deletions.
3 changes: 3 additions & 0 deletions kopf/reactor/causation.py
Expand Up @@ -67,6 +67,7 @@ class Cause(NamedTuple):
initial: bool
body: MutableMapping
patch: MutableMapping
digest: Union[str, int]
diff: Optional[diffs.Diff] = None
old: Optional[Any] = None
new: Optional[Any] = None
Expand Down Expand Up @@ -105,6 +106,7 @@ def detect_cause(
initial=initial,
**kwargs)

# Marked for deletion, but we still hold it with our finalizer.
if finalizers.is_deleted(body):
return Cause(
event=DELETE,
Expand Down Expand Up @@ -136,6 +138,7 @@ def detect_cause(
# For an object seen for the first time (i.e. just-created), call the creation handlers,
# then mark the state as if it was seen when the creation has finished.
if not lastseen.has_state(body):
kwargs['digest'] = True # or any other true'ish constant (str/int)
return Cause(
event=CREATE,
body=body,
Expand Down
36 changes: 26 additions & 10 deletions kopf/reactor/handling.py
Expand Up @@ -109,6 +109,7 @@ async def custom_object_handler(
if registry.has_cause_handlers(resource=resource):
extra_fields = registry.get_extra_fields(resource=resource)
old, new, diff = lastseen.get_state_diffs(body=body, extra_fields=extra_fields)
digest = lastseen.compute_digest(body=body, extra_fields=extra_fields)
cause = causation.detect_cause(
event=event,
resource=resource,
Expand All @@ -117,6 +118,7 @@ async def custom_object_handler(
old=old,
new=new,
diff=diff,
digest=digest,
requires_finalizer=registry.requires_finalizer(resource=resource, body=body),
)
delay = await handle_cause(lifecycle=lifecycle, registry=registry, cause=cause)
Expand Down Expand Up @@ -230,6 +232,13 @@ async def handle_cause(
logger.debug("Removing the finalizer, thus allowing the actual deletion.")
finalizers.remove_finalizers(body=body, patch=patch)

# Only for creation: freeze the last-seen state as it was in the beginning,
# so that an update cycle is triggered if there were changes during the creation cycle.
# Otherwise, the changes will be ignored, as they are included into the last-minute state.
if cause.event == causation.CREATE and not (done or skip):
extra_fields = registry.get_extra_fields(resource=cause.resource)
lastseen.freeze_state(body=body, patch=patch, extra_fields=extra_fields)

# Informational causes just print the log lines.
if cause.event == causation.GONE:
logger.debug("Deleted, really deleted, and we are notified.")
Expand Down Expand Up @@ -356,9 +365,9 @@ async def _execute(
logger = cause.logger

# Filter and select the handlers to be executed right now, on this event reaction cycle.
handlers_done = [h for h in handlers if status.is_finished(body=cause.body, handler=h)]
handlers_wait = [h for h in handlers if status.is_sleeping(body=cause.body, handler=h)]
handlers_todo = [h for h in handlers if status.is_awakened(body=cause.body, handler=h)]
handlers_done = [h for h in handlers if status.is_finished(body=cause.body, digest=cause.digest, handler=h)]
handlers_wait = [h for h in handlers if status.is_sleeping(body=cause.body, digest=cause.digest, handler=h)]
handlers_todo = [h for h in handlers if status.is_awakened(body=cause.body, digest=cause.digest, handler=h)]
handlers_plan = [h for h in await invocation.invoke(lifecycle, handlers_todo, cause=cause)]
handlers_left = [h for h in handlers_todo if h.id not in {h.id for h in handlers_plan}]

Expand Down Expand Up @@ -394,42 +403,49 @@ async def _execute(
# Unfinished children cause the regular retry, but with less logging and event reporting.
except HandlerChildrenRetry as e:
logger.debug(f"Handler {handler.id!r} has unfinished sub-handlers. Will retry soon.")
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay)
status.set_retry_time(body=cause.body, patch=cause.patch,
handler=handler, delay=e.delay)
handlers_left.append(handler)

# Definitely a temporary error, regardless of the error strictness.
except TemporaryError as e:
logger.error(f"Handler {handler.id!r} failed temporarily: %s", str(e) or repr(e))
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=e.delay)
status.set_retry_time(body=cause.body, patch=cause.patch,
handler=handler, delay=e.delay)
handlers_left.append(handler)

# Same as permanent errors below, but with better logging for our internal cases.
except HandlerTimeoutError as e:
logger.error(f"%s", str(e) or repr(e)) # already formatted
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
status.store_failure(body=cause.body, patch=cause.patch, digest=cause.digest,
handler=handler, exc=e)
# TODO: report the handling failure somehow (beside logs/events). persistent status?

# Definitely a permanent error, regardless of the error strictness.
except PermanentError as e:
logger.error(f"Handler {handler.id!r} failed permanently: %s", str(e) or repr(e))
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
status.store_failure(body=cause.body, patch=cause.patch, digest=cause.digest,
handler=handler, exc=e)
# TODO: report the handling failure somehow (beside logs/events). persistent status?

# Regular errors behave as either temporary or permanent depending on the error strictness.
except Exception as e:
if retry_on_errors:
logger.exception(f"Handler {handler.id!r} failed with an exception. Will retry.")
status.set_retry_time(body=cause.body, patch=cause.patch, handler=handler, delay=DEFAULT_RETRY_DELAY)
status.set_retry_time(body=cause.body, patch=cause.patch,
handler=handler, delay=DEFAULT_RETRY_DELAY)
handlers_left.append(handler)
else:
logger.exception(f"Handler {handler.id!r} failed with an exception. Will stop.")
status.store_failure(body=cause.body, patch=cause.patch, handler=handler, exc=e)
status.store_failure(body=cause.body, patch=cause.patch, digest=cause.digest,
handler=handler, exc=e)
# TODO: report the handling failure somehow (beside logs/events). persistent status?

# No errors means the handler should be excluded from future runs in this reaction cycle.
else:
logger.info(f"Handler {handler.id!r} succeeded.")
status.store_success(body=cause.body, patch=cause.patch, handler=handler, result=result)
status.store_success(body=cause.body, patch=cause.patch, digest=cause.digest,
handler=handler, result=result)

# Provoke the retry of the handling cycle if there were any unfinished handlers,
# either because they were not selected by the lifecycle, or failed and need a retry.
Expand Down
27 changes: 26 additions & 1 deletion kopf/structs/lastseen.py
Expand Up @@ -14,6 +14,7 @@
"""

import copy
import hashlib
import json

from kopf.structs import dicts
Expand Down Expand Up @@ -98,5 +99,29 @@ def retreive_state(body):


def refresh_state(*, body, patch, extra_fields=None):
frozen_state = body.get('status', {}).get('kopf', {}).get('frozen-state')
stored_state = retreive_state(body)
actual_state = get_state(body, extra_fields=extra_fields)
if stored_state is None or stored_state != actual_state:
annotations = patch.setdefault('metadata', {}).setdefault('annotations', {})
annotations[LAST_SEEN_ANNOTATION] = frozen_state or json.dumps(actual_state)
if frozen_state is not None:
storage = patch.setdefault('status', {}).setdefault('kopf', {})
storage['frozen-state'] = None


def freeze_state(*, body, patch, extra_fields=None):
frozen_state = body.get('status', {}).get('kopf', {}).get('frozen-state')
actual_state = get_state(body, extra_fields=extra_fields)
if frozen_state is None:
storage = patch.setdefault('status', {}).setdefault('kopf', {})
storage['frozen-state'] = json.dumps(actual_state)


def compute_digest(body, extra_fields=None):
state = get_state(body, extra_fields=extra_fields)
patch.setdefault('metadata', {}).setdefault('annotations', {})[LAST_SEEN_ANNOTATION] = json.dumps(state)

# Any digest with a short str/int result is sufficient. Even CRC. No security is needed.
hash = hashlib.md5()
hash.update(json.dumps(state).encode('utf-8'))
return hash.hexdigest()
67 changes: 49 additions & 18 deletions kopf/structs/status.py
Expand Up @@ -20,11 +20,11 @@
handler1:
started: 2018-12-31T23:59:59,999999
stopped: 2018-01-01T12:34:56,789000
success: true
success: abcdef1234567890fedcba
handler2:
started: 2018-12-31T23:59:59,999999
stopped: 2018-01-01T12:34:56,789000
failure: true
failure: abcdef1234567890fedcba
message: "Error message."
handler3:
started: 2018-12-31T23:59:59,999999
Expand All @@ -37,16 +37,46 @@
handler3/sub2:
started: 2018-12-31T23:59:59,999999
* ``status.kopf.success`` are the handlers that succeeded (no re-execution).
* ``status.kopf.failure`` are the handlers that failed completely (no retries).
* ``status.kopf.delayed`` are the timestamps, until which these handlers sleep.
* ``status.kopf.retries`` are number of retries for succeeded, failed,
and for the progressing handlers.
* ``status.kopf.progress`` stores the state of each individual handler in the
current handling cycle (by their ids, which are usually the function names).
For each handler's status, the following is stored:
* ``started``: when the handler was attempted for the first time (used for timeouts & durations).
* ``stopped``: when the handler has failed or succeeded.
* ``delayed``: when the handler can retry again.
* ``retries``: the number of retried attempted so far (including reruns and successful attempts).
* ``success``: a digest of where the handler has succeeded (and thus no retries are needed).
* ``failure``: a digest of where the handler has failed completely (no retries will be done).
* ``message``: a brief error message from the last exception (as a hint).
When the full event cycle is executed (possibly including multiple re-runs),
the whole ``status.kopf`` section is purged. The life-long persistence of status
is not intended: otherwise, multiple distinct causes will clutter the status
and collide with each other (especially critical for multiple updates).
The digest of each handler's success or failure can be considered a "version"
of an object being handled, as it was when the handler has finished.
If the object is changed during the remaining handling cycle, the digest
of the finished handlers will be mismatching the actual digest of the object,
and so they will be re-executed.
This is conceptually close to *reconciliation*: the handling is finished
only when all handlers are executed on the latest state of the object.
Creation is treated specially: the creation handlers will never be re-executed.
In case of changes during the creation handling, the remaining creation handlers
will get the new state (as normally), and then there will be an update cycle
with all the changes since the first creation handler -- i.e. not from the last
handler as usually, when the last-seen state is updated.
Update handlers are assumed to be idempotent by concept, so it should be safe
to call them with the changes that are already reflected in the system by some
of the creation handlers.
Note: The Kubernetes-provided "resource version" of the object is not used,
as it increases with every change of the object, while this digest is used
only for the changes relevant to the operator and framework (see `get_state`).
"""

import collections.abc
Expand All @@ -59,23 +89,24 @@ def is_started(*, body, handler):
return handler.id in progress


def is_sleeping(*, body, handler):
def is_sleeping(*, body, digest, handler):
ts = get_awake_time(body=body, handler=handler)
finished = is_finished(body=body, handler=handler)
finished = is_finished(body=body, digest=digest, handler=handler)
return not finished and ts is not None and ts > datetime.datetime.utcnow()


def is_awakened(*, body, handler):
finished = is_finished(body=body, handler=handler)
sleeping = is_sleeping(body=body, handler=handler)
def is_awakened(*, body, digest, handler):
finished = is_finished(body=body, digest=digest, handler=handler)
sleeping = is_sleeping(body=body, digest=digest, handler=handler)
return bool(not finished and not sleeping)


def is_finished(*, body, handler):
def is_finished(*, body, digest, handler):
progress = body.get('status', {}).get('kopf', {}).get('progress', {})
success = progress.get(handler.id, {}).get('success', None)
failure = progress.get(handler.id, {}).get('failure', None)
return bool(success or failure)
return ((success is not None and (success is True or success == digest)) or
(failure is not None and (failure is True or failure == digest)))


def get_start_time(*, body, patch, handler):
Expand Down Expand Up @@ -126,23 +157,23 @@ def set_retry_time(*, body, patch, handler, delay=None):
set_awake_time(body=body, patch=patch, handler=handler, delay=delay)


def store_failure(*, body, patch, handler, exc):
def store_failure(*, body, patch, digest, handler, exc):
retry = get_retry_count(body=body, handler=handler)
progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {})
progress.setdefault(handler.id, {}).update({
'stopped': datetime.datetime.utcnow().isoformat(),
'failure': True,
'failure': digest,
'retries': retry + 1,
'message': f'{exc}',
})


def store_success(*, body, patch, handler, result=None):
def store_success(*, body, patch, digest, handler, result=None):
retry = get_retry_count(body=body, handler=handler)
progress = patch.setdefault('status', {}).setdefault('kopf', {}).setdefault('progress', {})
progress.setdefault(handler.id, {}).update({
'stopped': datetime.datetime.utcnow().isoformat(),
'success': True,
'success': digest,
'retries': retry + 1,
'message': None,
})
Expand Down
6 changes: 6 additions & 0 deletions tests/basic-structs/test_cause.py
Expand Up @@ -15,6 +15,7 @@ def test_all_args(mocker):
initial = mocker.Mock()
body = mocker.Mock()
patch = mocker.Mock()
digest = mocker.Mock()
diff = mocker.Mock()
old = mocker.Mock()
new = mocker.Mock()
Expand All @@ -25,6 +26,7 @@ def test_all_args(mocker):
initial=initial,
body=body,
patch=patch,
digest=digest,
diff=diff,
old=old,
new=new,
Expand All @@ -35,6 +37,7 @@ def test_all_args(mocker):
assert cause.initial is initial
assert cause.body is body
assert cause.patch is patch
assert cause.digest is digest
assert cause.diff is diff
assert cause.old is old
assert cause.new is new
Expand All @@ -47,20 +50,23 @@ def test_required_args(mocker):
initial = mocker.Mock()
body = mocker.Mock()
patch = mocker.Mock()
digest = mocker.Mock()
cause = Cause(
resource=resource,
logger=logger,
event=event,
initial=initial,
body=body,
patch=patch,
digest=digest,
)
assert cause.resource is resource
assert cause.logger is logger
assert cause.event is event
assert cause.initial is initial
assert cause.body is body
assert cause.patch is patch
assert cause.digest is digest
assert cause.diff is None
assert cause.old is None
assert cause.new is None
2 changes: 2 additions & 0 deletions tests/causation/test_detection.py
Expand Up @@ -117,13 +117,15 @@ def kwargs():
resource=object(),
logger=object(),
patch=object(),
digest=object(),
)

def check_kwargs(cause, kwargs):
__traceback_hide__ = True
assert cause.resource is kwargs['resource']
assert cause.logger is kwargs['logger']
assert cause.patch is kwargs['patch']
assert cause.digest is kwargs['digest'] or cause.digest is True


#
Expand Down
6 changes: 5 additions & 1 deletion tests/handling/conftest.py
Expand Up @@ -190,12 +190,14 @@ def new_detect_fn(**kwargs):
original_diff = kwargs.pop('diff', None)
original_new = kwargs.pop('new', None)
original_old = kwargs.pop('old', None)
original_digest = kwargs.pop('digest', None)
event = mock.event if mock.event is not None else original_event
initial = bool(event == RESUME)
body = copy.deepcopy(mock.body) if mock.body is not None else original_body
diff = copy.deepcopy(mock.diff) if mock.diff is not None else original_diff
new = copy.deepcopy(mock.new) if mock.new is not None else original_new
old = copy.deepcopy(mock.old) if mock.old is not None else original_old
digest = copy.deepcopy(mock.digest) if mock.digest is not None else original_digest

# Remove requires_finalizer from kwargs as it shouldn't be passed to the Cause object
kwargs.pop('requires_finalizer', None)
Expand All @@ -209,6 +211,7 @@ def new_detect_fn(**kwargs):
diff=diff,
new=new,
old=old,
digest=digest,
**kwargs)

# Needed for the k8s-event creation, as they are attached to objects.
Expand All @@ -224,10 +227,11 @@ def new_detect_fn(**kwargs):
mocker.patch('kopf.reactor.causation.detect_cause', new=new_detect_fn)

# The mock object stores some values later used by the factory substitute.
mock = mocker.Mock(spec_set=['event', 'body', 'diff', 'new', 'old'])
mock = mocker.Mock(spec_set=['event', 'body', 'diff', 'new', 'old', 'digest'])
mock.event = None
mock.body = {'metadata': {'namespace': 'ns1', 'name': 'name1'}}
mock.diff = None
mock.new = None
mock.old = None
mock.digest = None
return mock

0 comments on commit 8b9fbc1

Please sign in to comment.