Skip to content

Commit

Permalink
Flow: basic flow control for incoming frames.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gábor Molnár committed Aug 12, 2013
1 parent e2e1a05 commit aa0f2eb
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
1 change: 0 additions & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ Connection.prototype._receive = function _receive(frame, done) {
Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
// * Sending the initial settings.
this._log.info('Sending the first SETTINGS frame as part of the connection header.');
settings.SETTINGS_FLOW_CONTROL_OPTIONS = true; // Inbound flow control is not implemented yet
this.set(settings);

// * Checking that the first frame the other endpoint sends is SETTINGS
Expand Down
38 changes: 37 additions & 1 deletion lib/flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ function Flow(flowControlId) {
this._window = this._initialWindow = INITIAL_WINDOW_SIZE;
this._flowControlId = flowControlId;
this._queue = [];

this._ended = false;
this._received = 0;
this._remoteFlowControlDisabled = false;
}
Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } });

Expand All @@ -77,16 +81,48 @@ Flow.prototype._receive = function _receive(frame, callback) {
Flow.prototype._write = function _write(frame, encoding, callback) {
this._log.trace({ frame: frame }, 'Receiving frame');
this.emit('receiving', frame);
this._receive(frame, callback);

if (frame.flags.END_STREAM) {
this._ended = true;
}

if ((frame.type === 'DATA') && (frame.data.length > 0) && !this._remoteFlowControlDisabled) {
this._receive(frame, function() {
this._received += frame.data.length;
if (!this._restoreWindowTimer) {
this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this));
}
callback();
}.bind(this));
}

else {
this._receive(frame, callback);
}

if ((frame.type === 'WINDOW_UPDATE') && (!this._flowControlId || (frame.stream === this._flowControlId))) {
this._updateWindow(frame);
}
};

// `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends
// a WINDOW_UPDATE that restores the flow control window of the remote end.
Flow.prototype._restoreWindow = function _restoreWindow() {
delete this._restoreWindowTimer;
if (!this._ended && !this._remoteFlowControlDisabled && (this._received > 0)) {
this.push({
type: 'WINDOW_UPDATE',
stream: this._flowControlId,
window_size: this._received
});
this._received = 0;
}
};

// Remote flow control is currently disabled by default, but in the future, it may be turned off
// using the `disableRemoteFlowControl` method.
Flow.prototype.disableRemoteFlowControl = function disableRemoteFlowControl() {
this._remoteFlowControlDisabled = true;
this.push({
type: 'WINDOW_UPDATE',
stream: this._flowControlId,
Expand Down
1 change: 0 additions & 1 deletion lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ Stream.prototype._finishing = function _finishing() {
} else {
this.upstream.push(endFrame);
}
this.upstream.push(null);
};

// [Stream States](http://tools.ietf.org/id/draft-unicorn-httpbis-http2-01.html#StreamStates)
Expand Down

0 comments on commit aa0f2eb

Please sign in to comment.