Skip to content

Commit

Permalink
[api] Add WebSocket#pause() and WebSocket#resume()
Browse files Browse the repository at this point in the history
Add ability to pause and resume a websocket.
  • Loading branch information
lpinca committed Sep 30, 2021
1 parent fef7942 commit d775113
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 24 deletions.
20 changes: 20 additions & 0 deletions doc/ws.md
Expand Up @@ -31,15 +31,18 @@
- [websocket.bufferedAmount](#websocketbufferedamount)
- [websocket.close([code[, reason]])](#websocketclosecode-reason)
- [websocket.extensions](#websocketextensions)
- [websocket.isPaused](#websocketispaused)
- [websocket.onclose](#websocketonclose)
- [websocket.onerror](#websocketonerror)
- [websocket.onmessage](#websocketonmessage)
- [websocket.onopen](#websocketonopen)
- [websocket.pause()](#websocketpause)
- [websocket.ping([data[, mask]][, callback])](#websocketpingdata-mask-callback)
- [websocket.pong([data[, mask]][, callback])](#websocketpongdata-mask-callback)
- [websocket.protocol](#websocketprotocol)
- [websocket.readyState](#websocketreadystate)
- [websocket.removeEventListener(type, listener)](#websocketremoveeventlistenertype-listener)
- [websocket.resume()](#websocketresume)
- [websocket.send(data[, options][, callback])](#websocketsenddata-options-callback)
- [websocket.terminate()](#websocketterminate)
- [websocket.url](#websocketurl)
Expand Down Expand Up @@ -409,6 +412,12 @@ following ways:

Initiate a closing handshake.

### websocket.isPaused

- {Boolean}

Indicates whether the websocket is paused.

### websocket.extensions

- {Object}
Expand Down Expand Up @@ -443,6 +452,12 @@ listener receives a `MessageEvent` named "message".
An event listener to be called when the connection is established. The listener
receives an `OpenEvent` named "open".

### websocket.pause()

Pause the websocket causing it to stop emitting events. Some events can still be
emitted after this is called, until all buffered data is consumed. This method
is a noop if the ready state is not `OPEN`.

### websocket.ping([data[, mask]][, callback])

- `data` {Array|Number|Object|String|ArrayBuffer|Buffer|DataView|TypedArray} The
Expand Down Expand Up @@ -473,6 +488,11 @@ Send a pong. This method throws an error if the ready state is `CONNECTING`.

The subprotocol selected by the server.

### websocket.resume()

Make a paused socket resume emitting events. This method is a noop if the ready
state is not `OPEN`.

### websocket.readyState

- {Number}
Expand Down
25 changes: 2 additions & 23 deletions lib/stream.js
Expand Up @@ -47,23 +47,8 @@ function duplexOnError(err) {
* @public
*/
function createWebSocketStream(ws, options) {
let resumeOnReceiverDrain = true;
let terminateOnDestroy = true;

function receiverOnDrain() {
if (resumeOnReceiverDrain) ws._socket.resume();
}

if (ws.readyState === ws.CONNECTING) {
ws.once('open', function open() {
ws._receiver.removeAllListeners('drain');
ws._receiver.on('drain', receiverOnDrain);
});
} else {
ws._receiver.removeAllListeners('drain');
ws._receiver.on('drain', receiverOnDrain);
}

const duplex = new Duplex({
...options,
autoDestroy: false,
Expand All @@ -76,10 +61,7 @@ function createWebSocketStream(ws, options) {
const data =
!isBinary && duplex._readableState.objectMode ? msg.toString() : msg;

if (!duplex.push(data)) {
resumeOnReceiverDrain = false;
ws._socket.pause();
}
if (!duplex.push(data)) ws.pause();
});

ws.once('error', function error(err) {
Expand Down Expand Up @@ -155,10 +137,7 @@ function createWebSocketStream(ws, options) {
};

duplex._read = function () {
if (ws.readyState === ws.OPEN && !resumeOnReceiverDrain) {
resumeOnReceiverDrain = true;
if (!ws._receiver._writableState.needDrain) ws._socket.resume();
}
if (ws.isPaused) ws.resume();
};

duplex._write = function (chunk, encoding, callback) {
Expand Down
37 changes: 36 additions & 1 deletion lib/websocket.js
Expand Up @@ -58,6 +58,7 @@ class WebSocket extends EventEmitter {
this._closeMessage = EMPTY_BUFFER;
this._closeTimer = null;
this._extensions = {};
this._paused = false;
this._protocol = '';
this._readyState = WebSocket.CONNECTING;
this._receiver = null;
Expand Down Expand Up @@ -124,6 +125,13 @@ class WebSocket extends EventEmitter {
return Object.keys(this._extensions).join();
}

/**
* @type {Boolean}
*/
get isPaused() {
return this._paused;
}

/**
* @type {Function}
*/
Expand Down Expand Up @@ -312,6 +320,18 @@ class WebSocket extends EventEmitter {
);
}

/**
* Pause the socket.
*
* @public
*/
pause() {
if (this.readyState !== WebSocket.OPEN) return;

this._paused = true;
this._socket.pause();
}

/**
* Send a ping.
*
Expand Down Expand Up @@ -376,6 +396,18 @@ class WebSocket extends EventEmitter {
this._sender.pong(data || EMPTY_BUFFER, mask, cb);
}

/**
* Resume the socket.
*
* @public
*/
resume() {
if (this.readyState !== WebSocket.OPEN) return;

this._paused = false;
if (!this._receiver._writableState.needDrain) this._socket.resume();
}

/**
* Send a data message.
*
Expand Down Expand Up @@ -518,6 +550,7 @@ Object.defineProperty(WebSocket.prototype, 'CLOSED', {
'binaryType',
'bufferedAmount',
'extensions',
'isPaused',
'protocol',
'readyState',
'url'
Expand Down Expand Up @@ -975,7 +1008,9 @@ function receiverOnConclude(code, reason) {
* @private
*/
function receiverOnDrain() {
this[kWebSocket]._socket.resume();
const websocket = this[kWebSocket];

if (!websocket.isPaused) websocket._socket.resume();
}

/**
Expand Down
132 changes: 132 additions & 0 deletions test/websocket.test.js
Expand Up @@ -359,6 +359,39 @@ describe('WebSocket', () => {
});
});

describe('`isPaused`', () => {
it('is enumerable and configurable', () => {
const descriptor = Object.getOwnPropertyDescriptor(
WebSocket.prototype,
'isPaused'
);

assert.strictEqual(descriptor.configurable, true);
assert.strictEqual(descriptor.enumerable, true);
assert.ok(descriptor.get !== undefined);
assert.ok(descriptor.set === undefined);
});

it('indicates whether the websocket is paused', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', () => {
ws.pause();
assert.ok(ws.isPaused);

ws.resume();
assert.ok(!ws.isPaused);

ws.close();
wss.close(done);
});

assert.ok(!ws.isPaused);
});
});
});

describe('`protocol`', () => {
it('is enumerable and configurable', () => {
const descriptor = Object.getOwnPropertyDescriptor(
Expand Down Expand Up @@ -1062,6 +1095,55 @@ describe('WebSocket', () => {
});
});

describe('#pause', () => {
it('does nothing if `readyState` is not `OPEN`', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
assert.ok(!ws.isPaused);

ws.pause();
assert.ok(!ws.isPaused);

ws.on('open', () => {
ws.close();
assert.strictEqual(ws.readyState, WebSocket.CLOSING);

ws.pause();
assert.ok(!ws.isPaused);

ws.on('close', () => {
assert.strictEqual(ws.readyState, WebSocket.CLOSED);

ws.pause();
assert.ok(!ws.isPaused);

wss.close(done);
});
});
});
});

it('pauses the socket', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
assert.ok(!ws.isPaused);
assert.ok(!ws._socket.isPaused());

ws.pause();
assert.ok(ws.isPaused);
assert.ok(ws._socket.isPaused());

ws.terminate();
wss.close(done);
});
});
});

describe('#ping', () => {
it('throws an error if `readyState` is `CONNECTING`', () => {
const ws = new WebSocket('ws://localhost', {
Expand Down Expand Up @@ -1400,6 +1482,56 @@ describe('WebSocket', () => {
});
});

describe('#resume', () => {
it('does nothing if `readyState` is not `OPEN`', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', () => {
ws.pause();
assert.ok(ws.isPaused);

ws.terminate();
assert.strictEqual(ws.readyState, WebSocket.CLOSING);

ws.resume();
assert.ok(ws.isPaused);

ws.on('close', () => {
assert.strictEqual(ws.readyState, WebSocket.CLOSED);

ws.resume();
assert.ok(ws.isPaused);

wss.close(done);
});
});
});
});

it('resumes the socket', (done) => {
const wss = new WebSocket.Server({ port: 0 }, () => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
});

wss.on('connection', (ws) => {
assert.ok(!ws.isPaused);
assert.ok(!ws._socket.isPaused());

ws.pause();
assert.ok(ws.isPaused);
assert.ok(ws._socket.isPaused());

ws.resume();
assert.ok(!ws.isPaused);
assert.ok(!ws._socket.isPaused());

ws.close();
wss.close(done);
});
});
});

describe('#send', () => {
it('throws an error if `readyState` is `CONNECTING`', () => {
const ws = new WebSocket('ws://localhost', {
Expand Down

0 comments on commit d775113

Please sign in to comment.