Skip to content

Commit

Permalink
Merge f841078 into 2aa0405
Browse files Browse the repository at this point in the history
  • Loading branch information
lpinca committed Apr 16, 2024
2 parents 2aa0405 + f841078 commit 02cac72
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 103 deletions.
8 changes: 4 additions & 4 deletions doc/ws.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ This class represents a WebSocket server. It extends the `EventEmitter`.
in response to a ping. Defaults to `true`.
- `allowSynchronousEvents` {Boolean} Specifies whether any of the `'message'`,
`'ping'`, and `'pong'` events can be emitted multiple times in the same
tick. To improve compatibility with the WHATWG standard, the default value
is `false`. Setting it to `true` improves performance slightly.
tick. Defaults to `true`. Setting it to `false` improves compatibility with
the WHATWG standardbut may negatively impact performance.
- `backlog` {Number} The maximum length of the queue of pending connections.
- `clientTracking` {Boolean} Specifies whether or not to track clients.
- `handleProtocols` {Function} A function which can be used to handle the
Expand Down Expand Up @@ -302,8 +302,8 @@ This class represents a WebSocket. It extends the `EventEmitter`.
in response to a ping. Defaults to `true`.
- `allowSynchronousEvents` {Boolean} Specifies whether any of the `'message'`,
`'ping'`, and `'pong'` events can be emitted multiple times in the same
tick. To improve compatibility with the WHATWG standard, the default value
is `false`. Setting it to `true` improves performance slightly.
tick. Defaults to `true`. Setting it to `false` improves compatibility with
the WHATWG standardbut may negatively impact performance.
- `finishRequest` {Function} A function which can be used to customize the
headers of each HTTP request before it is sent. See description below.
- `followRedirects` {Boolean} Whether or not to follow redirects. Defaults to
Expand Down
49 changes: 8 additions & 41 deletions lib/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ const { concat, toArrayBuffer, unmask } = require('./buffer-util');
const { isValidStatusCode, isValidUTF8 } = require('./validation');

const FastBuffer = Buffer[Symbol.species];
const promise = Promise.resolve();

//
// `queueMicrotask()` is not available in Node.js < 11.
//
const queueTask =
typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim;

