Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[W 171122 19:51:27 callback:157] Duplicate callback found for "1:Basic.Ack" #894

Closed
calJ2016 opened this issue Nov 22, 2017 · 3 comments
Closed

Comments

@calJ2016
Copy link

Hello, when I began publishing receive such warnings:
[W 171122 19:51:27 callback:157] Duplicate callback found for "1:Basic.Ack"
I am not sure what this means and is it bug or not? Any one can help solve it ?

tips:
the code i used comes from "Asynchronous publisher example"
pika version:0.11

@lukebakken
Copy link
Member

@calJ2016 without more information I can't help. Please provide a complete code sample that reproduces this and re-open this issue. Thanks.

@calJ2016
Copy link
Author

calJ2016 commented Nov 23, 2017

@lukebakken
hello, when I called the publish message interface of my tornado application, got a log as below:
2017-11-23 09:14:42,452 [INFO] Connection opened
2017-11-23 09:14:42,453 [INFO] Adding connection close callback
2017-11-23 09:14:42,453 [INFO] Creating a new channel
2017-11-23 09:14:42,455 [INFO] Channel opened
2017-11-23 09:14:42,457 [INFO] Adding channel close callback
2017-11-23 09:14:42,457 [INFO] Declaring exchange dad
2017-11-23 09:14:42,460 [INFO] Exchange declared
2017-11-23 09:14:42,460 [INFO] Declaring queue service
2017-11-23 09:14:42,463 [INFO] Binding dad to service with io
2017-11-23 09:14:42,464 [INFO] Queue bound
2017-11-23 09:14:45,963 [INFO] Issuing consumer related RPC commands
2017-11-23 09:14:45,963 [INFO] Published message # 1
2017-11-23 09:14:45,964 [INFO] Issuing Confirm.Select RPC command
[I 171123 09:14:45 web:2063] 200 POST /pub (::1) 3.06ms
2017-11-23 09:14:46,662 [INFO] Issuing consumer related RPC commands
2017-11-23 09:14:46,663 [INFO] Published message # 2
2017-11-23 09:14:46,663 [INFO] Issuing Confirm.Select RPC command
[W 171123 09:14:46 callback:157] Duplicate callback found for "1:Basic.Ack"
[W 171123 09:14:46 callback:157] Duplicate callback found for "1:Basic.Nack"

I am not sure what this "Duplicate callback found for " means and is it bug or not?

environment :
tornado4.5 +pika0.11

related code blocks:

handler of tornado

class publishDataHandler(tornado.web.RequestHandler):
    def post(self):
        rr = RestResponse()
        mq = self.application.mq
        r_data = json.loads(self.request.body.decode('utf-8'))
        mq.start_publishing(r_data['data'])   #call this function to publish msg.
        print(" [x] Sent {}".format(r_data['data']))
        rr.success("ok!")
        self.write(rr.get_json())

mq_processor.py :Used to send and receive rabbitmq messages asynchronously

import json
from pika import adapters
import pika
import logging

