-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch_info_cache.py
382 lines (325 loc) · 14.3 KB
/
batch_info_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# -*- test-case-name: vumi_message_store.tests.test_batch_info_cache -*-
# -*- coding: utf-8 -*-
from calendar import timegm
from datetime import datetime
from twisted.internet.defer import returnValue
from vumi.persist.redis_base import Manager
from vumi.message import TransportEvent, VUMI_DATE_FORMAT
from vumi.errors import VumiError
def to_timestamp(timestamp):
"""
Return a timestamp value for a datetime value.
"""
if isinstance(timestamp, basestring):
timestamp = datetime.strptime(timestamp, VUMI_DATE_FORMAT)
# We can't use time.mktime(), because that takes local time and we have
# UTC. The UTC equivalent, for some obscure reason, is in the calendar
# module.
return timegm(timestamp.timetuple())
class BatchInfoCacheException(VumiError):
pass
class BatchInfoCache(object):
"""
Redis-based cache for assorted batch-related information that is expensive
to acquire from Riak but useful to have low-latency access to.
"""
BATCH_KEY = 'batches'
OUTBOUND_KEY = 'outbound'
OUTBOUND_COUNT_KEY = 'outbound_count'
INBOUND_KEY = 'inbound'
INBOUND_COUNT_KEY = 'inbound_count'
TO_ADDR_KEY = 'to_addr_hll'
FROM_ADDR_KEY = 'from_addr_hll'
EVENT_KEY = 'event'
EVENT_COUNT_KEY = 'event_count'
STATUS_KEY = 'status'
TRUNCATE_MESSAGE_KEY_ZSET_AT = 2000
def __init__(self, redis):
# Store redis as `manager` as well since @Manager.calls_manager
# requires it to be named as such.
self.redis = self.manager = redis
def key(self, *args):
return ':'.join([unicode(a) for a in args])
def batch_key(self, *args):
return self.key(self.BATCH_KEY, *args)
def outbound_key(self, batch_id):
return self.batch_key(self.OUTBOUND_KEY, batch_id)
def outbound_count_key(self, batch_id):
return self.batch_key(self.OUTBOUND_COUNT_KEY, batch_id)
def inbound_key(self, batch_id):
return self.batch_key(self.INBOUND_KEY, batch_id)
def inbound_count_key(self, batch_id):
return self.batch_key(self.INBOUND_COUNT_KEY, batch_id)
def to_addr_key(self, batch_id):
return self.batch_key(self.TO_ADDR_KEY, batch_id)
def from_addr_key(self, batch_id):
return self.batch_key(self.FROM_ADDR_KEY, batch_id)
def status_key(self, batch_id):
return self.batch_key(self.STATUS_KEY, batch_id)
def event_key(self, batch_id):
return self.batch_key(self.EVENT_KEY, batch_id)
def event_count_key(self, batch_id):
return self.batch_key(self.EVENT_COUNT_KEY, batch_id)
def obsolete_keys(self, batch_id):
"""
Return a list of obsolete keys that should be cleared.
"""
return [
self.batch_key("to_addr", batch_id),
self.batch_key("from_addr", batch_id),
]
@Manager.calls_manager
def _truncate_keys(self, redis_key, truncate_at):
truncate_at = (truncate_at or self.TRUNCATE_MESSAGE_KEY_ZSET_AT)
keys_removed = yield self.redis.zremrangebyrank(
redis_key, 0, -truncate_at - 1)
returnValue(keys_removed)
def truncate_inbound_message_keys(self, batch_id, truncate_at=None):
return self._truncate_keys(self.inbound_key(batch_id), truncate_at)
def truncate_outbound_message_keys(self, batch_id, truncate_at=None):
return self._truncate_keys(self.outbound_key(batch_id), truncate_at)
def truncate_event_keys(self, batch_id, truncate_at=None):
return self._truncate_keys(self.event_key(batch_id), truncate_at)
@Manager.calls_manager
def batch_start(self, batch_id):
"""
Create the counter keys and status hash for a batch and add the batch
identifier to the set of tracked batches.
A call to this isn't strictly necessary, but is good for general
housekeeping.
This operation idempotent.
"""
# TODO: Do we really want to keep a set full of batch identifiers?
yield self.redis.sadd(self.batch_key(), batch_id)
yield self.redis.set(self.inbound_count_key(batch_id), 0)
yield self.redis.set(self.outbound_count_key(batch_id), 0)
yield self.redis.set(self.event_count_key(batch_id), 0)
# If the status hash already exists and has any keys in it, this will
# not reset those keys to zero.
events = (TransportEvent.EVENT_TYPES.keys() +
['delivery_report.%s' % status
for status in TransportEvent.DELIVERY_STATUSES] +
['sent'])
for event in events:
yield self.redis.hsetnx(self.status_key(batch_id), event, 0)
def batch_exists(self, batch_id):
return self.redis.sismember(self.batch_key(), batch_id)
@Manager.calls_manager
def clear_batch(self, batch_id):
"""
Removes all cached values for the given batch_id, useful before a
reconciliation happens to ensure that we start from scratch.
NOTE: This will reset all counters back to zero and will increment
them as messages are received. If your UI depends on your
cached values your UI values might be off while the
reconciliation is taking place.
"""
for key in self.obsolete_keys(batch_id):
yield self.redis.delete(key)
yield self.redis.delete(self.inbound_key(batch_id))
yield self.redis.delete(self.inbound_count_key(batch_id))
yield self.redis.delete(self.outbound_key(batch_id))
yield self.redis.delete(self.outbound_count_key(batch_id))
yield self.redis.delete(self.event_key(batch_id))
yield self.redis.delete(self.event_count_key(batch_id))
yield self.redis.delete(self.status_key(batch_id))
yield self.redis.srem(self.batch_key(), batch_id)
@Manager.calls_manager
def add_inbound_message(self, batch_id, msg):
"""
Add an inbound message to the cache for the given batch_id.
"""
timestamp = to_timestamp(msg["timestamp"])
yield self.add_inbound_message_key(
batch_id, msg["message_id"], timestamp)
yield self.add_from_addr(batch_id, msg['from_addr'])
@Manager.calls_manager
def add_inbound_message_key(self, batch_id, message_key, timestamp):
"""
Add a message key, weighted with the timestamp to the batch_id.
"""
new_entry = yield self.redis.zadd(self.inbound_key(batch_id), **{
message_key.encode('utf-8'): timestamp,
})
if new_entry:
yield self.redis.incr(self.inbound_count_key(batch_id))
yield self.truncate_inbound_message_keys(batch_id)
def add_from_addr(self, batch_id, from_addr):
"""
Add a from address to the HyperLogLog counter for the batch.
"""
return self.redis.pfadd(
self.from_addr_key(batch_id), from_addr.encode('utf-8'))
@Manager.calls_manager
def add_outbound_message(self, batch_id, msg):
"""
Add an outbound message to the cache for the given batch_id.
"""
timestamp = to_timestamp(msg['timestamp'])
yield self.add_outbound_message_key(
batch_id, msg['message_id'], timestamp)
yield self.add_to_addr(batch_id, msg['to_addr'])
@Manager.calls_manager
def add_outbound_message_key(self, batch_id, message_key, timestamp):
"""
Add a message key, weighted with the timestamp to the batch_id.
"""
new_entry = yield self.redis.zadd(self.outbound_key(batch_id), **{
message_key.encode('utf-8'): timestamp,
})
if new_entry:
yield self.increment_event_status(batch_id, 'sent')
yield self.redis.incr(self.outbound_count_key(batch_id))
yield self.truncate_outbound_message_keys(batch_id)
def add_to_addr(self, batch_id, to_addr):
"""
Add a from address to the HyperLogLog counter for the batch.
"""
return self.redis.pfadd(
self.to_addr_key(batch_id), to_addr.encode('utf-8'))
@Manager.calls_manager
def add_event(self, batch_id, event):
"""
Add an event to the cache for the given batch_id
"""
event_id = event['event_id']
timestamp = to_timestamp(event['timestamp'])
event_type = event['event_type']
if event_type == 'delivery_report':
event_type = "%s.%s" % (event_type, event['delivery_status'])
yield self.add_event_key(batch_id, event_id, event_type, timestamp)
@Manager.calls_manager
def add_event_key(self, batch_id, event_key, event_type, timestamp):
"""
Add the event key to the set of known event keys. If the event is a
delivery report, event_type should include the delivery status.
"""
new_entry = yield self.redis.zadd(self.event_key(batch_id), **{
event_key.encode('utf-8'): timestamp,
})
if new_entry:
yield self.redis.incr(self.event_count_key(batch_id))
yield self.truncate_event_keys(batch_id)
yield self.increment_event_status(batch_id, event_type)
@Manager.calls_manager
def increment_event_status(self, batch_id, event_type, count=1):
"""
Increment the status for the given event_type for the given batch_id.
If the event is a delivery report, event_type should include the
delivery status.
"""
status_key = self.status_key(batch_id)
yield self.redis.hincrby(status_key, event_type, count)
if event_type.startswith("delivery_report."):
yield self.redis.hincrby(status_key, "delivery_report", count)
@Manager.calls_manager
def add_inbound_message_count(self, batch_id, count):
"""
Add a count to all inbound message counters. (Used for recon.)
"""
yield self.redis.incr(self.inbound_count_key(batch_id), count)
@Manager.calls_manager
def add_outbound_message_count(self, batch_id, count):
"""
Add a count to all outbound message counters. (Used for recon.)
"""
yield self.increment_event_status(batch_id, 'sent', count)
yield self.redis.incr(self.outbound_count_key(batch_id), count)
@Manager.calls_manager
def add_event_count(self, batch_id, status, count):
"""
Add a count to all relevant event counters. (Used for recon.)
"""
yield self.increment_event_status(batch_id, status, count)
yield self.redis.incr(self.event_count_key(batch_id), count)
@Manager.calls_manager
def get_batch_status(self, batch_id):
"""
Return a dictionary containing the latest event stats for the given
batch_id.
"""
stats = yield self.redis.hgetall(self.status_key(batch_id))
returnValue(dict([(k, int(v)) for k, v in stats.iteritems()]))
def list_inbound_message_keys(self, batch_id, with_timestamp=False):
"""
Return the list of recent inbound message keys in descending order by
timestamp.
"""
return self.redis.zrange(
self.inbound_key(batch_id), 0, -1, desc=True,
withscores=with_timestamp)
def list_outbound_message_keys(self, batch_id, with_timestamp=False):
"""
Return the list of recent outbound message keys in descending order by
timestamp.
"""
return self.redis.zrange(
self.outbound_key(batch_id), 0, -1, desc=True,
withscores=with_timestamp)
def list_event_keys(self, batch_id, with_timestamp=False):
"""
Return the list of recent event keys in descending order by timestamp.
"""
return self.redis.zrange(
self.event_key(batch_id), 0, -1, desc=True,
withscores=with_timestamp)
@Manager.calls_manager
def _get_counter_value(self, counter_key):
count = yield self.redis.get(counter_key)
returnValue(0 if count is None else int(count))
def get_inbound_message_count(self, batch_id):
"""
Return the count of inbound messages.
"""
return self._get_counter_value(self.inbound_count_key(batch_id))
def get_outbound_message_count(self, batch_id):
"""
Return the count of outbound messages.
"""
return self._get_counter_value(self.outbound_count_key(batch_id))
def get_event_count(self, batch_id):
"""
Return the count of events.
"""
return self._get_counter_value(self.event_count_key(batch_id))
def get_from_addr_count(self, batch_id):
"""
Return the count of from addresses.
"""
return self.redis.pfcount(self.from_addr_key(batch_id))
def get_to_addr_count(self, batch_id):
"""
Return the count of to addresses.
"""
return self.redis.pfcount(self.to_addr_key(batch_id))
@Manager.calls_manager
def rebuild_cache(self, batch_id, qms, page_size=None):
"""
Rebuild the cache using the provided IQueryMessageStore implementation.
"""
# TODO: Make this less naive.
yield self.clear_batch(batch_id)
yield self.batch_start(batch_id)
inbound_page = yield qms.list_batch_inbound_keys_with_addresses(
batch_id, max_results=page_size)
while inbound_page is not None:
for key, timestamp, from_addr in inbound_page:
yield self.add_inbound_message_key(
batch_id, key, to_timestamp(timestamp))
yield self.add_from_addr(batch_id, from_addr)
inbound_page = yield inbound_page.next_page()
outbound_page = yield qms.list_batch_outbound_keys_with_addresses(
batch_id, max_results=page_size)
while outbound_page is not None:
for key, timestamp, to_addr in outbound_page:
yield self.add_outbound_message_key(
batch_id, key, to_timestamp(timestamp))
yield self.add_to_addr(batch_id, to_addr)
event_page = yield qms.list_message_event_keys_with_statuses(
key)
while event_page is not None:
for ekey, etimestamp, status in event_page:
yield self.add_event_key(
batch_id, ekey, status, to_timestamp(etimestamp))
event_page = yield event_page.next_page()
outbound_page = yield outbound_page.next_page()