Skip to content

Commit

Permalink
refactor(messages): completely rework how messages are processed
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Feb 16, 2016
1 parent 276d6b6 commit 26e6939
Show file tree
Hide file tree
Showing 10 changed files with 600 additions and 493 deletions.
44 changes: 0 additions & 44 deletions lib/new_types/message.js

This file was deleted.

7 changes: 4 additions & 3 deletions lib/receiver_link.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) {
this._currentTransferFrame = null;
}

var message = m.decodeMessagePayload(curFrame.payload);
var message = m.decodeMessage(curFrame.payload);
// store deliveryId for later use
message._deliveryId = curFrame.deliveryId;
Object.defineProperty(message, '_deliveryId', { value: curFrame.deliveryId });

this.linkCredit--;
debug('Rx message ' + transferFrame.deliveryId + ' on ' + this.name + ', ' + this.linkCredit + ' credit, ' + this.session._sessionParams.incomingWindow + ' window left.');
// @todo Bump link credit based on strategy
Expand All @@ -174,7 +175,7 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) {
}

// optionally decode message based on policy
var payload = message.body[0] || message.body;
var payload = message.body;
message.body = this.policy.decoder ? this.policy.decoder(payload) : payload;
debug('received from (' + this.name + '): ' + message.body);

Expand Down
14 changes: 9 additions & 5 deletions lib/sender_link.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ var Builder = require('buffer-builder'),
errors = require('./errors'),
constants = require('./constants'),
putils = require('./policies/policy_utilities'),
codec = require('./codec'),
m = require('./types/message'),
u = require('./utilities'),

DeliveryState = require('./types/delivery_state'),
M = require('./types/message'),
Link = require('./link');

function SenderLink(session, handle, linkPolicy) {
Expand Down Expand Up @@ -74,10 +73,15 @@ SenderLink.prototype.send = function(msg, options) {
return new Promise(function(resolve, reject) {
var message;
if (u.isObject(msg) && msg.hasOwnProperty('body')) {
message = new M.Message(msg);
message = msg;
message.body = self.policy.encoder(message.body);
} else {
message = new M.Message(options, self.policy.encoder(msg));
if (u.isObject(options)) {
options.body = self.policy.encoder(msg);
message = options;
} else {
message = { body: self.policy.encoder(msg) };
}
}

var deliveryTag = self.session._deliveryTag++;
Expand Down Expand Up @@ -152,7 +156,7 @@ SenderLink.prototype._sendMessage = function(message, options) {

// pre-encode message to determine if multiple frames are required
var messageBuilder = new Builder();
codec.encode(message, messageBuilder);
m.encodeMessage(message, messageBuilder);
var messageBuffer = messageBuilder.get();

// send the frame(s)
Expand Down
7 changes: 4 additions & 3 deletions lib/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ registerType('uuid', {

registerType('binary', {
encoder: function(val, bufb) {
val = (val instanceof Buffer) ? val : new Buffer(val);
var code = (val.length <= 0xFF) ? 0xA0 : 0xB0;
bufb.appendUInt8(code);
types[code].encode(val, bufb);
Expand Down Expand Up @@ -692,9 +693,9 @@ function mapBuilderForKeyType(keyType) {
};
}

registerType('fields', {
encoder: mapBuilderForKeyType('symbol')
});
registerType('fields', { encoder: mapBuilderForKeyType('symbol') });
registerType('milliseconds', 'uint');
registerType('sequence-no', 'uint');

types.defineComposite = require('./types/composite_type');

0 comments on commit 26e6939

Please sign in to comment.