Skip to content

Commit

Permalink
refactor(known-types): remove known types, move message decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Feb 16, 2016
1 parent 6e68562 commit 276d6b6
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 98 deletions.
5 changes: 1 addition & 4 deletions lib/codec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ var Int64 = require('node-int64'),
AMQPArray = require('./types/amqp_composites').Array,
DescribedType = require('./types/described_type'),
ForcedType = require('./types/forced_type'),
knownTypes = require('./types/known_type_converter'),

types = require('./types'),
errors = require('./errors');
Expand Down Expand Up @@ -105,8 +104,7 @@ Codec.prototype._asMostSpecific = function(buf, forcedCode) {
var descriptor = this._asMostSpecific(buf[0]);
var value = this._asMostSpecific(buf[1]);
var describedType = new DescribedType(descriptor, value);
var asKnownType = knownTypes(describedType);
return asKnownType || describedType;
return describedType;
}

var code = forcedCode || buf[0];
Expand Down Expand Up @@ -238,7 +236,6 @@ Codec.prototype._encodeObject = function(value, buffer) {
return this.encode(value.toDescribedType() || [], buffer);
}


if (value instanceof Buffer) return types.binary.encode(value, buffer, this);
if (value instanceof AMQPArray) return types.array.encode(value, buffer, this);
if (value instanceof Array) return types.list.encode(value, buffer, this);
Expand Down
58 changes: 0 additions & 58 deletions lib/frames.js
Original file line number Diff line number Diff line change
Expand Up @@ -294,61 +294,3 @@ frames.HeartbeatFrame.prototype.toDescribedType = function() { return null; };

// used to determine if a message should be split to multiple frames
frames.TRANSFER_FRAME_OVERHEAD = 29;

// TransferFrame specific
var M = require('./types/message');
var _possibleFields = {
'header': M.Header,
'footer': M.Footer,
'deliveryAnnotations': M.DeliveryAnnotations,
'annotations': M.Annotations,
'properties': M.Properties,
'applicationProperties': M.ApplicationProperties
};

frames.decodeMessagePayload = function(buffer) {
var message = new M.Message();
var body = [];
var curIdx = 0;
var isData = function(x) { return x instanceof M.Data; };
var isSequence = function(x) { return x instanceof M.AMQPSequence; };
while (curIdx < buffer.length) {
var decoded = codec.decode(buffer, curIdx);
if (!decoded) {
throw new errors.MalformedPayloadError('Unable to decode bytes from message body: ' + buffer.slice(curIdx).toString('hex'));
}

curIdx += decoded[1];
var matched = false;
for (var fieldName in _possibleFields) {
if (decoded[0] instanceof _possibleFields[fieldName]) {
if (message.hasOwnProperty(fieldName)) throw new errors.MalformedPayloadError('Duplicate ' + fieldName + ' section in message');
message[fieldName] = decoded[0];
matched = true;
break;
}
}

if (!matched) {
// Part of the body
if (decoded[0] instanceof M.Data) {
if (body.length && !body.every(isData)) throw new errors.MalformedPayloadError(
'Attempt to put both Data and non-Data payloads in message body');
body.push(decoded[0]);
} else if (decoded[0] instanceof M.AMQPSequence) {
if (body.length && !body.every(isSequence)) throw new errors.MalformedPayloadError(
'Attempt to put both AMQPSequence and non-AMQPSequence payloads in message body');
body.push(decoded[0]);
} else if (decoded[0] instanceof M.AMQPValue) {
if (body.length) throw new errors.MalformedPayloadError('Attempt to provide more than one AMQPValue for message body');
body.push(decoded[0]);
} else {
throw new errors.MalformedPayloadError('Unknown message contents: ' + JSON.stringify(decoded[0]));
}
}
}

// Pull out the values.
message.body = body.map(function(x) { return x.getValue(); });
return message;
};
3 changes: 2 additions & 1 deletion lib/receiver_link.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var util = require('util'),
constants = require('./constants'),
u = require('./utilities'),

m = require('./types/message'),
frames = require('./frames'),
DeliveryState = require('./types/delivery_state'),
Link = require('./link');
Expand Down Expand Up @@ -160,7 +161,7 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) {
this._currentTransferFrame = null;
}

var message = frames.decodeMessagePayload(curFrame.payload);
var message = m.decodeMessagePayload(curFrame.payload);
// store deliveryId for later use
message._deliveryId = curFrame.deliveryId;
this.linkCredit--;
Expand Down
34 changes: 0 additions & 34 deletions lib/types/known_type_converter.js

This file was deleted.

87 changes: 86 additions & 1 deletion lib/types/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ var util = require('util'),
ForcedType = require('./forced_type'),

u = require('../utilities'),
errors = require('../errors');
errors = require('../errors'),
codec = require('../codec');

/**
*
Expand Down Expand Up @@ -338,3 +339,87 @@ Message.prototype.encode = function(codec, buf) {
};

module.exports.Message = Message;

var _possibleFields = {
'header': Header,
'footer': Footer,
'deliveryAnnotations': DeliveryAnnotations,
'annotations': Annotations,
'properties': Properties,
'applicationProperties': ApplicationProperties
};

var sectionByDescriptor = {};

[
Header, DeliveryAnnotations, Annotations,
Properties, ApplicationProperties, Footer,
Data, AMQPSequence, AMQPValue
].forEach(function(section) {
sectionByDescriptor[section.prototype.Descriptor.code] = section;
sectionByDescriptor[section.prototype.Descriptor.name] = section;
});

module.exports.decodeMessagePayload = function(buffer) {
var message = new Message();
var body = [];
var curIdx = 0;
var isData = function(x) { return x instanceof Data; };
var isSequence = function(x) { return x instanceof AMQPSequence; };
while (curIdx < buffer.length) {
var decoded = codec.decode(buffer, curIdx);
if (!decoded) {
throw new errors.MalformedPayloadError('Unable to decode bytes from message body: ' + buffer.slice(curIdx).toString('hex'));
}

var described = decoded[0];
if (!sectionByDescriptor.hasOwnProperty(described.descriptor)) {
throw new errors.MalformedPayloadError('Unknown section: ', described);
}

var section = sectionByDescriptor[described.descriptor].fromDescribedType(described);

curIdx += decoded[1];
var matched = false;
for (var fieldName in _possibleFields) {
if (section instanceof _possibleFields[fieldName]) {
if (message.hasOwnProperty(fieldName)) {
throw new errors.MalformedPayloadError('Duplicate ' + fieldName + ' section in message');
}

message[fieldName] = section;
matched = true;
break;
}
}

if (!matched) {
// Part of the body
if (section instanceof Data) {
if (body.length && !body.every(isData)) {
throw new errors.MalformedPayloadError('Attempt to put both Data and non-Data payloads in message body');
}

body.push(section);
} else if (section instanceof AMQPSequence) {
if (body.length && !body.every(isSequence)) {
throw new errors.MalformedPayloadError('Attempt to put both AMQPSequence and non-AMQPSequence payloads in message body');
}

body.push(section);
} else if (section instanceof AMQPValue) {
if (body.length) {
throw new errors.MalformedPayloadError('Attempt to provide more than one AMQPValue for message body');
}

body.push(section);
} else {
throw new errors.MalformedPayloadError('Unknown message contents: ' + JSON.stringify(section));
}
}
}

// Pull out the values.
message.body = body.map(function(x) { return x.getValue(); });
return message;
};

0 comments on commit 276d6b6

Please sign in to comment.