Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
1. Add clear debug prints with queue size (one print every 1000 items in order not to hurt performance)
2. If main queue is empty keep running and do nothing
3. In case of a timeout from main queue restart queue
  • Loading branch information
Tom Kregenbild committed Jul 22, 2015
1 parent a7b5780 commit d159ec5
Showing 1 changed file with 17 additions and 11 deletions.
28 changes: 17 additions & 11 deletions beaver/run_queue.py
Expand Up @@ -31,21 +31,27 @@ def run_queue(queue, beaver_config, logger=None):
break

if int(time.time()) - last_update_time > queue_timeout:
logger.info('Queue timeout of "{0}" seconds exceeded, stopping queue'.format(queue_timeout))
logger.info('Main consumer queue timeout of "{0}" seconds exceeded, stopping queue'.format(queue_timeout))
break

try:
if queue.full():
logger.debug("Queue is full")
logger.error("Main consumer queue is full")

else:
logger.debug("Queue Size is: " + str(queue.qsize()))
command, data = queue.get(block=True, timeout=wait_timeout)
if command == "callback":
last_update_time = int(time.time())
logger.debug('Last update time now {0}'.format(last_update_time))
if count == 1000:
logger.debug("Main consumer queue Size is: " + str(queue.qsize()))
count = 0
command, data = queue.get(block=True, timeout=wait_timeout)
if command == "callback":
last_update_time = int(time.time())
logger.debug('Last update time now {0}'.format(last_update_time))
except Queue.Empty:
logger.debug('No data')
continue
if not queue.empty():
logger.error('Recieved timeout from main consumer queue - stopping queue')
break
else:
logger.debug('No data')

if command == 'callback':
if data.get('ignore_empty', False):
Expand All @@ -67,15 +73,15 @@ def run_queue(queue, beaver_config, logger=None):
try:
transport.callback(**data)
count += 1
logger.debug("Number of transports: " + str(count))
break
except TransportException:
except TransportException,e:
failure_count = failure_count + 1
if failure_count > beaver_config.get('max_failure'):
failure_count = beaver_config.get('max_failure')

sleep_time = beaver_config.get('respawn_delay') ** failure_count
logger.info('Caught transport exception, reconnecting in %d seconds' % sleep_time)
logger.debug(e)

try:
transport.invalidate()
Expand Down

0 comments on commit d159ec5

Please sign in to comment.