Skip to content

Commit

Permalink
Make batcher size/time limit configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Javier Collado committed Nov 28, 2016
1 parent ca73b6f commit bc42e43
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 38 deletions.
21 changes: 12 additions & 9 deletions rabbithole/batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ class Batcher(object):
"""

SIZE_LIMIT = 5
TIME_LIMIT = 15
DEFAULT_SIZE_LIMIT = 5
DEFAULT_TIME_LIMIT = 15

def __init__(self):
def __init__(self, size_limit, time_limit):
"""Initialize internal data structures."""
self.size_limit = size_limit or self.DEFAULT_SIZE_LIMIT
self.time_limit = time_limit or self.DEFAULT_TIME_LIMIT

self.batches = defaultdict(list)
self.locks = defaultdict(threading.Lock)
self.timers = {}
Expand Down Expand Up @@ -63,15 +66,15 @@ def message_received_cb(self, sender, exchange_name, payload):
'Message added to %r batch (size: %d, capacity: %d)',
exchange_name,
len(batch),
self.SIZE_LIMIT,
self.size_limit,
)

if len(batch) == 1:
self.start_timer(exchange_name)
elif len(batch) >= self.SIZE_LIMIT:
elif len(batch) >= self.size_limit:
LOGGER.debug(
'Size limit (%d) exceeded for %r',
self.SIZE_LIMIT,
self.size_limit,
exchange_name,
)
self.queue_batch(exchange_name)
Expand All @@ -91,7 +94,7 @@ def time_expired_cb(self, exchange_name):
with self.locks[exchange_name]:
LOGGER.debug(
'Time limit (%.2f) exceeded for %r',
self.TIME_LIMIT,
self.time_limit,
exchange_name,
)
self.queue_batch(exchange_name)
Expand Down Expand Up @@ -142,7 +145,7 @@ def start_timer(self, exchange_name):
LOGGER.warning('Timer already active for: %r', exchange_name)
return
timer = threading.Timer(
self.TIME_LIMIT,
self.time_limit,
self.time_expired_cb,
(exchange_name, ),
)
Expand All @@ -151,7 +154,7 @@ def start_timer(self, exchange_name):
timer.start()
LOGGER.debug(
'Timer thread started (%.2f) for %r: (%d, %s)',
self.TIME_LIMIT,
self.time_limit,
exchange_name,
timer.ident,
timer.name,
Expand Down
5 changes: 4 additions & 1 deletion rabbithole/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import os
import sys

from pprint import pformat

import pika
import sqlalchemy
import yaml
Expand All @@ -31,6 +33,7 @@ def main(argv=None):
args = parse_arguments(argv)
config = args.config
configure_logging(args.log_level)
logging.debug('Configuration:\n%s', pformat(config))

try:
consumer = Consumer(config['rabbitmq'], config['output'].keys())
Expand All @@ -44,7 +47,7 @@ def main(argv=None):
LOGGER.error(exception)
return 1

batcher = Batcher()
batcher = Batcher(config.get('size_limit'), config.get('time_limit'))
consumer.message_received.connect(batcher.message_received_cb)
batcher.batch_ready.connect(database.batch_ready_cb)

Expand Down
56 changes: 28 additions & 28 deletions tests/test_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,67 @@ class TestBatcher(TestCase):

"""Batcher test cases."""

def setUp(self):
"""Create batcher object."""
size_limit = 5
time_limit = 15
self.batcher = Batcher(size_limit, time_limit)

def test_first_message_received(self):
"""Message appended to batch and timer started."""
exchange = 'exchange'
payload = 'payload'

batcher = Batcher()
batcher.batch_ready = Mock()
self.batcher.batch_ready = Mock()
with patch('rabbithole.batcher.threading') as threading:
batcher.message_received_cb('sender', exchange, payload)
self.batcher.message_received_cb('sender', exchange, payload)
threading.Timer.assert_called_once_with(
batcher.TIME_LIMIT,
batcher.time_expired_cb,
self.batcher.time_limit,
self.batcher.time_expired_cb,
(exchange, ),
)
self.assertListEqual(batcher.batches[exchange], [payload])
self.assertListEqual(self.batcher.batches[exchange], [payload])

def test_size_limit_exceeded(self):
"""Batch queued when size limit is exceed."""
exchange = 'exchange'
payload = 'payload'

batcher = Batcher()
batcher.batch_ready = Mock()
self.batcher.batch_ready = Mock()
with patch('rabbithole.batcher.threading'):
for _ in range(batcher.SIZE_LIMIT):
batcher.message_received_cb('sender', exchange, payload)
for _ in range(self.batcher.size_limit):
self.batcher.message_received_cb('sender', exchange, payload)

batcher.batch_ready.send.assert_called_with(
batcher,
self.batcher.batch_ready.send.assert_called_with(
self.batcher,
exchange_name=exchange,
batch=[payload] * batcher.SIZE_LIMIT,
batch=[payload] * self.batcher.size_limit,
)
self.assertListEqual(batcher.batches[exchange], [])
self.assertListEqual(self.batcher.batches[exchange], [])

def test_time_limit_exceeded(self):
"""Batch queued whem time limit is exceeded."""
exchange = 'exchange'
payload = 'payload'

batcher = Batcher()
batcher.batch_ready = Mock()
self.batcher.batch_ready = Mock()
with patch('rabbithole.batcher.threading'):
batcher.message_received_cb('sender', exchange, payload)
batcher.time_expired_cb(exchange)
self.batcher.message_received_cb('sender', exchange, payload)
self.batcher.time_expired_cb(exchange)

batcher.batch_ready.send.assert_called_with(
batcher,
self.batcher.batch_ready.send.assert_called_with(
self.batcher,
exchange_name=exchange,
batch=[payload],
)
self.assertListEqual(batcher.batches[exchange], [])
self.assertListEqual(self.batcher.batches[exchange], [])

def test_expired_timer_not_found(self):
"""Warning written to logs when expired timer is not found."""
exchange = 'exchange'

batcher = Batcher()
with patch('rabbithole.batcher.LOGGER') as logger:
batcher.time_expired_cb(exchange)
self.batcher.time_expired_cb(exchange)
logger.warning.assert_called_with(
'Timer not found for: %r',
exchange,
Expand All @@ -85,10 +87,9 @@ def test_timer_already_active(self):
"""Warning written to logs when timer is alreadya active."""
exchange = 'exchange'

batcher = Batcher()
batcher.timers[exchange] = Mock()
self.batcher.timers[exchange] = Mock()
with patch('rabbithole.batcher.LOGGER') as logger:
batcher.start_timer(exchange)
self.batcher.start_timer(exchange)
logger.warning.assert_called_with(
'Timer already active for: %r',
exchange,
Expand All @@ -98,9 +99,8 @@ def test_cancelled_timer_not_found(self):
"""Warning written to logs when cancelled timer is not found."""
exchange = 'exchange'

batcher = Batcher()
with patch('rabbithole.batcher.LOGGER') as logger:
batcher.cancel_timer(exchange)
self.batcher.cancel_timer(exchange)
logger.warning.assert_called_with(
'Timer not found for: %r',
exchange,
Expand Down

0 comments on commit bc42e43

Please sign in to comment.