From c2d737d3cf0036ea862c84a708e8548237d90e87 Mon Sep 17 00:00:00 2001 From: Aleksandr Zinoviev Date: Mon, 23 Jan 2017 15:31:15 +0100 Subject: [PATCH 1/2] Take into account the data queued in the sender. --- lib/Sender.js | 73 ++++++++++++++++++++++++++++++------------ lib/WebSocket.js | 4 ++- test/Sender.test.js | 18 +++++++++++ test/WebSocket.test.js | 30 +++++++++++++---- 4 files changed, 98 insertions(+), 27 deletions(-) diff --git a/lib/Sender.js b/lib/Sender.js index 74f6e85d1..bd78f24f7 100644 --- a/lib/Sender.js +++ b/lib/Sender.js @@ -30,6 +30,7 @@ class Sender { this._socket = socket; this.onerror = null; this.queue = []; + this.bufferedBytes = 0; } /** @@ -86,8 +87,10 @@ class Sender { * @public */ ping (data, mask) { + var readOnly = isReadOnly(data); + data = makeBuffer(data); if (this.perMessageDeflate) { - this.enqueue([this.doPing, data, mask]); + this.enqueue([this.doPing, data, mask, readOnly]); } else { this.doPing(data, mask); } @@ -98,11 +101,12 @@ class Sender { * * @param {*} data The message to send * @param {Boolean} mask Specifies whether or not to mask `data` + * @param {Boolean} readOnly Specifies whether `data` can be modified * @private */ - doPing (data, mask) { + doPing (data, mask, readOnly) { this.frameAndSend(data, { - readOnly: true, + readOnly: readOnly, opcode: 0x09, rsv1: false, fin: true, @@ -120,8 +124,10 @@ class Sender { * @public */ pong (data, mask) { + var readOnly = isReadOnly(data); + data = makeBuffer(data); if (this.perMessageDeflate) { - this.enqueue([this.doPong, data, mask]); + this.enqueue([this.doPong, data, mask, readOnly]); } else { this.doPong(data, mask); } @@ -132,11 +138,12 @@ class Sender { * * @param {*} data The message to send * @param {Boolean} mask Specifies whether or not to mask `data` + * @param {Boolean} readOnly Specifies whether `data` can be modified * @private */ - doPong (data, mask) { + doPong (data, mask, readOnly) { this.frameAndSend(data, { - readOnly: true, + readOnly: readOnly, opcode: 0x0a, rsv1: false, fin: true, @@ -161,18 +168,9 @@ class Sender { send (data, options, cb) { var opcode = options.binary ? 2 : 1; var rsv1 = options.compress; - var readOnly = true; + var readOnly = isReadOnly(data); - if (data && !Buffer.isBuffer(data)) { - if (data instanceof ArrayBuffer) { - data = Buffer.from(data); - } else if (ArrayBuffer.isView(data)) { - data = viewToBuffer(data); - } else { - data = Buffer.from(data); - readOnly = false; - } - } + data = makeBuffer(data); if (this.firstFragment) { this.firstFragment = false; @@ -334,12 +332,13 @@ class Sender { dequeue () { if (this.processing) return; - const handler = this.queue.shift(); - if (!handler) return; + const params = this.queue.shift(); + if (!params) return; + if (params[1]) this.bufferedBytes -= params[1].length; this.processing = true; - handler[0].apply(this, handler.slice(1)); + params[0].apply(this, params.slice(1)); } /** @@ -361,6 +360,7 @@ class Sender { * @private */ enqueue (params) { + if (params[1]) this.bufferedBytes += params[1].length; this.queue.push(params); this.dequeue(); } @@ -412,3 +412,36 @@ function sendFramedData (sender, outputBuffer, data, cb) { sender._socket.write(outputBuffer, cb); } } + +/** +* @param {*} data The data to be converted to Buffer +* @return {Buffer} Converted data +* @private +*/ +function makeBuffer (data) { + if (data && !Buffer.isBuffer(data)) { + if (data instanceof ArrayBuffer) { + data = Buffer.from(data); + } else if (ArrayBuffer.isView(data)) { + data = viewToBuffer(data); + } else { + data = Buffer.from(data); + } + } + + return data; +} + +/** +* @param {*} data The data to be checked +* @return {Boolean} Specifies whether `data` can be modified +* @private +*/ +function isReadOnly (data) { + if (data && !Buffer.isBuffer(data) && + !(data instanceof ArrayBuffer) && + !ArrayBuffer.isView(data)) { + return false; + } + return true; +} diff --git a/lib/WebSocket.js b/lib/WebSocket.js index ea3513559..ec7490bd0 100644 --- a/lib/WebSocket.js +++ b/lib/WebSocket.js @@ -82,7 +82,9 @@ class WebSocket extends EventEmitter { get bufferedAmount () { var amount = 0; - if (this._socket) amount = this._socket.bufferSize || 0; + if (this._socket) { + amount = this._socket.bufferSize + this._sender.bufferedBytes; + } return amount; } diff --git a/test/Sender.test.js b/test/Sender.test.js index 48c126963..d6454b47a 100644 --- a/test/Sender.test.js +++ b/test/Sender.test.js @@ -73,6 +73,24 @@ describe('Sender', function () { }); }); + describe('#pong', function () { + it('works with multiple types of data', function (done) { + let count = 0; + const sender = new Sender({ + write: (data) => { + assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69]))); + if (++count === 3) done(); + } + }); + + const array = new Uint8Array([0x68, 0x69]); + + sender.pong(array.buffer, false); + sender.pong(array, false); + sender.pong('hi', false); + }); + }); + describe('#send', function () { it('compresses data if compress option is enabled', function (done) { const perMessageDeflate = new PerMessageDeflate({ threshold: 0 }); diff --git a/test/WebSocket.test.js b/test/WebSocket.test.js index 7ac77dd96..13b9d4bb4 100644 --- a/test/WebSocket.test.js +++ b/test/WebSocket.test.js @@ -131,19 +131,34 @@ describe('WebSocket', function () { }); it('defaults to zero upon "open"', function (done) { - server.createServer(++port, (srv) => { + const wss = new WebSocketServer({ port: ++port }, () => { const ws = new WebSocket(`ws://localhost:${port}`); ws.onopen = () => { assert.strictEqual(ws.bufferedAmount, 0); - - ws.on('close', () => srv.close(done)); - ws.close(); + wss.close(done); }; }); }); - it('stress kernel write buffer', function (done) { + it('takes into account the data in the sender queue', function (done) { + const wss = new WebSocketServer({ port: ++port }, () => { + const ws = new WebSocket(`ws://localhost:${port}`); + + ws.on('open', () => { + ws.send('foo'); + ws.send('bar', (err) => { + assert.ifError(err); + assert.strictEqual(ws.bufferedAmount, 0); + wss.close(done); + }); + + assert.strictEqual(ws.bufferedAmount, 3); + }); + }); + }); + + it('takes into account the data in the socket queue', function (done) { const wss = new WebSocketServer({ port: ++port }, () => { const ws = new WebSocket(`ws://localhost:${port}`, { perMessageDeflate: false @@ -152,7 +167,10 @@ describe('WebSocket', function () { wss.on('connection', (ws) => { while (true) { - if (ws.bufferedAmount > 0) break; + if (ws._socket.bufferSize > 0) { + assert.strictEqual(ws.bufferedAmount, ws._socket.bufferSize); + break; + } ws.send('hello'.repeat(1e4)); } wss.close(done); From 83aefc92a0996cc91bc5a7d13fa2fa8206f7915c Mon Sep 17 00:00:00 2001 From: Aleksandr Zinoviev Date: Mon, 23 Jan 2017 15:44:28 +0100 Subject: [PATCH 2/2] Remove obsolete converting in frameAndSend. --- lib/Sender.js | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/lib/Sender.js b/lib/Sender.js index bd78f24f7..fb1f89765 100644 --- a/lib/Sender.js +++ b/lib/Sender.js @@ -265,17 +265,6 @@ class Sender { return; } - if (!Buffer.isBuffer(data)) { - if (data instanceof ArrayBuffer) { - data = Buffer.from(data); - } else if (ArrayBuffer.isView(data)) { - data = viewToBuffer(data); - } else { - data = Buffer.from(data); - options.readOnly = false; - } - } - const mergeBuffers = data.length < 1024 || options.mask && options.readOnly; var dataOffset = options.mask ? 6 : 2; var payloadLength = data.length;