-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
events.py
159 lines (122 loc) · 5.06 KB
/
events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from __future__ import absolute_import
from __future__ import with_statement
import time
import shelve
import logging
import threading
import collections
from functools import partial
import celery
from pkg_resources import parse_version
from tornado.ioloop import PeriodicCallback
from tornado.ioloop import IOLoop
from celery.events import EventReceiver
from celery.events.state import State
from . import api
try:
from collections import Counter
except ImportError:
from .utils.backports.collections import Counter
from prometheus_client import Counter as PrometheusCounter, Histogram
logger = logging.getLogger(__name__)
class PrometheusMetrics(object):
events = PrometheusCounter('flower_events_total', "Number of events", ['worker', 'type', 'task'])
runtime = Histogram('flower_task_runtime_seconds', "Task runtime", ['worker', 'task'])
class EventsState(State):
# EventsState object is created and accessed only from ioloop thread
def __init__(self, *args, **kwargs):
super(EventsState, self).__init__(*args, **kwargs)
self.counter = collections.defaultdict(Counter)
self.metrics = PrometheusMetrics()
def event(self, event):
worker_name = event['hostname']
event_type = event['type']
self.counter[worker_name][event_type] += 1
if event_type.startswith('task-'):
task_id = event['uuid']
task_name = event.get('name', '')
if not task_name and task_id in self.tasks:
task_name = self.tasks[task_id].name or ''
self.metrics.events.labels(worker_name, event_type, task_name).inc()
runtime = event.get('runtime', 0)
if runtime:
self.metrics.runtime.labels(worker_name, task_name).observe(runtime)
# Send event to api subscribers (via websockets)
classname = api.events.getClassName(event_type)
cls = getattr(api.events, classname, None)
if cls:
cls.send_message(event)
# Save the event
super(EventsState, self).event(event)
class Events(threading.Thread):
events_enable_interval = 5000
def __init__(self, capp, db=None, persistent=False,
enable_events=True, io_loop=None, **kwargs):
threading.Thread.__init__(self)
self.daemon = True
self.io_loop = io_loop or IOLoop.instance()
self.capp = capp
self.db = db
self.persistent = persistent
self.enable_events = enable_events
self.state = None
if self.persistent and parse_version(celery.__version__) < parse_version("3.0.15"):
logger.warning('Persistent mode is available with '
'Celery 3.0.15 and later')
self.persistent = False
if self.persistent:
logger.debug("Loading state from '%s'...", self.db)
state = shelve.open(self.db)
if state:
self.state = state['events']
state.close()
if not self.state:
self.state = EventsState(**kwargs)
self.timer = PeriodicCallback(self.on_enable_events,
self.events_enable_interval)
def start(self):
threading.Thread.start(self)
# Celery versions prior to 2 don't support enable_events
if self.enable_events and celery.VERSION[0] > 2:
self.timer.start()
def stop(self):
if self.persistent:
logger.debug("Saving state to '%s'...", self.db)
state = shelve.open(self.db)
state['events'] = self.state
state.close()
def run(self):
try_interval = 1
while True:
try:
try_interval *= 2
with self.capp.connection() as conn:
recv = EventReceiver(conn,
handlers={"*": self.on_event},
app=self.capp)
try_interval = 1
logger.debug("Capturing events...")
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
try:
import _thread as thread
except ImportError:
import thread
thread.interrupt_main()
except Exception as e:
logger.error("Failed to capture events: '%s', "
"trying again in %s seconds.",
e, try_interval)
logger.debug(e, exc_info=True)
time.sleep(try_interval)
def on_enable_events(self):
# Periodically enable events for workers
# launched after flower
try:
logger.debug("Enabling events...")
self.capp.control.enable_events()
except Exception as e:
logger.debug("Failed to enable events: '%s'", e)
def on_event(self, event):
# Call EventsState.event in ioloop thread to avoid synchronization
self.io_loop.add_callback(partial(self.state.event, event))