diff --git a/beaver/run_queue.py b/beaver/run_queue.py index 6c5f81b8..776039de 100644 --- a/beaver/run_queue.py +++ b/beaver/run_queue.py @@ -31,21 +31,27 @@ def run_queue(queue, beaver_config, logger=None): break if int(time.time()) - last_update_time > queue_timeout: - logger.info('Queue timeout of "{0}" seconds exceeded, stopping queue'.format(queue_timeout)) + logger.info('Main consumer queue timeout of "{0}" seconds exceeded, stopping queue'.format(queue_timeout)) break try: if queue.full(): - logger.debug("Queue is full") + logger.error("Main consumer queue is full") + else: - logger.debug("Queue Size is: " + str(queue.qsize())) - command, data = queue.get(block=True, timeout=wait_timeout) - if command == "callback": - last_update_time = int(time.time()) - logger.debug('Last update time now {0}'.format(last_update_time)) + if count == 1000: + logger.debug("Main consumer queue Size is: " + str(queue.qsize())) + count = 0 + command, data = queue.get(block=True, timeout=wait_timeout) + if command == "callback": + last_update_time = int(time.time()) + logger.debug('Last update time now {0}'.format(last_update_time)) except Queue.Empty: - logger.debug('No data') - continue + if not queue.empty(): + logger.error('Recieved timeout from main consumer queue - stopping queue') + break + else: + logger.debug('No data') if command == 'callback': if data.get('ignore_empty', False): @@ -67,15 +73,15 @@ def run_queue(queue, beaver_config, logger=None): try: transport.callback(**data) count += 1 - logger.debug("Number of transports: " + str(count)) break - except TransportException: + except TransportException,e: failure_count = failure_count + 1 if failure_count > beaver_config.get('max_failure'): failure_count = beaver_config.get('max_failure') sleep_time = beaver_config.get('respawn_delay') ** failure_count logger.info('Caught transport exception, reconnecting in %d seconds' % sleep_time) + logger.debug(e) try: transport.invalidate()