Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #123 from dlmiddlecote/45-filter-by-labels-annotat…
Browse files Browse the repository at this point in the history
…ions

Filter by labels and annotations
  • Loading branch information
nolar committed Jul 24, 2019
2 parents e12c27b + 2aa7e7b commit 8da86a0
Show file tree
Hide file tree
Showing 12 changed files with 473 additions and 34 deletions.
33 changes: 33 additions & 0 deletions docs/handlers.rst
Expand Up @@ -271,3 +271,36 @@ with the ids like ``create_fn/item1``, ``create_fn/item2``, etc.
As such, the parent handler SHOULD NOT produce any side-effects
except as the read-only parsing of the inputs (e.g. ``spec``),
and generating the dynamic functions of the sub-handlers.


Filtering
=========

It is possible to only execute handlers when the object that triggers a handler
matches certain filters.

The following filters are available for all event, cause, and field handlers:

* Match an object's label and value::

@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': 'somevalue'})
def my_handler(spec, **_):
pass

* Match on the existence of an object's label::

@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': None})
def my_handler(spec, **_):
pass

* Match an object's annotation and value::

@kopf.on.create('zalando.org', 'v1', 'kopfexamples', annotations={'someannotation': 'somevalue'})
def my_handler(spec, **_):
pass

* Match on the existence of an object's annotation::

@kopf.on.create('zalando.org', 'v1', 'kopfexamples', annotations={'someannotation': None})
def my_handler(spec, **_):
pass
33 changes: 33 additions & 0 deletions examples/11-filtering-handlers/README.md
@@ -0,0 +1,33 @@
# Kopf example for testing the filtering of handlers

Kopf has the ability to execute handlers only if the watched objects
match the filters passed to the handler. This includes matching on:
* labels of a resource
* annotations of a resource

Start the operator:

```bash
kopf run example.py
```

Trigger the object creation and monitor the stderr of the operator:

```bash
$ kubectl apply -f ../obj.yaml
```

```
[2019-07-04 14:19:33,393] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Label satisfied.
[2019-07-04 14:19:33,395] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Handler 'create_with_labels_satisfied' succeeded.
[2019-07-04 14:19:33,648] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Label exists.
[2019-07-04 14:19:33,649] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Handler 'create_with_labels_exist' succeeded.
[2019-07-04 14:19:33,807] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Annotation satisfied.
[2019-07-04 14:19:33,809] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Handler 'create_with_annotations_satisfied' succeeded.
[2019-07-04 14:19:33,966] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Annotation exists.
[2019-07-04 14:19:33,967] kopf.reactor.handlin [INFO ] [default/kopf-example-1] Handler 'create_with_annotations_exist' succeeded.
[2019-07-04 14:19:33,967] kopf.reactor.handlin [INFO ] [default/kopf-example-1] All handlers succeeded for creation.
```