const GET_INFO = 0;
const GET_PAYLOAD_LENGTH_16 = 1;
Expand All @@ -39,7 +32,7 @@ class Receiver extends Writable {
* Creates a Receiver instance.
*
* @param {Object} [options] Options object
* @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether
* @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether
* any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
* multiple times in the same tick
* @param {String} [options.binaryType=nodebuffer] The type for binary data
Expand All @@ -54,7 +47,10 @@ class Receiver extends Writable {
constructor(options = {}) {
super();

this._allowSynchronousEvents = !!options.allowSynchronousEvents;
this._allowSynchronousEvents =
options.allowSynchronousEvents !== undefined
? options.allowSynchronousEvents
: true;
this._binaryType = options.binaryType || BINARY_TYPES[0];
this._extensions = options.extensions || {};
this._isServer = !!options.isServer;
Expand Down Expand Up @@ -577,7 +573,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit('message', data, true);
this._state = GET_INFO;
this.startLoop(cb);
Expand All @@ -604,7 +600,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit('message', buf, false);
this._state = GET_INFO;
this.startLoop(cb);
Expand Down Expand Up @@ -675,7 +671,7 @@ class Receiver extends Writable {
this._state = GET_INFO;
} else {
this._state = DEFER_EVENT;
queueTask(() => {
setImmediate(() => {
this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
this._state = GET_INFO;
this.startLoop(cb);
Expand Down Expand Up @@ -711,32 +707,3 @@ class Receiver extends Writable {
}

module.exports = Receiver;

/**
* A shim for `queueMicrotask()`.
*
* @param {Function} cb Callback
*/
function queueMicrotaskShim(cb) {
promise.then(cb).catch(throwErrorNextTick);
}

/**
* Throws an error.
*
* @param {Error} err The error to throw
* @private
*/
function throwError(err) {
throw err;
}

/**
* Throws an error in the next tick.
*
* @param {Error} err The error to throw
* @private
*/
function throwErrorNextTick(err) {
process.nextTick(throwError, err);
}
4 changes: 2 additions & 2 deletions lib/websocket-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class WebSocketServer extends EventEmitter {
* Create a `WebSocketServer` instance.
*
* @param {Object} options Configuration options
* @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether
* @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether
* any of the `'message'`, `'ping'`, and `'pong'` events can be emitted
* multiple times in the same tick
* @param {Boolean} [options.autoPong=true] Specifies whether or not to
Expand Down Expand Up @@ -60,7 +60,7 @@ class WebSocketServer extends EventEmitter {
super();

options = {
allowSynchronousEvents: false,
allowSynchronousEvents: true,
autoPong: true,
maxPayload: 100 * 1024 * 1024,
skipUTF8Validation: false,
Expand Down
4 changes: 2 additions & 2 deletions lib/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ module.exports = WebSocket;
* @param {(String|URL)} address The URL to which to connect
* @param {Array} protocols The subprotocols
* @param {Object} [options] Connection options
* @param {Boolean} [options.allowSynchronousEvents=false] Specifies whether any
* @param {Boolean} [options.allowSynchronousEvents=true] Specifies whether any
* of the `'message'`, `'ping'`, and `'pong'` events can be emitted multiple
* times in the same tick
* @param {Boolean} [options.autoPong=true] Specifies whether or not to
Expand Down Expand Up @@ -652,7 +652,7 @@ module.exports = WebSocket;
*/
function initAsClient(websocket, address, protocols, options) {
const opts = {
allowSynchronousEvents: false,
allowSynchronousEvents: true,
autoPong: true,
protocolVersion: protocolVersions[1],
maxPayload: 100 * 1024 * 1024,
Expand Down
73 changes: 23 additions & 50 deletions test/receiver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ describe('Receiver', () => {
buf[i + 1] = 0x00;
}

const receiver = new Receiver({ allowSynchronousEvents: true });
const receiver = new Receiver();
let counter = 0;

receiver.on('message', (data, isBinary) => {
Expand Down Expand Up @@ -1085,17 +1085,21 @@ describe('Receiver', () => {
receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8]));
});

it('emits at most one event per microtask', (done) => {
it('honors the `allowSynchronousEvents` option', (done) => {
const actual = [];
const expected = [
'1',
'microtask 1',
'- 1',
'-- 1',
'2',
'microtask 2',
'- 2',
'-- 2',
'3',
'microtask 3',
'- 3',
'-- 3',
'4',
'microtask 4'
'- 4',
'-- 4'
];

function listener(data) {
Expand All @@ -1104,16 +1108,20 @@ describe('Receiver', () => {

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
actual.push(`microtask ${message}`);
actual.push(`- ${message}`);

if (actual.length === 8) {
assert.deepStrictEqual(actual, expected);
done();
}
Promise.resolve().then(() => {
actual.push(`-- ${message}`);

if (actual.length === 12) {
assert.deepStrictEqual(actual, expected);
done();
}
});
});
}

const receiver = new Receiver();
const receiver = new Receiver({ allowSynchronousEvents: false });

receiver.on('message', listener);
receiver.on('ping', listener);
Expand Down Expand Up @@ -1148,43 +1156,8 @@ describe('Receiver', () => {
done();
});

receiver.write(Buffer.from('82008200', 'hex'));
});

it('honors the `allowSynchronousEvents` option', (done) => {
const actual = [];
const expected = [
'1',
'2',
'3',
'4',
'microtask 1',
'microtask 2',
'microtask 3',
'microtask 4'
];

function listener(data) {
const message = data.toString();
actual.push(message);

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
actual.push(`microtask ${message}`);

if (actual.length === 8) {
assert.deepStrictEqual(actual, expected);
done();
}
});
}

const receiver = new Receiver({ allowSynchronousEvents: true });

receiver.on('message', listener);
receiver.on('ping', listener);
receiver.on('pong', listener);

receiver.write(Buffer.from('8101318901328a0133810134', 'hex'));
setImmediate(() => {
receiver.write(Buffer.from('82008200', 'hex'));
});
});
});
6 changes: 2 additions & 4 deletions test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4234,8 +4234,7 @@ describe('WebSocket', () => {

if (messages.push(message.toString()) > 1) return;

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
setImmediate(() => {
process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.close(1000);
Expand Down Expand Up @@ -4485,8 +4484,7 @@ describe('WebSocket', () => {

if (messages.push(message.toString()) > 1) return;

// `queueMicrotask()` is not available in Node.js < 11.
Promise.resolve().then(() => {
setImmediate(() => {
process.nextTick(() => {
assert.strictEqual(ws._receiver._state, 5);
ws.terminate();
Expand Down

0 comments on commit 02cac72

Please sign in to comment.