Skip to content

Commit

Permalink
[fix] Do not rely on undocumented behavior
Browse files Browse the repository at this point in the history
Use the chunk returned by `socket.read()` to handle the buffered data
instead of relying on a `'data'` event emitted after the `'close'`
event.

Refs: nodejs/node#39639
  • Loading branch information
lpinca committed Aug 28, 2021
1 parent 4c1849a commit 76087fb
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 6 deletions.
17 changes: 13 additions & 4 deletions lib/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -1002,24 +1002,33 @@ function socketOnClose() {
const websocket = this[kWebSocket];

this.removeListener('close', socketOnClose);
this.removeListener('data', socketOnData);
this.removeListener('end', socketOnEnd);

websocket._readyState = WebSocket.CLOSING;

let chunk;

//
// The close frame might not have been received or the `'end'` event emitted,
// for example, if the socket was destroyed due to an error. Ensure that the
// `receiver` stream is closed after writing any remaining buffered data to
// it. If the readable side of the socket is in flowing mode then there is no
// buffered data as everything has been already written and `readable.read()`
// will return `null`. If instead, the socket is paused, any possible buffered
// data will be read as a single chunk and emitted synchronously in a single
// `'data'` event.
// data will be read as a single chunk.
//
websocket._socket.read();
if (
!this._readableState.endEmitted &&
!websocket._closeFrameReceived &&
!websocket._receiver._writableState.errorEmitted &&
(chunk = websocket._socket.read()) !== null
) {
websocket._receiver.write(chunk);
}

websocket._receiver.end();

this.removeListener('data', socketOnData);
this[kWebSocket] = undefined;

clearTimeout(websocket._closeTimer);
Expand Down
81 changes: 79 additions & 2 deletions test/websocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const tls = require('tls');
const fs = require('fs');
const { URL } = require('url');

const Sender = require('../lib/sender');
const WebSocket = require('..');
const { GUID, NOOP } = require('../lib/constants');

Expand Down Expand Up @@ -2735,15 +2736,21 @@ describe('WebSocket', () => {
});
});

it('consumes all received data when connection is closed abnormally', (done) => {
it('consumes all received data when connection is closed (1/2)', (done) => {
const wss = new WebSocket.Server(
{
perMessageDeflate: { threshold: 0 },
port: 0
},
() => {
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
const messages = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', () => {
ws._socket.on('close', () => {
assert.strictEqual(ws._receiver._state, 5);
});
});

ws.on('message', (message) => messages.push(message));
ws.on('close', (code) => {
Expand All @@ -2762,6 +2769,76 @@ describe('WebSocket', () => {
});
});

it('consumes all received data when connection is closed (2/2)', (done) => {
const payload1 = Buffer.alloc(15 * 1024);
const payload2 = Buffer.alloc(1);

const opts = {
fin: true,
opcode: 0x02,
mask: false,
readOnly: false
};

const list = [
...Sender.frame(payload1, { rsv1: false, ...opts }),
...Sender.frame(payload2, { rsv1: true, ...opts })
];

for (let i = 0; i < 399; i++) {
list.push(list[list.length - 2], list[list.length - 1]);
}

const data = Buffer.concat(list);

const wss = new WebSocket.Server(
{
perMessageDeflate: true,
port: 0
},
() => {
const messageLengths = [];
const ws = new WebSocket(`ws://localhost:${wss.address().port}`);

ws.on('open', () => {
ws._socket.prependListener('close', () => {
assert.strictEqual(ws._receiver._state, 5);
assert.strictEqual(ws._socket._readableState.length, 3);
});

const push = ws._socket.push;

ws._socket.push = (data) => {
ws._socket.push = push;
ws._socket.push(data);
ws.terminate();
};

// This hack is used because there is no guarantee that more than
// 16 KiB will be sent as a single TCP packet.
push.call(ws._socket, data);

wss.clients
.values()
.next()
.value.send(payload2, { compress: false });
});

ws.on('message', (message) => {
messageLengths.push(message.length);
});

ws.on('close', (code) => {
assert.strictEqual(code, 1006);
assert.strictEqual(messageLengths.length, 402);
assert.strictEqual(messageLengths[0], 15360);
assert.strictEqual(messageLengths[messageLengths.length - 1], 1);
wss.close(done);
});
}
);
});

it('handles a close frame received while compressing data', (done) => {
const wss = new WebSocket.Server(
{
Expand Down

0 comments on commit 76087fb

Please sign in to comment.