Here, notice that only the handlers that have labels or annotations that match the applied
object are executed, and the ones that don't, aren't.
31 changes: 31 additions & 0 deletions examples/11-filtering-handlers/example.py
@@ -0,0 +1,31 @@
import kopf


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': 'somevalue'})
def create_with_labels_satisfied(logger, **kwargs):
logger.info("Label satisfied.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': None})
def create_with_labels_exist(logger, **kwargs):
logger.info("Label exists.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', labels={'somelabel': 'othervalue'})
def create_with_labels_not_satisfied(logger, **kwargs):
logger.info("Label not satisfied.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', annotations={'someannotation': 'somevalue'})
def create_with_annotations_satisfied(logger, **kwargs):
logger.info("Annotation satisfied.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', annotations={'someannotation': None})
def create_with_annotations_exist(logger, **kwargs):
logger.info("Annotation exists.")


@kopf.on.create('zalando.org', 'v1', 'kopfexamples', annotations={'someannotation': 'othervalue'})
def create_with_annotations_not_satisfied(logger, **kwargs):
logger.info("Annotation not satisfied.")
53 changes: 53 additions & 0 deletions examples/11-filtering-handlers/test_example_11.py
@@ -0,0 +1,53 @@
import os.path
import subprocess
import time

import pytest

import kopf.testing

crd_yaml = os.path.relpath(os.path.join(os.path.dirname(__file__), '..', 'crd.yaml'))
obj_yaml = os.path.relpath(os.path.join(os.path.dirname(__file__), '..', 'obj.yaml'))
example_py = os.path.relpath(os.path.join(os.path.dirname(__file__), 'example.py'))


@pytest.fixture(autouse=True)
def crd_exists():
subprocess.run(f"kubectl apply -f {crd_yaml}",
check=True, timeout=10, capture_output=True, shell=True)


@pytest.fixture(autouse=True)
def obj_absent():
# Operator is not running in fixtures, so we need a force-delete (or this patch).
subprocess.run(['kubectl', 'patch', '-f', obj_yaml,
'-p', '{"metadata":{"finalizers":[]}}',
'--type', 'merge'],
check=False, timeout=10, capture_output=True)
subprocess.run(f"kubectl delete -f {obj_yaml}",
check=False, timeout=10, capture_output=True, shell=True)


def test_handler_filtering(mocker):

# To prevent lengthy threads in the loop executor when the process exits.
mocker.patch('kopf.config.WatchersConfig.default_stream_timeout', 10)

# Run an operator and simulate some activity with the operated resource.
with kopf.testing.KopfRunner(['run', '--verbose', '--standalone', example_py]) as runner:
subprocess.run(f"kubectl create -f {obj_yaml}", shell=True, check=True)
time.sleep(5) # give it some time to react
subprocess.run(f"kubectl delete -f {obj_yaml}", shell=True, check=True)
time.sleep(1) # give it some time to react

# Ensure that the operator did not die on start, or during the operation.
assert runner.exception is None
assert runner.exit_code == 0

# Check for correct log lines (to indicate correct handlers were executed).
assert '[default/kopf-example-1] Label satisfied.' in runner.stdout
assert '[default/kopf-example-1] Label exists.' in runner.stdout
assert '[default/kopf-example-1] Label not satisfied.' not in runner.stdout
assert '[default/kopf-example-1] Annotation satisfied.' in runner.stdout
assert '[default/kopf-example-1] Annotation exists.' in runner.stdout
assert '[default/kopf-example-1] Annotation not satisfied.' not in runner.stdout
2 changes: 2 additions & 0 deletions examples/obj.yaml
Expand Up @@ -5,6 +5,8 @@ metadata:
name: kopf-example-1
labels:
somelabel: somevalue
annotations:
someannotation: somevalue
spec:
duration: 1m
field: value
Expand Down
39 changes: 26 additions & 13 deletions kopf/on.py
Expand Up @@ -11,7 +11,7 @@ def creation_handler(**kwargs):

# TODO: add cluster=True support (different API methods)

from typing import Optional, Union, Tuple, List
from typing import Optional, Union, Tuple, List, Mapping

from kopf.reactor import causation
from kopf.reactor import handling
Expand All @@ -23,14 +23,16 @@ def resume(
*,
id: Optional[str] = None,
timeout: Optional[float] = None,
registry: Optional[registries.GlobalRegistry] = None):
registry: Optional[registries.GlobalRegistry] = None,
labels: Optional[Mapping] = None,
annotations: Optional[Mapping] = None):
""" ``@kopf.on.resume()`` handler for the object resuming on operator (re)start. """
registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn):
registry.register_cause_handler(
group=group, version=version, plural=plural,
event=None, initial=True, id=id, timeout=timeout,
fn=fn)
fn=fn, labels=labels, annotations=annotations)
return fn
return decorator

Expand All @@ -40,14 +42,16 @@ def create(
*,
id: Optional[str] = None,
timeout: Optional[float] = None,
registry: Optional[registries.GlobalRegistry] = None):
registry: Optional[registries.GlobalRegistry] = None,
labels: Optional[Mapping] = None,
annotations: Optional[Mapping] = None):
""" ``@kopf.on.create()`` handler for the object creation. """
registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn):
registry.register_cause_handler(
group=group, version=version, plural=plural,
event=causation.CREATE, id=id, timeout=timeout,
fn=fn)
fn=fn, labels=labels, annotations=annotations)
return fn
return decorator

Expand All @@ -57,14 +61,16 @@ def update(
*,
id: Optional[str] = None,
timeout: Optional[float] = None,
registry: Optional[registries.GlobalRegistry] = None):
registry: Optional[registries.GlobalRegistry] = None,
labels: Optional[Mapping] = None,
annotations: Optional[Mapping] = None):
""" ``@kopf.on.update()`` handler for the object update or change. """
registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn):
registry.register_cause_handler(
group=group, version=version, plural=plural,
event=causation.UPDATE, id=id, timeout=timeout,
fn=fn)
fn=fn, labels=labels, annotations=annotations)
return fn
return decorator

Expand All @@ -75,14 +81,17 @@ def delete(
id: Optional[str] = None,
timeout: Optional[float] = None,
registry: Optional[registries.GlobalRegistry] = None,
optional: Optional[bool] = None):
optional: Optional[bool] = None,
labels: Optional[Mapping] = None,
annotations: Optional[Mapping] = None):
""" ``@kopf.on.delete()`` handler for the object deletion. """
registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn):
registry.register_cause_handler(
group=group, version=version, plural=plural,
event=causation.DELETE, id=id, timeout=timeout,
fn=fn, requires_finalizer=bool(not optional))
fn=fn, requires_finalizer=bool(not optional),
labels=labels, annotations=annotations)
return fn
return decorator

Expand All @@ -93,14 +102,16 @@ def field(
*,
id: Optional[str] = None,
timeout: Optional[float] = None,
registry: Optional[registries.GlobalRegistry] = None):
registry: Optional[registries.GlobalRegistry] = None,
labels: Optional[Mapping] = None,
annotations: Optional[Mapping] = None):
""" ``@kopf.on.field()`` handler for the individual field changes. """
registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn):
registry.register_cause_handler(
group=group, version=version, plural=plural,
event=None, field=field, id=id, timeout=timeout,
fn=fn)
fn=fn, labels=labels, annotations=annotations)
return fn
return decorator

