New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using direct reply-to for RPC calls #259

Closed
facundoolano opened this Issue Jun 24, 2016 · 17 comments

Comments

Projects
None yet
9 participants
@facundoolano

facundoolano commented Jun 24, 2016

I’ve been trying to implement RPC in a project without having to create a new connection, channel and reply queue per RPC request (since that obviously performed poorly). Most examples out there show you one-off calls so there isn’t much guide on how to do it.

Reusing the connection is easy, but I attempted several ways of reusing the channel and the reply-to queue (relying on a correlationId to distinguish consumers) and hit different issues every time.

After reading about the direct reply-to feature that uses a pseudo queue, I settled to that since it proved to have the best performance. To make it work with this lib I still had to use a new channel per request, otherwise I saw the error PRECONDITION_FAILED - reply consumer already set when trying to consume from the amq.rabbitmq.reply-to pseudo queue.

I wonder if I see that error because indeed the proper way to use the direct reply-to feature is with separate channels per request, or the problem is that this lib is not prepared to handle that feature (or maybe I’m just using it wrong).

Thanks!

@michaelklishin

This comment has been minimized.

Show comment
Hide comment
@michaelklishin

michaelklishin Jun 24, 2016

There is no need for client library support for direct reply-to. Please post your code, it's impossible to suggest much without it.

michaelklishin commented Jun 24, 2016

There is no need for client library support for direct reply-to. Please post your code, it's impossible to suggest much without it.

@facundoolano

This comment has been minimized.

Show comment
Hide comment
@facundoolano

facundoolano Jun 24, 2016

Ok, here's the client code. I call createClient once per process to create the connection, and then use it in sendRPCMessage for every RPC request.

This is the version of the code that's working, where I create a new channel per request:

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)

const sendRPCMessage = (client, message, rpcQueue) => conn.createChannel()
  .then((channel) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const timeout = setTimeout(() => channel.close(), 10000);

    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
        clearTimeout(timeout);
        channel.close();
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });

The version of the code that's NOT working is when I try to use a single channel per process (the call fails with PRECONDITION_FAILED - reply consumer already set):

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)
  .then((conn) => conn.createChannel())

const sendRPCMessage = (channel, message, rpcQueue) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });
}

As said, I'm not sure if that second one is the wrong way to do direct reply-to (or RPC altogether), I just wanted to check before settling to using a bunch of channels just because it worked.

facundoolano commented Jun 24, 2016

Ok, here's the client code. I call createClient once per process to create the connection, and then use it in sendRPCMessage for every RPC request.

This is the version of the code that's working, where I create a new channel per request:

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)

const sendRPCMessage = (client, message, rpcQueue) => conn.createChannel()
  .then((channel) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const timeout = setTimeout(() => channel.close(), 10000);

    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
        clearTimeout(timeout);
        channel.close();
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });

The version of the code that's NOT working is when I try to use a single channel per process (the call fails with PRECONDITION_FAILED - reply consumer already set):

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)
  .then((conn) => conn.createChannel())

const sendRPCMessage = (channel, message, rpcQueue) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });
}

As said, I'm not sure if that second one is the wrong way to do direct reply-to (or RPC altogether), I just wanted to check before settling to using a bunch of channels just because it worked.

@squaremo

This comment has been minimized.

Show comment
Hide comment
@squaremo

squaremo Jul 3, 2016

Owner

Reading the description of the RabbitMQ feature, I don't think there's any reason it won't work with amqplib. I don't think there's any need to create a new channel each time.

Looking at the second example of code, I think the mistake is to consume each time you send an RPC.

What you want to do is consume once, and simply send a message for each RPC. If you are concerned that replies can come out of order, you should only send one RPC at a time on a channel -- or you can keep a queue of correlation IDs, and reorder when messages come in.

Owner

squaremo commented Jul 3, 2016

Reading the description of the RabbitMQ feature, I don't think there's any reason it won't work with amqplib. I don't think there's any need to create a new channel each time.

