Skip to content

Commit

Permalink
Flow: lazy chunking.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gábor Molnár committed Aug 12, 2013
1 parent 92ed8b8 commit bd7886b
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 50 deletions.
17 changes: 8 additions & 9 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -242,26 +242,25 @@ Connection.prototype._send = function _send() {
this._log.trace('Starting forwarding frames from streams.');

// * Looping through the active streams in priority order and forwarding frames from streams
priority_loop:
stream_loop:
for (var i = 0; i < this._streamPriorities.length; i++) {
var stream = this._streamPriorities[i];
var id = this._getIdOf(stream);
var frame;
var unshiftRemainder = stream.upstream.unshift.bind(stream.upstream);
while (frame = stream.upstream.read()) {
if (!this.wouldForward(frame)) {
stream.upstream.unshift(frame);
continue priority_loop;
}

frame.stream = id;
if (frame.type === 'PUSH_PROMISE') {
frame.promised_stream.emit('promise_sent');
frame.promised_stream = this._getIdOf(frame.promised_stream);
}

var moreNeeded = this.push(frame);
if (!moreNeeded) {
break priority_loop;
var moreNeeded = this._push(frame, unshiftRemainder);

if (moreNeeded === null) {
continue stream_loop; // It's still possible that other streams have frames that don't consume flow control window
} else if (moreNeeded === false) {
break stream_loop;
}
}
}
Expand Down
66 changes: 43 additions & 23 deletions lib/flow.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
var assert = process.env.HTTP2_ASSERT ? require('assert') : function noop() {};
var logging = require('./logging');

// The Flow class
Expand Down Expand Up @@ -109,6 +110,8 @@ Flow.prototype.disableRemoteFlowControl = function disableRemoteFlowControl() {
// | |
// +----------------------------------------------------+

var MAX_HTTP_PAYLOAD_SIZE = 16383; // TODO: this is repeated in multiple files

// `_send` is called when more frames should be pushed to the output buffer.
Flow.prototype._send = function _send() {
throw new Error('The _send() method has to be overridden by the child class!');
Expand All @@ -131,36 +134,53 @@ Flow.prototype._read = function _read() {
Flow.prototype._onWindowIncrease = function _onWindowIncrease() {
var moreNeeded = true, frame;

var unshiftRemainder = this._queue.unshift.bind(this._queue);
while (moreNeeded && (frame = this._queue.shift())) {
if (this._forwardable(frame)) {
moreNeeded = this._forward(frame);
} else {
this._queue.unshift(frame);
moreNeeded = false;
}
moreNeeded = this._push(frame, unshiftRemainder);
}

this.read(0); // See http://nodejs.org/api/stream.html#stream_stream_read_0
};

// `forward(frame)` pushes `frame` into the output queue and decreases the flow control window size
Flow.prototype._forward = function _forward(frame) {
if ((frame !== null) && (frame.type === 'DATA')) {
this._log.trace({ window: this._window, by: frame.data.length }, 'Decreasing flow control window size.');
this._window -= frame.data.length;
// `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size
Flow.prototype._push = function _push(frame, remainderCallback) {
var forwardable, remainder;
if ((frame === null) || (frame.type !== 'DATA') || (this._window >= frame.data.length)) {
forwardable = frame;
}
return Duplex.prototype.push.call(this, frame);
};

// A frame is `_forwardable` if there's enough window to push it in the output queue.
Flow.prototype._forwardable = function _forwardable(frame) {
return (frame === null) || (frame.type !== 'DATA') || (this._window >= frame.data.length);
};
else if (this._window <= 0) {
remainder = frame;
}

// A flow `wouldForward` a `frame` if it can push it into the output queue immediately (without
// putting it in the flow control queue first).
Flow.prototype.wouldForward = function wouldForward(frame) {
return (this._queue.length === 0) && (this._forwardable(frame));
else {
var chunkSize = Math.min(this._window, MAX_HTTP_PAYLOAD_SIZE);
forwardable = {
stream: frame.stream,
type: 'DATA',
data: frame.data.slice(0, chunkSize)
};

frame.data = frame.data.slice(chunkSize);
remainder = frame;
}

var moreNeeded = null;
if (forwardable !== undefined) {
if (forwardable && forwardable.type === 'DATA') {
this._log.trace({ window: this._window, by: forwardable.data.length },
'Decreasing flow control window size.');
this._window -= forwardable.data.length;
assert(this._window >= 0);
}
moreNeeded = Duplex.prototype.push.call(this, forwardable);
}

if (remainder !== undefined) {
remainderCallback(remainder);
}

return moreNeeded;
};

// Push `frame` into the flow control queue, or if it's empty, then directly into the output queue
Expand All @@ -173,8 +193,8 @@ Flow.prototype.push = function push(frame) {
this.emit('sending', frame);
}

if (this.wouldForward(frame)) {
return this._forward(frame);
if (this._queue.length === 0) {
return this._push(frame, this._queue.push.bind(this._queue));
} else {
this._queue.push(frame);
return false;
Expand Down
23 changes: 5 additions & 18 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,11 @@ Stream.prototype._read = function _read() {

// The `write` method gets called when there's a write request from the user.
Stream.prototype._write = function _write(buffer, encoding, ready) {
// * The incoming buffer is cut into pieces that are no larger than `MAX_HTTP_PAYLOAD_SIZE` and
// the initial window size, but at least 100 bytes. Ideal chunk size is such that most chunks
// are of equal size. This is a mostly heuristic algorithm, and should be changed to a more
// fine-grained chunking method in the future.
var limit = Math.max(100, Math.min(MAX_HTTP_PAYLOAD_SIZE, this.upstream._initialWindow));
var idealChunkCount = Math.ceil(buffer.length / limit);
var chunkSize = Math.round(buffer.length / idealChunkCount);
var chunks = utils.cut(buffer, chunkSize);

// * Chunks are wrapped in DATA frames and sent out.
var moreNeeded = true;
while (chunks.length > 0) {
moreNeeded = this.upstream.push({
type: 'DATA',
flags: {},
data: chunks.shift()
});
}
// * Chunking is done by the upstream Flow.
var moreNeeded = this.upstream.push({
type: 'DATA',
data: buffer
});

// * Call ready when upstream is ready to receive more frames.
if (moreNeeded) {
Expand Down

0 comments on commit bd7886b

Please sign in to comment.