Skip to content

Commit

Permalink
Expose and document explicit operator orchestration from 3rd-party apps
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Vasilyev committed Jul 24, 2019
1 parent e0b3a51 commit 25fd0a9
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 5 deletions.
112 changes: 112 additions & 0 deletions docs/embedding.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
=========
Embedding
=========

Kopf is designed to be embeddable into other applications, which require
watching over the Kubernetes resources (custom or built-in), and handling
the changes.
This can be used, for example, in desktop applications or web APIs/UIs
to keep the state of the cluster and its resources in memory.


Manual orchestration
====================

Since Kopf is fully asynchronous, the best way to run Kopf is to provide
an event-loop specially for Kopf in a separate thread, while running
the main application in the main thread.

.. code-block:: python
import asyncio
import threading
import kopf
@kopf.on.create('zalando.org', 'v1', 'kopfexamples')
def create_fn(**_):
pass
def kopf_thread():
loop = asyncio.get_event_loop()
loop.run_until_complete(kopf.operator())
def main():
thread = threading.Thread(target_fn=kopf_thread)
thread.start()
# ...
thread.join()
In case of :command:`kopf run`, the main application is Kopf itself,
so its event-loop runs in the main thread.

.. note::
When an asyncio task runs not in the main thread, it cannot set
the OS signal handlers, so a developer should implement the termination
themselves (cancellation of an operator task is enough).

Alternatively, a developer can orchestrate the operator's tasks and sub-tasks
themselves. The example above is an equivalent of the following:

.. code-block:: python
def kopf_thread():
loop = asyncio.get_event_loop()
tasks = loop.run_until_complete(kopf.spawn_tasks())
loop.run_until_complete(kopf.run_tasks(tasks, return_when=asyncio.FIRST_COMPLETED))
Or, if proper cancellation and termination is not expected, of the following:

.. code-block:: python
def kopf_thread():
loop = asyncio.get_event_loop()
tasks = loop.run_until_complete(kopf.spawn_tasks())
loop.run_until_complete(asyncio.wait(tasks))
Multiple operators
==================

Kopf can handle multiple resources at a time, so only one instance should be
sufficient for most cases. However, it can be needed to run multiple isolated
operators in the same process.

It should be safe to run multiple operators in multiple isolated event-loops.
Despite Kopf's routines use the global state, all such a global state is stored
in :mod:`contextvars` containers with values isolated per-loop and per-task.

.. code-block:: python
import asyncio
import threading
import kopf
registry = kopf.GlobalRegistry()
@kopf.on.create('zalando.org', 'v1', 'kopfexamples', registry=registry)
def create_fn(**_):
pass
def kopf_thread():
loop = asyncio.get_event_loop()
loop.run_until_complete(kopf.operator(
registry=registry,
))
def main():
thread = threading.Thread(target_fn=kopf_thread)
thread.start()
# ...
thread.join()
.. warning::
It is not recommended to run Kopf in the same event-loop with other routines
or applications: it considers all tasks in the event-loop as spawned by its
workers and handlers, and cancells them when it exits.

There are some basic safety measures to not cancel the tasks existed before
operator startup, but that cannot be applied to the tasks spawned later
due to asyncio implementation details.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Kopf: Kubernetes Operators Framework
events
testing
configuring
embedding

.. toctree::
:maxdepth: 2
Expand Down
7 changes: 5 additions & 2 deletions kopf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@
set_default_registry,
)
from kopf.reactor.running import (
spawn_tasks,
run_tasks,
operator,
run,
create_tasks,
create_tasks, # deprecated
)
from kopf.structs.hierarchies import (
adopt,
Expand All @@ -71,7 +74,7 @@
'configure',
'login', 'LoginError',
'event', 'info', 'warn', 'exception',
'run', 'create_tasks',
'spawn_tasks', 'run_tasks', 'operator', 'run', 'create_tasks',
'adopt', 'label',
'get_default_lifecycle', 'set_default_lifecycle',
'build_object_reference', 'build_owner_reference',
Expand Down
12 changes: 9 additions & 3 deletions kopf/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import signal
import threading
import warnings
from typing import Optional, Callable, Collection

from kopf.engines import peering
Expand All @@ -18,7 +19,7 @@
def run(
loop: Optional[asyncio.AbstractEventLoop] = None,
lifecycle: Optional[Callable] = None,
registry: Optional[registries.BaseRegistry] = None,
registry: Optional[registries.GlobalRegistry] = None,
standalone: bool = False,
priority: int = 0,
peering_name: str = peering.PEERING_DEFAULT_NAME,
Expand All @@ -45,7 +46,7 @@ def run(

async def operator(
lifecycle: Optional[Callable] = None,
registry: Optional[registries.BaseRegistry] = None,
registry: Optional[registries.GlobalRegistry] = None,
standalone: bool = False,
priority: int = 0,
peering_name: str = peering.PEERING_DEFAULT_NAME,
Expand All @@ -56,6 +57,8 @@ async def operator(
This function should be used to run an operator in an asyncio event-loop
if the operator is orchestrated explicitly and manually.
It is efficiently `spawn_tasks` + `run_tasks` with some safety.
"""
existing_tasks = await _all_tasks()
operator_tasks = await spawn_tasks(
Expand All @@ -71,7 +74,7 @@ async def operator(

async def spawn_tasks(
lifecycle: Optional[Callable] = None,
registry: Optional[registries.BaseRegistry] = None,
registry: Optional[registries.GlobalRegistry] = None,
standalone: bool = False,
priority: int = 0,
peering_name: str = peering.PEERING_DEFAULT_NAME,
Expand Down Expand Up @@ -267,4 +270,7 @@ def create_tasks(loop: asyncio.AbstractEventLoop, *arg, **kwargs):
It is only kept for backward compatibility, as it was exposed
via the public interface of the framework.
"""
warnings.warn("kopf.create_tasks() is deprecated: "
"use kopf.spawn_tasks() or kopf.operator().",
DeprecationWarning)
return loop.run_until_complete(spawn_tasks(*arg, **kwargs))

0 comments on commit 25fd0a9

Please sign in to comment.