Looking at the second example of code, I think the mistake is to consume each time you send an RPC.

What you want to do is consume once, and simply send a message for each RPC. If you are concerned that replies can come out of order, you should only send one RPC at a time on a channel -- or you can keep a queue of correlation IDs, and reorder when messages come in.

@facundoolano

This comment has been minimized.

Show comment
Hide comment
@facundoolano

facundoolano Jul 3, 2016

Sending a single RPC at a time doesn't sound like an option for me, at least not with a single channel per client (I need the client to be able to handle multiple concurrent messages, otherwise I'd be introducing a bottleneck in my platform).

If I understand you correctly, to consume only once I'd need to introduce some sort of structure or maybe an event emitter, to be able to route each RPC response to the specific promise resolve that's expecting it. I'll give it some thought but I'll probably defer the added complexity until I have evidence that the current one channel per request approach is not good enough (btw please let me know if creating so many channels is a terrible idea for some reason).

facundoolano commented Jul 3, 2016

Sending a single RPC at a time doesn't sound like an option for me, at least not with a single channel per client (I need the client to be able to handle multiple concurrent messages, otherwise I'd be introducing a bottleneck in my platform).

If I understand you correctly, to consume only once I'd need to introduce some sort of structure or maybe an event emitter, to be able to route each RPC response to the specific promise resolve that's expecting it. I'll give it some thought but I'll probably defer the added complexity until I have evidence that the current one channel per request approach is not good enough (btw please let me know if creating so many channels is a terrible idea for some reason).

@squaremo

This comment has been minimized.

Show comment
Hide comment
@squaremo

squaremo Jul 3, 2016

Owner

one channel per request approach is not good enough (btw please let me know if creating so many channels is a terrible idea for some reason).

Opening a channel per request is kind of an anti-pattern. You'd be better off having a channel per requesting thread-of-control -- i.e., for anything that needs answers to proceed -- or using a pool of channels. Extra complexity, I know.

Owner

squaremo commented Jul 3, 2016

one channel per request approach is not good enough (btw please let me know if creating so many channels is a terrible idea for some reason).

Opening a channel per request is kind of an anti-pattern. You'd be better off having a channel per requesting thread-of-control -- i.e., for anything that needs answers to proceed -- or using a pool of channels. Extra complexity, I know.

@facundoolano

This comment has been minimized.

Show comment
Hide comment
@facundoolano

facundoolano Jul 3, 2016

I understand, thanks for your answers. I actually like how it's looking with a single consumer and an event emitter to route responses:

const REPLY_QUEUE = 'amq.rabbitmq.reply-to';

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)
  .then((conn) => conn.createChannel())
  .then((channel) => {
    // create an event emitter where rpc responses will be published by correlationId
    channel.responseEmitter = new EventEmitter();
    channel.responseEmitter.setMaxListeners(0);
    channel.consume(REPLY_QUEUE,
      (msg) => channel.responseEmitter.emit(msg.properties.correlationId, msg.content),
      {noAck: true});

    return channel;
  });

const sendRPCMessage = (channel, message, rpcQueue) => new Promise((resolve) => {
  const correlationId = uuid.v4();
  // listen for the content emitted on the correlationId event
  channel.responseEmitter.once(correlationId, resolve);
  channel.sendToQueue(rpcQueue, new Buffer(message), { correlationId, replyTo: REPLY_QUEUE })
});

It seems to be working fine, I'll test it for a while and see how it goes.

facundoolano commented Jul 3, 2016

I understand, thanks for your answers. I actually like how it's looking with a single consumer and an event emitter to route responses:

const REPLY_QUEUE = 'amq.rabbitmq.reply-to';

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)
  .then((conn) => conn.createChannel())
  .then((channel) => {
    // create an event emitter where rpc responses will be published by correlationId
    channel.responseEmitter = new EventEmitter();
    channel.responseEmitter.setMaxListeners(0);
    channel.consume(REPLY_QUEUE,
      (msg) => channel.responseEmitter.emit(msg.properties.correlationId, msg.content),
      {noAck: true});

    return channel;
  });

