Skip to content

Commit

Permalink
Merge pull request #228 from nolar/liveliness
Browse files Browse the repository at this point in the history
Liveliness/readiness probes
  • Loading branch information
nolar committed Nov 13, 2019
2 parents 223f53d + 9c4002a commit 445cfd6
Show file tree
Hide file tree
Showing 14 changed files with 378 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ For this, create the deployment file:
Please note that there is only one replica. Keep it so. If there will be
two or more operators running in the cluster for the same objects,
they will collide with each other and the consequences are unpredictable.
In case of pod restarts, only one pod should be running at a time too:
use ``.spec.strategy.type=Recreate`` (see the documentation_).

.. _documentation: https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#recreate-deployment

Deploy it to the cluster:

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Kopf: Kubernetes Operators Framework
hierarchies
async
loading
probing
peering
scopes
errors
Expand Down
106 changes: 106 additions & 0 deletions docs/probing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
=============
Health-checks
=============

Kopf provides a minimalistic HTTP server to report its health status.


Liveness endpoints
==================

By default, no endpoint is configured, and no health is reported.
To specify an endpoint to listen for probes, use :option:`--liveness`:

.. code-block:: bash
kopf run --liveness=http://:8080/healthz --verbose handlers.py
Currently, only HTTP is supported.
Other protocols (TCP, HTTPS) can be added in the future.


Kubernetes probing
==================

This port and path can be used in a liveness probe of the operator's deployment.
If the operator does not respond for any reason, Kubernetes will restart it.

.. code-block:: yaml
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: the-only-one
image: ...
livenessProbe:
httpGet:
path: /healthz
port: 8080
.. seealso::

Kubernetes manual on `liveness and readiness probes`__.

__ https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/

.. seealso::

Please be aware of the readiness vs. liveness probing.
In case of operators, readiness probing makes no practical sense,
as operators do not serve traffic under the load balancing or with services.
Liveness probing can help in disastrous cases (e.g. the operator is stuck),
but will not help in case of partial failures (one of the API calls stuck).
You can read more here:
https://srcco.de/posts/kubernetes-liveness-probes-are-dangerous.html

.. warning::

Make sure that one and only one pod of an operator is running at a time,
especially during the restarts --- see :doc:`deployment`.


Probe handlers
==============

The content of the response is empty by default. It can be populated with
probing handlers:

.. code-block:: python
import datetime
import kopf
import random
@kopf.on.probe(id='now')
def get_current_timestamp(**kwargs):
return datetime.datetime.utcnow().isoformat()
@kopf.on.probe(id='random')
def get_random_value(**kwargs):
return random.randint(0, 1_000_000)
The probe handlers will be executed on the requests to the liveness URL,
and cached for a reasonable period of time to prevent overloading
by mass-requesting the status.

The handler results will be reported as the content of the liveness response:

.. code-block:: console
$ curl http://localhost:8080/healthz
{"now": "2019-11-07T18:03:52.513803", "random": 765846}
.. note::
Liveless status report is simplistic and minimalistic at the moment.
It only reports success if the health-reporting task runs at all.
It can happen so that some of the operator's tasks, threads, or streams
do break, freeze, or become unresponsive, while the health-reporting task
continues to run. The probability of such case is low, but not zero.

There are no checks that operator actually operates anything
(unless they are implemented explicitly with the probe-handlers),
as there are no reliable criteria for that -- total absence of handled
resources or events can be an expected state of the cluster.
2 changes: 1 addition & 1 deletion examples/06-peering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

When multiple operators start for the same cluster (in the cluster or outside),
they become aware about each other, and exchange the basic information about
their liveliness and the priorities, and cooperate to avoid the undesired
their liveness and the priorities, and cooperate to avoid the undesired
side-effects (e.g., duplicated children creation, infinite cross-changes).

The main use-case for this is the development mode: when a developer starts
Expand Down
10 changes: 10 additions & 0 deletions examples/13-hooks/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ async def login_fn(**kwargs):
)


@kopf.on.probe()
async def tasks_count(**kwargs):
return sum([len(flags) for flags in STOPPERS.values()])


@kopf.on.probe()
async def monitored_objects(**kwargs):
return {namespace: sorted([name for name in STOPPERS[namespace]]) for namespace in STOPPERS}


@kopf.on.event('', 'v1', 'pods')
async def pod_task(namespace, name, logger, **_):
async with LOCK:
Expand Down
3 changes: 3 additions & 0 deletions kopf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def main() -> None:
@click.option('-n', '--namespace', default=None)
@click.option('--standalone', is_flag=True, default=False)
@click.option('--dev', 'priority', type=int, is_flag=True, flag_value=666)
@click.option('-L', '--liveness', 'liveness_endpoint', type=str)
@click.option('-P', '--peering', 'peering_name', type=str, default=None, envvar='KOPF_RUN_PEERING')
@click.option('-p', '--priority', type=int, default=0)
@click.option('-m', '--module', 'modules', multiple=True)
Expand All @@ -59,6 +60,7 @@ def run(
priority: int,
standalone: bool,
namespace: Optional[str],
liveness_endpoint: Optional[str],
) -> None:
""" Start an operator process and handle all the requests. """
loaders.preload(
Expand All @@ -70,6 +72,7 @@ def run(
namespace=namespace,
priority=priority,
peering_name=peering_name,
liveness_endpoint=liveness_endpoint,
stop_flag=__controls.stop_flag,
ready_flag=__controls.ready_flag,
vault=__controls.vault,
Expand Down
92 changes: 92 additions & 0 deletions kopf/engines/probing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import asyncio
import datetime
import logging
import urllib.parse
from typing import Optional, Tuple, MutableMapping

import aiohttp.web

from kopf.reactor import causation
from kopf.reactor import handling
from kopf.reactor import lifecycles
from kopf.reactor import registries

logger = logging.getLogger(__name__)

LOCALHOST: str = 'localhost'
HTTP_PORT: int = 80

_Key = Tuple[str, int] # hostname, port


async def health_reporter(
endpoint: str,
*,
registry: registries.OperatorRegistry,
ready_flag: Optional[asyncio.Event] = None, # used for testing
) -> None:
"""
Simple HTTP(S)/TCP server to report the operator's health to K8s probes.
Runs forever until cancelled (which happens if any other root task
is cancelled or failed). Once it will stop responding for any reason,
Kubernetes will assume the pod is not alive anymore, and will restart it.
"""
probing_container: MutableMapping[registries.HandlerId, registries.HandlerResult] = {}
probing_timestamp: Optional[datetime.datetime] = None
probing_max_age = datetime.timedelta(seconds=10.0)
probing_lock = asyncio.Lock()

async def get_health(
request: aiohttp.web.Request,
) -> aiohttp.web.Response:
nonlocal probing_timestamp

# Recollect the data on-demand, and only if is is older that a reasonable caching period.
# Protect against multiple parallel requests performing the same heavy activity.
now = datetime.datetime.utcnow()
if probing_timestamp is None or now - probing_timestamp >= probing_max_age:
async with probing_lock:
now = datetime.datetime.utcnow()
if probing_timestamp is None or now - probing_timestamp >= probing_max_age:

activity_results = await handling.activity_trigger(
lifecycle=lifecycles.all_at_once,
registry=registry,
activity=causation.Activity.PROBE,
)
probing_container.clear()
probing_container.update(activity_results)
probing_timestamp = datetime.datetime.utcnow()

return aiohttp.web.json_response(probing_container)

parts = urllib.parse.urlsplit(endpoint)
if parts.scheme == 'http':
host = parts.hostname or LOCALHOST
port = parts.port or HTTP_PORT
path = parts.path
else:
raise Exception(f"Unsupported scheme: {endpoint}")

app = aiohttp.web.Application()
app.add_routes([aiohttp.web.get(path, get_health)])

runner = aiohttp.web.AppRunner(app, handle_signals=False)
await runner.setup()

site = aiohttp.web.TCPSite(runner, host, port, shutdown_timeout=1.0)
await site.start()

# Log with the actual URL: normalised, with hostname/port set.
url = urllib.parse.urlunsplit([parts.scheme, f'{host}:{port}', path, '', ''])
logger.debug("Serving health status at %s", url)
if ready_flag is not None:
ready_flag.set()

try:
# Sleep forever. No activity is needed.
await asyncio.Event().wait()
finally:
# On any reason of exit, stop reporting the health.
await asyncio.shield(runner.cleanup())
20 changes: 20 additions & 0 deletions kopf/on.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,26 @@ def decorator(fn: registries.ActivityHandlerFn) -> registries.ActivityHandlerFn:
return decorator


def probe(
*,
id: Optional[str] = None,
errors: Optional[registries.ErrorsMode] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None,
cooldown: Optional[float] = None,
registry: Optional[registries.OperatorRegistry] = None,
) -> ActivityHandlerDecorator:
""" ``@kopf.on.probe()`` handler for arbitrary liveness metrics. """
actual_registry = registry if registry is not None else registries.get_default_registry()
def decorator(fn: registries.ActivityHandlerFn) -> registries.ActivityHandlerFn:
return actual_registry.register_activity_handler(
fn=fn, id=id,
errors=errors, timeout=timeout, retries=retries, cooldown=cooldown,
activity=causation.Activity.PROBE,
)
return decorator


def resume(
group: str, version: str, plural: str,
*,
Expand Down
1 change: 1 addition & 0 deletions kopf/reactor/causation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Activity(str, enum.Enum):
STARTUP = 'startup'
CLEANUP = 'cleanup'
AUTHENTICATION = 'authentication'
PROBE = 'probe'


# Constants for cause types, to prevent a direct usage of strings, and typos.
Expand Down
2 changes: 1 addition & 1 deletion kopf/reactor/handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from kopf.structs import resources

WAITING_KEEPALIVE_INTERVAL = 10 * 60
""" How often to wake up from the long sleep, to show the liveliness. """
""" How often to wake up from the long sleep, to show liveness in the logs. """

DEFAULT_RETRY_DELAY = 1 * 60
""" The default delay duration for the regular exception in retry-mode. """
Expand Down
16 changes: 16 additions & 0 deletions kopf/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from kopf.clients import auth
from kopf.engines import peering
from kopf.engines import posting
from kopf.engines import probing
from kopf.reactor import activities
from kopf.reactor import causation
from kopf.reactor import handling
Expand Down Expand Up @@ -85,6 +86,7 @@ def run(
standalone: bool = False,
priority: int = 0,
peering_name: Optional[str] = None,
liveness_endpoint: Optional[str] = None,
namespace: Optional[str] = None,
stop_flag: Optional[Flag] = None,
ready_flag: Optional[Flag] = None,
Expand All @@ -105,6 +107,7 @@ def run(
namespace=namespace,
priority=priority,
peering_name=peering_name,
liveness_endpoint=liveness_endpoint,
stop_flag=stop_flag,
ready_flag=ready_flag,
vault=vault,
Expand All @@ -121,6 +124,7 @@ async def operator(
standalone: bool = False,
priority: int = 0,
peering_name: Optional[str] = None,
liveness_endpoint: Optional[str] = None,
namespace: Optional[str] = None,
stop_flag: Optional[Flag] = None,
ready_flag: Optional[Flag] = None,
Expand All @@ -143,6 +147,7 @@ async def operator(
namespace=namespace,
priority=priority,
peering_name=peering_name,
liveness_endpoint=liveness_endpoint,
stop_flag=stop_flag,
ready_flag=ready_flag,
vault=vault,
Expand All @@ -158,6 +163,7 @@ async def spawn_tasks(
standalone: bool = False,
priority: int = 0,
peering_name: Optional[str] = None,
liveness_endpoint: Optional[str] = None,
namespace: Optional[str] = None,
stop_flag: Optional[Flag] = None,
ready_flag: Optional[Flag] = None,
Expand Down Expand Up @@ -217,6 +223,16 @@ async def spawn_tasks(
event_queue=event_queue))),
])

# Liveness probing -- so that Kubernetes would know that the operator is alive.
if liveness_endpoint:
tasks.extend([
loop.create_task(_root_task_checker(
name="health reporter", ready_flag=ready_flag,
coro=probing.health_reporter(
registry=registry,
endpoint=liveness_endpoint))),
])

# Monitor the peers, unless explicitly disabled.
ourselves: Optional[peering.Peer] = await peering.Peer.detect(
id=peering.detect_own_id(), priority=priority,
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ urllib3<1.25
-e .

# Everything needed to develop (test, debug) the framework.
pytest-aiohttp
pytest-asyncio
pytest-mock>=1.11.1
pytest-cov
Expand Down
Loading

0 comments on commit 445cfd6

Please sign in to comment.