Skip to content

Commit

Permalink
Track unique address counts using HyperLogLog.
Browse files Browse the repository at this point in the history
  • Loading branch information
jerith committed Mar 26, 2015
1 parent 3e579b3 commit bf039a6
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 7 deletions.
32 changes: 25 additions & 7 deletions vumi_message_store/batch_info_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,17 @@ def clear_batch(self, batch_id):
@Manager.calls_manager
def add_inbound_message(self, batch_id, msg):
"""
Add an inbound message to the cache for the given batch_id
Add an inbound message to the cache for the given batch_id.
"""
timestamp = to_timestamp(msg["timestamp"])
yield self.add_inbound_message_key(
batch_id, msg["message_id"], timestamp)
yield self.add_from_addr(batch_id, msg['from_addr'])

@Manager.calls_manager
def add_inbound_message_key(self, batch_id, message_key, timestamp):
"""
Add a message key, weighted with the timestamp to the batch_id
Add a message key, weighted with the timestamp to the batch_id.
"""
new_entry = yield self.redis.zadd(self.inbound_key(batch_id), **{
message_key.encode('utf-8'): timestamp,
Expand All @@ -178,14 +179,22 @@ def add_inbound_message_key(self, batch_id, message_key, timestamp):
yield self.redis.incr(self.inbound_count_key(batch_id))
yield self.truncate_inbound_message_keys(batch_id)

def add_from_addr(self, batch_id, from_addr):
"""
Add a from address to the HyperLogLog counter for the batch.
"""
return self.redis.pfadd(
self.from_addr_key(batch_id), from_addr.encode('utf-8'))

@Manager.calls_manager
def add_outbound_message(self, batch_id, msg):
"""
Add an outbound message to the cache for the given batch_id
Add an outbound message to the cache for the given batch_id.
"""
timestamp = to_timestamp(msg['timestamp'])
yield self.add_outbound_message_key(
batch_id, msg['message_id'], timestamp)
yield self.add_to_addr(batch_id, msg['to_addr'])

@Manager.calls_manager
def add_outbound_message_key(self, batch_id, message_key, timestamp):
Expand All @@ -200,6 +209,13 @@ def add_outbound_message_key(self, batch_id, message_key, timestamp):
yield self.redis.incr(self.outbound_count_key(batch_id))
yield self.truncate_outbound_message_keys(batch_id)

def add_to_addr(self, batch_id, to_addr):
"""
Add a from address to the HyperLogLog counter for the batch.
"""
return self.redis.pfadd(
self.to_addr_key(batch_id), to_addr.encode('utf-8'))

@Manager.calls_manager
def add_event(self, batch_id, event):
"""
Expand Down Expand Up @@ -328,20 +344,22 @@ def rebuild_cache(self, batch_id, qms, page_size=None):
yield self.clear_batch(batch_id)
yield self.batch_start(batch_id)

inbound_page = yield qms.list_batch_inbound_keys_with_timestamps(
inbound_page = yield qms.list_batch_inbound_keys_with_addresses(
batch_id, max_results=page_size)
while inbound_page is not None:
for key, timestamp in inbound_page:
for key, timestamp, from_addr in inbound_page:
yield self.add_inbound_message_key(
batch_id, key, to_timestamp(timestamp))
yield self.add_from_addr(batch_id, from_addr)
inbound_page = yield inbound_page.next_page()

outbound_page = yield qms.list_batch_outbound_keys_with_timestamps(
outbound_page = yield qms.list_batch_outbound_keys_with_addresses(
batch_id, max_results=page_size)
while outbound_page is not None:
for key, timestamp in outbound_page:
for key, timestamp, to_addr in outbound_page:
yield self.add_outbound_message_key(
batch_id, key, to_timestamp(timestamp))
yield self.add_to_addr(batch_id, to_addr)
event_page = yield qms.list_message_event_keys_with_statuses(
key)
while event_page is not None:
Expand Down
75 changes: 75 additions & 0 deletions vumi_message_store/tests/test_batch_info_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def assert_redis_zset(self, key, expected_value):
key, start=0, stop=-1, desc=False, withscores=True)
self.assertEqual(expected_value, value)

@inlineCallbacks
def assert_redis_pfcount(self, key, expected_value):
value = yield self.redis.pfcount(key)
self.assertEqual(expected_value, value)

@inlineCallbacks
def test_batch_start(self):
"""
Expand Down Expand Up @@ -157,6 +162,7 @@ def test_add_inbound_message(self):
"batches:outbound_count:mybatch",
"batches:event_count:mybatch",
"batches:status:mybatch",
"batches:from_addr_hll:mybatch",
])

timestamp = to_timestamp(msg["timestamp"])
Expand All @@ -174,6 +180,7 @@ def test_add_inbound_message(self):
"delivery_report.failed": "0",
"delivery_report.pending": "0",
})
yield self.assert_redis_pfcount("batches:from_addr_hll:mybatch", 1)

@inlineCallbacks
def test_add_inbound_message_key(self):
Expand Down Expand Up @@ -264,6 +271,35 @@ def test_add_inbound_message_key_truncates_zset(self):
yield self.assert_redis_zset("batches:inbound:batch", msgs[2:5])
yield self.assert_redis_string("batches:inbound_count:batch", "5")