const sendRPCMessage = (channel, message, rpcQueue) => new Promise((resolve) => {
  const correlationId = uuid.v4();
  // listen for the content emitted on the correlationId event
  channel.responseEmitter.once(correlationId, resolve);
  channel.sendToQueue(rpcQueue, new Buffer(message), { correlationId, replyTo: REPLY_QUEUE })
});

It seems to be working fine, I'll test it for a while and see how it goes.

@squaremo

This comment has been minimized.

Show comment
Hide comment
@squaremo

squaremo Jul 3, 2016

Owner

Cool. For the sake of completeness: another way, since you're already using promises, would be to keep a map of correlationID->promise.

Owner

squaremo commented Jul 3, 2016

Cool. For the sake of completeness: another way, since you're already using promises, would be to keep a map of correlationID->promise.

@eltoro

This comment has been minimized.

Show comment
Hide comment
@eltoro

eltoro Aug 25, 2016

Sorry if I'm not understanding correctly but is this async? If I'm sending multiple calls will this block the client?

eltoro commented Aug 25, 2016

Sorry if I'm not understanding correctly but is this async? If I'm sending multiple calls will this block the client?

@squaremo

This comment has been minimized.

Show comment
Hide comment
@squaremo

squaremo Aug 25, 2016

Owner

@eltoro Yes, it's async; the responses will queue up for a consumer to collect.

Owner

squaremo commented Aug 25, 2016

@eltoro Yes, it's async; the responses will queue up for a consumer to collect.

@Voles

This comment has been minimized.

Show comment
Hide comment
@Voles

Voles Nov 12, 2016

@facundoolano thanks for your example code!

Am I right that the worker still needs to send a response to the queue (amq.rabbitmq.reply-to)? Eg.

server.js

channel.sendToQueue(
  'amq.rabbitmq.reply-to',
  new Buffer(''),
  {correlationId: message.properties.correlationId}
);

Voles commented Nov 12, 2016

@facundoolano thanks for your example code!

Am I right that the worker still needs to send a response to the queue (amq.rabbitmq.reply-to)? Eg.

server.js

channel.sendToQueue(
  'amq.rabbitmq.reply-to',
  new Buffer(''),
  {correlationId: message.properties.correlationId}
);
@facundoolano

This comment has been minimized.

Show comment
Hide comment
@facundoolano

facundoolano Nov 12, 2016

@Voles that's right. But I think it's better to let the client tell the worker where to reply, that's why the initial message includes a replyTo: REPLY_QUEUE along with the correlationId.

facundoolano commented Nov 12, 2016

@Voles that's right. But I think it's better to let the client tell the worker where to reply, that's why the initial message includes a replyTo: REPLY_QUEUE along with the correlationId.

@Voles

This comment has been minimized.

Show comment
Hide comment
@Voles

Voles Nov 12, 2016

@facundoolano I agree. Thanks for your very quick response!

Voles commented Nov 12, 2016

@facundoolano I agree. Thanks for your very quick response!

@RP-3

This comment has been minimized.

Show comment
Hide comment
@RP-3

RP-3 Mar 31, 2017

@facundoolano If I had multiple clients, would the the direct reply-to queue guarantee that the response will be sent to the client that sent the message in the first place? BTW I really like your event-emitter solution!

RP-3 commented Mar 31, 2017

@facundoolano If I had multiple clients, would the the direct reply-to queue guarantee that the response will be sent to the client that sent the message in the first place? BTW I really like your event-emitter solution!

@facundoolano

This comment has been minimized.

Show comment
Hide comment
@facundoolano

facundoolano Apr 1, 2017

@facundoolano If I had multiple clients, would the the direct reply-to queue guarantee that the response will be sent to the client that sent the message in the first place

Yes, that's what direct reply-to does.

BTW I'm closing this issue since it was solved by following @squaremo suggestions.

facundoolano commented Apr 1, 2017

