Skip to content

Commit

Permalink
feat(SenderLink): support splitting messages into multiple frames
Browse files Browse the repository at this point in the history
If a user attempts to send a message greater than the agreed upon
maximum frame size, then the message should be split up into
multiple transfer frames. Tests are included.
  • Loading branch information
mbroadst committed Nov 4, 2015
1 parent d7cb30e commit 0135c3a
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 23 deletions.
3 changes: 3 additions & 0 deletions lib/frames/transfer_frame.js
Expand Up @@ -219,6 +219,9 @@ TransferFrame.prototype.EncodeOrdering = [
'receiverSettleMode', 'state', 'resume', 'aborted', 'batchable'
];

// used to determine if a message should be split to multiple frames
TransferFrame.FRAME_OVERHEAD = 29;

TransferFrame.prototype._getPerformative = function() {
var self = this;
return new DescribedType(TransferFrame, {
Expand Down
29 changes: 26 additions & 3 deletions lib/sender_link.js
@@ -1,13 +1,15 @@
'use strict';

var _ = require('lodash'),
Builder = require('buffer-builder'),
Promise = require('bluebird'),
util = require('util'),
debug = require('debug')('amqp10:link:sender'),

errors = require('./errors'),
constants = require('./constants'),
putils = require('./policies/policy_utilities'),
codec = require('./codec'),

TransferFrame = require('./frames/transfer_frame'),

Expand Down Expand Up @@ -141,17 +143,38 @@ SenderLink.prototype._sendMessage = function(message, options) {
}
}

// create frame and send
// create frame(s)
var transferOptions = _.defaults(options, {
channel: this.session.channel,
handle: this.handle,
deliveryId: messageId,
settled: this.session._sessionParams.senderSettleMode === constants.senderSettleMode.settled,
message: message
});

// pre-encode message to determine if multiple frames are required
var messageBuilder = new Builder();
codec.encode(message, messageBuilder);
var messageBuffer = messageBuilder.get();

// send the frame(s)
var maxFrameSize = this.session.connection._params.maxFrameSize;
var approximateFrameSize =
messageBuffer.length + TransferFrame.FRAME_OVERHEAD + options.deliveryTag.length;
if (approximateFrameSize >= maxFrameSize) {
var messageCount = Math.ceil(approximateFrameSize / maxFrameSize);
var bufferIdx = 0;
for (var i = 0; i < messageCount; ++i) {
transferOptions.more = (i !== messageCount - 1) ? true : false;
transferOptions.message = messageBuffer.slice(bufferIdx, bufferIdx + maxFrameSize);

This comment has been minimized.

Copy link
@noodlefrenzy

noodlefrenzy Nov 4, 2015

Owner

Nope, too large. Should be bufferIdx + (maxFrameSize - (options.deliveryTag.length + TransferFrame.FRAME_OVERHEAD))

this.session.connection.sendFrame(new TransferFrame(transferOptions));
bufferIdx += maxFrameSize;

This comment has been minimized.

Copy link
@noodlefrenzy

noodlefrenzy Nov 4, 2015

Owner

Ditto.

}
} else {
transferOptions.message = messageBuffer;
this.session.connection.sendFrame(new TransferFrame(transferOptions));
}

this.linkCredit--;
this.session.connection.sendFrame(new TransferFrame(transferOptions));
return messageId;
};

Expand Down
1 change: 1 addition & 0 deletions test/integration/qpid/client.test.js
Expand Up @@ -31,6 +31,7 @@ describe('Client', function() {
.spread(function(receiver, sender) {
receiver.on('message', function(message) {
expect(message).to.exist;
expect(message.body).to.equal('test');
done();
});

Expand Down
19 changes: 19 additions & 0 deletions test/integration/qpid/sender_link.test.js
Expand Up @@ -88,5 +88,24 @@ describe('SenderLink', function() {
});
});

it('should send and receive multi-frame messages', function(done) {
var messageData = new Array(2048).join('0');
return test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver(config.defaultLink),
test.client.createSender(config.defaultLink)
]);
})
.spread(function(receiver, sender) {
receiver.on('message', function(message) {
expect(message.body).to.eql(messageData);
done();
});

return sender.send(messageData);
});
});

});
});
29 changes: 19 additions & 10 deletions test/unit/client.test.js
@@ -1,6 +1,7 @@
'use strict';

