Skip to content

Commit

Permalink
[feature] Introduce the allowMultipleEventsPerMicrotask option
Browse files Browse the repository at this point in the history
The `allowMultipleEventsPerMicrotask` option allows the `'message'`,
`'ping'`, and `'pong'` events to be emitted more than once per
microtask.

Refs: #2160
  • Loading branch information
lpinca committed Dec 9, 2023
1 parent 603a039 commit 93e3552
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 11 deletions.
8 changes: 8 additions & 0 deletions doc/ws.md
Expand Up @@ -72,6 +72,10 @@ This class represents a WebSocket server. It extends the `EventEmitter`.
### new WebSocketServer(options[, callback])

- `options` {Object}
- `allowMultipleEventsPerMicrotask` {Boolean} Specifies whether or not to
process more than one of the `'message'`, `'ping'`, and `'pong'` events per
microtask. To improve compatibility with the WHATWG standard, the default
value is `false`. Setting it to `true` improves performance slightly.
- `backlog` {Number} The maximum length of the queue of pending connections.
- `clientTracking` {Boolean} Specifies whether or not to track clients.
- `handleProtocols` {Function} A function which can be used to handle the
Expand Down Expand Up @@ -292,6 +296,10 @@ This class represents a WebSocket. It extends the `EventEmitter`.
- `address` {String|url.URL} The URL to which to connect.
- `protocols` {String|Array} The list of subprotocols.
- `options` {Object}
- `allowMultipleEventsPerMicrotask` {Boolean} Specifies whether or not to
process more than one of the `'message'`, `'ping'`, and `'pong'` events per
microtask. To improve compatibility with the WHATWG standard, the default
value is `false`. Setting it to `true` improves performance slightly.
- `finishRequest` {Function} A function which can be used to customize the
headers of each http request before it is sent. See description below.
- `followRedirects` {Boolean} Whether or not to follow redirects. Defaults to
Expand Down
27 changes: 16 additions & 11 deletions lib/receiver.js
Expand Up @@ -39,6 +39,9 @@ class Receiver extends Writable {
* Creates a Receiver instance.
*
* @param {Object} [options] Options object
* @param {Boolean} [options.allowMultipleEventsPerMicrotask=false] Specifies
* whether or not to process more than one of the `'message'`, `'ping'`,
* and `'pong'` events per microtask
* @param {String} [options.binaryType=nodebuffer] The type for binary data
* @param {Object} [options.extensions] An object containing the negotiated
* extensions
Expand All @@ -51,6 +54,8 @@ class Receiver extends Writable {
constructor(options = {}) {
super();

this._allowMultipleEventsPerMicrotask =
!!options.allowMultipleEventsPerMicrotask;
this._binaryType = options.binaryType || BINARY_TYPES[0];
this._extensions = options.extensions || {};
this._isServer = !!options.isServer;
Expand Down Expand Up @@ -561,7 +566,9 @@ class Receiver extends Writable {
}
}

this._state = WAIT_MICROTASK;
this._state = this._allowMultipleEventsPerMicrotask
? GET_INFO
: WAIT_MICROTASK;
}

/**
Expand All @@ -578,8 +585,6 @@ class Receiver extends Writable {
if (data.length === 0) {
this.emit('conclude', 1005, EMPTY_BUFFER);
this.end();

this._state = GET_INFO;
} else {
const code = data.readUInt16BE(0);

Expand Down Expand Up @@ -611,16 +616,16 @@ class Receiver extends Writable {

this.emit('conclude', code, buf);
this.end();

this._state = GET_INFO;
}
} else if (this._opcode === 0x09) {
this.emit('ping', data);
this._state = WAIT_MICROTASK;
} else {
this.emit('pong', data);
this._state = WAIT_MICROTASK;

this._state = GET_INFO;
return;
}

this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
this._state = this._allowMultipleEventsPerMicrotask
? GET_INFO
: WAIT_MICROTASK;
}
}

Expand Down
6 changes: 6 additions & 0 deletions lib/websocket-server.js
Expand Up @@ -29,6 +29,9 @@ class WebSocketServer extends EventEmitter {
* Create a `WebSocketServer` instance.
*
* @param {Object} options Configuration options
* @param {Boolean} [options.allowMultipleEventsPerMicrotask=false] Specifies
* whether or not to process more than one of the `'message'`, `'ping'`,
* and `'pong'` events per microtask
* @param {Number} [options.backlog=511] The maximum length of the queue of
* pending connections
* @param {Boolean} [options.clientTracking=true] Specifies whether or not to
Expand All @@ -55,6 +58,7 @@ class WebSocketServer extends EventEmitter {
super();

options = {
allowMultipleEventsPerMicrotask: false,
maxPayload: 100 * 1024 * 1024,
skipUTF8Validation: false,
perMessageDeflate: false,
Expand Down Expand Up @@ -409,6 +413,8 @@ class WebSocketServer extends EventEmitter {
socket.removeListener('error', socketOnError);

ws.setSocket(socket, head, {
allowMultipleEventsPerMicrotask:
this.options.allowMultipleEventsPerMicrotask,
maxPayload: this.options.maxPayload,
skipUTF8Validation: this.options.skipUTF8Validation
});
Expand Down
9 changes: 9 additions & 0 deletions lib/websocket.js
Expand Up @@ -192,6 +192,9 @@ class WebSocket extends EventEmitter {
* @param {Duplex} socket The network socket between the server and client
* @param {Buffer} head The first packet of the upgraded stream
* @param {Object} options Options object
* @param {Boolean} [options.allowMultipleEventsPerMicrotask=false] Specifies
* whether or not to process more than one of the `'message'`, `'ping'`,
* and `'pong'` events per microtask
* @param {Function} [options.generateMask] The function used to generate the
* masking key
* @param {Number} [options.maxPayload=0] The maximum allowed message size
Expand All @@ -201,6 +204,7 @@ class WebSocket extends EventEmitter {
*/
setSocket(socket, head, options) {
const receiver = new Receiver({
allowMultipleEventsPerMicrotask: options.allowMultipleEventsPerMicrotask,
binaryType: this.binaryType,
extensions: this._extensions,
isServer: this._isServer,
Expand Down Expand Up @@ -618,6 +622,9 @@ module.exports = WebSocket;
* @param {(String|URL)} address The URL to which to connect
* @param {Array} protocols The subprotocols
* @param {Object} [options] Connection options
* @param {Boolean} [options.allowMultipleEventsPerMicrotask=false] Specifies
* whether or not to process more than one of the `'message'`, `'ping'`,
* and `'pong'` events per microtask
* @param {Function} [options.finishRequest] A function which can be used to
* customize the headers of each http request before it is sent
* @param {Boolean} [options.followRedirects=false] Whether or not to follow
Expand All @@ -642,6 +649,7 @@ module.exports = WebSocket;
*/
function initAsClient(websocket, address, protocols, options) {
const opts = {
allowMultipleEventsPerMicrotask: false,
protocolVersion: protocolVersions[1],
maxPayload: 100 * 1024 * 1024,
skipUTF8Validation: false,
Expand Down Expand Up @@ -993,6 +1001,7 @@ function initAsClient(websocket, address, protocols, options) {
}

websocket.setSocket(socket, head, {
allowMultipleEventsPerMicrotask: opts.allowMultipleEventsPerMicrotask,
generateMask: opts.generateMask,
maxPayload: opts.maxPayload,
skipUTF8Validation: opts.skipUTF8Validation
Expand Down
37 changes: 37 additions & 0 deletions test/receiver.test.js
Expand Up @@ -1150,4 +1150,41 @@ describe('Receiver', () => {

receiver.write(Buffer.from('82008200', 'hex'));
});

it('honors the `allowMultipleEventsPerMicrotask` option', (done) => {
const actual = [];
const expected = [
'1',
'2',
'3',
'4',
'microtask 1',
'microtask 2',
'microtask 3',
'microtask 4'
];

function listener(data) {
const message = data.toString();
actual.push(message);

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
actual.push(`microtask ${message}`);

if (actual.length === 8) {
assert.deepStrictEqual(actual, expected);
done();
}
});
}

const receiver = new Receiver({ allowMultipleEventsPerMicrotask: true });

receiver.on('message', listener);
receiver.on('ping', listener);
receiver.on('pong', listener);

receiver.write(Buffer.from('8101318901328a0133810134', 'hex'));
});
});

0 comments on commit 93e3552

Please sign in to comment.