Skip to content

Commit

Permalink
Merge pull request #1303 from pika/pika-1255
Browse files Browse the repository at this point in the history
Fix tornado consumer example
  • Loading branch information
lukebakken committed Jan 27, 2021
2 parents aa609f3 + d4d90c7 commit 8f1fc9f
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions docs/examples/tornado_consumer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ The following example implements a consumer using the :class:`Tornado adapter <p

consumer.py::

from pika import adapters
import pika
import logging
import pika
from pika import adapters
from pika.adapters.tornado_connection import TornadoConnection
from pika.exchange_type import ExchangeType

LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
Expand Down Expand Up @@ -54,8 +55,10 @@ consumer.py::

"""
LOGGER.info('Connecting to %s', self._url)
return adapters.tornado_connection.TornadoConnection(pika.URLParameters(self._url),
self.on_connection_open)
return TornadoConnection(
pika.URLParameters(self._url),
self.on_connection_open,
)

def close_connection(self):
"""This method closes the connection to RabbitMQ."""
Expand Down Expand Up @@ -85,7 +88,7 @@ consumer.py::
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reopening in 5 seconds: %s',
reason)
reason)
self._connection.ioloop.call_later(5, self.reconnect)

def on_connection_open(self, unused_connection):
Expand Down Expand Up @@ -155,9 +158,11 @@ consumer.py::

"""
LOGGER.info('Declaring exchange %s', exchange_name)
self._channel.exchange_declare(self.on_exchange_declareok,
exchange_name,
self.EXCHANGE_TYPE)
self._channel.exchange_declare(
callback=self.on_exchange_declareok,
exchange=exchange_name,
exchange_type=self.EXCHANGE_TYPE,
)

def on_exchange_declareok(self, unused_frame):
"""Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC
Expand All @@ -178,8 +183,10 @@ consumer.py::

"""
LOGGER.info('Declaring queue %s', queue_name)
self._channel.queue_declare(self.on_queue_declareok,
queue_name)
self._channel.queue_declare(
queue=queue_name,
callback=self.on_queue_declareok,
)

def on_queue_declareok(self, method_frame):
"""Method invoked by pika when the Queue.Declare RPC call made in
Expand All @@ -193,8 +200,12 @@ consumer.py::
"""
LOGGER.info('Binding %s to %s with %s',
self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
self._channel.queue_bind(self.on_bindok, self.QUEUE,
self.EXCHANGE, self.ROUTING_KEY)
self._channel.queue_bind(
queue=self.QUEUE,
exchange=self.EXCHANGE,
routing_key=self.ROUTING_KEY,
callback=self.on_bindok,
)

def add_on_cancel_callback(self):
"""Add a callback that will be invoked if RabbitMQ cancels the consumer
Expand Down Expand Up @@ -278,8 +289,10 @@ consumer.py::
"""
LOGGER.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(self.on_message,
self.QUEUE)
self._consumer_tag = self._channel.basic_consume(
on_message_callback=self.on_message,
queue=self.QUEUE,
)

def on_bindok(self, unused_frame):
"""Invoked by pika when the Queue.Bind method has completed. At this
Expand Down Expand Up @@ -346,4 +359,3 @@ consumer.py::

if __name__ == '__main__':
main()

0 comments on commit 8f1fc9f

Please sign in to comment.