Skip to content

Commit

Permalink
Reintroduce support for custom preload options (celery#6516)
Browse files Browse the repository at this point in the history
* Restore preload options.

Fixes celery#6307.

* Document breaking changes for preload options in 5.0.

Fixes celery#6379.
  • Loading branch information
thedrow authored and jeyrce committed Aug 25, 2021
1 parent 942204d commit a490523
Show file tree
Hide file tree
Showing 18 changed files with 74 additions and 28 deletions.
3 changes: 3 additions & 0 deletions celery/bin/amqp.py
Expand Up @@ -8,6 +8,8 @@

__all__ = ('amqp',)

from celery.bin.base import handle_preload_options


def dump_message(message):
if message is None:
Expand Down Expand Up @@ -54,6 +56,7 @@ def reconnect(self):

@click.group(invoke_without_command=True)
@click.pass_context
@handle_preload_options
def amqp(ctx):
"""AMQP Administration Shell.
Expand Down
21 changes: 21 additions & 0 deletions celery/bin/base.py
@@ -1,13 +1,15 @@
"""Click customizations for Celery."""
import json
from collections import OrderedDict
from functools import update_wrapper
from pprint import pformat

import click
from click import ParamType
from kombu.utils.objects import cached_property

from celery._state import get_current_app
from celery.signals import user_preload_options
from celery.utils import text
from celery.utils.log import mlevel
from celery.utils.time import maybe_iso8601
Expand Down Expand Up @@ -113,6 +115,25 @@ def say_chat(self, direction, title, body='', show_body=False):
self.echo(body)


def handle_preload_options(f):
def caller(ctx, *args, **kwargs):
app = ctx.obj.app

preload_options = [o.name for o in app.user_options.get('preload', [])]

if preload_options:
user_options = {
preload_option: kwargs[preload_option]
for preload_option in preload_options
}

user_preload_options.send(sender=f, app=app, options=user_options)

return f(ctx, *args, **kwargs)

return update_wrapper(caller, f)


class CeleryOption(click.Option):
"""Customized option for Celery."""

Expand Down
4 changes: 3 additions & 1 deletion celery/bin/beat.py
Expand Up @@ -3,7 +3,8 @@

import click

from celery.bin.base import LOG_LEVEL, CeleryDaemonCommand, CeleryOption
from celery.bin.base import (LOG_LEVEL, CeleryDaemonCommand, CeleryOption,
handle_preload_options)
from celery.platforms import detached, maybe_drop_privileges


Expand Down Expand Up @@ -43,6 +44,7 @@
help_group="Beat Options",
help="Logging level.")
@click.pass_context
@handle_preload_options
def beat(ctx, detach=False, logfile=None, pidfile=None, uid=None,
gid=None, umask=None, workdir=None, **kwargs):
"""Start the beat periodic task scheduler."""
Expand Down
5 changes: 3 additions & 2 deletions celery/bin/call.py
Expand Up @@ -2,9 +2,10 @@
import click

from celery.bin.base import (ISO8601, ISO8601_OR_FLOAT, JSON, CeleryCommand,
CeleryOption)
CeleryOption, handle_preload_options)


@click.command(cls=CeleryCommand)
@click.argument('name')
@click.option('-a',
'--args',
Expand Down Expand Up @@ -52,8 +53,8 @@
cls=CeleryOption,
help_group="Routing Options",
help="custom routing key.")
@click.command(cls=CeleryCommand)
@click.pass_context
@handle_preload_options
def call(ctx, name, args, kwargs, eta, countdown, expires, serializer, queue, exchange, routing_key):
"""Call a task by name."""
task_id = ctx.obj.app.send_task(
Expand Down
3 changes: 3 additions & 0 deletions celery/bin/celery.py
Expand Up @@ -145,6 +145,9 @@ def celery(ctx, app, broker, result_backend, loader, config, workdir,
beat.params.extend(ctx.obj.app.user_options.get('beat', []))
events.params.extend(ctx.obj.app.user_options.get('events', []))

for command in celery.commands.values():
command.params.extend(ctx.obj.app.user_options.get('preload', []))


@celery.command(cls=CeleryCommand)
@click.pass_context
Expand Down
6 changes: 5 additions & 1 deletion celery/bin/control.py
Expand Up @@ -4,7 +4,8 @@
import click
from kombu.utils.json import dumps

from celery.bin.base import COMMA_SEPARATED_LIST, CeleryCommand, CeleryOption
from celery.bin.base import (COMMA_SEPARATED_LIST, CeleryCommand,
CeleryOption, handle_preload_options)
from celery.platforms import EX_UNAVAILABLE
from celery.utils import text
from celery.worker.control import Panel
Expand Down Expand Up @@ -71,6 +72,7 @@ def _compile_arguments(action, args):
help_group='Remote Control Options',
help='Use json as output format.')
@click.pass_context
@handle_preload_options
def status(ctx, timeout, destination, json, **kwargs):
"""Show list of workers that are online."""
callback = None if json else partial(_say_remote_command_reply, ctx)
Expand Down Expand Up @@ -115,6 +117,7 @@ def status(ctx, timeout, destination, json, **kwargs):
help_group='Remote Control Options',
help='Use json as output format.')
@click.pass_context
@handle_preload_options
def inspect(ctx, action, timeout, destination, json, **kwargs):
"""Inspect the worker at runtime.
Expand Down Expand Up @@ -164,6 +167,7 @@ def inspect(ctx, action, timeout, destination, json, **kwargs):
help_group='Remote Control Options',
help='Use json as output format.')
@click.pass_context
@handle_preload_options
def control(ctx, action, timeout, destination, json):
"""Workers remote control.
Expand Down
5 changes: 4 additions & 1 deletion celery/bin/events.py
Expand Up @@ -4,7 +4,8 @@

import click

from celery.bin.base import LOG_LEVEL, CeleryDaemonCommand, CeleryOption
from celery.bin.base import (LOG_LEVEL, CeleryDaemonCommand, CeleryOption,
handle_preload_options)
from celery.platforms import detached, set_process_title, strargv


Expand Down Expand Up @@ -47,6 +48,7 @@ def _run_evtop(app):
raise click.UsageError("The curses module is required for this command.")


@handle_preload_options
@click.command(cls=CeleryDaemonCommand)
@click.option('-d',
'--dump',
Expand Down Expand Up @@ -78,6 +80,7 @@ def _run_evtop(app):
help_group="Snapshot",
help="Logging level.")
@click.pass_context
@handle_preload_options
def events(ctx, dump, camera, detach, frequency, maxrate, loglevel, **kwargs):
"""Event-stream utilities."""
app = ctx.obj.app
Expand Down
3 changes: 2 additions & 1 deletion celery/bin/graph.py
Expand Up @@ -4,11 +4,12 @@

import click

from celery.bin.base import CeleryCommand
from celery.bin.base import CeleryCommand, handle_preload_options
from celery.utils.graph import DependencyGraph, GraphFormatter


@click.group()
@handle_preload_options
def graph():
"""The ``celery graph`` command."""

Expand Down
3 changes: 2 additions & 1 deletion celery/bin/list.py
@@ -1,10 +1,11 @@
"""The ``celery list bindings`` command, used to inspect queue bindings."""
import click

from celery.bin.base import CeleryCommand
from celery.bin.base import CeleryCommand, handle_preload_options


@click.group(name="list")
@handle_preload_options
def list_():
"""Get info from broker.
Expand Down
3 changes: 2 additions & 1 deletion celery/bin/logtool.py
Expand Up @@ -5,7 +5,7 @@

import click

from celery.bin.base import CeleryCommand
from celery.bin.base import CeleryCommand, handle_preload_options

__all__ = ('logtool',)

Expand Down Expand Up @@ -111,6 +111,7 @@ def report(self):


@click.group()
@handle_preload_options
def logtool():
"""The ``celery logtool`` command."""

Expand Down
4 changes: 3 additions & 1 deletion celery/bin/migrate.py
Expand Up @@ -2,7 +2,8 @@
import click
from kombu import Connection

from celery.bin.base import CeleryCommand, CeleryOption
from celery.bin.base import (CeleryCommand, CeleryOption,
handle_preload_options)
from celery.contrib.migrate import migrate_tasks


Expand Down Expand Up @@ -44,6 +45,7 @@
help_group='Migration Options',
help='Continually migrate tasks until killed.')
@click.pass_context
@handle_preload_options
def migrate(ctx, source, destination, **kwargs):
"""Migrate tasks from one broker to another.
Expand Down
3 changes: 2 additions & 1 deletion celery/bin/multi.py
Expand Up @@ -108,7 +108,7 @@

from celery import VERSION_BANNER
from celery.apps.multi import Cluster, MultiParser, NamespacedOptionParser
from celery.bin.base import CeleryCommand
from celery.bin.base import CeleryCommand, handle_preload_options
from celery.platforms import EX_FAILURE, EX_OK, signals
from celery.utils import term
from celery.utils.text import pluralize
Expand Down Expand Up @@ -468,6 +468,7 @@ def DOWN(self):
}
)
@click.pass_context
@handle_preload_options
def multi(ctx):
"""Start multiple worker instances."""
cmd = MultiTool(quiet=ctx.obj.quiet, no_color=ctx.obj.no_color)
Expand Down
4 changes: 3 additions & 1 deletion celery/bin/purge.py
@@ -1,7 +1,8 @@
"""The ``celery purge`` program, used to delete messages from queues."""
import click

from celery.bin.base import COMMA_SEPARATED_LIST, CeleryCommand, CeleryOption
from celery.bin.base import (COMMA_SEPARATED_LIST, CeleryCommand,
CeleryOption, handle_preload_options)
from celery.utils import text


Expand All @@ -25,6 +26,7 @@
help_group='Purging Options',
help="Comma separated list of queues names not to purge.")
@click.pass_context
@handle_preload_options
def purge(ctx, force, queues, exclude_queues):
"""Erase all messages from all known task queues.
Expand Down
4 changes: 3 additions & 1 deletion celery/bin/result.py
@@ -1,7 +1,8 @@
"""The ``celery result`` program, used to inspect task results."""
import click

from celery.bin.base import CeleryCommand, CeleryOption
from celery.bin.base import (CeleryCommand, CeleryOption,
handle_preload_options)


@click.command(cls=CeleryCommand)
Expand All @@ -17,6 +18,7 @@
help_group='Result Options',
help="Show traceback instead.")
@click.pass_context
@handle_preload_options
def result(ctx, task_id, task, traceback):
"""Print the return value for a given task id."""
app = ctx.obj.app
Expand Down
4 changes: 3 additions & 1 deletion celery/bin/shell.py
Expand Up @@ -6,7 +6,8 @@

import click

from celery.bin.base import CeleryCommand, CeleryOption
from celery.bin.base import (CeleryCommand, CeleryOption,
handle_preload_options)


def _invoke_fallback_shell(locals):
Expand Down Expand Up @@ -114,6 +115,7 @@ def _invoke_default_shell(locals):
help_group="Shell Options",
help="Use gevent.")
@click.pass_context
@handle_preload_options
def shell(ctx, ipython=False, bpython=False,
python=False, without_tasks=False, eventlet=False,
gevent=False):
Expand Down
4 changes: 3 additions & 1 deletion celery/bin/upgrade.py
Expand Up @@ -5,11 +5,13 @@
import click

from celery.app import defaults
from celery.bin.base import CeleryCommand, CeleryOption
from celery.bin.base import (CeleryCommand, CeleryOption,
handle_preload_options)
from celery.utils.functional import pass1


@click.group()
@handle_preload_options
def upgrade():
"""Perform upgrade between versions."""

Expand Down
4 changes: 3 additions & 1 deletion celery/bin/worker.py
Expand Up @@ -9,7 +9,8 @@

from celery import concurrency
from celery.bin.base import (COMMA_SEPARATED_LIST, LOG_LEVEL,
CeleryDaemonCommand, CeleryOption)
CeleryDaemonCommand, CeleryOption,
handle_preload_options)
from celery.platforms import (EX_FAILURE, EX_OK, detached,
maybe_drop_privileges)
from celery.utils.log import get_logger
Expand Down Expand Up @@ -273,6 +274,7 @@ def detach(path, argv, logfile=None, pidfile=None, uid=None,
cls=CeleryOption,
help_group="Embedded Beat Options")
@click.pass_context
@handle_preload_options
def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
loglevel=None, logfile=None, pidfile=None, statedb=None,
**kwargs):
Expand Down
19 changes: 6 additions & 13 deletions docs/userguide/extending.rst
Expand Up @@ -769,29 +769,22 @@ Preload options
~~~~~~~~~~~~~~~

The :program:`celery` umbrella command supports the concept of 'preload
options'. These are special options passed to all sub-commands and parsed
outside of the main parsing step.
options'. These are special options passed to all sub-commands.

The list of default preload options can be found in the API reference:
:mod:`celery.bin.base`.

You can add new preload options too, for example to specify a configuration
You can add new preload options, for example to specify a configuration
template:

.. code-block:: python
from celery import Celery
from celery import signals
from celery.bin import Option
from click import Option
app = Celery()
def add_preload_options(parser):
parser.add_argument(
'-Z', '--template', default='default',
help='Configuration template to use.',
)
app.user_options['preload'].add(add_preload_options)
app.user_options['preload'].add(Option(('-Z', '--template'),
default='default',
help='Configuration template to use.'))
@signals.user_preload_options.connect
def on_preload_parsed(options, **kwargs):
Expand Down

0 comments on commit a490523

Please sign in to comment.