Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 53 additions & 31 deletions lib/Sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Sender {
this._socket = socket;
this.onerror = null;
this.queue = [];
this.bufferedBytes = 0;
}

/**
Expand Down Expand Up @@ -86,8 +87,10 @@ class Sender {
* @public
*/
ping (data, mask) {
var readOnly = isReadOnly(data);
data = makeBuffer(data);
if (this.perMessageDeflate) {
this.enqueue([this.doPing, data, mask]);
this.enqueue([this.doPing, data, mask, readOnly]);
} else {
this.doPing(data, mask);
}
Expand All @@ -98,11 +101,12 @@ class Sender {
*
* @param {*} data The message to send
* @param {Boolean} mask Specifies whether or not to mask `data`
* @param {Boolean} readOnly Specifies whether `data` can be modified
* @private
*/
doPing (data, mask) {
doPing (data, mask, readOnly) {
this.frameAndSend(data, {
readOnly: true,
readOnly: readOnly,
opcode: 0x09,
rsv1: false,
fin: true,
Expand All @@ -120,8 +124,10 @@ class Sender {
* @public
*/
pong (data, mask) {
var readOnly = isReadOnly(data);
data = makeBuffer(data);
if (this.perMessageDeflate) {
this.enqueue([this.doPong, data, mask]);
this.enqueue([this.doPong, data, mask, readOnly]);
} else {
this.doPong(data, mask);
}
Expand All @@ -132,11 +138,12 @@ class Sender {
*
* @param {*} data The message to send
* @param {Boolean} mask Specifies whether or not to mask `data`
* @param {Boolean} readOnly Specifies whether `data` can be modified
* @private
*/
doPong (data, mask) {
doPong (data, mask, readOnly) {
this.frameAndSend(data, {
readOnly: true,
readOnly: readOnly,
opcode: 0x0a,
rsv1: false,
fin: true,
Expand All @@ -161,18 +168,9 @@ class Sender {
send (data, options, cb) {
var opcode = options.binary ? 2 : 1;
var rsv1 = options.compress;
var readOnly = true;
var readOnly = isReadOnly(data);

if (data && !Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
readOnly = false;
}
}
data = makeBuffer(data);

if (this.firstFragment) {
this.firstFragment = false;
Expand Down Expand Up @@ -267,17 +265,6 @@ class Sender {
return;
}

if (!Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
options.readOnly = false;
}
}

const mergeBuffers = data.length < 1024 || options.mask && options.readOnly;
var dataOffset = options.mask ? 6 : 2;
var payloadLength = data.length;
Expand Down Expand Up @@ -334,12 +321,13 @@ class Sender {
dequeue () {
if (this.processing) return;

const handler = this.queue.shift();
if (!handler) return;
const params = this.queue.shift();
if (!params) return;

if (params[1]) this.bufferedBytes -= params[1].length;
this.processing = true;

handler[0].apply(this, handler.slice(1));
params[0].apply(this, params.slice(1));
}

/**
Expand All @@ -361,6 +349,7 @@ class Sender {
* @private
*/
enqueue (params) {
if (params[1]) this.bufferedBytes += params[1].length;
this.queue.push(params);
this.dequeue();
}
Expand Down Expand Up @@ -412,3 +401,36 @@ function sendFramedData (sender, outputBuffer, data, cb) {
sender._socket.write(outputBuffer, cb);
}
}

/**
* @param {*} data The data to be converted to Buffer
* @return {Buffer} Converted data
* @private
*/
function makeBuffer (data) {
if (data && !Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
}
}

return data;
}

/**
* @param {*} data The data to be checked
* @return {Boolean} Specifies whether `data` can be modified
* @private
*/
function isReadOnly (data) {
if (data && !Buffer.isBuffer(data) &&
!(data instanceof ArrayBuffer) &&
!ArrayBuffer.isView(data)) {
return false;
}
return true;
}
4 changes: 3 additions & 1 deletion lib/WebSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ class WebSocket extends EventEmitter {
get bufferedAmount () {
var amount = 0;

if (this._socket) amount = this._socket.bufferSize || 0;
if (this._socket) {
amount = this._socket.bufferSize + this._sender.bufferedBytes;
}
return amount;
}

Expand Down
18 changes: 18 additions & 0 deletions test/Sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ describe('Sender', function () {
});
});

describe('#pong', function () {
it('works with multiple types of data', function (done) {
let count = 0;
const sender = new Sender({
write: (data) => {
assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69])));
if (++count === 3) done();
}
});

const array = new Uint8Array([0x68, 0x69]);

sender.pong(array.buffer, false);
sender.pong(array, false);
sender.pong('hi', false);
});
});

describe('#send', function () {
it('compresses data if compress option is enabled', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
Expand Down
30 changes: 24 additions & 6 deletions test/WebSocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,34 @@ describe('WebSocket', function () {
});

it('defaults to zero upon "open"', function (done) {
server.createServer(++port, (srv) => {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`);

ws.onopen = () => {
assert.strictEqual(ws.bufferedAmount, 0);

ws.on('close', () => srv.close(done));
ws.close();
wss.close(done);
};
});
});

it('stress kernel write buffer', function (done) {
it('takes into account the data in the sender queue', function (done) {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`);

ws.on('open', () => {
ws.send('foo');
ws.send('bar', (err) => {
assert.ifError(err);
assert.strictEqual(ws.bufferedAmount, 0);
wss.close(done);
});

assert.strictEqual(ws.bufferedAmount, 3);
});
});
});

it('takes into account the data in the socket queue', function (done) {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`, {
perMessageDeflate: false
Expand All @@ -152,7 +167,10 @@ describe('WebSocket', function () {

wss.on('connection', (ws) => {
while (true) {
if (ws.bufferedAmount > 0) break;
if (ws._socket.bufferSize > 0) {
assert.strictEqual(ws.bufferedAmount, ws._socket.bufferSize);
break;
}
ws.send('hello'.repeat(1e4));
}
wss.close(done);
Expand Down