Skip to content

Watch replays events forever when wrapping the API function pointer in functools.partial #803

@randyesq

Description

@randyesq

Client Version: 9.0.0
Kubernetes version: 1.13
Python Version: 3.7.2

I am finding that when I watch a CRD in a multiprocessing.Process environment, the watcher will continue to fire the same events that have occurred since the watch began over and over forever. When I use the watcher in the main python process, it fires events when they happen, as expected.

Here is the output that I get when I initially create two CRDs. The rest of the logging is the watcher reporting those same two created events:

2019-04-01 15:19:19,725 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test1 ADDED (version=1065261)
2019-04-01 15:20:59,935 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test2 ADDED (version=1065497)
2019-04-01 15:21:03,348 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test1 ADDED (version=1065261)
2019-04-01 15:21:03,349 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test2 ADDED (version=1065497)
2019-04-01 15:22:56,176 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test1 ADDED (version=1065261)
2019-04-01 15:22:56,177 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test2 ADDED (version=1065497)
2019-04-01 15:24:18,555 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test1 ADDED (version=1065261)
2019-04-01 15:24:18,555 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test2 ADDED (version=1065497)
2019-04-01 15:26:04,191 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test1 ADDED (version=1065261)
2019-04-01 15:26:04,192 - INFO    - cfs_operator - EVENT: ConfigFrameworkSession=test2 ADDED (version=1065497)

This happens with Jobs as well (and maybe others that I have not tried).

Here is my multiprocessing code:

from functools import partial
import logging
import multiprocessing

from kubernetes import client, config, watch
from kubernetes.client import rest
from kubernetes.client.rest import ApiException

LOGGER = logging.getLogger('cfs_operator')

try:
    config.load_incluster_config()
except:
    config.load_kube_config()  # Development

_api_client = client.ApiClient()
k8sjobs = client.BatchV1Api(_api_client)
k8scrds = client.CustomObjectsApi(_api_client)

RESOURCE_GROUP = 'my.resource.group.com'
RESOURCE_VERSION = 'v1'
RESOURCE_NAMESPACE = 'default'
RESOURCE_PLURAL = 'cfsessions'
RESOURCE_KIND = "ConfigFrameworkSession"

custom_resource_args = (RESOURCE_GROUP, RESOURCE_VERSION, RESOURCE_NAMESPACE, RESOURCE_PLURAL)
list_namespaced_custom_object = partial(k8scrds.list_namespaced_custom_object, *custom_resource_args)


def handle_event(event):
    etype = event['type']
    obj = event['raw_object']
    name = obj['metadata']['name']
    objtype = obj['kind']
    resource_version = obj['metadata']['resourceVersion']
    LOGGER.info("EVENT: %s=%s %s (version=%s)", objtype, name, etype, resource_version)


def main():
    """ Spawn watch processes of relevant Kubernetes objects """
    def capture_stream(watch_fcn, handler, queue, **kwargs):
        """ Used by processes watching Kubernetes object events """
        stream = watch.Watch().stream(watch_fcn, **kwargs)
        try:
            for event in stream:
                queue.put((None, handler, event))
        except Exception as err:
            queue.put((err, None, None))
        return

    def process_events(queue):
        """ Used by processes doing things with Kubernetes object events """
        while True:
            error, handler, event = queue.get()
            if error:
                LOGGER.error(error)
                raise error
            else:
                handler(event)

    # Start a process to handle events placed in the event queue
    event_queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=process_events, args=(event_queue,))
    p.start()

    # Watch ConfigFrameworkSession Custom Resource events
    cfs_event_process = multiprocessing.Process(
        target=capture_stream,
        args=(list_namespaced_custom_object, handle_event, event_queue),
    )
    cfs_event_process.start()

    p.join()


if __name__ == '__main__':

    # Format logs for stdout
    log_format = "%(asctime)-15s - %(levelname)-7s - %(name)s - %(message)s"
    log_level = logging.getLevelName('INFO')
    logging.basicConfig(level=log_level, format=log_format)

    # Watch events and handle them
    main()

I have this setup with multiprocessing because I would like to watch multiple objects and process their events in a queue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    lifecycle/rottenDenotes an issue or PR that has aged beyond stale and will be auto-closed.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions