Skip to content

Commit

Permalink
refactor(Session): add _handleDisposition, pass decision to links
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Jun 26, 2015
1 parent 21fd06f commit 7fd6ab3
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 31 deletions.
4 changes: 4 additions & 0 deletions lib/receiver_link.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,8 @@ ReceiverLink.prototype._messageReceived = function(transferFrame) {
this._checkCredit();
};

ReceiverLink.prototype._dispositionReceived = function(details) {
debug('not yet processing "receiver" disposition frames');
};

module.exports = ReceiverLink;
49 changes: 18 additions & 31 deletions lib/session.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var EventEmitter = require('events').EventEmitter,
var _ = require('lodash'),
EventEmitter = require('events').EventEmitter,
util = require('util'),

StateMachine = require('stately.js'),
Expand Down Expand Up @@ -330,36 +331,7 @@ Session.prototype._processFrame = function(frame) {
console.warn('received Transfer frame for unknown link(' + frame.handle + '): ' + JSON.stringify(frame));
}
} else if (frame instanceof DispositionFrame) {
//DispositionFrame: {"frameType":0,"channel":0,"role":true,"first":10000,"last":null,
// "settled":true,"state":{"descriptor":{"buffer":[0,0,0,0,0,0,0,36],"offset":0}},"batchable":null}
if (frame.role !== constants.linkRole.receiver) {
debug('not yet processing "sender" Disposition frames');
} else {
var first = frame.first;
var last = frame.last || first;
var settled = frame.settled;

if (settled) {
for (var messageId = first; messageId <= last; ++messageId) {
if (this._transfersInFlight[messageId]) {
var delta = Date.now() - this._transfersInFlight[messageId].sent;
delete this._transfersInFlight[messageId];
debug('message ' + messageId + ' settled in ' + delta + ' ms');
} else {
debug('invalid messageId: ' + messageId);
}
}
} else {
debug('not settled, first: ' + first + ', last: ' + last);
}

this.emit(Session.DispositionReceived, {
first: first,
last: last,
settled: settled,
state: frame.state
});
}
this._handleDisposition(frame);
} else {
debug('not yet processing frames of type: ' + frame.constructor.name);
}
Expand Down Expand Up @@ -433,4 +405,19 @@ Session.prototype._unmap = function() {
}
};

Session.prototype._handleDisposition = function(frame) {
var disposition = {
first: frame.first,
last: frame.last || frame.first,
settled: frame.settled,
state: frame.state
};

_.values(this._linksByName).forEach(function(link) {
link._dispositionReceived(disposition);
});

this.emit(Session.DispositionReceived, disposition);
};

module.exports = Session;

0 comments on commit 7fd6ab3

Please sign in to comment.