var expect = require('chai').expect,
var _ = require('lodash'),
expect = require('chai').expect,
Builder = require('buffer-builder'),
AMQPClient = require('../../lib').Client,
MockServer = require('./mocks').Server,
Expand Down Expand Up @@ -162,7 +163,9 @@ describe('Client', function() {
it('should send multi-frame messages', function() {
test.server.setResponseSequence([
constants.amqpVersion,
new OpenFrame(DefaultPolicy.connect.options),
new OpenFrame(_.extend(DefaultPolicy.connect.options, {

This comment has been minimized.

Copy link
@noodlefrenzy

noodlefrenzy Nov 4, 2015

Owner

Can't you just pass in { maxFrameSize: 10 }? Won't it just fill in the rest with the policy defaults? If not, I think that's a bug.

This comment has been minimized.

Copy link
@mbroadst

mbroadst Nov 4, 2015

Author Collaborator

bug, I get a: AmqpArgumentError: must provide argument containerId

This comment has been minimized.

Copy link
@noodlefrenzy

noodlefrenzy Nov 4, 2015

Owner

Oh, ok, I get why. Users never actually call new OpenFrame, so it's not a bug. Sorry about that.

maxFrameSize: 10 // <-- the important part, though it will be overwritten with 512
})),
new BeginFrame({
remoteChannel: 1, nextOutgoingId: 0, incomingWindow: 100000,
outgoingWindow: 2147483647, handleMax: 4294967295
Expand All @@ -172,7 +175,7 @@ describe('Client', function() {
var rxAttach = FrameReader.read(prev[prev.length-1]);
return new AttachFrame({
name: rxAttach.name, handle: 1, role: constants.linkRole.receiver,
source: {}, target: {}, initialDeliveryCount: 0, maxMessageSize: 1 // <-- the important part
source: {}, target: {}, initialDeliveryCount: 0
});
},
new FlowFrame({
Expand All @@ -190,16 +193,18 @@ describe('Client', function() {
]);

// build our expected buffer segments
var message = new M.Message({ body: 'asupercalifragilisticexpialidocious' });
var messageData = new Array(2048).join('0');
var message = new M.Message({ body: messageData });
var codecBuffer = new Builder();
Codec.encode(message, codecBuffer);
var messageBuffer = codecBuffer.get();

var expected = [];
expected.push(messageBuffer.slice(0, 10));
expected.push(messageBuffer.slice(10, 20));
expected.push(messageBuffer.slice(20, 30));
expected.push(messageBuffer.slice(30, 40));
expected.push(messageBuffer.slice(0, 512));

This comment has been minimized.

Copy link
@noodlefrenzy

noodlefrenzy Nov 4, 2015

Owner

This may change based on my comments in SenderLink re: message slice sizes.

expected.push(messageBuffer.slice(512, 1024));
expected.push(messageBuffer.slice(1024, 1536));
expected.push(messageBuffer.slice(1536, 2048));
expected.push(messageBuffer.slice(2048, 2063));

// 1. It is an error if the delivery-id on a continuation transfer differs
// from the delivery-id on the first transfer of a delivery.
Expand All @@ -224,14 +229,18 @@ describe('Client', function() {
}),
new TransferFrame({
channel: 1, handle: 0, deliveryId: 1, settled: false, deliveryTag: deliveryTag,
message: expected[3], more: false,
message: expected[3], more: true,
}),
new TransferFrame({
channel: 1, handle: 0, deliveryId: 1, settled: false, deliveryTag: deliveryTag,
message: expected[4], more: false,
}),
false
]);

return test.client.connect(test.server.address())
.then(function() { return test.client.createSender('test.link'); })
.then(function(sender) { return sender.send('supercalifragilisticexpialidocious'); })
.then(function(sender) { return sender.send(messageData); })
.then(function() { return test.client.disconnect(); });
});

Expand Down
34 changes: 24 additions & 10 deletions test/unit/mocks/server.js
Expand Up @@ -54,16 +54,7 @@ MockServer.prototype.setup = function() {

c.on('data', function(d) {
debug('read: ', d.toString('hex'));
if (self._expectedFrames.length) {
var expectedFrame = self._expectedFrames.shift();
if (!!expectedFrame) {
debug('check: ' + expectedFrame.toString('hex'));
expect(d).to.eql(expectedFrame);
}
}

self._seenFrames.push(new BufferList(d));
self._sendNextResponse();
self._checkExpectations(d);
});
});

Expand All @@ -78,6 +69,29 @@ MockServer.prototype.setup = function() {
});
};

MockServer.prototype._checkExpectations = function(data) {
if (this._expectedFrames.length) {
var idx = 0;
var expectedFrame = this._expectedFrames.shift();
while (true) {
if (expectedFrame === false || expectedFrame === undefined) break;
var actualFrame = data.slice(idx, idx + expectedFrame.length);
debug('expected(', expectedFrame.length, '): ' + expectedFrame.toString('hex'));
debug(' actual(', actualFrame.length, '): ', actualFrame.toString('hex'));
expect(actualFrame).to.eql(expectedFrame);

if (this._expectedFrames[0] === false) break;
if (idx >= data.length) break;

idx += expectedFrame.length;
expectedFrame = this._expectedFrames.shift();
}
}

this._seenFrames.push(new BufferList(data));
this._sendNextResponse();
};

MockServer.prototype.teardown = function() {
var self = this;
return new Promise(function(resolve, reject) {
Expand Down

0 comments on commit 0135c3a

Please sign in to comment.