LOGGER = logging.getLogger('mq.service')
class MqProcessor(object):

    EXCHANGE = 'dad'
    EXCHANGE_TYPE = 'direct'
    QUEUE = 'service'
    ROUTING_KEY = 'io'
    PUBLISH_INTERVAL = 1

    def __init__(self, amqp_url):
        """Create a new instance of the consumer class, passing in the AMQP
        URL used to connect to RabbitMQ.

        :param str amqp_url: The AMQP url to connect with

        """
        self._connection = None
        self._channel = None
        self._closing = False
        self._consumer_tag = None
        self._url = amqp_url
        self._deliveries = None
        self._message_number = None

        self._deliveries = []
        self._acked = 0
        self._nacked = 0
        self._message_number = 0

    def connect(self):
        LOGGER.info('Connecting to %s', self._url)
        return adapters.TornadoConnection(pika.URLParameters(self._url),
                                          self.on_connection_open)

    def close_connection(self):
        """This method closes the connection to RabbitMQ."""
        LOGGER.info('Closing connection')
        self._connection.close()

    def add_on_connection_close_callback(self):
        LOGGER.info('Adding connection close callback')
        self._connection.add_on_close_callback(self.on_connection_closed)

    def on_connection_closed(self, connection, reply_code, reply_text):
        self._channel = None
        if self._closing:
            self._connection.ioloop.stop()
        else:
            LOGGER.warning('Connection closed, reopening in 5 seconds: (%s) %s',
                           reply_code, reply_text)
            self._connection.add_timeout(5, self.reconnect)

    def on_connection_open(self, unused_connection):
        LOGGER.info('Connection opened')
        self.add_on_connection_close_callback()
        self.open_channel()

    def reconnect(self):
        if not self._closing:
            # Create a new connection
            self._connection = self.connect()

    def add_on_channel_close_callback(self):

        LOGGER.info('Adding channel close callback')
        self._channel.add_on_close_callback(self.on_channel_closed)

    def on_channel_closed(self, channel, reply_code, reply_text):

        LOGGER.warning('Channel %i was closed: (%s) %s',
                       channel, reply_code, reply_text)
        self._connection.close()

    def on_channel_open(self, channel):
        LOGGER.info('Channel opened')
        self._channel = channel
        self.add_on_channel_close_callback()
        self.setup_exchange(self.EXCHANGE)

    def setup_exchange(self, exchange_name):
        LOGGER.info('Declaring exchange %s', exchange_name)
        self._channel.exchange_declare(self.on_exchange_declareok,
                                       exchange_name,
                                       self.EXCHANGE_TYPE,
                                       durable=True)

    def on_exchange_declareok(self, unused_frame):
        LOGGER.info('Exchange declared')
        self.setup_queue(self.QUEUE)

    def setup_queue(self, queue_name):
        LOGGER.info('Declaring queue %s', queue_name)
        self._channel.queue_declare(self.on_queue_declareok, queue_name, durable=True)

    def on_queue_declareok(self, method_frame):
        LOGGER.info('Binding %s to %s with %s',
                    self.EXCHANGE, self.QUEUE, self.ROUTING_KEY)
        self._channel.queue_bind(self.on_bindok, self.QUEUE,
                                 self.EXCHANGE, self.ROUTING_KEY)

    def add_on_cancel_callback(self):
        LOGGER.info('Adding consumer cancellation callback')
        self._channel.add_on_cancel_callback(self.on_consumer_cancelled)

    def on_consumer_cancelled(self, method_frame):
        LOGGER.info('Consumer was cancelled remotely, shutting down: %r',
                    method_frame)
        if self._channel:
            self._channel.close()

    def acknowledge_message(self, delivery_tag):
        LOGGER.info('Acknowledging message %s', delivery_tag)
        self._channel.basic_ack(delivery_tag)

    def on_message(self, unused_channel, basic_deliver, properties, body):
        LOGGER.info('Received message # %s from %s: %s',
                    basic_deliver.delivery_tag, properties.app_id, body)
        self.acknowledge_message(basic_deliver.delivery_tag)

    def on_cancelok(self, unused_frame):
        LOGGER.info('RabbitMQ acknowledged the cancellation of the consumer')
        self.close_channel()

    def stop_consuming(self):
        if self._channel:
            LOGGER.info('Sending a Basic.Cancel RPC command to RabbitMQ')
            self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)

    def start_consuming(self):
        LOGGER.info('Issuing consumer related RPC commands')
        self.add_on_cancel_callback()
        self._channel.basic_qos(prefetch_count=1)

        self._consumer_tag = self._channel.basic_consume(self.on_message,
                                                         self.QUEUE)

    def start_publishing(self, data):
        LOGGER.info('Issuing consumer related RPC commands')
        self.publish_message(data)
        self.enable_delivery_confirmations()
   
    def enable_delivery_confirmations(self):
        LOGGER.info('Issuing Confirm.Select RPC command')
        self._channel.confirm_delivery(self.on_delivery_confirmation)

    def on_delivery_confirmation(self, method_frame):
        confirmation_type = method_frame.method.NAME.split('.')[1].lower()
        LOGGER.info('Received %s for delivery tag: %i',
                    confirmation_type,
                    method_frame.method.delivery_tag)
        if confirmation_type == 'ack':
            self._acked += 1
        elif confirmation_type == 'nack':
            self._nacked += 1
        self._deliveries.remove(method_frame.method.delivery_tag)
        LOGGER.info('Published %i messages, %i have yet to be confirmed, '
                    '%i were acked and %i were nacked',
                    self._message_number, len(self._deliveries),
                    self._acked, self._nacked)

    def publish_message(self,data):
        if self._channel is None or not self._channel.is_open:
            return

        properties = pika.BasicProperties(app_id='example-publisher',
                                          content_type='application/json',
                                          delivery_mode=2)

        self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY,
                                    json.dumps(data, ensure_ascii=False),
                                    properties)
        self._message_number += 1
        self._deliveries.append(self._message_number)
        LOGGER.info('Published message # %i', self._message_number)
        # self.schedule_next_message()


    def schedule_next_message(self):
       
        LOGGER.info('Scheduling next message for %0.1f seconds',
                    self.PUBLISH_INTERVAL)
        self._connection.add_timeout(self.PUBLISH_INTERVAL,
                                     self.publish_message)

    def on_bindok(self, unused_frame):
      
        LOGGER.info('Queue bound')


    def close_channel(self):
       
        LOGGER.info('Closing the channel')
        self._channel.close()

    def open_channel(self):
       
        LOGGER.info('Creating a new channel')
        self._connection.channel(on_open_callback=self.on_channel_open)

    def run(self):
       
        self._connection = self.connect()
        self._connection.ioloop.start()

    def stop(self):
        LOGGER.info('Stopping')
        self._closing = True
        self.stop_consuming()
        self._connection.ioloop.start()
        LOGGER.info('Stopped')

tornado app.py

mqp = MqProcessor('amqp://admin:dTvnNfA8@localhost:5672/')
dad_application = tornado.web.Application(router.router_list, settings, db=mongodb)
dad_application.mq = mqp
def main():
    tornado.options.parse_command_line()
    config.dictConfig(yaml.load(open('logging.yaml', 'r')))
    http_server = tornado.httpserver.HTTPServer(dad_application, xheaders=True)
    http_server.listen(options.port)
    try:
        mqp.run()
    except KeyboardInterrupt:
        mqp.stop()

if __name__ == "__main__":
    main()

could you help me ? thanks!

@katajakasa
Copy link
Contributor

You are enabling delivery confirmations after every message publish. You only need to do it once, after creating the channel. Now you have a duplicate on_delivery_confirmation callback after the second publish call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants