Skip to content

Commit

Permalink
feat(auto-settle): send disposition frames for auto-settle
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Jun 24, 2015
1 parent 70f9b4c commit 1a3473d
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions lib/link.js
Expand Up @@ -11,14 +11,15 @@ var _ = require('lodash'),

constants = require('./constants'),
errors = require('./errors'),
u = require('./utilities'),

AttachFrame = require('./frames/attach_frame'),
DetachFrame = require('./frames/detach_frame'),
FlowFrame = require('./frames/flow_frame'),
TransferFrame = require('./frames/transfer_frame'),
DispositionFrame = require('./frames/disposition_frame'),

DeliveryStates = require('./types/delivery_states'),
DeliveryState = require('./types/delivery_state'),
Session = require('./session');

function Link(session, handle, linkPolicy) {
Expand Down Expand Up @@ -164,23 +165,19 @@ Link.prototype.addCredits = function(credits, flowOptions) {
};

Link.prototype.accept = function(message) {
// @todo: handle accepting an array of messages
this._sendDisposition({
first: message._deliveryId,
last: message._deliveryId,
var range = u.dispositionRange(message);
this._sendDisposition(_.defaults(range, {
settled: true,
state: new DeliveryStates.Accepted()
});
state: new DeliveryState.Accepted()
}));
};

Link.prototype.reject = function(message, reason) {
// @todo: handle accepting an array of messages
this._sendDisposition({
first: message._deliveryId,
last: message._deliveryId,
var range = u.dispositionRange(message);
this._sendDisposition(_.defaults(range, {
settled: true,
state: new DeliveryStates.Rejected({ error: reason })
});
state: new DeliveryState.Rejected({ error: reason })
}));
};

// private api
Expand Down Expand Up @@ -228,6 +225,14 @@ Link.prototype._messageReceived = function(transferFrame) {
this.linkCredit--;
debug('Rx message ' + transferFrame.deliveryId + ' on ' + this.name + ', ' + this.linkCredit + ' credit, ' + this.session._sessionParams.incomingWindow + ' window left.');
// @todo Bump link credit based on strategy

// store deliveryId for later use
transferFrame.message._deliveryId = transferFrame.deliveryId;

// respect settle mode in policy
if (this.policy.options.receiverSettleMode === constants.receiverSettleMode.autoSettle)
this.accept(transferFrame.message);

this.emit(Link.MessageReceived, transferFrame.message);
this._checkCredit();
};
Expand All @@ -236,6 +241,7 @@ Link.prototype._sendMessage = function(messageId, message, transferOptions) {
if (this.linkCredit <= 0) {
throw new errors.OverCapacityError('Cannot send if no link credit.');
}

var opts = transferOptions || {};
opts.handle = this.handle;
opts.deliveryId = messageId;
Expand Down

0 comments on commit 1a3473d

Please sign in to comment.