From 1a5244251379c3b2ea96b8faea93195d7228bd50 Mon Sep 17 00:00:00 2001 From: Luigi Pinca Date: Sat, 19 Aug 2023 16:03:40 +0200 Subject: [PATCH] [fix] Emit at most one event per microtask To improve compatibility with the WHATWG standard, emit at most one of `'message'`, `'ping'`, and `'pong'` events per tick. Fixes #2159 --- lib/receiver.js | 32 ++++++++++++--- test/receiver.test.js | 92 ++++++++++++++++++++++++++++++++++++++++++ test/websocket.test.js | 23 ++++++----- 3 files changed, 132 insertions(+), 15 deletions(-) diff --git a/lib/receiver.js b/lib/receiver.js index 96f572cb1..c7dcf8acd 100644 --- a/lib/receiver.js +++ b/lib/receiver.js @@ -19,6 +19,7 @@ const GET_PAYLOAD_LENGTH_64 = 2; const GET_MASK = 3; const GET_DATA = 4; const INFLATING = 5; +const WAIT_MICROTASK = 6; /** * HyBi Receiver implementation. @@ -157,10 +158,27 @@ class Receiver extends Writable { case GET_DATA: err = this.getData(cb); break; - default: - // `INFLATING` + case INFLATING: this._loop = false; return; + default: { + // `WAIT_MICROTASK`. + this._loop = false; + + const next = () => { + this._state = GET_INFO; + this.startLoop(cb); + }; + + if (typeof queueMicrotask === 'function') { + queueMicrotask(next); + } else { + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(next); + } + + return; + } } } while (this._loop); @@ -542,7 +560,7 @@ class Receiver extends Writable { } } - this._state = GET_INFO; + this._state = WAIT_MICROTASK; } /** @@ -559,6 +577,8 @@ class Receiver extends Writable { if (data.length === 0) { this.emit('conclude', 1005, EMPTY_BUFFER); this.end(); + + this._state = GET_INFO; } else { const code = data.readUInt16BE(0); @@ -590,14 +610,16 @@ class Receiver extends Writable { this.emit('conclude', code, buf); this.end(); + + this._state = GET_INFO; } } else if (this._opcode === 0x09) { this.emit('ping', data); + this._state = WAIT_MICROTASK; } else { this.emit('pong', data); + this._state = WAIT_MICROTASK; } - - this._state = GET_INFO; } } diff --git a/test/receiver.test.js b/test/receiver.test.js index 4ae279469..a4e1bb5ad 100644 --- a/test/receiver.test.js +++ b/test/receiver.test.js @@ -1083,4 +1083,96 @@ describe('Receiver', () => { receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8])); }); + + it("waits a microtask after each 'message' event", (done) => { + const messages = []; + const receiver = new Receiver(); + + receiver.on('message', (data, isBinary) => { + assert.ok(!isBinary); + + const message = data.toString(); + messages.push(message); + + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + messages.push(`microtask ${message}`); + + if (messages.length === 6) { + assert.deepStrictEqual(messages, [ + '1', + 'microtask 1', + '2', + 'microtask 2', + '3', + 'microtask 3' + ]); + + done(); + } + }); + }); + + receiver.write(Buffer.from('810131810132810133', 'hex')); + }); + + it("waits a microtask after each 'ping' event", (done) => { + const actual = []; + const receiver = new Receiver(); + + receiver.on('ping', (data) => { + const message = data.toString(); + actual.push(message); + + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + actual.push(`microtask ${message}`); + + if (actual.length === 6) { + assert.deepStrictEqual(actual, [ + '1', + 'microtask 1', + '2', + 'microtask 2', + '3', + 'microtask 3' + ]); + + done(); + } + }); + }); + + receiver.write(Buffer.from('890131890132890133', 'hex')); + }); + + it("waits a microtask after each 'pong' event", (done) => { + const actual = []; + const receiver = new Receiver(); + + receiver.on('pong', (data) => { + const message = data.toString(); + actual.push(message); + + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + actual.push(`microtask ${message}`); + + if (actual.length === 6) { + assert.deepStrictEqual(actual, [ + '1', + 'microtask 1', + '2', + 'microtask 2', + '3', + 'microtask 3' + ]); + + done(); + } + }); + }); + + receiver.write(Buffer.from('8A01318A01328A0133', 'hex')); + }); }); diff --git a/test/websocket.test.js b/test/websocket.test.js index cb5b434c0..b90923460 100644 --- a/test/websocket.test.js +++ b/test/websocket.test.js @@ -4109,18 +4109,18 @@ describe('WebSocket', () => { const messages = []; const ws = new WebSocket(`ws://localhost:${wss.address().port}`); - ws.on('open', () => { - ws._socket.on('end', () => { - assert.strictEqual(ws._receiver._state, 5); - }); - }); - ws.on('message', (message, isBinary) => { assert.ok(!isBinary); if (messages.push(message.toString()) > 1) return; - ws.close(1000); + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + process.nextTick(() => { + assert.strictEqual(ws._receiver._state, 5); + ws.close(1000); + }); + }); }); ws.on('close', (code, reason) => { @@ -4365,9 +4365,12 @@ describe('WebSocket', () => { if (messages.push(message.toString()) > 1) return; - process.nextTick(() => { - assert.strictEqual(ws._receiver._state, 5); - ws.terminate(); + // `queueMicrotask()` is not available in Node.js < 11. + Promise.resolve().then(() => { + process.nextTick(() => { + assert.strictEqual(ws._receiver._state, 5); + ws.terminate(); + }); }); });