Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take into account the data queued in the sender #971

Merged
merged 1 commit into from
Jan 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 49 additions & 26 deletions lib/Sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ class Sender {
*/
constructor (socket, extensions) {
this.perMessageDeflate = (extensions || {})[PerMessageDeflate.extensionName];
this._socket = socket;

this.firstFragment = true;
this.processing = false;
this.compress = false;
this._socket = socket;
this.onerror = null;

this.processing = false;
this.bufferedBytes = 0;
this.queue = [];

this.onerror = null;
}

/**
Expand Down Expand Up @@ -86,10 +90,23 @@ class Sender {
* @public
*/
ping (data, mask) {
var readOnly = true;

if (data && !Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
readOnly = false;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move this block of code to a separate function, like

makeBuffer (data) {
  var readOnly = true;

  if (data && !Buffer.isBuffer(data)) {
    if (data instanceof ArrayBuffer) {
      data = Buffer.from(data);
    } else if (ArrayBuffer.isView(data)) {
      data = viewToBuffer(data);
    } else {
      data = Buffer.from(data);
      readOnly = false;
    }
  }

  return [data, readOnly];
}

Usage:
[data, readOnly] = makeBuffer(data);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't like having this code duplication. The reason for not using a separate function is to avoid creating an array.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Array can make additional overhead... ok :) Speed is better than duplicates :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be benchmarked to see if it actually makes any difference :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option would be to add the readOnly flag to the returned buffer. For example:

const kReadOnly = Symbol('read-only');

function toBuffer(data) {
  var readOnly = true;

  if (!Buffer.isBuffer(data)) {
    if (data instanceof ArrayBuffer) {
      data = Buffer.from(data);
    } else if (ArrayBuffer.isView(data)) {
      data = viewToBuffer(data);
    } else {
      data = Buffer.from(data);
      readOnly = false;
    }
  }

  data[kReadOnly] = readOnly;
  return data;
}

but I'm still not happy with it as that changes the buffer hidden class.

if (this.perMessageDeflate) {
this.enqueue([this.doPing, data, mask]);
this.enqueue([this.doPing, data, mask, readOnly]);
} else {
this.doPing(data, mask);
this.doPing(data, mask, readOnly);
}
}

Expand All @@ -98,14 +115,15 @@ class Sender {
*
* @param {*} data The message to send
* @param {Boolean} mask Specifies whether or not to mask `data`
* @param {Boolean} readOnly Specifies whether `data` can be modified
* @private
*/
doPing (data, mask) {
doPing (data, mask, readOnly) {
this.frameAndSend(data, {
readOnly: true,
opcode: 0x09,
rsv1: false,
fin: true,
readOnly,
mask
});

Expand All @@ -120,10 +138,23 @@ class Sender {
* @public
*/
pong (data, mask) {
var readOnly = true;

if (data && !Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
readOnly = false;
}
}

if (this.perMessageDeflate) {
this.enqueue([this.doPong, data, mask]);
this.enqueue([this.doPong, data, mask, readOnly]);
} else {
this.doPong(data, mask);
this.doPong(data, mask, readOnly);
}
}

Expand All @@ -132,14 +163,15 @@ class Sender {
*
* @param {*} data The message to send
* @param {Boolean} mask Specifies whether or not to mask `data`
* @param {Boolean} readOnly Specifies whether `data` can be modified
* @private
*/
doPong (data, mask) {
doPong (data, mask, readOnly) {
this.frameAndSend(data, {
readOnly: true,
opcode: 0x0a,
rsv1: false,
fin: true,
readOnly,
mask
});

Expand Down Expand Up @@ -243,7 +275,7 @@ class Sender {
/**
* Frames and sends a piece of data according to the HyBi WebSocket protocol.
*
* @param {*} data The data to send
* @param {Buffer} data The data to send
* @param {Object} options Options object
* @param {Number} options.opcode The opcode
* @param {Boolean} options.readOnly Specifies whether `data` can be modified
Expand All @@ -267,17 +299,6 @@ class Sender {
return;
}

if (!Buffer.isBuffer(data)) {
if (data instanceof ArrayBuffer) {
data = Buffer.from(data);
} else if (ArrayBuffer.isView(data)) {
data = viewToBuffer(data);
} else {
data = Buffer.from(data);
options.readOnly = false;
}
}

const mergeBuffers = data.length < 1024 || options.mask && options.readOnly;
var dataOffset = options.mask ? 6 : 2;
var payloadLength = data.length;
Expand Down Expand Up @@ -334,12 +355,13 @@ class Sender {
dequeue () {
if (this.processing) return;

const handler = this.queue.shift();
if (!handler) return;
const params = this.queue.shift();
if (!params) return;

if (params[1]) this.bufferedBytes -= params[1].length;
this.processing = true;

handler[0].apply(this, handler.slice(1));
params[0].apply(this, params.slice(1));
}

/**
Expand All @@ -361,6 +383,7 @@ class Sender {
* @private
*/
enqueue (params) {
if (params[1]) this.bufferedBytes += params[1].length;
this.queue.push(params);
this.dequeue();
}
Expand Down
4 changes: 3 additions & 1 deletion lib/WebSocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ class WebSocket extends EventEmitter {
get bufferedAmount () {
var amount = 0;

if (this._socket) amount = this._socket.bufferSize || 0;
if (this._socket) {
amount = this._socket.bufferSize + this._sender.bufferedBytes;
}
return amount;
}

Expand Down
18 changes: 18 additions & 0 deletions test/Sender.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ describe('Sender', function () {
});
});

describe('#pong', function () {
it('works with multiple types of data', function (done) {
let count = 0;
const sender = new Sender({
write: (data) => {
assert.ok(data.equals(Buffer.from([0x8a, 0x02, 0x68, 0x69])));
if (++count === 3) done();
}
});

const array = new Uint8Array([0x68, 0x69]);

sender.pong(array.buffer, false);
sender.pong(array, false);
sender.pong('hi', false);
});
});

describe('#send', function () {
it('compresses data if compress option is enabled', function (done) {
const perMessageDeflate = new PerMessageDeflate({ threshold: 0 });
Expand Down
30 changes: 24 additions & 6 deletions test/WebSocket.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,34 @@ describe('WebSocket', function () {
});

it('defaults to zero upon "open"', function (done) {
server.createServer(++port, (srv) => {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`);

ws.onopen = () => {
assert.strictEqual(ws.bufferedAmount, 0);

ws.on('close', () => srv.close(done));
ws.close();
wss.close(done);
};
});
});

it('stress kernel write buffer', function (done) {
it('takes into account the data in the sender queue', function (done) {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`);

ws.on('open', () => {
ws.send('foo');
ws.send('bar', (err) => {
assert.ifError(err);
assert.strictEqual(ws.bufferedAmount, 0);
wss.close(done);
});

assert.strictEqual(ws.bufferedAmount, 3);
});
});
});

it('takes into account the data in the socket queue', function (done) {
const wss = new WebSocketServer({ port: ++port }, () => {
const ws = new WebSocket(`ws://localhost:${port}`, {
perMessageDeflate: false
Expand All @@ -152,7 +167,10 @@ describe('WebSocket', function () {

wss.on('connection', (ws) => {
while (true) {
if (ws.bufferedAmount > 0) break;
if (ws._socket.bufferSize > 0) {
assert.strictEqual(ws.bufferedAmount, ws._socket.bufferSize);
break;
}
ws.send('hello'.repeat(1e4));
}
wss.close(done);
Expand Down