forked from pgq/skytools-legacy
-
Notifications
You must be signed in to change notification settings - Fork 17
/
consumer.py
132 lines (93 loc) · 3.75 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""PgQ consumer framework for Python.
"""
from pgq.baseconsumer import BaseConsumer, BaseBatchWalker
from pgq.event import Event
__all__ = ['Consumer']
# Event status codes
EV_UNTAGGED = -1
EV_RETRY = 0
EV_DONE = 1
class RetriableEvent(Event):
"""Event which can be retried
Consumer is supposed to tag them after processing.
"""
__slots__ = ('_status', )
def __init__(self, queue_name, row):
super(RetriableEvent, self).__init__(queue_name, row)
self._status = EV_DONE
def tag_done(self):
self._status = EV_DONE
def get_status(self):
return self._status
def tag_retry(self, retry_time = 60):
self._status = EV_RETRY
self.retry_time = retry_time
class RetriableWalkerEvent(RetriableEvent):
"""Redirects status flags to RetriableBatchWalker.
That way event data can be gc'd immediately and
tag_done() events don't need to be remembered.
"""
__slots__ = ('_walker', )
def __init__(self, walker, queue_name, row):
super(RetriableWalkerEvent, self).__init__(queue_name, row)
self._walker = walker
def tag_done(self):
self._walker.tag_event_done(self)
def get_status(self):
self._walker.get_status(self)
def tag_retry(self, retry_time = 60):
self._walker.tag_event_retry(self, retry_time)
class RetriableBatchWalker(BaseBatchWalker):
"""BatchWalker that returns RetriableEvents
"""
def __init__(self, curs, batch_id, queue_name, fetch_size = 300, consumer_filter = None):
super(RetriableBatchWalker, self).__init__(curs, batch_id, queue_name, fetch_size, consumer_filter)
self.status_map = {}
def _make_event(self, queue_name, row):
return RetriableWalkerEvent(self, queue_name, row)
def tag_event_done(self, event):
if event.id in self.status_map:
del self.status_map[event.id]
def tag_event_retry(self, event, retry_time):
self.status_map[event.id] = (EV_RETRY, retry_time)
def get_status(self, event):
return self.status_map.get(event.id, (EV_DONE, 0))[0]
def iter_status(self):
for res in self.status_map.iteritems():
yield res
class Consumer(BaseConsumer):
"""Normal consumer base class.
Can retry events
"""
_batch_walker_class = RetriableBatchWalker
def _make_event(self, queue_name, row):
return RetriableEvent(queue_name, row)
def _flush_retry(self, curs, batch_id, list):
"""Tag retry events."""
retry = 0
if self.pgq_lazy_fetch:
for ev_id, stat in list.iter_status():
if stat[0] == EV_RETRY:
self._tag_retry(curs, batch_id, ev_id, stat[1])
retry += 1
elif stat[0] != EV_DONE:
raise Exception("Untagged event: id=%d" % ev_id)
else:
for ev in list:
if ev._status == EV_RETRY:
self._tag_retry(curs, batch_id, ev.id, ev.retry_time)
retry += 1
elif ev._status != EV_DONE:
raise Exception("Untagged event: (id=%d, type=%s, data=%s, ex1=%s" % (
ev.id, ev.type, ev.data, ev.extra1))
# report weird events
if retry:
self.stat_increase('retry-events', retry)
def _finish_batch(self, curs, batch_id, list):
"""Tag events and notify that the batch is done."""
self._flush_retry(curs, batch_id, list)
super(Consumer, self)._finish_batch(curs, batch_id, list)
def _tag_retry(self, cx, batch_id, ev_id, retry_time):
"""Tag event for retry. (internal)"""
cx.execute("select pgq.event_retry(%s, %s, %s)",
[batch_id, ev_id, retry_time])