Skip to content

Commit

Permalink
feat(ReceiverLink): emit transfer frame optionally on message
Browse files Browse the repository at this point in the history
This gives users the option of deeper knowledge of the internals of
the transfer frame used to extract the message body.
  • Loading branch information
mbroadst committed Oct 13, 2016
1 parent 450028e commit cd59fec
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 1 deletion.
2 changes: 1 addition & 1 deletion lib/receiver_link.js
Expand Up @@ -166,7 +166,7 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) {
message.body = this.policy.decoder ? this.policy.decoder(message.body) : message.body;
debug('received from (' + this.name + '): ' + message.body);

this.emit(Link.MessageReceived, message);
this.emit(Link.MessageReceived, message, curFrame);
};

ReceiverLink.prototype._dispositionReceived = function(details) {
Expand Down
22 changes: 22 additions & 0 deletions test/integration/qpid/receiver_link.test.js
Expand Up @@ -75,5 +75,27 @@ describe('ReceiverLink', function() {
});
});

it('should emit the transfer frame, as well as message', function(done) {
var deliveryTag = new Buffer([0x00, 0x00, 0x00, 0x00]);
test.client.connect(config.address)
.then(function() {
return Promise.all([
test.client.createReceiver('amq.topic'),
test.client.createSender('amq.topic')
]);
})
.spread(function(receiver, sender) {
receiver.on('message', function(msg, frame) {
expect(msg).to.exist;
expect(msg.body).to.eql('test message');
expect(frame).to.exist;
expect(frame.deliveryTag).to.eql(deliveryTag);
done();
});

return sender.send('test message');
});
});

}); // ReceiverLink
}); // QPID
92 changes: 92 additions & 0 deletions test/unit/receiver_link.test.js
@@ -0,0 +1,92 @@
'use strict';

var expect = require('chai').expect,
Builder = require('buffer-builder'),
AMQPClient = require('../../lib').Client,
MockServer = require('./mocks').Server,

constants = require('../../lib/constants'),
frames = require('../../lib/frames'),

Policy = require('../../lib/policies/policy'),
AMQPError = require('../../lib/types/amqp_error'),
ErrorCondition = require('../../lib/types/error_condition'),
m = require('../../lib/types/message'),

test = require('./test-fixture');

var TestPolicy = new Policy({
connect: { options: { containerId: 'test' } },
reconnect: { retries: 0, forever: false }
});

function encodeMessagePayload(message) {
var tmpBuf = new Builder();
m.encodeMessage(message, tmpBuf);
return tmpBuf.get();
}

describe('ReceiverLink', function() {
beforeEach(function() {
if (!!test.server) test.server = undefined;
if (!!test.client) test.client = undefined;
test.client = new AMQPClient(TestPolicy);
test.server = new MockServer();
return test.server.setup();
});

afterEach(function() {
if (!test.server) return;
return test.server.teardown()
.then(function() {
test.server = undefined;
});
});

it('should emit optional transfer frames with `message` event', function(done) {
var message = { body: { test: 'testing' } };
var messageBuf = encodeMessagePayload(message);
test.server.setResponseSequence([
constants.amqpVersion,
new frames.OpenFrame(test.client.policy.connect.options),
new frames.BeginFrame({
remoteChannel: 1, nextOutgoingId: 0,
incomingWindow: 2147483647, outgoingWindow: 2147483647,
handleMax: 4294967295
}),
function(prev) {
var rxAttach = frames.readFrame(prev[prev.length-1]);
return new frames.AttachFrame({
name: rxAttach.name, handle: 1,
role: constants.linkRole.sender,
source: {}, target: {},
initialDeliveryCount: 0
});
},
function(prev) {
var txFrame = new frames.TransferFrame({ handle: 1, deliveryId: 1, deliveryTag: 'llamas' });
txFrame.payload = messageBuf;
return txFrame;
},
new frames.CloseFrame({
error: new AMQPError({ condition: ErrorCondition.ConnectionForced, description: 'test' })
})
]);

test.client.connect(test.server.address())
.then(function() { return test.client.createReceiver('testing'); })
.then(function(receiver) {
receiver.on('message', function(msg, frame) {
expect(msg.body).not.to.be.null;
expect(msg.body.test).to.eql('testing');
expect(frame).to.not.be.null;
expect(frame).to.be.an.instanceOf(frames.TransferFrame);
expect(frame.deliveryTag).to.eql(new Buffer('llamas'));

test.client.disconnect().then(function() {
done();
});
});
});
});
});

0 comments on commit cd59fec

Please sign in to comment.