From 358a8e118fe73920fb7e0b72811f14c0daedeea2 Mon Sep 17 00:00:00 2001 From: Alex Plugaru Date: Wed, 26 Jul 2017 10:46:31 -0700 Subject: [PATCH] Made queues non-durable. Always retry after disconnect. (#120) --- socketio/kombu_manager.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/socketio/kombu_manager.py b/socketio/kombu_manager.py index 85bea986..9906673e 100644 --- a/socketio/kombu_manager.py +++ b/socketio/kombu_manager.py @@ -71,18 +71,33 @@ def _exchange(self): def _queue(self): queue_name = 'flask-socketio.' + str(uuid.uuid4()) return kombu.Queue(queue_name, self._exchange(), + durable=False, queue_arguments={'x-expires': 300000}) def _producer(self): return self._connection().Producer(exchange=self._exchange()) + def __error_callback(self, exception, interval): + self.server.logger.exception('Sleeping {}s'.format(interval)) + def _publish(self, data): - self.producer.publish(pickle.dumps(data)) + connection = self._connection() + publish = connection.ensure(self.producer, self.producer.publish, + errback=self.__error_callback) + publish(pickle.dumps(data)) def _listen(self): reader_queue = self._queue() - with self._connection().SimpleQueue(reader_queue) as queue: - while True: - message = queue.get(block=True) - message.ack() - yield message.payload + + while True: + connection = self._connection().ensure_connection( + errback=self.__error_callback) + try: + with connection.SimpleQueue(reader_queue) as queue: + while True: + message = queue.get(block=True) + message.ack() + yield message.payload + except connection.connection_errors: + self.server.logger.exception("Connection error " + "while reading from queue")