From 26e693961a5a080cd1ce1c11378666c009f7ac99 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Mon, 15 Feb 2016 13:04:58 -0500 Subject: [PATCH] refactor(messages): completely rework how messages are processed --- lib/new_types/message.js | 44 -- lib/receiver_link.js | 7 +- lib/sender_link.js | 14 +- lib/types.js | 7 +- lib/types/message.js | 516 +++++++---------------- lib/utilities.js | 8 + test/integration/qpid/client.test.js | 57 +-- test/unit/client.test.js | 27 +- test/unit/test_message_types.js | 23 - test/unit/types/message_sections.test.js | 390 +++++++++++++++++ 10 files changed, 600 insertions(+), 493 deletions(-) delete mode 100644 lib/new_types/message.js delete mode 100644 test/unit/test_message_types.js create mode 100644 test/unit/types/message_sections.test.js diff --git a/lib/new_types/message.js b/lib/new_types/message.js deleted file mode 100644 index f94e0d8..0000000 --- a/lib/new_types/message.js +++ /dev/null @@ -1,44 +0,0 @@ -'use strict'; -var types = require('./index'), - Message = module.exports = {}; - -Message.Header = types.defineComposite({ - name: 'header', code: 0x70, - fields: [ - { name: 'durable', type: 'boolean', default: false }, - { name: 'priority', type: 'ubyte', default: 4 }, - { name: 'ttl', type: 'milliseconds' }, - { name: 'firstAcquirer', type: 'boolean', default: false }, - { name: 'deliveryCount', type: 'uint', default: 0 } - ] -}); - -Message.DeliveryAnnotations = {}; // map -Message.MessageAnnotations = {}; // map - -Message.Properties = types.defineComposite({ - name: 'properties', code: 0x73, - fields: [ - { name: 'messageId', type: '*' }, - { name: 'userId', type: 'binary' }, - { name: 'to', type: 'string' }, - { name: 'subject', type: 'string' }, - { name: 'replyTo', type: 'string' }, - { name: 'correlationId', type: '*' }, - { name: 'contentType', type: 'symbol' }, - { name: 'contentEncoding', type: 'symbol' }, - { name: 'absoluteExpiryTime', type: 'timestamp' }, - { name: 'creationTime', type: 'timestamp' }, - { name: 'groupId', type: 'string' }, - { name: 'groupSequence', type: 'sequence-no' }, - { name: 'replyToGroupId', type: 'string' } - ] -}); - -Message.ApplicationProperties = {}; // map -Message.Data = {}; // binary -Message.AMQPSequence = {}; // list -Message.AMQPValue = {}; // * -Message.Footer = {}; // annotations - - diff --git a/lib/receiver_link.js b/lib/receiver_link.js index 217978d..d11b2ab 100644 --- a/lib/receiver_link.js +++ b/lib/receiver_link.js @@ -161,9 +161,10 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) { this._currentTransferFrame = null; } - var message = m.decodeMessagePayload(curFrame.payload); + var message = m.decodeMessage(curFrame.payload); // store deliveryId for later use - message._deliveryId = curFrame.deliveryId; + Object.defineProperty(message, '_deliveryId', { value: curFrame.deliveryId }); + this.linkCredit--; debug('Rx message ' + transferFrame.deliveryId + ' on ' + this.name + ', ' + this.linkCredit + ' credit, ' + this.session._sessionParams.incomingWindow + ' window left.'); // @todo Bump link credit based on strategy @@ -174,7 +175,7 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) { } // optionally decode message based on policy - var payload = message.body[0] || message.body; + var payload = message.body; message.body = this.policy.decoder ? this.policy.decoder(payload) : payload; debug('received from (' + this.name + '): ' + message.body); diff --git a/lib/sender_link.js b/lib/sender_link.js index d42d7f3..f45d950 100644 --- a/lib/sender_link.js +++ b/lib/sender_link.js @@ -9,11 +9,10 @@ var Builder = require('buffer-builder'), errors = require('./errors'), constants = require('./constants'), putils = require('./policies/policy_utilities'), - codec = require('./codec'), + m = require('./types/message'), u = require('./utilities'), DeliveryState = require('./types/delivery_state'), - M = require('./types/message'), Link = require('./link'); function SenderLink(session, handle, linkPolicy) { @@ -74,10 +73,15 @@ SenderLink.prototype.send = function(msg, options) { return new Promise(function(resolve, reject) { var message; if (u.isObject(msg) && msg.hasOwnProperty('body')) { - message = new M.Message(msg); + message = msg; message.body = self.policy.encoder(message.body); } else { - message = new M.Message(options, self.policy.encoder(msg)); + if (u.isObject(options)) { + options.body = self.policy.encoder(msg); + message = options; + } else { + message = { body: self.policy.encoder(msg) }; + } } var deliveryTag = self.session._deliveryTag++; @@ -152,7 +156,7 @@ SenderLink.prototype._sendMessage = function(message, options) { // pre-encode message to determine if multiple frames are required var messageBuilder = new Builder(); - codec.encode(message, messageBuilder); + m.encodeMessage(message, messageBuilder); var messageBuffer = messageBuilder.get(); // send the frame(s) diff --git a/lib/types.js b/lib/types.js index 16611c9..cc2c402 100644 --- a/lib/types.js +++ b/lib/types.js @@ -516,6 +516,7 @@ registerType('uuid', { registerType('binary', { encoder: function(val, bufb) { + val = (val instanceof Buffer) ? val : new Buffer(val); var code = (val.length <= 0xFF) ? 0xA0 : 0xB0; bufb.appendUInt8(code); types[code].encode(val, bufb); @@ -692,9 +693,9 @@ function mapBuilderForKeyType(keyType) { }; } -registerType('fields', { - encoder: mapBuilderForKeyType('symbol') -}); +registerType('fields', { encoder: mapBuilderForKeyType('symbol') }); +registerType('milliseconds', 'uint'); +registerType('sequence-no', 'uint'); types.defineComposite = require('./types/composite_type'); diff --git a/lib/types/message.js b/lib/types/message.js index 50c198c..16fb4a5 100644 --- a/lib/types/message.js +++ b/lib/types/message.js @@ -1,371 +1,145 @@ 'use strict'; -var util = require('util'), - - AMQPFields = require('./amqp_composites').Fields, - DescribedType = require('./described_type'), +var DescribedType = require('./described_type'), ForcedType = require('./forced_type'), u = require('../utilities'), errors = require('../errors'), codec = require('../codec'); -/** - * - * @param options - * @constructor - */ -function Header(options) { - Header.super_.call(this, Header); +var sectionByDescriptor = {}, + sectionByProperty = {}; - u.assignDefined(this, options, { - durable: u.onUndef(options.durable, false), - priority: u.onUndef(options.priority, 4), - ttl: options.ttl, - firstAcquirer: u.onUndef(options.firstAcquirer, false), - deliveryCount: u.onUndef(options.deliveryCount, 0) - }); +function defineSection(descriptor, definition) { + sectionByDescriptor[descriptor.code] = definition; + sectionByDescriptor[descriptor.name] = definition; } -util.inherits(Header, DescribedType); - -Header.prototype.Descriptor = { code: 0x70, name: 'amqp:header:list' }; -Header.prototype.EncodeOrdering = [ - 'durable', 'priority', 'ttl', 'firstAcquirer', 'deliveryCount' -]; - -Header.fromDescribedType = function(describedType) { - var options = {}; - u.assignFromDescribedType(Header, describedType, options); - - return new Header(options); -}; - -Header.prototype.getValue = function() { - var self = this; - return { - durable: self.durable, - priority: new ForcedType('ubyte', self.priority), - ttl: new ForcedType('uint', self.ttl || null), - firstAcquirer: self.firstAcquirer, - deliveryCount: new ForcedType('uint', self.deliveryCount), - encodeOrdering: Header.prototype.EncodeOrdering - }; -}; - -module.exports.Header = Header; - - - -/** - * - * @param annotations - * @constructor - */ -function DeliveryAnnotations(annotations) { - DeliveryAnnotations.super_.call(this, DeliveryAnnotations, annotations); -} - -util.inherits(DeliveryAnnotations, DescribedType); - -DeliveryAnnotations.prototype.Descriptor = { code: 0x71, name: 'amqp:delivery-annotations:map' }; -DeliveryAnnotations.fromDescribedType = function(describedType) { - return new DeliveryAnnotations(describedType.value); -}; - -DeliveryAnnotations.prototype.getValue = function() { - return this.value instanceof AMQPFields ? this.value : new AMQPFields(this.value); -}; - -module.exports.DeliveryAnnotations = DeliveryAnnotations; - - - -/** - * - * @param annotations - * @constructor - */ -function Annotations(annotations) { - Annotations.super_.call(this, Annotations, annotations); -} - -util.inherits(Annotations, DescribedType); - -Annotations.prototype.Descriptor = { code: 0x72, name: 'amqp:message-annotations:map' }; -Annotations.fromDescribedType = function(describedType) { - return new Annotations(describedType.value); -}; - -Annotations.prototype.getValue = function() { - return this.value instanceof AMQPFields ? this.value : new AMQPFields(this.value); -}; - -module.exports.Annotations = Annotations; +var isData = function(x) { return x instanceof Buffer; }; +defineSection({ code: 0x75, name: 'amqp:data:binary' }, { + encode: null, + decode: function(message, described) { + if (!message.hasOwnProperty('body')) message.body = []; + else if (!Array.isArray(message.body) || !message.body.every(isData)) { + throw new errors.MalformedPayloadError('Attempt to put both Data and non-Data payloads in message body'); + } + message.body.push(described.value); + } +}); +var isSequence = function(x) { return Array.isArray(x); }; +defineSection({ code: 0x76, name: 'amqp:amqp-sequence:list' }, { + encode: null, + decode: function(message, described) { + if (!message.hasOwnProperty('body')) message.body = []; + else if (!Array.isArray(message.body) || !message.body.every(isSequence)) { + throw new errors.MalformedPayloadError('Attempt to put both AMQPSequence and non-AMQPSequence payloads in message body'); + } -/** - * - * @param options - * @constructor - */ -function Properties(options) { - Properties.super_.call(this, Properties); + message.body.push(described.value); + } +}); - u.assignDefined(this, options, { - messageId: options.messageId, - userId: u.coerce(options.userId, Buffer), - to: options.to, - subject: options.subject, - replyTo: options.replyTo, - correlationId: options.correlationId, - contentType: options.contentType, - contentEncoding: options.contentEncoding, - absoluteExpiryTime: options.absoluteExpiryTime, - creationTime: options.creationTime, - groupId: options.groupId, - groupSequence: options.groupSequence, - replyToGroupId: options.replyToGroupId - }); -} +defineSection({ code: 0x77, name: 'amqp:value:*' }, { + encode: null, + decode: function(message, described) { + if (message.body && Array.isArray(message.body)) { + throw new errors.MalformedPayloadError('Attempt to provide more than one AMQPValue for message body'); + } -util.inherits(Properties, DescribedType); + message.body = described.value; + } +}); -Properties.prototype.Descriptor = { code: 0x73, name: 'amqp:properties:list' }; -Properties.prototype.EncodeOrdering = [ - 'messageId', 'userId', 'to', 'subject', 'replyTo', 'correlationId', - 'contentType', 'contentEncoding', 'absoluteExpiryTime', 'creationTime', - 'groupId', 'groupSequence', 'replyToGroupId' -]; +function defineCompositeSection(definition) { + var sectionDefinition = { + descriptor: definition.code, + encode: function(value, buffer) { + var _len = definition.fields.length, result = []; + for (var i = 0; i < _len; ++i) { + var field = definition.fields[i]; + if (value && value.hasOwnProperty(field.name)) { + result[i] = field.type !== '*' ? new ForcedType(field.type, value[field.name]) : value[field.name]; + } else if (field.hasOwnProperty('default')) { + result[i] = field.type !== '*' ? new ForcedType(field.type, field.default) : field.default; + } else { + result[i] = null; + } + } -Properties.fromDescribedType = function(describedType) { - var options = {}; - u.assignFromDescribedType(Properties, describedType, options); - return new Properties(options); -}; + return result; + }, + decode: function(message, described) { + var _len = definition.fields.length, data = {}; + for (var i = 0; i < _len; ++i) { + var field = definition.fields[i]; + data[field.name] = described.value[i]; + } -Properties.prototype.getValue = function() { - var self = this; - return { - messageId: u.orNull(self.messageId), - userId: u.orNull(self.userId), - to: u.orNull(self.to), - subject: u.orNull(self.subject), - replyTo: u.orNull(self.replyTo), - correlationId: u.orNull(self.correlationId), - contentType: (self.contentType === undefined) ? null : self.contentType, - contentEncoding: (self.contentEncoding === undefined) ? null : self.contentEncoding, - absoluteExpiryTime: (self.absoluteExpiryTime === undefined) ? null : new ForcedType('timestamp', self.absoluteExpiryTime), - creationTime: (self.creationTime === undefined) ? null : new ForcedType('timestamp', self.creationTime), - groupId: u.orNull(self.groupId), - groupSequence: (self.groupSequence === undefined) ? null : new ForcedType('uint', self.groupSequence), - replyToGroupId: u.orNull(self.replyToGroupId), - encodeOrdering: Properties.prototype.EncodeOrdering + message[definition.name] = data; + } }; -}; - -module.exports.Properties = Properties; - - - -/** - * - * @param properties - * @constructor - */ -function ApplicationProperties(properties) { - ApplicationProperties.super_.call(this, ApplicationProperties, properties); - - if (!(properties instanceof Object)) - throw errors.MalformedPayloadError('invalid application properties: ', properties); - - var _keys = Object.keys(properties), _len = _keys.length; - for (var i = 0; i < _len; ++i) this[_keys[i]] = properties[_keys[i]]; -} - -util.inherits(ApplicationProperties, DescribedType); - -ApplicationProperties.prototype.Descriptor = { code: 0x74, name: 'amqp:application-properties:map' }; -ApplicationProperties.fromDescribedType = function(describedType) { - return new ApplicationProperties(describedType.value); -}; - -module.exports.ApplicationProperties = ApplicationProperties; - - -/** - * - * @param map - * @constructor - */ -function Footer(map) { - Footer.super_.call(this, Footer, map); + var symbolicDescriptor = 'amqp:' + definition.name + ':list'; + sectionByDescriptor[definition.code] = sectionDefinition; + sectionByDescriptor[symbolicDescriptor] = sectionDefinition; + sectionByProperty[definition.name] = sectionDefinition; } -util.inherits(Footer, DescribedType); - -Footer.prototype.Descriptor = { code: 0x78, name: 'amqp:footer:map' }; -Footer.fromDescribedType = function(describedType) { - return new Footer(describedType.value); -}; - -module.exports.Footer = Footer; - - - -/** - * - * @param data - * @constructor - */ -function Data(data) { - Data.super_.call(this, Data, data); -} - -util.inherits(Data, DescribedType); - -Data.prototype.Descriptor = { code: 0x75, name: 'amqp:data:binary' }; -Data.fromDescribedType = function(describedType) { - return new Data(describedType.value); -}; - -module.exports.Data = Data; - - - -/** - * - * @param values - * @constructor - */ -function AMQPSequence(values) { - AMQPSequence.super_.call(this, AMQPSequence, values); -} - -util.inherits(AMQPSequence, DescribedType); - -AMQPSequence.prototype.Descriptor = { code: 0x76, name: 'amqp:amqp-sequence:list' }; -AMQPSequence.fromDescribedType = function(describedType) { - return new AMQPSequence(describedType.value); -}; - -module.exports.AMQPSequence = AMQPSequence; - - - -/** - * - * @param value - * @constructor - */ -function AMQPValue(value) { - AMQPValue.super_.call(this, AMQPValue, value); -} - -util.inherits(AMQPValue, DescribedType); - -AMQPValue.prototype.Descriptor = { code: 0x77, name: 'amqp:amqp-value:*' }; -AMQPValue.fromDescribedType = function(describedType) { - return new AMQPValue(describedType.value); -}; - -module.exports.AMQPValue = AMQPValue; - +defineCompositeSection({ + name: 'header', code: 0x70, + fields: [ + { name: 'durable', type: 'boolean', default: false }, + { name: 'priority', type: 'ubyte', default: 4 }, + { name: 'ttl', type: 'milliseconds' }, + { name: 'firstAcquirer', type: 'boolean', default: false }, + { name: 'deliveryCount', type: 'uint', default: 0 } + ] +}); +defineCompositeSection({ + name: 'properties', code: 0x73, + fields: [ + { name: 'messageId', type: '*' }, + { name: 'userId', type: 'binary' }, + { name: 'to', type: 'string' }, + { name: 'subject', type: 'string' }, + { name: 'replyTo', type: 'string' }, + { name: 'correlationId', type: '*' }, + { name: 'contentType', type: 'symbol' }, + { name: 'contentEncoding', type: 'symbol' }, + { name: 'absoluteExpiryTime', type: 'timestamp' }, + { name: 'creationTime', type: 'timestamp' }, + { name: 'groupId', type: 'string' }, + { name: 'groupSequence', type: 'sequence-no' }, + { name: 'replyToGroupId', type: 'string' } + ] +}); -/** - * Actual AMQP Message, which as defined by the spec looks like: -
-                                                      Bare Message
-                                                            |
-                                      .---------------------+--------------------.
-                                      |                                          |
- +--------+-------------+-------------+------------+--------------+--------------+--------+
- | header | delivery-   | message-    | properties | application- | application- | footer |
- |        | annotations | annotations |            | properties   | data         |        |
- +--------+-------------+-------------+------------+--------------+--------------+--------+
- |                                                                                        |
- '-------------------------------------------+--------------------------------------------'
-                                             |
-                                      Annotated Message
- 
- * - * The message _may_ contain the sections above, and application data _may_ be repeated, as follows: - * - * * Zero or one {@link Header} sections. - * * Zero or one {@link DeliveryAnnotations} sections. - * * Zero or one {@link Annotations} sections. - * * Zero or one {@link Properties} sections. - * * Zero or one {@link ApplicationProperties} sections. - * * The body consists of either: one or more {@link Data} sections, one or more {@link AMQPSequence} sections, - * or a single {@link AMQPValue} section. - * * Zero or one {@link Footer} sections. - * - * @param contents - * @param body - * @constructor - */ -function Message(contents, body) { - contents = contents || {}; - u.assignDefined(this, contents, { - header: u.coerce(contents.header, Header), - deliveryAnnotations: u.coerce(contents.deliveryAnnotations, DeliveryAnnotations), - annotations: u.coerce(contents.annotations, Annotations), - properties: u.coerce(contents.properties, Properties), - applicationProperties: u.coerce(contents.applicationProperties, ApplicationProperties), - footer: u.coerce(contents.footer, Footer) - }); +function defineMapSection(definition) { + var property = u.camelCase(definition.name); + var sectionDefinition = { + descriptor: definition.code, source: !!definition.source ? definition.source : 'map', + decode: function(message, described) { + message[property] = described.value; + } + }; - this.body = contents.body || body; + var symbolicDescriptor = 'amqp:' + definition.name + ':map'; + sectionByDescriptor[definition.code] = sectionDefinition; + sectionByDescriptor[symbolicDescriptor] = sectionDefinition; + sectionByProperty[property] = sectionDefinition; } -Message.prototype.encode = function(codec, buf) { - if (this.header) codec.encode(this.header, buf); - if (this.deliveryAnnotations) codec.encode(this.deliveryAnnotations, buf); - if (this.annotations) codec.encode(this.annotations, buf); - if (this.properties) codec.encode(this.properties, buf); - if (this.applicationProperties) codec.encode(this.applicationProperties, buf); - if (this.body instanceof Buffer) { - codec.encode(new Data(this.body), buf); - } else if (Array.isArray(this.body)) { - codec.encode(new AMQPSequence(this.body), buf); - } else { - codec.encode(new AMQPValue(this.body), buf); - } - - if (this.footer) codec.encode(this.footer, buf); -}; - -module.exports.Message = Message; - -var _possibleFields = { - 'header': Header, - 'footer': Footer, - 'deliveryAnnotations': DeliveryAnnotations, - 'annotations': Annotations, - 'properties': Properties, - 'applicationProperties': ApplicationProperties -}; - -var sectionByDescriptor = {}; +defineMapSection({ name: 'delivery-annotations', code: 0x71, source: 'fields' }); +defineMapSection({ name: 'message-annotations', code: 0x72, source: 'fields' }); +defineMapSection({ name: 'application-properties', code: 0x74, source: 'map' }); +defineMapSection({ name: 'footer', code: 0x78 , source: 'map' }); -[ - Header, DeliveryAnnotations, Annotations, - Properties, ApplicationProperties, Footer, - Data, AMQPSequence, AMQPValue -].forEach(function(section) { - sectionByDescriptor[section.prototype.Descriptor.code] = section; - sectionByDescriptor[section.prototype.Descriptor.name] = section; -}); - -module.exports.decodeMessagePayload = function(buffer) { - var message = new Message(); - var body = []; +module.exports.decodeMessage = function(buffer) { + var message = {}; var curIdx = 0; - var isData = function(x) { return x instanceof Data; }; - var isSequence = function(x) { return x instanceof AMQPSequence; }; while (curIdx < buffer.length) { var decoded = codec.decode(buffer, curIdx); if (!decoded) { @@ -377,49 +151,43 @@ module.exports.decodeMessagePayload = function(buffer) { throw new errors.MalformedPayloadError('Unknown section: ', described); } - var section = sectionByDescriptor[described.descriptor].fromDescribedType(described); - + var sectionDefinition = sectionByDescriptor[described.descriptor]; curIdx += decoded[1]; - var matched = false; - for (var fieldName in _possibleFields) { - if (section instanceof _possibleFields[fieldName]) { - if (message.hasOwnProperty(fieldName)) { - throw new errors.MalformedPayloadError('Duplicate ' + fieldName + ' section in message'); - } - message[fieldName] = section; - matched = true; - break; - } - } + sectionDefinition.decode(message, described); + } - if (!matched) { - // Part of the body - if (section instanceof Data) { - if (body.length && !body.every(isData)) { - throw new errors.MalformedPayloadError('Attempt to put both Data and non-Data payloads in message body'); - } + if (Array.isArray(message.body) && message.body.length === 1) + message.body = message.body[0]; + return message; +}; - body.push(section); - } else if (section instanceof AMQPSequence) { - if (body.length && !body.every(isSequence)) { - throw new errors.MalformedPayloadError('Attempt to put both AMQPSequence and non-AMQPSequence payloads in message body'); - } +module.exports.encodeMessage = function(message, buffer) { + var _keys = Object.keys(sectionByProperty), _len = _keys.length; + for (var i = 0; i < _len; ++i) { + var property = _keys[i], definition = sectionByProperty[property]; + if (message.hasOwnProperty(property)) { + var section = message[property]; + if (definition.encode && typeof definition.encode === 'function') { + section = definition.encode(section); + } - body.push(section); - } else if (section instanceof AMQPValue) { - if (body.length) { - throw new errors.MalformedPayloadError('Attempt to provide more than one AMQPValue for message body'); - } + var value = definition.hasOwnProperty('source') ? + new ForcedType(definition.source, section) : section; + var described = new DescribedType(definition.descriptor, value); + codec.encode(described, buffer); + } + } - body.push(section); - } else { - throw new errors.MalformedPayloadError('Unknown message contents: ' + JSON.stringify(section)); - } + if (!!message.body) { + if (message.body instanceof Buffer) { + codec.encode(new DescribedType(0x75, message.body), buffer); // Data + } else if (Array.isArray(message.body)) { + codec.encode(new DescribedType(0x76, message.body), buffer); // AMQPSequence + } else { + codec.encode(new DescribedType(0x77, message.body), buffer); // AMQPValue } } - // Pull out the values. - message.body = body.map(function(x) { return x.getValue(); }); - return message; + return buffer; }; diff --git a/lib/utilities.js b/lib/utilities.js index bf989ed..da6cb10 100644 --- a/lib/utilities.js +++ b/lib/utilities.js @@ -277,3 +277,11 @@ utilities.assignFromDescribedType = function(Type, describedType, dest, defaults if (defaults !== undefined) _.defaults(dest, defaults); // NOTE: this mutates the dest object }; + +utilities.camelCase = function(name) { + return name.toLowerCase() + .replace(/[-_]+/g, ' ') + .replace(/[^\w\s]/g, '') + .replace(/ (.)/g, function($1) { return $1.toUpperCase(); }) + .replace(/ /g, '' ); +}; diff --git a/test/integration/qpid/client.test.js b/test/integration/qpid/client.test.js index c20db27..e4a02ff 100644 --- a/test/integration/qpid/client.test.js +++ b/test/integration/qpid/client.test.js @@ -1,7 +1,6 @@ 'use strict'; -var AMQPClient = require('../../..').Client, - Message = require('../../../lib/types/message'), - Promise = require('bluebird'), +var Promise = require('bluebird'), + AMQPClient = require('../../..').Client, config = require('./config'), expect = require('chai').expect; @@ -93,11 +92,11 @@ describe('Client', function() { describe('Messages', function() { [ { - option: 'properties', type: Message.Properties, - options: { + section: 'properties', + data: { properties: { messageId: 42, - userId: 'user', + userId: new Buffer('user'), to: 'mom', subject: 'hello!', replyTo: 'amq.topic', @@ -116,16 +115,16 @@ describe('Client', function() { } }, { - option: 'applicationProperties', type: Message.ApplicationProperties, - options: { + section: 'applicationProperties', + data: { applicationProperties: { something: "special" } } }, { - option: 'annotations', type: Message.Annotations, - options: { + section: 'messageAnnotations', + data: { annotations: { "x-foo" : 5, "x-bar" : "wibble" @@ -133,8 +132,8 @@ describe('Client', function() { } }, { - option: 'deliveryAnnotations', type: Message.DeliveryAnnotations, - options: { + section: 'deliveryAnnotations', + data: { deliveryAnnotations: { "x-foo" : 5, "x-bar" : "wibble" @@ -142,19 +141,21 @@ describe('Client', function() { } }, { - option: 'header', type: Message.Header, - options: { + section: 'header', + data: { header: { durable: true, priority: 2, ttl: 150, - firstAcquirer: true + firstAcquirer: true, + + deliveryCount: undefined // TODO: what is going on here? } } }, { - option: 'footer', type: Message.Footer, - options: { + section: 'footer', + data: { footer: { "x-foo" : 5, "x-bar" : "wibble" @@ -162,7 +163,7 @@ describe('Client', function() { } } ].forEach(function(testCase) { - it('should send and receive ' + testCase.option + ' options', function(done) { + it('should send and receive ' + testCase.section + ' sections', function(done) { test.client.connect(config.address) .then(function() { return Promise.all([ @@ -173,25 +174,27 @@ describe('Client', function() { .spread(function(receiver, sender) { receiver.on('message', function(message) { expect(message).to.exist; - var expected = new testCase.type(testCase.options[testCase.option]); - if (testCase.option === 'header') { + // console.log('received: ', message); + + var expected = testCase.data[testCase.section]; + if (testCase.section === 'header') { // NOTE: this is flakey because the TTL will be decremented by // the server. So, pull it out, check that its close and delete - expect(message[testCase.option].ttl).to.be.closeTo(149, 5); + expect(message[testCase.section].ttl).to.be.closeTo(149, 5); delete expected.ttl; - delete message[testCase.option].ttl; - } else if (testCase.option === 'properties') { - message[testCase.option].absoluteExpiryTime = - message[testCase.option].absoluteExpiryTime.getTime(); + delete message[testCase.section].ttl; + } else if (testCase.section === 'properties') { + message[testCase.section].absoluteExpiryTime = + message[testCase.section].absoluteExpiryTime.getTime(); expected.creationTime = new Date(expected.creationTime); } - expect(message[testCase.option]).to.eql(expected); + expect(message[testCase.section]).to.eql(expected); done(); }); - return sender.send('test-' + testCase.option, testCase.options); + return sender.send('test-' + testCase.section, testCase.data); }); }); }); diff --git a/test/unit/client.test.js b/test/unit/client.test.js index 5db11a7..47c278f 100644 --- a/test/unit/client.test.js +++ b/test/unit/client.test.js @@ -11,8 +11,7 @@ var _ = require('lodash'), DefaultPolicy = require('../../lib/policies/default_policy'), AMQPError = require('../../lib/types/amqp_error'), - M = require('../../lib/types/message'), - Codec = require('../../lib/codec'), + m = require('../../lib/types/message'), DeliveryState = require('../../lib/types/delivery_state'), test = require('./test-fixture'); @@ -21,6 +20,12 @@ DefaultPolicy.connect.options.containerId = 'test'; DefaultPolicy.reconnect.retries = 0; DefaultPolicy.reconnect.forever = false; +function encodeMessagePayload(message) { + var tmpBuf = new Builder(); + m.encodeMessage(message, tmpBuf); + return tmpBuf.get(); +} + describe('Client', function() { describe('#connect()', function() { beforeEach(function() { @@ -58,10 +63,8 @@ describe('Client', function() { }); it('should connect and receive', function(done) { - var message = new M.Message({}, { test: 'testing' }); - var tmpBuf = new Builder(); - Codec.encode(message, tmpBuf); - var messageBuf = tmpBuf.get(); + var message = { body: { test: 'testing' } }; + var messageBuf = encodeMessagePayload(message); test.server.setResponseSequence([ constants.amqpVersion, new frames.OpenFrame(DefaultPolicy.connect.options), @@ -105,10 +108,8 @@ describe('Client', function() { }); it('should receive multi-frame messages', function(done) { - var message = new M.Message({}, { test: 'Really long message' }); - var tmpBuf = new Builder(); - Codec.encode(message, tmpBuf); - var messageBuf = tmpBuf.get(); + var message = { body: { test: 'Really long message' } }; + var messageBuf = encodeMessagePayload(message); var buf1 = messageBuf.slice(0, 10); var buf2 = messageBuf.slice(10, 15); var buf3 = messageBuf.slice(15); @@ -209,10 +210,8 @@ describe('Client', function() { // build our expected buffer segments var messageData = new Array(2048).join('0'); - var message = new M.Message({ body: messageData }); - var codecBuffer = new Builder(); - Codec.encode(message, codecBuffer); - var messageBuffer = codecBuffer.get(); + var message = { body: messageData }; + var messageBuffer = encodeMessagePayload(message); // ensure expected frames are broken up the same way we break them up var deliveryTag = new Buffer(Number(1).toString()); diff --git a/test/unit/test_message_types.js b/test/unit/test_message_types.js deleted file mode 100644 index d907b54..0000000 --- a/test/unit/test_message_types.js +++ /dev/null @@ -1,23 +0,0 @@ -'use strict'; - -var expect = require('chai').expect, - Int64 = require('node-int64'), - M = require('../../lib/types/message'); - -describe('MessageTypes', function() { - describe('Properties', function() { - it('should encode user-id properly', function() { - var props = new M.Properties({ userId: new Buffer(123) }); - expect(props.userId, 'From Buffer').to.be.an.instanceof(Buffer); - - props = new M.Properties({ userId: 123 }); - expect(props.userId, 'From int').to.be.an.instanceof(Buffer); - - props = new M.Properties({ userId: "123" }); - expect(props.userId, 'From string').to.be.an.instanceof(Buffer); - - props = new M.Properties({ userId: new Int64(12, 34)}); - expect(props.userId, 'From int64').to.be.an.instanceof(Buffer); - }); - }); -}); diff --git a/test/unit/types/message_sections.test.js b/test/unit/types/message_sections.test.js new file mode 100644 index 0000000..6ec9b74 --- /dev/null +++ b/test/unit/types/message_sections.test.js @@ -0,0 +1,390 @@ +'use strict'; +var Builder = require('buffer-builder'), + m = require('../../../lib/types/message'), + tu = require('../testing_utils'), + expect = require('chai').expect; + +describe('Message Sections', function() { +describe('Header', function() { + it('should encode the section', function() { + var message = { + header: { + durable: true, + priority: 2, + ttl: 150, + firstAcquirer: true, + deliveryCount: 0 + } + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x70, + 0xc0, 0x08, 0x05, + 0x41, + 0x50, 0x02, + 0x52, 0x96, + 0x41, + 0x43 + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(expected).to.eql(actual.get()); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x70, + 0xc0, 0x0c, 0x05, + 0x41, + 0x50, 0x02, + 0x52, 0x96, + 0x41, + 0x70, 0x00, 0x00, 0x00, 0x00 + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('header'); + expect(message.header).to.eql({ + durable: true, + priority: 2, + ttl: 150, + firstAcquirer: true, + deliveryCount: 0 + }); + }); +}); // Header + +describe('DeliveryAnnotations', function() { + it('should encode the section', function() { + var message = { + deliveryAnnotations: { + 'x-foo' : 5, + 'x-bar' : 'wibble' + } + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x71, + 0xc1, 0x19, 0x04, + 0xa3, 0x05, 0x78, 0x2d, 0x66, 0x6f, 0x6f, + 0x54, 0x05, + 0xa3, 0x05, 0x78, 0x2d, 0x62, 0x61, 0x72, + 0xa1, 0x06, 0x77, 0x69, 0x62, 0x62, 0x6c, 0x65 + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(expected).to.eql(actual.get()); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x71, + 0xc1, 0x19, 0x04, + 0xa3, 0x05, 0x78, 0x2d, 0x66, 0x6f, 0x6f, + 0x54, 0x05, + 0xa3, 0x05, 0x78, 0x2d, 0x62, 0x61, 0x72, + 0xa1, 0x06, 0x77, 0x69, 0x62, 0x62, 0x6c, 0x65 + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('deliveryAnnotations'); + expect(message.deliveryAnnotations).to.eql({ + 'x-foo' : 5, + 'x-bar' : 'wibble' + }); + }); +}); // DeliveryAnnotations + +describe('MessageAnnotations', function() { + it('should encode the section', function() { + var message = { + messageAnnotations: { + 'x-foo' : 5, + 'x-bar' : 'wibble' + } + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x72, + 0xc1, 0x19, 0x04, + 0xa3, 0x05, 0x78, 0x2d, 0x66, 0x6f, 0x6f, + 0x54, 0x05, + 0xa3, 0x05, 0x78, 0x2d, 0x62, 0x61, 0x72, + 0xa1, 0x06, 0x77, 0x69, 0x62, 0x62, 0x6c, 0x65 + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(expected).to.eql(actual.get()); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x72, + 0xc1, 0x19, 0x04, + 0xa3, 0x05, 0x78, 0x2d, 0x66, 0x6f, 0x6f, + 0x54, 0x05, + 0xa3, 0x05, 0x78, 0x2d, 0x62, 0x61, 0x72, + 0xa1, 0x06, 0x77, 0x69, 0x62, 0x62, 0x6c, 0x65 + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('messageAnnotations'); + expect(message.messageAnnotations).to.eql({ + 'x-foo' : 5, + 'x-bar' : 'wibble' + }); + }); +}); // MessageAnnotations + +describe('Properties', function() { + it('should encode the section', function() { + var message = { + properties: { + messageId: 42, + userId: new Buffer('user'), + to: 'mom', + subject: 'hello!', + replyTo: 'amq.topic', + correlationId: 'msg-001', + contentType: 'text/plain', + contentEncoding: 'UTF-8', + groupId: 'group-one', + groupSequence: 2, + replyToGroupId: 'group-two', + absoluteExpiryTime: new Date('1970-01-17T20:17:59.232Z'), + creationTime: new Date('2016-02-14T19:47:12.198Z') + } + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x73, + 0xc0, 0x67, 0x0d, + 0x54, 0x2a, + 0xa0, 0x04, 0x75, 0x73, 0x65, 0x72, + 0xa1, 0x03, 0x6d, 0x6f, 0x6d, + 0xa1, 0x06, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, + 0xa1, 0x09, 0x61, 0x6d, 0x71, 0x2e, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0xa1, 0x07, 0x6d, 0x73, 0x67, 0x2d, 0x30, 0x30, 0x31, + 0xa3, 0x0a, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, 0x61, 0x69, 0x6e, + 0xa3, 0x05, 0x55, 0x54, 0x46, 0x2d, 0x38, + 0x83, 0x00, 0x00, 0x00, 0x00, 0x56, 0xc0, 0xd9, 0xc0, + 0x83, 0x00, 0x00, 0x01, 0x52, 0xe1, 0x52, 0x96, 0xc6, + 0xa1, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x2d, 0x6f, 0x6e, 0x65, + 0x52, 0x02, + 0xa1, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x2d, 0x74, 0x77, 0x6f + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(expected).to.eql(actual.get()); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x73, + 0xc0, 0x67, 0x0d, + 0x54, 0x2a, + 0xa0, 0x04, 0x75, 0x73, 0x65, 0x72, + 0xa1, 0x03, 0x6d, 0x6f, 0x6d, + 0xa1, 0x06, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x21, + 0xa1, 0x09, 0x61, 0x6d, 0x71, 0x2e, 0x74, 0x6f, 0x70, 0x69, 0x63, + 0xa1, 0x07, 0x6d, 0x73, 0x67, 0x2d, 0x30, 0x30, 0x31, + 0xa3, 0x0a, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, 0x61, 0x69, 0x6e, + 0xa3, 0x05, 0x55, 0x54, 0x46, 0x2d, 0x38, + 0x83, 0x00, 0x00, 0x00, 0x00, 0x56, 0xc0, 0xd9, 0xc0, + 0x83, 0x00, 0x00, 0x01, 0x52, 0xe1, 0x52, 0x96, 0xc6, + 0xa1, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x2d, 0x6f, 0x6e, 0x65, + 0x52, 0x02, + 0xa1, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x2d, 0x74, 0x77, 0x6f + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('properties'); + expect(message.properties).to.eql({ + messageId: 42, + userId: new Buffer('user'), + to: 'mom', + subject: 'hello!', + replyTo: 'amq.topic', + correlationId: 'msg-001', + contentType: 'text/plain', + contentEncoding: 'UTF-8', + groupId: 'group-one', + groupSequence: 2, + replyToGroupId: 'group-two', + absoluteExpiryTime: new Date('1970-01-17T20:17:59.232Z'), + creationTime: new Date('2016-02-14T19:47:12.198Z') + }); + }); +}); // Properties + +describe('ApplicationProperties', function() { + it('should encode the section', function() { + var message = { + applicationProperties: { + something: 'special' + } + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x74, + 0xc1, 0x15,0x02, + 0xa1, 0x09, 0x73, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x69, 0x6e, 0x67, + 0xa1, 0x07, 0x73, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(expected).to.eql(actual.get()); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x74, + 0xc1, 0x15,0x02, + 0xa1, 0x09, 0x73, 0x6f, 0x6d, 0x65, 0x74, 0x68, 0x69, 0x6e, 0x67, + 0xa1, 0x07, 0x73, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('applicationProperties'); + expect(message.applicationProperties).to.eql({ + something: 'special' + }); + }); +}); // ApplicationProperties + +describe('Footer', function() { + it('should encode the section', function() { + var message = { + footer: { + 'x-foo' : 5, + 'x-bar' : 'wibble' + } + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x78, + 0xc1, 0x19, 0x04, + 0xa1, 0x05, 0x78, 0x2d, 0x66, 0x6f, 0x6f, + 0x54, 0x05, + 0xa1, 0x05, 0x78, 0x2d, 0x62, 0x61, 0x72, + 0xa1, 0x06, 0x77, 0x69, 0x62, 0x62, 0x6c, 0x65 + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(actual.get()).to.eql(expected); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x78, + 0xc1, 0x19, 0x04, + 0xa1, 0x05, 0x78, 0x2d, 0x66, 0x6f, 0x6f, + 0x54, 0x05, + 0xa1, 0x05, 0x78, 0x2d, 0x62, 0x61, 0x72, + 0xa1, 0x06, 0x77, 0x69, 0x62, 0x62, 0x6c, 0x65 + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('footer'); + expect(message.footer).to.eql({ + 'x-foo' : 5, + 'x-bar' : 'wibble' + }); + }); +}); // Footer + +describe('Data', function() { + it('should encode the section', function() { + var message = { + body: new Buffer('this is a test') + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x75, + 0xa0, 0x0e, 0x74, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x20, 0x74, 0x65, 0x73, 0x74 + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(actual.get()).to.eql(expected); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x75, + 0xa0, 0x0e, 0x74, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x20, 0x74, 0x65, 0x73, 0x74 + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('body'); + expect(message.body).to.eql(new Buffer('this is a test')); + }); +}); // Data + +describe('AMQPSequence', function() { + it('should encode the section', function() { + var message = { + body: [123, 456] + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x76, + 0xc0, 0x08, 0x02, + 0x54, 0x7b, + 0x71, 0x00, 0x00, 0x01, 0xc8 + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(actual.get()).to.eql(expected); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x76, + 0xc0, 0x0b, 0x02, + 0x71, 0x00, 0x00, 0x00, 0x7b, + 0x71, 0x00, 0x00, 0x01, 0xc8 + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('body'); + expect(message.body).to.eql([ 123, 456 ]); + }); +}); // AMQPSequence + +describe('AMQPValue', function() { + it('should encode the section', function() { + var message = { + body: 'this is a test' + }; + + var expected = tu.buildBuffer([ + 0x00, 0x53, 0x77, + 0xa1, 0x0e, 0x74, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x20, 0x74, 0x65, 0x73, 0x74 + ]); + + var actual = new Builder(); + m.encodeMessage(message, actual); + expect(actual.get()).to.eql(expected); + }); + + it('should decode the section', function() { + var buffer = tu.newBuffer([ + 0x00, 0x53, 0x77, + 0xa1, 0x0e, 0x74, 0x68, 0x69, 0x73, 0x20, 0x69, 0x73, 0x20, 0x61, 0x20, 0x74, 0x65, 0x73, 0x74 + ]); + + var message = m.decodeMessage(buffer); + expect(message).to.have.keys('body'); + expect(message.body).to.eql('this is a test'); + }); +}); // AMQPValue + +}); // Message Sections