@facundoolano If I had multiple clients, would the the direct reply-to queue guarantee that the response will be sent to the client that sent the message in the first place

Yes, that's what direct reply-to does.

BTW I'm closing this issue since it was solved by following @squaremo suggestions.

@boylove142

This comment has been minimized.

Show comment
Hide comment
@boylove142

boylove142 Nov 18, 2017

UPDATE: I checked, and it seems like the problem caused because my rabbitmq version was 3.2.4. Is this method only work in version 3.4 or later?

Hi, I just implement the code like the one @facundoolano suggested and encounter this problem NOT_FOUND - no queue 'amq.rabbitmq.reply-to' in vhost '/'

When I tried to create this queue amq.rabbitmq.reply-to manually using the management site, It told me that the queue's name contains reserved prefix amq. Do you guy has any idea why this happens? It seems like the server treats this queue like a normal queue.

The code is exactly like this

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)

const sendRPCMessage = (client, message, rpcQueue) => conn.createChannel()
  .then((channel) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const timeout = setTimeout(() => channel.close(), 10000);

    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
        clearTimeout(timeout);
        channel.close();
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });

boylove142 commented Nov 18, 2017

UPDATE: I checked, and it seems like the problem caused because my rabbitmq version was 3.2.4. Is this method only work in version 3.4 or later?

Hi, I just implement the code like the one @facundoolano suggested and encounter this problem NOT_FOUND - no queue 'amq.rabbitmq.reply-to' in vhost '/'

When I tried to create this queue amq.rabbitmq.reply-to manually using the management site, It told me that the queue's name contains reserved prefix amq. Do you guy has any idea why this happens? It seems like the server treats this queue like a normal queue.

The code is exactly like this

const createClient = (settings) => amqp.connect(settings.url, settings.socketOptions)

const sendRPCMessage = (client, message, rpcQueue) => conn.createChannel()
  .then((channel) => new Promise((resolve, reject) => {
    const replyToQueue = 'amq.rabbitmq.reply-to';
    const timeout = setTimeout(() => channel.close(), 10000);

    const correlationId = uuid.v4();
    const msgProperties = {
      correlationId,
      replyTo: replyToQueue
    };

    function consumeAndReply (msg) {
      if (!msg) return reject(Error.create('consumer cancelled by rabbitmq'));

      if (msg.properties.correlationId === correlationId) {
        resolve(msg.content);
        clearTimeout(timeout);
        channel.close();
      }
    }

    channel.consume(replyToQueue, consumeAndReply, {noAck: true})
    .then(() => channel.sendToQueue(rpcQueue, new Buffer(content), msgProperties))
  });
@cressie176

This comment has been minimized.

Show comment
Hide comment
@cressie176

cressie176 Nov 19, 2017

Collaborator

The following should answer your questions...

Queue names starting with "amq." are reserved for internal use by the broker. Attempts to declare a queue with a name that violates this rule will result in a channel-level exception with reply code 403 (ACCESS_REFUSED).

https://www.rabbitmq.com/tutorials/amqp-concepts.html

To use direct reply-to, an RPC client should:

Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to declare this "queue" first, although the client can do so if it wants.
Set the reply-to property in their request message to amq.rabbitmq.reply-to.
The RPC server will then see a reply-to property with a generated name. It should publish to the default exchange ("") with the routing key set to this value (i.e. just as if it were sending to a reply queue as usual). The message will then be sent straight to the client consumer.

https://www.rabbitmq.com/direct-reply-to.html

3.4.0 21 Oct 2014 Fast RPC replies
Live plugin activation
Reconnecting .net client
(changes)

https://www.rabbitmq.com/changelog.html

Collaborator

cressie176 commented Nov 19, 2017

The following should answer your questions...

Queue names starting with "amq." are reserved for internal use by the broker. Attempts to declare a queue with a name that violates this rule will result in a channel-level exception with reply code 403 (ACCESS_REFUSED).

https://www.rabbitmq.com/tutorials/amqp-concepts.html

To use direct reply-to, an RPC client should:

Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to declare this "queue" first, although the client can do so if it wants.
Set the reply-to property in their request message to amq.rabbitmq.reply-to.
The RPC server will then see a reply-to property with a generated name. It should publish to the default exchange ("") with the routing key set to this value (i.e. just as if it were sending to a reply queue as usual). The message will then be sent straight to the client consumer.

https://www.rabbitmq.com/direct-reply-to.html

3.4.0 21 Oct 2014 Fast RPC replies
Live plugin activation
Reconnecting .net client
(changes)

https://www.rabbitmq.com/changelog.html

@mattqs

This comment has been minimized.

Show comment
Hide comment
@mattqs

mattqs Jul 28, 2018

I wrote an npm package amq.rabbitmq.reply-to.js that:

  • Uses direct reply-to - a feature that allows RPC (request/reply) clients with a design similar to that demonstrated in tutorial 6 (https://www.rabbitmq.com/direct-reply-to.html) to avoid declaring a response queue per request.

  • Creates an event emitter where rpc responses will be published by correlationId
    as suggested by #259 (comment)

Usage:

const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');

const serverCallbackTimesTen = (message, rpcServer) => {
    const n = parseInt(message);
    return Promise.resolve(`${n * 10}`);
};

let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
    const serverOptions = new rabbitmqreplyto.RpcServerOptions(
    /* url */ undefined, 
    /* serverId */ undefined, 
    /* callback */ serverCallbackTimesTen);

    return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
    rpcServer = rpcServerP;
    return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
    rpcClient = rpcClientP;
    const promises = [];
    for (let i = 1; i <= 20; i++) {
        promises.push(rpcClient.sendRPCMessage(`${i}`));
    }
    return Promise.all(promises);
}).then((replies) => {
    console.log(replies);
    return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});

