Skip to content

Commit

Permalink
feat(sender-link): add support for sending with no reply
Browse files Browse the repository at this point in the history
This allows users to send messages on a SenderLink (and the
corresponding SenderLinkStream) without requiring a Promise to be
returned. There are no guarantees regarding unsettled sends or
tracking for pending sends with this options set, the goal here is
purely for performance.
  • Loading branch information
mbroadst committed Nov 27, 2016
1 parent afb391e commit 5e159a9
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 86 deletions.
2 changes: 1 addition & 1 deletion lib/amqp_client.js
Expand Up @@ -256,7 +256,7 @@ AMQPClient.prototype.createSender = function(address, policyOverrides) {
*/
AMQPClient.prototype.createSenderStream = function(address, policyOverrides) {
return this.createSender(address, policyOverrides)
.then(function(link) { return new SenderStream(link); });
.then(function(link) { return new SenderStream(link, policyOverrides); });
};

/**
Expand Down
18 changes: 13 additions & 5 deletions lib/policies/policy_utilities.js
Expand Up @@ -57,14 +57,22 @@ var CreditPolicies = {

module.exports.CreditPolicies = CreditPolicies;

var SenderCallbackPolicies = {
// Only callback when settled Disposition received from recipient
/**
* Defines the behavior of the return value of `SenderLink.send`
* @enum
*/
var SenderCallbackPolicy = {
/** Callback immediately after sending, no promise is created */
None: 'none',
/** Only callback when settled Disposition received from recipient */
OnSettle: 'settled',
// Callback as soon as sent, will not call-back again if future disposition results in error.
OnSent: 'sent'
/** Callback as soon as sent, will not call-back again if future disposition
* results in error. */
OnSent: 'sent',
};

module.exports.SenderCallbackPolicies = SenderCallbackPolicies;
module.exports.SenderCallbackPolicies = SenderCallbackPolicy; // deprecated
module.exports.SenderCallbackPolicy = SenderCallbackPolicy;

function merge(newPolicy, base) {
return u.deepMerge(newPolicy, base);
Expand Down
66 changes: 43 additions & 23 deletions lib/sender_link.js
Expand Up @@ -53,13 +53,17 @@ SenderLink.prototype.canSend = function() {
* @inner @memberof SenderLink
* @param {object|string|array} msg Message to send. Will be encoded using sender link policy's encoder.
* @param {object} [options] An object of options to attach to the message including: annotations, properties,
and application properties
* and application properties
* @param {string} [options.callback] Determines when the send operation should callback. Possible
* options are: 'sent', 'settled' and 'none'. For the best performance
* choose 'none', which is essentially "send and forget" and notably will
* not return a promise.
* @param {object} [options.annotations] Annotations for the message, if any. See AMQP spec for details, and server for specific
* annotations that might be relevant (e.g. x-opt-partition-key on EventHub). If node-amqp-encoder'd
* map is given, it will be translated to appropriate internal types. Simple maps will be converted
* to AMQP Fields type as defined in the spec.
*
* @return {Promise}
* @return {Promise|null}
*/
SenderLink.prototype.send = function(msg, options) {
if (!this.session.connection || this.session.connection.connected === false) {
Expand All @@ -76,24 +80,44 @@ SenderLink.prototype.send = function(msg, options) {
options.properties.subject = this.policy.defaultSubject;
}

var self = this;
return new Promise(function(resolve, reject) {
var message;
if (u.isObject(msg) && msg.hasOwnProperty('body')) {
message = msg;
message.body = self.policy.encoder ? self.policy.encoder(message.body) : message.body;
u.merge(message, options);
var message;
var deliveryTag = this.session._deliveryTag++;
if (u.isObject(msg) && msg.hasOwnProperty('body')) {
message = msg;
message.body = this.policy.encoder ? this.policy.encoder(message.body) : message.body;
u.merge(message, options);
} else {
msg = this.policy.encoder ? this.policy.encoder(msg) : msg;
if (u.isObject(options)) {
options.body = msg;
message = options;
} else {
msg = self.policy.encoder ? self.policy.encoder(msg) : msg;
if (u.isObject(options)) {
options.body = msg;
message = options;
} else {
message = { body: msg };
}
message = { body: msg };
}
}

var deliveryTag = self.session._deliveryTag++;
var self = this,
cbPolicy = self.policy.callback;

if (cbPolicy === putils.SenderCallbackPolicies.None) {
var sendMessage = function(err) {
if (!!err) throw err;
debug('sending: ', msg);
return self._sendMessage(message, {
deliveryTag: new Buffer(deliveryTag.toString())
});
};

if (!this.canSend()) {
this._pendingSends.push(sendMessage);
return;
}

sendMessage();
return;
}

return new Promise(function(resolve, reject) {
var sendMessage = function(err) {
if (err) {
return reject(err);
Expand All @@ -104,14 +128,10 @@ SenderLink.prototype.send = function(msg, options) {
deliveryTag: new Buffer(deliveryTag.toString())
});

var cbPolicy = self.policy.callback;
if (cbPolicy === putils.SenderCallbackPolicies.OnSettle) {
var deferredSender = function(err, state) {
if (!!err) {
reject(err);
} else {
resolve(state);
}
if (!!err) return reject(err);
resolve(state);
};
self._unsettledSends[messageId] = deferredSender;
} else if (cbPolicy === putils.SenderCallbackPolicies.OnSent) {
Expand Down
11 changes: 9 additions & 2 deletions lib/streams/sender_stream.js
@@ -1,15 +1,22 @@
'use strict';
var Writable = require('stream').Writable,
var Promise = require('bluebird'),
Writable = require('stream').Writable,
util = require('util');

function SenderStream(link, options) {
Writable.call(this, { objectMode: true });

options = options || {};
this._noReply = options.hasOwnProperty('noReply') ? !!options.noReply : false;
this._link = link;
}
util.inherits(SenderStream, Writable);

SenderStream.prototype._write = function(chunk, encoding, callback) {
return this._link.send(chunk)
var promise = this._link.send(chunk, { noReply: this._noReply });
if (!(promise instanceof Promise)) return callback();

return promise
.then(function() { callback(); })
.error(function(err) { callback(err); });
};
Expand Down
9 changes: 5 additions & 4 deletions test/integration/qpid/streams.test.js
Expand Up @@ -29,13 +29,14 @@ describe('QPID', function() {

Promise.all([
test.client.createReceiverStream(config.defaultLink),
test.client.createSender(config.defaultLink)
test.client.createSender(config.defaultLink, { callback: 'none' })
])
.spread(function(stream, sender) {
var count = 0;
stream.on('data', function(data) {
expect(expected[count]).to.eql(data.body);
count++;

if (count === expected.length) done();
});

Expand All @@ -61,7 +62,7 @@ describe('QPID', function() {

Promise.all([
test.client.createReceiver(config.defaultLink),
test.client.createSenderStream(config.defaultLink)
test.client.createSenderStream(config.defaultLink, { callback: 'none' })
])
.spread(function(receiver, stream) {
var count = 0;
Expand Down Expand Up @@ -111,7 +112,7 @@ describe('QPID', function() {
Promise.all([
test.client.createReceiver(config.defaultLink),
test.client.createReceiverStream('test.streams.queue'),
test.client.createSenderStream(config.defaultLink),
test.client.createSenderStream(config.defaultLink, { callback: 'none' }),
])
.spread(function(receiver, receiverStream, senderStream) {
var count = 0;
Expand All @@ -122,7 +123,7 @@ describe('QPID', function() {
});

receiverStream.pipe(senderStream);
return test.client.createSender('test.streams.queue');
return test.client.createSender('test.streams.queue', { callback: 'none' });
})
.then(function(sender) {
return Promise.mapSeries(expected, function(v) { return sender.send(v); });
Expand Down
102 changes: 51 additions & 51 deletions test/integration/servicebus/eventhubs/streams.test.js
Expand Up @@ -31,68 +31,68 @@ function teardown() {

describe('ServiceBus', function() {

describe('Streams', function() {
describe('ReceiverStream', function() {
beforeEach(setup);
afterEach(teardown);
describe('Streams', function() {
describe('ReceiverStream', function() {
beforeEach(setup);
afterEach(teardown);

it('should let you create a receiver link as a readable stream', function(done) {
expect(config.partitionSenderLinkPrefix,
'Required env vars not found in ' + Object.keys(process.env)).to.exist;
it('should let you create a receiver link as a readable stream', function(done) {
expect(config.partitionSenderLinkPrefix,
'Required env vars not found in ' + Object.keys(process.env)).to.exist;

var dataString = u.uuidV4().replace(/-/g, ''),
expected = Array.apply(null, new Array(20))
.map(function(a) { return Math.floor(Math.random() * 100); });
var dataString = u.uuidV4().replace(/-/g, ''),
expected = Array.apply(null, new Array(20))
.map(function(a) { return Math.floor(Math.random() * 100); });

Promise.all([
test.client.createReceiverStream(config.receiverLinkPrefix + test.partition),
test.client.createSender(config.partitionSenderLinkPrefix + test.partition)
])
.spread(function(stream, sender) {
var count = 0;
stream.on('data', function(data) {
if (data.body.DataString !== dataString) return; // ignore previously run tests
expect(expected[count]).to.eql(data.body.DataValue);
count++;
if (count === expected.length) done();
});
Promise.all([
test.client.createReceiverStream(config.receiverLinkPrefix + test.partition),
test.client.createSender(config.partitionSenderLinkPrefix + test.partition, { callback: 'none' })
])
.spread(function(stream, sender) {
var count = 0;
stream.on('data', function(data) {
if (data.body.DataString !== dataString) return; // ignore previously run tests
expect(expected[count]).to.eql(data.body.DataValue);
count++;
if (count === expected.length) done();
});

return Promise.mapSeries(expected, function(v) {
return sender.send({ DataString: dataString, DataValue: v });
});
return Promise.mapSeries(expected, function(v) {
return sender.send({ DataString: dataString, DataValue: v });
});
});
}); // ReceiverStream

describe('SenderStream', function() {
beforeEach(setup);
afterEach(teardown);
});
}); // ReceiverStream

it('should let you create a sender link as a writable stream', function(done) {
var dataString = u.uuidV4().replace(/-/g, ''),
expected = Array.apply(null, new Array(20))
.map(function(a) { return Math.floor(Math.random() * 100); });
describe('SenderStream', function() {
beforeEach(setup);
afterEach(teardown);

Promise.all([
test.client.createReceiver(config.receiverLinkPrefix + test.partition),
test.client.createSenderStream(config.partitionSenderLinkPrefix + test.partition)
])
.spread(function(receiver, stream) {
var count = 0;
receiver.on('message', function(data) {
if (data.body.DataString !== dataString) return; // ignore previously run tests
expect(expected[count]).to.eql(data.body.DataValue);
count++;
if (count === expected.length) done();
});
it('should let you create a sender link as a writable stream', function(done) {
var dataString = u.uuidV4().replace(/-/g, ''),
expected = Array.apply(null, new Array(20))
.map(function(a) { return Math.floor(Math.random() * 100); });

for (var i = 0; i < expected.length; i++) {
stream.write({ DataString: dataString, DataValue: expected[i] });
}
Promise.all([
test.client.createReceiver(config.receiverLinkPrefix + test.partition),
test.client.createSenderStream(config.partitionSenderLinkPrefix + test.partition, { callback: 'none' })
])
.spread(function(receiver, stream) {
var count = 0;
receiver.on('message', function(data) {
if (data.body.DataString !== dataString) return; // ignore previously run tests
expect(expected[count]).to.eql(data.body.DataValue);
count++;
if (count === expected.length) done();
});

for (var i = 0; i < expected.length; i++) {
stream.write({ DataString: dataString, DataValue: expected[i] });
}
});
}); // SenderStream
});
}); // SenderStream

}); // Streams
}); // Streams

}); // ServiceBus

0 comments on commit 5e159a9

Please sign in to comment.