Expand All @@ -109,13 +120,15 @@ def event(
group: str, version: str, plural: str,
*,
id: Optional[str] = None,
registry: Optional[registries.GlobalRegistry] = None):
registry: Optional[registries.GlobalRegistry] = None,
labels: Optional[Mapping] = None,
annotations: Optional[Mapping] = None):
""" ``@kopf.on.event()`` handler for the silent spies on the events. """
registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn):
registry.register_event_handler(
group=group, version=version, plural=plural,
id=id, fn=fn)
id=id, fn=fn, labels=labels, annotations=annotations)
return fn
return decorator

Expand Down
33 changes: 20 additions & 13 deletions kopf/reactor/registries.py
Expand Up @@ -14,7 +14,9 @@
import abc
import functools
from types import FunctionType, MethodType
from typing import MutableMapping, NamedTuple, Text, Optional, Tuple, Callable
from typing import MutableMapping, NamedTuple, Text, Optional, Tuple, Callable, Mapping

from kopf.structs import filters


# An immutable reference to a custom resource definition.
Expand All @@ -41,6 +43,8 @@ class Handler(NamedTuple):
field: Optional[Tuple[str, ...]]
timeout: Optional[float] = None
initial: Optional[bool] = None
labels: Optional[Mapping] = None
annotations: Optional[Mapping] = None


class BaseRegistry(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -117,7 +121,8 @@ def __bool__(self):
def append(self, handler):
self._handlers.append(handler)

def register(self, fn, id=None, event=None, field=None, timeout=None, initial=None, requires_finalizer=False):
def register(self, fn, id=None, event=None, field=None, timeout=None, initial=None, requires_finalizer=False,
labels=None, annotations=None):
if field is None:
field = None # for the non-field events
elif isinstance(field, str):
Expand All @@ -130,7 +135,8 @@ def register(self, fn, id=None, event=None, field=None, timeout=None, initial=No
id = id if id is not None else get_callable_id(fn)
id = id if field is None else f'{id}/{".".join(field)}'
id = id if self.prefix is None else f'{self.prefix}/{id}'
handler = Handler(id=id, fn=fn, event=event, field=field, timeout=timeout, initial=initial)
handler = Handler(id=id, fn=fn, event=event, field=field, timeout=timeout, initial=initial,
labels=labels, annotations=annotations)

self.append(handler)

Expand All @@ -140,20 +146,18 @@ def register(self, fn, id=None, event=None, field=None, timeout=None, initial=No
return fn # to be usable as a decorator too.

def iter_cause_handlers(self, cause):
fields = {field for _, field, _, _ in cause.diff or []}
changed_fields = {field for _, field, _, _ in cause.diff or []}
for handler in self._handlers:
if handler.event is None or handler.event == cause.event:
if handler.initial and not cause.initial:
pass # ignore initial handlers in non-initial causes.
elif handler.field:
if any(field[:len(handler.field)] == handler.field for field in fields):
yield handler
else:
elif filters.match(handler=handler, body=cause.body, changed_fields=changed_fields):
yield handler

def iter_event_handlers(self, resource, event):
for handler in self._handlers:
yield handler
if filters.match(handler=handler, body=event['object']):
yield handler

def iter_extra_fields(self, resource):
for handler in self._handlers:
Expand Down Expand Up @@ -196,22 +200,25 @@ def __init__(self):
self._event_handlers: MutableMapping[Resource, SimpleRegistry] = {}

def register_cause_handler(self, group, version, plural, fn,
id=None, event=None, field=None, timeout=None, initial=None, requires_finalizer=False):
id=None, event=None, field=None, timeout=None, initial=None, requires_finalizer=False,
labels=None, annotations=None):
"""
Register an additional handler function for the specific resource and specific event.
"""
resource = Resource(group, version, plural)
registry = self._cause_handlers.setdefault(resource, SimpleRegistry())
registry.register(event=event, field=field, fn=fn, id=id, timeout=timeout, initial=initial, requires_finalizer=requires_finalizer)
registry.register(event=event, field=field, fn=fn, id=id, timeout=timeout, initial=initial, requires_finalizer=requires_finalizer,
labels=labels, annotations=annotations)
return fn # to be usable as a decorator too.

def register_event_handler(self, group, version, plural, fn, id=None):
def register_event_handler(self, group, version, plural, fn, id=None, labels=None,
annotations=None):
"""
Register an additional handler function for low-level events.
"""
resource = Resource(group, version, plural)
registry = self._event_handlers.setdefault(resource, SimpleRegistry())
registry.register(fn=fn, id=id)
registry.register(fn=fn, id=id, labels=labels, annotations=annotations)
return fn # to be usable as a decorator too.

@property
Expand Down

0 comments on commit 8da86a0

Please sign in to comment.