Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Persistent messages being lost #119

Open
ali-bugdayci opened this Issue · 3 comments

2 participants

@ali-bugdayci

Hi,

I am using durable queues and persistent messages ("delivery_mode" = 2). I check via:

"sudo rabbitmqctl list_queues"

to confirm that the messages are persistent after a restart on rabbitmq. When I subscribe and get the first message from the queue, the other messages are lost.

It is working fine when only the publisher publishes messages, and subscriber subscribes after like 3 messages and receives them one by one. I am implementing a long polling server, hence I subscribe using ack:true and send queue.shift after unsubscribe.

Here is my code:

connection.queue(queue_name, {durable: true}, function(q){
            console.log("Queue connected");
            // Catch all messages
            q.bind('#');
            var ctag;

            // Receive messages
            q.subscribe({ack: true}, function (message) {
            console.log("incoming message");
            console.log(message.data);
             response.writeHead(200, { "Content-Type": "text/plain" });
             response.end(message.data);

            console.log("unsubscribe");
            q.unsubscribe(ctag)
             .addCallback(function() { q.shift(); });
            })
            .addCallback(function(ok) { ctag = ok.consumerTag; });

My publisher is in Ruby:

AMQP.start do |connection|
      AMQP::Channel.new(connection) do |channel|
       queue = channel.queue(to_imei, {:auto_delete => true, :durable => true})

    channel.default_exchange.publish json, :routing_key => queue.name, :persistent => true
  end

My Configuration:

$ node -v
v0.8.5

$ npm list
amqp@0.1.3/home/bor/sunucu/node_sunucu/node_modules/amqp

$ rabbitmqctl status
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","2.8.4"},
{xmerl,"XML parser","1.2.10"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","2.8.4"},
{amqp_client,"RabbitMQ AMQP Client","2.8.4"},
{rabbit,"RabbitMQ","2.8.4"},
{os_mon,"CPO CXC 138 46","2.2.7"},
{sasl,"SASL CXC 138 11","2.1.10"},
{rabbitmq_mochiweb,"RabbitMQ Mochiweb Embedding","2.8.4"},
{webmachine,"webmachine","1.7.0-rmq2.8.4-hg"},
{mochiweb,"MochiMedia Web Server","1.3-rmq2.8.4-git"},
{inets,"INETS CXC 138 49","5.7.1"},
{mnesia,"MNESIA CXC 138 12","4.5"},
{stdlib,"ERTS CXC 138 10","1.17.5"},
{kernel,"ERTS CXC 138 10","2.14.5"}]},
{os,{unix,linux}},

@postwait
Owner

You can't use {ack: true} to do what you want.

@ali-bugdayci

Hi,

Thanks for the quick reply. First of all I dont buy why I cant use it. Doesn't rabbitmq reboot as if the queued messages are send before it crashed. If it doesn't then I believe this should be reported to rabbit to fix ack work on persisted messages too.

I checked without ack:true. I am getting all the messages but afterward getting the following exception on node-amqp side:

events.js:66
throw arguments[1]; // Unhandled 'error' event
^
Error: PRECONDITION_FAILED - unknown delivery tag 4
at Queue._onMethod (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:1720:15)
at Queue.Channel._onChannelMethod (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:1365:14)
at Connection._onMethod (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:922:28)
at AMQPParser.self.addListener.parser.onMethod (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:797:12)
at AMQPParser._parseMethodFrame (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:442:10)
at frameEnd (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:187:16)
at frame (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:172:14)
at AMQPParser.header as parse
at AMQPParser.execute (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:231:21)
at Connection. (/home/bor/sunucu/node_sunucu/node_modules/amqp/amqp.js:837:12)

@ali-bugdayci

I find out that Persistent messages dont work well with the default {autoDelete: true } option.

Correct me if I am wrong but I believe they are two different things:
autoDelete removes the queue if there are no messages in the queue
where as if there is any persistent message then the queue will not be autoDeleted.

For the time being I am making the queue's autoDelete:false but would this lead to too many resources being consumed ?

Thanks

@postwait postwait closed this
@postwait postwait reopened this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.