forked from celery/celery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
logtool.py
156 lines (124 loc) · 4.14 KB
/
logtool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""The ``celery logtool`` command."""
import re
from collections import Counter
from fileinput import FileInput
import click
from celery.bin.base import CeleryCommand, handle_preload_options
__all__ = ('logtool',)
RE_LOG_START = re.compile(r'^\[\d\d\d\d\-\d\d-\d\d ')
RE_TASK_RECEIVED = re.compile(r'.+?\] Received')
RE_TASK_READY = re.compile(r'.+?\] Task')
RE_TASK_INFO = re.compile(r'.+?([\w\.]+)\[(.+?)\].+')
RE_TASK_RESULT = re.compile(r'.+?[\w\.]+\[.+?\] (.+)')
REPORT_FORMAT = """
Report
======
Task total: {task[total]}
Task errors: {task[errors]}
Task success: {task[succeeded]}
Task completed: {task[completed]}
Tasks
=====
{task[types].format}
"""
class _task_counts(list):
@property
def format(self):
return '\n'.join('{}: {}'.format(*i) for i in self)
def task_info(line):
m = RE_TASK_INFO.match(line)
return m.groups()
class Audit:
def __init__(self, on_task_error=None, on_trace=None, on_debug=None):
self.ids = set()
self.names = {}
self.results = {}
self.ready = set()
self.task_types = Counter()
self.task_errors = 0
self.on_task_error = on_task_error
self.on_trace = on_trace
self.on_debug = on_debug
self.prev_line = None
def run(self, files):
for line in FileInput(files):
self.feed(line)
return self
def task_received(self, line, task_name, task_id):
self.names[task_id] = task_name
self.ids.add(task_id)
self.task_types[task_name] += 1
def task_ready(self, line, task_name, task_id, result):
self.ready.add(task_id)
self.results[task_id] = result
if 'succeeded' not in result:
self.task_error(line, task_name, task_id, result)
def task_error(self, line, task_name, task_id, result):
self.task_errors += 1
if self.on_task_error:
self.on_task_error(line, task_name, task_id, result)
def feed(self, line):
if RE_LOG_START.match(line):
if RE_TASK_RECEIVED.match(line):
task_name, task_id = task_info(line)
self.task_received(line, task_name, task_id)
elif RE_TASK_READY.match(line):
task_name, task_id = task_info(line)
result = RE_TASK_RESULT.match(line)
if result:
result, = result.groups()
self.task_ready(line, task_name, task_id, result)
else:
if self.on_debug:
self.on_debug(line)
self.prev_line = line
else:
if self.on_trace:
self.on_trace('\n'.join(filter(None, [self.prev_line, line])))
self.prev_line = None
def incomplete_tasks(self):
return self.ids ^ self.ready
def report(self):
return {
'task': {
'types': _task_counts(self.task_types.most_common()),
'total': len(self.ids),
'errors': self.task_errors,
'completed': len(self.ready),
'succeeded': len(self.ready) - self.task_errors,
}
}
@click.group()
@handle_preload_options
def logtool():
"""The ``celery logtool`` command."""
@logtool.command(cls=CeleryCommand)
@click.argument('files', nargs=-1)
@click.pass_context
def stats(ctx, files):
ctx.obj.echo(REPORT_FORMAT.format(
**Audit().run(files).report()
))
@logtool.command(cls=CeleryCommand)
@click.argument('files', nargs=-1)
@click.pass_context
def traces(ctx, files):
Audit(on_trace=ctx.obj.echo).run(files)
@logtool.command(cls=CeleryCommand)
@click.argument('files', nargs=-1)
@click.pass_context
def errors(ctx, files):
Audit(on_task_error=lambda line, *_: ctx.obj.echo(line)).run(files)
@logtool.command(cls=CeleryCommand)
@click.argument('files', nargs=-1)
@click.pass_context
def incomplete(ctx, files):
audit = Audit()
audit.run(files)
for task_id in audit.incomplete_tasks():
ctx.obj.echo(f'Did not complete: {task_id}')
@logtool.command(cls=CeleryCommand)
@click.argument('files', nargs=-1)
@click.pass_context
def debug(ctx, files):
Audit(on_debug=ctx.obj.echo).run(files)