@inlineCallbacks
def test_add_from_addrs(self):
"""
Adding a from_addr updates the HyperLogLog counter for the batch.
"""
yield self.batch_info_cache.batch_start("mybatch")
incr = yield self.batch_info_cache.add_from_addr("mybatch", "from-1")
self.assertEqual(incr, 1)

yield self.assert_redis_keys([
"batches",
"batches:inbound_count:mybatch",
"batches:outbound_count:mybatch",
"batches:event_count:mybatch",
"batches:status:mybatch",
"batches:from_addr_hll:mybatch",
])
yield self.assert_redis_pfcount("batches:from_addr_hll:mybatch", 1)

# Adding a second address updates the counter.
incr = yield self.batch_info_cache.add_from_addr("mybatch", "from-2")
self.assertEqual(incr, 1)
yield self.assert_redis_pfcount("batches:from_addr_hll:mybatch", 2)

# Adding a previously-added address doesn't update the counter.
incr = yield self.batch_info_cache.add_from_addr("mybatch", "from-1")
self.assertEqual(incr, 0)
yield self.assert_redis_pfcount("batches:from_addr_hll:mybatch", 2)

@inlineCallbacks
def test_add_outbound_message(self):
"""
Expand All @@ -281,6 +317,7 @@ def test_add_outbound_message(self):
"batches:outbound_count:mybatch",
"batches:event_count:mybatch",
"batches:status:mybatch",
"batches:to_addr_hll:mybatch",
])

timestamp = to_timestamp(msg["timestamp"])
Expand All @@ -298,6 +335,7 @@ def test_add_outbound_message(self):
"delivery_report.failed": "0",
"delivery_report.pending": "0",
})
yield self.assert_redis_pfcount("batches:to_addr_hll:mybatch", 1)

@inlineCallbacks
def test_add_outbound_message_key(self):
Expand Down Expand Up @@ -388,6 +426,35 @@ def test_add_outbound_message_key_truncates_zset(self):
yield self.assert_redis_zset("batches:outbound:batch", msgs[2:5])
yield self.assert_redis_string("batches:outbound_count:batch", "5")

@inlineCallbacks
def test_add_to_addrs(self):
"""
Adding a to_addr updates the HyperLogLog counter for the batch.
"""
yield self.batch_info_cache.batch_start("mybatch")
incr = yield self.batch_info_cache.add_to_addr("mybatch", "from-1")
self.assertEqual(incr, 1)

yield self.assert_redis_keys([
"batches",
"batches:inbound_count:mybatch",
"batches:outbound_count:mybatch",
"batches:event_count:mybatch",
"batches:status:mybatch",
"batches:to_addr_hll:mybatch",
])
yield self.assert_redis_pfcount("batches:to_addr_hll:mybatch", 1)

# Adding a second address updates the counter.
incr = yield self.batch_info_cache.add_to_addr("mybatch", "from-2")
self.assertEqual(incr, 1)
yield self.assert_redis_pfcount("batches:to_addr_hll:mybatch", 2)

# Adding a previously-added address doesn't update the counter.
incr = yield self.batch_info_cache.add_to_addr("mybatch", "from-1")
self.assertEqual(incr, 0)
yield self.assert_redis_pfcount("batches:to_addr_hll:mybatch", 2)

@inlineCallbacks
def test_add_event_ack(self):
"""
Expand Down Expand Up @@ -990,6 +1057,8 @@ def test_rebuild_cache(self):
"batches:outbound_count:mybatch",
"batches:event_count:mybatch",
"batches:status:mybatch",
"batches:to_addr_hll:mybatch",
"batches:from_addr_hll:mybatch",
])
yield self.assert_redis_set("batches", ["mybatch"])
yield self.assert_redis_string("batches:inbound_count:mybatch", "5")
Expand All @@ -1007,6 +1076,8 @@ def test_rebuild_cache(self):
yield self.assert_redis_zset("batches:inbound:mybatch", inbound_keys)
yield self.assert_redis_zset("batches:outbound:mybatch", outbound_keys)
yield self.assert_redis_zset("batches:event:mybatch", event_keys)
yield self.assert_redis_pfcount("batches:to_addr_hll:mybatch", 1)
yield self.assert_redis_pfcount("batches:from_addr_hll:mybatch", 1)

@inlineCallbacks
def test_rebuild_cache_uncached_batch(self):
Expand Down Expand Up @@ -1064,6 +1135,8 @@ def test_rebuild_cache_uncached_batch(self):
"batches:outbound_count:mybatch",
"batches:event_count:mybatch",
"batches:status:mybatch",
"batches:to_addr_hll:mybatch",
"batches:from_addr_hll:mybatch",
])
yield self.assert_redis_set("batches", ["mybatch"])
yield self.assert_redis_string("batches:inbound_count:mybatch", "5")
Expand All @@ -1081,6 +1154,8 @@ def test_rebuild_cache_uncached_batch(self):
yield self.assert_redis_zset("batches:inbound:mybatch", inbound_keys)
yield self.assert_redis_zset("batches:outbound:mybatch", outbound_keys)
yield self.assert_redis_zset("batches:event:mybatch", event_keys)
yield self.assert_redis_pfcount("batches:to_addr_hll:mybatch", 1)
yield self.assert_redis_pfcount("batches:from_addr_hll:mybatch", 1)

@inlineCallbacks
def test_rebuild_cache_missing_batch(self):
Expand Down

0 comments on commit bf039a6

Please sign in to comment.