forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
purge.py
69 lines (60 loc) · 2.52 KB
/
purge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""The ``celery purge`` program, used to delete messages from queues."""
import click
from celery.bin.base import (COMMA_SEPARATED_LIST, CeleryCommand,
CeleryOption, handle_preload_options)
from celery.utils import text
@click.command(cls=CeleryCommand)
@click.option('-f',
'--force',
cls=CeleryOption,
is_flag=True,
help_group='Purging Options',
help="Don't prompt for verification.")
@click.option('-Q',
'--queues',
cls=CeleryOption,
type=COMMA_SEPARATED_LIST,
help_group='Purging Options',
help="Comma separated list of queue names to purge.")
@click.option('-X',
'--exclude-queues',
cls=CeleryOption,
type=COMMA_SEPARATED_LIST,
help_group='Purging Options',
help="Comma separated list of queues names not to purge.")
@click.pass_context
@handle_preload_options
def purge(ctx, force, queues, exclude_queues):
"""Erase all messages from all known task queues.
Warning:
There's no undo operation for this command.
"""
app = ctx.obj.app
queues = set(queues or app.amqp.queues.keys())
exclude_queues = set(exclude_queues or [])
names = queues - exclude_queues
qnum = len(names)
if names:
queues_headline = text.pluralize(qnum, 'queue')
if not force:
queue_names = ', '.join(sorted(names))
click.confirm(f"{ctx.obj.style('WARNING', fg='red')}:"
"This will remove all tasks from "
f"{queues_headline}: {queue_names}.\n"
" There is no undo for this operation!\n\n"
"(to skip this prompt use the -f option)\n"
"Are you sure you want to delete all tasks?",
abort=True)
def _purge(conn, queue):
try:
return conn.default_channel.queue_purge(queue) or 0
except conn.channel_errors:
return 0
with app.connection_for_write() as conn:
messages = sum(_purge(conn, queue) for queue in names)
if messages:
messages_headline = text.pluralize(messages, 'message')
ctx.obj.echo(f"Purged {messages} {messages_headline} from "
f"{qnum} known task {queues_headline}.")
else:
ctx.obj.echo(f"No messages purged from {qnum} {queues_headline}.")