//['10',
//  '20',
//  '30',
//  '40',
//  '50',
//  '60',
//  '70',
//  '80',
//  '90',
//  '100',
//  '110',
//  '120',
//  '130',
//  '140',
//  '150',
//  '160',
//  '170',
//  '180',
//  '190',
//  '200']

mattqs commented Jul 28, 2018

I wrote an npm package amq.rabbitmq.reply-to.js that:

  • Uses direct reply-to - a feature that allows RPC (request/reply) clients with a design similar to that demonstrated in tutorial 6 (https://www.rabbitmq.com/direct-reply-to.html) to avoid declaring a response queue per request.

  • Creates an event emitter where rpc responses will be published by correlationId
    as suggested by #259 (comment)

Usage:

const rabbitmqreplyto = require('amq.rabbitmq.reply-to.js');

const serverCallbackTimesTen = (message, rpcServer) => {
    const n = parseInt(message);
    return Promise.resolve(`${n * 10}`);
};

let rpcServer;
let rpcClient;
Promise.resolve().then(() => {
    const serverOptions = new rabbitmqreplyto.RpcServerOptions(
    /* url */ undefined, 
    /* serverId */ undefined, 
    /* callback */ serverCallbackTimesTen);

    return rabbitmqreplyto.RpcServer.Create(serverOptions);
}).then((rpcServerP) => {
    rpcServer = rpcServerP;
    return rabbitmqreplyto.RpcClient.Create();
}).then((rpcClientP) => {
    rpcClient = rpcClientP;
    const promises = [];
    for (let i = 1; i <= 20; i++) {
        promises.push(rpcClient.sendRPCMessage(`${i}`));
    }
    return Promise.all(promises);
}).then((replies) => {
    console.log(replies);
    return Promise.all([rpcServer.Close(), rpcClient.Close()]);
});

//['10',
//  '20',
//  '30',
//  '40',
//  '50',
//  '60',
//  '70',
//  '80',
//  '90',
//  '100',
//  '110',
//  '120',
//  '130',
//  '140',
//  '150',
//  '160',
//  '170',
//  '180',
//  '190',
//  '200']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment