Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publishing without closing channel #144

Closed
dbousamra opened this issue Mar 12, 2015 · 7 comments
Closed

Publishing without closing channel #144

dbousamra opened this issue Mar 12, 2015 · 7 comments

Comments

@dbousamra
Copy link

Hi all,

We are trying to publish without closing the channel (i.e. have a long running channel as suggested). When we publish, if we don't close the channel, messages aren't flushed from the buffer and sent.

How do we "flush" regularly, so messages are sent, while still keeping a channel open?

Dom

@deyles-zz
Copy link

I agree - it'd be helpful if the calling scope was able to explicitly flush the channel buffer rather than depending on an implicit flush.

 var messages = getMyLongListOfMessages();
 conn.createChannel().then(function(ch) {
    ch.assertQueue('tasks');
    _.each(messages, function(message) {
       ch.sendToQueue('tasks', new Buffer(JSON.stringify(message)));
    });
 }).then(function(ch) {
   ch.flush();
 });

@squaremo
Copy link
Collaborator

When we publish, if we don't close the channel, messages aren't flushed from the buffer and sent.

Do you mean that if you don't close the channel explicitly, exiting the program can leave messages in buffers? If not that, what problem are you seeing?

In general, messages are sent as soon as Node.JS can write to the socket. A caveat is that each channel is competing with other channels to write its messages to the socket, so it's more accurate to say that some channels' messages will be sent as soon as Node.JS can write to the socket. But certainly channels make progress without you having to tell them to.

With that in mind, I don't know what flush is supposed to be doing in the above snippet, unless it's synchronising on the channel having written everything that's buffered. Then it would need to return a promise, or accept a callback.

If it's messages reaching the server that you care about, I'd use confirmations and .waitForConfirms(). (I may not have documented that adequately -- er, at all -- yet)

@dbousamra
Copy link
Author

The problem we are seeing is extremely low throughput when publishing. The RabbitMQ instance we are using is located in US-East (and we are publishing from Sydney).

Our code for publishing looked like this:

var queueConnection = connect(config.queues.snapshots.host)
var queueName = config.queues.snapshots.queue_name

function connect(queue) {
  config.logger.info("Connecting to RabbitMQ at %s", queue)
  var connection = amqp.connect(queue);
  return connection.then(function(conn) {
    config.logger.info("Connected to RabbitMQ at %s", queue)
    conn.on('error', function(error) {
      config.logger.error(JSON.stringify({ message: error.message, stack: error.stack }))
      return reconnect(queue);
    });
    return conn
  }).catch(function(err) {
    config.logger.error(JSON.stringify({ message: err.message, stack: err.stack }))
    return reconnect(queue);
  })
};

function reconnect(queue) {
  return Promise.delay(config.reconnect_rate).then(function() {
    return connect(queue)
  })
}

var sendToSnapshotsQueue = function(message) {
  return queueConnection.then(function(conn) {
    return conn.createChannel()
  }).then(function(ch) {
    var queue = ch.assertQueue(queueName, { durable: false });
    return queue.then(function() {
      ch.sendToQueue(queueName, new Buffer(JSON.stringify(message)), { deliveryMode: true });
      return ch.close();
    });
  })
};

Notice how our sendToSnapshotsQueue function works. We reuse the same connection, but we create a channel, assertQueue, and sendToQueue, before closing the channel... each time we send a message. We could remove the assertQueue, and do that in our connection code rather than each time, sure.

But it seems as if opening a channel, sending and closing a channel, is taking a good 750ms. If we reuse the same channel, and send multiple messages, before closing, we can bump our throughput into the 1000's of messages per second, rather than 10's.

The issue that Dan pointed out, is that you don't close the channel, it doesn't seem to actually send them to the queue. Looking through the code, it seems when a buffer is filled, it actually performs the socket send, and queues the messages.

I couldn't seem to fill it with enough messages (I queued 10000 messages in a loop, to see if it would send without closing the channel, but it didn't).

So I then came up with the following code to try and batch message queuing under a single channel:

var async = require('async')

var publishQueue = async.cargo((messages, cb) => {
  return queueConnection.then(function(conn) {
    return conn.createChannel()
  }).then(function(ch) {
    var queue = ch.assertQueue(queueName, { durable: false });
    return queue.then(function() {
      _.each(messages, function(message) {
        ch.sendToQueue(queueName, new Buffer(JSON.stringify(message)), { deliveryMode: true });
      })
      return ch.close();
    });
  })
}, 1000)

var sendToSnapshotsQueue = function(message) {
  publishQueue.push(message);
  return Promise.resolve(message);  
};

This is not the final code we used, but it illustrates the idea. We use async's cargo construct. When we call sendToSnapshotsQueue, we publish the in memory cargo queue, which tries to send as often as possible, in batches of 1000 (by reusing the same channel).

This increased our throughput massively, but is it the right solution? Any ideas?

@squaremo
Copy link
Collaborator

But it seems as if opening a channel, sending and closing a channel, is taking a good 750ms.

There are three round-trips in that, since opening the channel, declaring (asserting) the queue, and closing the channel are all RPCs (publishing is not). So that seems longish, but not out of the question if the latency on your connection is high.

The issue that Dan pointed out, is that you don't close the channel, it doesn't seem to actually send them to the queue.

It'll do the socket write straight-away if it can. But: when I say "straight-away", I mean "on the next event loop". If you have code that's publishing thousands of messages at a time, none of them will be written to the socket until your code returns control to the event loop.

The "node.js" way of dealing with this is to have a form of flow control, most notably in its stream module which amqplib's channel mimic. If channel.publish() returns false, you stop publishing and register a handler for the 'drain' event. When the channel is ready to be written to again, it will emit that event and you can start again. (Sorry if this is all known to you already)

@wbyoung
Copy link

wbyoung commented Apr 13, 2017

I was just hit by this as well. I have the following small app to publish just a single message for testing and it's not actually publishing w/o a few hacks:

const connection = await amqp.connect(config.rabbitmq.uri);

try {
  const channel = await connection.createChannel();
  const exchange = 'exchange-name';
  const message: Message = { hello: 'world' };
  const buffer = Buffer.from(JSON.stringify(message));

  await channel.assertExchange(exchange, 'topic', { durable: false });

  await channel.publish(exchange, 'routing.key', buffer, {
    contentType: 'application/json',
  });

  // explicit closed required to flush publish.
  await channel.close();
}
finally {
  await connection.close();
}

// also required to flush publish
await new Promise((resolve: () => void) => setTimeout(resolve));

The requirement to wait on a timeout is a bit concerning as well. I'm really not quite sure why that's required here. Shouldn't waiting on the connection close be sufficient?

Interestingly, I have more or less the same code w/o the channel.close in other places and it works fine, but in that case, the connection is pooled for reuse and more could have been done on that same channel before/after the individual publish.

@samuksilv
Copy link

samuksilv commented Oct 23, 2019

I was just hit by this as well. I have the following small app to publish just a single message for testing and it's not actually publishing w/o a few hacks:

const connection = await amqp.connect(config.rabbitmq.uri);

try {
  const channel = await connection.createChannel();
  const exchange = 'exchange-name';
  const message: Message = { hello: 'world' };
  const buffer = Buffer.from(JSON.stringify(message));

  await channel.assertExchange(exchange, 'topic', { durable: false });

  await channel.publish(exchange, 'routing.key', buffer, {
    contentType: 'application/json',
  });

  // explicit closed required to flush publish.
  await channel.close();
}
finally {
  await connection.close();
}

// also required to flush publish
await new Promise((resolve: () => void) => setTimeout(resolve));

The requirement to wait on a timeout is a bit concerning as well. I'm really not quite sure why that's required here. Shouldn't waiting on the connection close be sufficient?

Interestingly, I have more or less the same code w/o the channel.close in other places and it works fine, but in that case, the connection is pooled for reuse and more could have been done on that same channel before/after the individual publish.

Thanks man, you help me ! It's work for me.

@cressie176
Copy link
Collaborator

Wireshark shows that the message is published without the need to close the channel

const amqp = require('amqplib');

(async () => {
  const connection = await amqp.connect();
  const channel = await connection.createChannel();
  const message = { hello: 'world' };
  const buffer = Buffer.from(JSON.stringify(message));

  await channel.assertQueue('test');

  await channel.sendToQueue('test', buffer, {
    contentType: 'application/json',
  });
})();
15	0.615047	127.0.0.1	127.0.0.1	AMQP	569	Connection.Start 
17	0.617962	127.0.0.1	127.0.0.1	AMQP	378	Connection.Start-Ok 
19	0.619008	127.0.0.1	127.0.0.1	AMQP	76	Connection.Tune 
21	0.619408	127.0.0.1	127.0.0.1	AMQP	76	Connection.Tune-Ok 
23	0.619531	127.0.0.1	127.0.0.1	AMQP	72	Connection.Open vhost=/ 
25	0.620405	127.0.0.1	127.0.0.1	AMQP	69	Connection.Open-Ok 
27	0.623768	127.0.0.1	127.0.0.1	AMQP	69	Channel.Open 
29	0.625205	127.0.0.1	127.0.0.1	AMQP	72	Channel.Open-Ok 
31	0.626064	127.0.0.1	127.0.0.1	AMQP	80	Queue.Declare q=test 
33	0.627174	127.0.0.1	127.0.0.1	AMQP	81	Queue.Declare-Ok q=test 
35	0.628366	127.0.0.1	127.0.0.1	AMQP/JSON	145	Basic.Publish x= rk=test Content-Header type=application/json Content-Body , JavaScript Object Notation (application/json)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants