Skip to content

Commit

Permalink
Merge pull request #572 from nolar/peering-settings
Browse files Browse the repository at this point in the history
Configure operator's peering via centralised settings
  • Loading branch information
nolar committed Nov 2, 2020
2 parents fbe6b38 + abed462 commit 8023ea4
Show file tree
Hide file tree
Showing 17 changed files with 527 additions and 592 deletions.
97 changes: 77 additions & 20 deletions docs/peering.rst
Expand Up @@ -23,6 +23,16 @@ To set the operator's priority, use :option:`--priority`:
kopf run --priority=100 ...
Or:

.. code-block:: python
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.priority = 100
As a shortcut, there is a :option:`--dev` option, which sets
the priority to ``666``, and is intended for the development mode.

Expand Down Expand Up @@ -59,25 +69,11 @@ Create the peering objects as needed with one of:
.. note::

Previously, ``KopfPeering`` was the only CRD, and it was cluster-scoped.
Now, it is namespaced. For the new users, it all will be fine and working.

If the old ``KopfPeering`` CRD is already deployed to your cluster,
it will also continue to work as before without re-configuration:
though there will be no namespace isolation as documented here ---
it will be cluster peering regardless of :option:`--namespace`
(as it was before the changes).

When possible (but strictly after the Kopf's version upgrade),
please delete the old CRD, and re-create from scratch:

.. code-block:: bash
kubectl delete crd kopfpeerings.zalando.org
# give it 1-2 minutes to cleanup, or repeat until succeeded:
kubectl create -f peering.yaml
Then re-deploy your custom peering objects of your apps.
In ``kopf<0.11`` (until May'2019), ``KopfPeering`` was the only CRD,
and it was cluster-scoped. In ``kopf>=0.11,<0.29`` (until Oct'2020),
this mode was deprecated but supported if the old CRD existed.
Since ``kopf>=0.29`` (Nov'2020), it is not supported anymore.
To upgrade, delete and re-create the peering CRDs to the new ones.


Custom peering
Expand All @@ -88,11 +84,27 @@ The operator can be instructed to use alternative peering objects::
kopf run --peering=example ...
kopf run --peering=example --namespace=some-ns ...

Or:

.. code-block:: python
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.name = "example"
settings.peering.mandatory = True
Depending on :option:`--namespace`, either ``ClusterKopfPeering``
or ``KopfPeering`` will be used (in the operator's namespace).

If the peering object does not exist, the operator will fail to start.
Using :option:`--peering` assumes that the peering is required.
Using :option:`--peering` assumes that the peering is mandatory.

Please note that in the startup handler, this is not exactly the same:
the mandatory mode must be set explicitly. Otherwise, the operator will try
to auto-detect the presence of the custom peering object, but will not fail
if it is absent -- unlike with the ``--peering=`` CLI option.

The operators from different peering objects do not see each other.

Expand All @@ -108,6 +120,16 @@ the standalone mode can be enabled::

kopf run --standalone ...

Or:

.. code-block:: python
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.standalone = True
In that case, the operator will not freeze if other operators with
the higher priority will start handling the objects, which may lead
to the conflicting changes and reactions from multiple operators
Expand Down Expand Up @@ -145,10 +167,45 @@ operator in the deployment or replicaset:
kopf run --priority=$RANDOM ...
Or:

.. code-block:: python
import random
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.priority = random.randint(0, 32767)
``$RANDOM`` is a feature of bash
(if you use another shell, see its man page for an equivalent).
It returns a random integer in the range 0..32767.
With high probability, 2-3 pods will get their unique priorities.

You can also use the pod's IP address in its numeric form as the priority,
or any other source of integers.


Stealth keep-alives
===================

Every few seconds (60 by default), the operator will send a keep-alive update
to the chosen peering, showing that it is still functioning. Other operators
will notice that and make decisions on their freezing or resuming.

The operator also logs a keep-alive activity to its own logs. This can be
distracting. To disable:

.. code-block:: python
import random
import kopf
@kopf.on.startup()
def configure(settings: kopf.OperatorSettings, **_):
settings.peering.stealth = True
There is no equivalent CLI option for that.

Please note that it only affects logging. The keep-alive are sent anyway.
2 changes: 1 addition & 1 deletion docs/walkthrough/starting.rst
Expand Up @@ -38,7 +38,7 @@ The output looks like this:
.. code-block:: none
[2019-05-31 10:42:11,870] kopf.config [DEBUG ] configured via kubeconfig file
[2019-05-31 10:42:11,913] kopf.reactor.peering [WARNING ] Default peering object not found, falling back to the standalone mode.
[2019-05-31 10:42:11,913] kopf.reactor.peering [WARNING ] Default peering object is not found, falling back to the standalone mode.
[2019-05-31 10:42:12,037] kopf.reactor.handlin [DEBUG ] [default/my-claim] First appearance: {'apiVersion': 'zalando.org/v1', 'kind': 'EphemeralVolumeClaim', 'metadata': {'annotations': {'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"zalando.org/v1","kind":"EphemeralVolumeClaim","metadata":{"annotations":{},"name":"my-claim","namespace":"default"}}\n'}, 'creationTimestamp': '2019-05-29T00:41:57Z', 'generation': 1, 'name': 'my-claim', 'namespace': 'default', 'resourceVersion': '47720', 'selfLink': '/apis/zalando.org/v1/namespaces/default/ephemeralvolumeclaims/my-claim', 'uid': '904c2b9b-81aa-11e9-a202-a6e6b278a294'}}
[2019-05-31 10:42:12,038] kopf.reactor.handlin [DEBUG ] [default/my-claim] Adding the finalizer, thus preventing the actual deletion.
[2019-05-31 10:42:12,038] kopf.reactor.handlin [DEBUG ] [default/my-claim] Patching with: {'metadata': {'finalizers': ['KopfFinalizerMarker']}}
Expand Down
2 changes: 1 addition & 1 deletion examples/12-embedded/README.md
Expand Up @@ -21,7 +21,7 @@ Starting the main app.
[DEBUG ] Pykube is configured via kubeconfig file.
[DEBUG ] Client is configured via kubeconfig file.
[WARNING ] Default peering object not found, falling back to the standalone mode.
[WARNING ] Default peering object is not found, falling back to the standalone mode.
[WARNING ] OS signals are ignored: running not in the main thread.
Do the main app activity here. Step 1/3.
Expand Down
45 changes: 24 additions & 21 deletions kopf/cli.py
Expand Up @@ -64,11 +64,11 @@ def main() -> None:
@main.command()
@logging_options
@click.option('-n', '--namespace', default=None)
@click.option('--standalone', is_flag=True, default=False)
@click.option('--standalone', is_flag=True, default=None)
@click.option('--dev', 'priority', type=int, is_flag=True, flag_value=666)
@click.option('-L', '--liveness', 'liveness_endpoint', type=str)
@click.option('-P', '--peering', 'peering_name', type=str, default=None, envvar='KOPF_RUN_PEERING')
@click.option('-p', '--priority', type=int, default=0)
@click.option('-P', '--peering', 'peering_name', type=str, envvar='KOPF_RUN_PEERING')
@click.option('-p', '--priority', type=int)
@click.option('-m', '--module', 'modules', multiple=True)
@click.argument('paths', nargs=-1)
@click.make_pass_decorator(CLIControls, ensure=True)
Expand All @@ -77,8 +77,8 @@ def run(
paths: List[str],
modules: List[str],
peering_name: Optional[str],
priority: int,
standalone: bool,
priority: Optional[int],
standalone: Optional[bool],
namespace: Optional[str],
liveness_endpoint: Optional[str],
) -> None:
Expand Down Expand Up @@ -108,7 +108,7 @@ def run(
@click.option('-n', '--namespace', default=None)
@click.option('-i', '--id', type=str, default=None)
@click.option('--dev', 'priority', flag_value=666)
@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_FREEZE_PEERING')
@click.option('-P', '--peering', 'peering_name', required=True, envvar='KOPF_FREEZE_PEERING')
@click.option('-p', '--priority', type=int, default=100, required=True)
@click.option('-t', '--lifetime', type=int, required=True)
@click.option('-m', '--message', type=str)
Expand All @@ -121,46 +121,49 @@ def freeze(
priority: int,
) -> None:
""" Freeze the resource handling in the cluster. """
ourserlves = peering.Peer(
id=id or peering.detect_own_id(manual=True),
name=peering_name,
namespace=namespace,
priority=priority,
lifetime=lifetime,
)
identity = peering.Identity(id) if id else peering.detect_own_id(manual=True)
registry = registries.SmartOperatorRegistry()
settings = configuration.OperatorSettings()
settings.peering.name = peering_name
settings.peering.priority = priority
vault = credentials.Vault()
auth.vault_var.set(vault)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait({
activities.authenticate(registry=registry, settings=settings, vault=vault),
ourserlves.keepalive(),
peering.touch(
identity=identity,
settings=settings,
namespace=namespace,
lifetime=lifetime,
),
}))


@main.command()
@logging_options
@click.option('-n', '--namespace', default=None)
@click.option('-i', '--id', type=str, default=None)
@click.option('-P', '--peering', 'peering_name', type=str, required=True, envvar='KOPF_RESUME_PEERING')
@click.option('-P', '--peering', 'peering_name', required=True, envvar='KOPF_RESUME_PEERING')
def resume(
id: Optional[str],
namespace: Optional[str],
peering_name: str,
) -> None:
""" Resume the resource handling in the cluster. """
ourselves = peering.Peer(
id=id or peering.detect_own_id(manual=True),
name=peering_name,
namespace=namespace,
)
identity = peering.Identity(id) if id else peering.detect_own_id(manual=True)
registry = registries.SmartOperatorRegistry()
settings = configuration.OperatorSettings()
settings.peering.name = peering_name
vault = credentials.Vault()
auth.vault_var.set(vault)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait({
activities.authenticate(registry=registry, settings=settings, vault=vault),
ourselves.disappear(),
peering.touch(
identity=identity,
settings=settings,
namespace=namespace,
lifetime=0,
),
}))
24 changes: 0 additions & 24 deletions kopf/clients/fetching.py
Expand Up @@ -15,30 +15,6 @@ class _UNSET(enum.Enum):
token = enum.auto()


@auth.reauthenticated_request
async def read_crd(
*,
resource: resources.Resource,
default: Union[_T, _UNSET] = _UNSET.token,
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> Union[bodies.RawBody, _T]:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

try:
response = await context.session.get(
url=CRD_CRD.get_url(server=context.server, name=resource.name),
)
await errors.check_response(response)
respdata = await response.json()
return cast(bodies.RawBody, respdata)

except aiohttp.ClientResponseError as e:
if e.status in [403, 404] and not isinstance(default, _UNSET):
return default
raise


@auth.reauthenticated_request
async def read_obj(
*,
Expand Down

0 comments on commit 8023ea4

Please sign in to comment.