-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.py
49 lines (37 loc) · 2.34 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from __future__ import absolute_import
from celery import current_app
from celery.events.state import state
import logging
class EventConsumer(object):
def __init__(self, logger, state=state):
self.logger = logger
self.app = current_app
self.state = state
self.connection = self.app.broker_connection()
self.receiver = self.app.events.Receiver(self.connection,
handlers={'task-failed': self.failed_tasks,
'task-succeeded' : self.succeeded_tasks,
'task-received' : self.received_tasks,
'task-started' : self.started_tasks,
'worker-heartbeat': self.workers_heartbeat,
'worker-offline': self.worker_down,
'worker-online': self.worker_up
})
def start(self):
self.receiver.capture(limit=None, timeout=None, wakeup=True)
def failed_tasks(self, event):
self.logger.info("Task FAILED: {0} {1} {2} {3}".format(event['uuid'], event['timestamp'], event['hostname'], event['exception']))
self.logger.debug("Failed task uuid & traceback: ".format(event['uuid'], event['traceback']))
def succeeded_tasks(self, event):
self.logger.info("Task SUCCEEDED: {0} {1} {2} {3}".format(event['uuid'], event['timestamp'], event['hostname'], event['runtime']))
def received_tasks(self, event):
self.logger.info("Task RECEIVED: {0} {1} {2} {3} {4}".format(event['uuid'], event['timestamp'], event['hostname'], event['name'], event['retries']))
def started_tasks(self, event):
self.logger.info("Task STARTED: {0} {1} {2}".format(event['uuid'], event['timestamp'], event['hostname']))
def worker_down(self, event):
self.logger.info("Worker DOWN: {0} {1}".format(event['hostname'], event['timestamp']))
def worker_up(self, event):
self.logger.info("Worker UP: {0}".format(event['hostname'], event['timestamp']))
def workers_heartbeat(self, event):
#Noisy bitch
self.logger.info("Worker HEARTBEAT: {0} {1} {2}".format(event['hostname'], event['timestamp'], event['freq']))