Skip to content

Commit

Permalink
[minor] Refactor PerMessageDeflate.prototype.cleanup()
Browse files Browse the repository at this point in the history
  • Loading branch information
lpinca committed Dec 24, 2018
1 parent e8ada8a commit 3d6692a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 83 deletions.
58 changes: 25 additions & 33 deletions lib/permessage-deflate.js
Expand Up @@ -4,14 +4,12 @@ const Limiter = require('async-limiter');
const zlib = require('zlib');

const bufferUtil = require('./buffer-util');
const constants = require('./constants');
const { kStatusCode, NOOP } = require('./constants');

const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
const EMPTY_BLOCK = Buffer.from([0x00]);

const kPerMessageDeflate = Symbol('permessage-deflate');
const kWriteInProgress = Symbol('write-in-progress');
const kPendingClose = Symbol('pending-close');
const kTotalLength = Symbol('total-length');
const kCallback = Symbol('callback');
const kBuffers = Symbol('buffers');
Expand Down Expand Up @@ -130,20 +128,13 @@ class PerMessageDeflate {
*/
cleanup() {
if (this._inflate) {
if (this._inflate[kWriteInProgress]) {
this._inflate[kPendingClose] = true;
} else {
this._inflate.close();
this._inflate = null;
}
this._inflate.close();
this._inflate = null;
}

if (this._deflate) {
if (this._deflate[kWriteInProgress]) {
this._deflate[kPendingClose] = true;
} else {
this._deflate.close();
this._deflate = null;
}
this._deflate.close();
this._deflate = null;
}
}

Expand Down Expand Up @@ -355,7 +346,6 @@ class PerMessageDeflate {
}

this._inflate[kCallback] = callback;
this._inflate[kWriteInProgress] = true;

this._inflate.write(data);
if (fin) this._inflate.write(TRAILER);
Expand All @@ -375,14 +365,10 @@ class PerMessageDeflate {
this._inflate[kTotalLength]
);

if (
(fin && this.params[`${endpoint}_no_context_takeover`]) ||
this._inflate[kPendingClose]
) {
if (fin && this.params[`${endpoint}_no_context_takeover`]) {
this._inflate.close();
this._inflate = null;
} else {
this._inflate[kWriteInProgress] = false;
this._inflate[kTotalLength] = 0;
this._inflate[kBuffers] = [];
}
Expand Down Expand Up @@ -422,32 +408,38 @@ class PerMessageDeflate {
this._deflate[kBuffers] = [];

//
// `zlib.DeflateRaw` emits an `'error'` event only when an attempt to use
// it is made after it has already been closed. This cannot happen here,
// so we only add a listener for the `'data'` event.
// An `'error'` event is emitted, only on Node.js < 10.0.0, if the
// `zlib.DeflateRaw` instance is closed while data is being processed.
// This can happen if `PerMessageDeflate#cleanup()` is called at the wrong
// time due to an abnormal WebSocket closure.
//
this._deflate.on('error', NOOP);
this._deflate.on('data', deflateOnData);
}

this._deflate[kWriteInProgress] = true;

this._deflate.write(data);
this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
if (!this._deflate) {
//
// This `if` statement is only needed for Node.js < 10.0.0 because as of
// commit https://github.com/nodejs/node/commit/5e3f5164, the flush
// callback is no longer called if the deflate stream is closed while
// data is being processed.
//
return;
}

var data = bufferUtil.concat(
this._deflate[kBuffers],
this._deflate[kTotalLength]
);

if (fin) data = data.slice(0, data.length - 4);

if (
(fin && this.params[`${endpoint}_no_context_takeover`]) ||
this._deflate[kPendingClose]
) {
if (fin && this.params[`${endpoint}_no_context_takeover`]) {
this._deflate.close();
this._deflate = null;
} else {
this._deflate[kWriteInProgress] = false;
this._deflate[kTotalLength] = 0;
this._deflate[kBuffers] = [];
}
Expand Down Expand Up @@ -488,7 +480,7 @@ function inflateOnData(chunk) {
}

this[kError] = new RangeError('Max payload size exceeded');
this[kError][constants.kStatusCode] = 1009;
this[kError][kStatusCode] = 1009;
this.removeListener('data', inflateOnData);
this.reset();
}
Expand All @@ -505,6 +497,6 @@ function inflateOnError(err) {
// closed when an error is emitted.
//
this[kPerMessageDeflate]._inflate = null;
err[constants.kStatusCode] = 1007;
err[kStatusCode] = 1007;
this[kCallback](err);
}
10 changes: 0 additions & 10 deletions lib/sender.js
Expand Up @@ -348,16 +348,6 @@ class Sender {
this._deflating = true;
perMessageDeflate.compress(data, options.fin, (_, buf) => {
this._deflating = false;

if (!this._socket.readable && !this._socket.writable) {
//
// The socket is closed. Clear the queue and bail out.
//
this._bufferedBytes = 0;
this._queue.length = 0;
return;
}

options.readOnly = false;
this.sendFrame(Sender.frame(buf, options), cb);
this.dequeue();
Expand Down
13 changes: 13 additions & 0 deletions test/permessage-deflate.test.js
Expand Up @@ -609,5 +609,18 @@ describe('PerMessageDeflate', function() {
});
});
});

it("doesn't call the callback if the deflate stream is closed prematurely", function(done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
const buf = Buffer.from('A'.repeat(50));

perMessageDeflate.accept([{}]);
perMessageDeflate.compress(buf, true, () => {
done(new Error('Unexpected callback invocation'));
});
perMessageDeflate._deflate.on('close', done);

process.nextTick(() => perMessageDeflate.cleanup());
});
});
});
40 changes: 0 additions & 40 deletions test/sender.test.js
Expand Up @@ -69,46 +69,6 @@ describe('Sender', function() {
sender.send('hi', options);
});

it('does not compress enqueued messages after socket closes', function(done) {
const mockSocket = new MockSocket({
write: () => done(new Error('Unexpected call to socket.write()'))
});

const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
perMessageDeflate.accept([{}]);

const compress = perMessageDeflate.compress;
const sender = new Sender(mockSocket, {
'permessage-deflate': perMessageDeflate
});

perMessageDeflate.compress = (data, fin, callback) => {
compress.call(perMessageDeflate, data, fin, (_, buf) => {
assert.strictEqual(sender._bufferedBytes, 198);
assert.strictEqual(sender._queue.length, 99);
assert.strictEqual(mockSocket.readable, false);
assert.strictEqual(mockSocket.writable, false);

process.nextTick(() => {
assert.strictEqual(sender._bufferedBytes, 0);
assert.strictEqual(sender._queue.length, 0);
done();
});

callback(_, buf);
});
};

const options = { compress: true, fin: true };

for (let i = 0; i < 100; i++) sender.send('hi', options);

process.nextTick(() => {
mockSocket.readable = false;
mockSocket.writable = false;
});
});

it('does not compress data for small payloads', function(done) {
const perMessageDeflate = new PerMessageDeflate();
const mockSocket = new MockSocket({
Expand Down

0 comments on commit 3d6692a

Please sign in to comment.