Skip to content

Commit

Permalink
[minor] Change Sender.prototype.frameAndSend() signature
Browse files Browse the repository at this point in the history
  • Loading branch information
lpinca committed Dec 15, 2016
1 parent 76143e9 commit 2dd4156
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 65 deletions.
35 changes: 25 additions & 10 deletions bench/sender.benchmark.js
Expand Up @@ -17,20 +17,35 @@ const data3 = crypto.randomBytes(64 * 1024);
const data4 = crypto.randomBytes(200 * 1024);
const data5 = crypto.randomBytes(1024 * 1024);

const opts1 = {
readOnly: false,
mask: false,
rsv1: false,
opcode: 2,
fin: true
};
const opts2 = {
readOnly: true,
rsv1: false,
mask: true,
opcode: 2,
fin: true
};

const suite = new benchmark.Suite();
var sender = new Sender();
sender._socket = { write () {} };

suite.add('frameAndSend, unmasked (64 B)', () => sender.frameAndSend(0x2, data1, false, true, false));
suite.add('frameAndSend, masked (64 B)', () => sender.frameAndSend(0x2, data1, true, true, true));
suite.add('frameAndSend, unmasked (16 KiB)', () => sender.frameAndSend(0x2, data2, false, true, false));
suite.add('frameAndSend, masked (16 KiB)', () => sender.frameAndSend(0x2, data2, true, true, true));
suite.add('frameAndSend, unmasked (64 KiB)', () => sender.frameAndSend(0x2, data3, false, true, false));
suite.add('frameAndSend, masked (64 KiB)', () => sender.frameAndSend(0x2, data3, true, true, true));
suite.add('frameAndSend, unmasked (200 KiB)', () => sender.frameAndSend(0x2, data4, false, true, false));
suite.add('frameAndSend, masked (200 KiB)', () => sender.frameAndSend(0x2, data4, true, true, true));
suite.add('frameAndSend, unmasked (1 MiB)', () => sender.frameAndSend(0x2, data5, false, true, false));
suite.add('frameAndSend, masked (1 MiB)', () => sender.frameAndSend(0x2, data5, true, true, true));
suite.add('frameAndSend, unmasked (64 B)', () => sender.frameAndSend(data1, opts1));
suite.add('frameAndSend, masked (64 B)', () => sender.frameAndSend(data1, opts2));
suite.add('frameAndSend, unmasked (16 KiB)', () => sender.frameAndSend(data2, opts1));
suite.add('frameAndSend, masked (16 KiB)', () => sender.frameAndSend(data2, opts2));
suite.add('frameAndSend, unmasked (64 KiB)', () => sender.frameAndSend(data3, opts1));
suite.add('frameAndSend, masked (64 KiB)', () => sender.frameAndSend(data3, opts2));
suite.add('frameAndSend, unmasked (200 KiB)', () => sender.frameAndSend(data4, opts1));
suite.add('frameAndSend, masked (200 KiB)', () => sender.frameAndSend(data4, opts2));
suite.add('frameAndSend, unmasked (1 MiB)', () => sender.frameAndSend(data5, opts1));
suite.add('frameAndSend, masked (1 MiB)', () => sender.frameAndSend(data5, opts2));

suite.on('cycle', (e) => console.log(e.target.toString()));

Expand Down
137 changes: 85 additions & 52 deletions lib/Sender.js
Expand Up @@ -12,6 +12,8 @@ const PerMessageDeflate = require('./PerMessageDeflate');
const bufferUtil = require('./BufferUtil').BufferUtil;
const ErrorCodes = require('./ErrorCodes');

const noop = () => {};

/**
* HyBi Sender implementation.
*/
Expand All @@ -23,12 +25,12 @@ class Sender {
* @param {Object} extensions An object containing the negotiated extensions
*/
constructor (socket, extensions) {
this.extensions = extensions || {};
this.perMessageDeflate = (extensions || {})[PerMessageDeflate.extensionName];
this.firstFragment = true;
this.processing = false;
this.compress = false;
this._socket = socket;
this.onerror = null;
this.onerror = noop;
this.queue = [];
}

Expand All @@ -51,8 +53,8 @@ class Sender {
buf.writeUInt16BE(code || 1000, 0, true);
if (buf.length > 2) buf.write(data, 2);

if (this.extensions[PerMessageDeflate.extensionName]) {
this.enqueue([this.doClose, [buf, mask, cb]]);
if (this.perMessageDeflate) {
this.enqueue([this.doClose, buf, mask, cb]);
} else {
this.doClose(buf, mask, cb);
}
Expand All @@ -67,10 +69,15 @@ class Sender {
* @private
*/
doClose (data, mask, cb) {
this.frameAndSend(0x08, data, false, true, mask, false, cb);
if (this.extensions[PerMessageDeflate.extensionName]) {
this.continue();
}
this.frameAndSend(data, {
readOnly: false,
opcode: 0x08,
rsv1: false,
fin: true,
mask
}, cb);

if (this.perMessageDeflate) this.continue();
}

/**
Expand All @@ -82,8 +89,8 @@ class Sender {
* @public
*/
ping (data, options) {
if (this.extensions[PerMessageDeflate.extensionName]) {
this.enqueue([this.doPing, [data, options.mask]]);
if (this.perMessageDeflate) {
this.enqueue([this.doPing, data, options.mask]);
} else {
this.doPing(data, options.mask);
}
Expand All @@ -97,10 +104,15 @@ class Sender {
* @private
*/
doPing (data, mask) {
this.frameAndSend(0x09, data, true, true, mask, false);
if (this.extensions[PerMessageDeflate.extensionName]) {
this.continue();
}
this.frameAndSend(data, {
readOnly: true,
opcode: 0x09,
rsv1: false,
fin: true,
mask
});

if (this.perMessageDeflate) this.continue();
}

/**
Expand All @@ -112,8 +124,8 @@ class Sender {
* @public
*/
pong (data, options) {
if (this.extensions[PerMessageDeflate.extensionName]) {
this.enqueue([this.doPong, [data, options.mask]]);
if (this.perMessageDeflate) {
this.enqueue([this.doPong, data, options.mask]);
} else {
this.doPong(data, options.mask);
}
Expand All @@ -127,10 +139,15 @@ class Sender {
* @private
*/
doPong (data, mask) {
this.frameAndSend(0x0a, data, true, true, mask, false);
if (this.extensions[PerMessageDeflate.extensionName]) {
this.continue();
}
this.frameAndSend(data, {
readOnly: true,
opcode: 0x0a,
rsv1: false,
fin: true,
mask
});

if (this.perMessageDeflate) this.continue();
}

/**
Expand All @@ -146,7 +163,6 @@ class Sender {
* @public
*/
send (data, options, cb) {
const pmd = this.extensions[PerMessageDeflate.extensionName];
var opcode = options.binary ? 2 : 1;
var rsv1 = options.compress;
var readOnly = true;
Expand All @@ -164,7 +180,9 @@ class Sender {

if (this.firstFragment) {
this.firstFragment = false;
if (rsv1 && data && pmd) rsv1 = data.length >= pmd.threshold;
if (rsv1 && data && this.perMessageDeflate) {
rsv1 = data.length >= this.perMessageDeflate.threshold;
}
this.compress = rsv1;
} else {
rsv1 = false;
Expand All @@ -173,63 +191,78 @@ class Sender {

if (options.fin) this.firstFragment = true;

if (pmd) {
const args = [opcode, data, readOnly, options.fin, options.mask, rsv1, cb];
this.enqueue([this.sendCompressed, args]);
if (this.perMessageDeflate) {
this.enqueue([this.sendCompressed, data, {
mask: options.mask,
fin: options.fin,
readOnly,
opcode,
rsv1
}, cb]);
} else {
this.frameAndSend(opcode, data, readOnly, options.fin, options.mask, false, cb);
this.frameAndSend(data, {
mask: options.mask,
fin: options.fin,
rsv1: false,
readOnly,
opcode
}, cb);
}
}

/**
* Compresses, frames and sends a data message.
*
* @param {Number} opcode The opcode
* @param {*} data The message to send
* @param {Boolean} readOnly Specifies whether `data` can be modified
* @param {Boolean} fin Specifies whether or not to set the FIN bit
* @param {Boolean} mask Specifies whether or not to mask `data`
* @param {Boolean} rsv1 Specifies whether or not to set the RSV1 bit
* @param {Buffer} data The message to send
* @param {Object} options Options object
* @param {Number} options.opcode The opcode
* @param {Boolean} options.readOnly Specifies whether `data` can be modified
* @param {Boolean} options.fin Specifies whether or not to set the FIN bit
* @param {Boolean} options.mask Specifies whether or not to mask `data`
* @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
* @param {Function} cb Callback
* @private
*/
sendCompressed (opcode, data, readOnly, fin, mask, rsv1, cb) {
sendCompressed (data, options, cb) {
if (!this.compress) {
this.frameAndSend(opcode, data, readOnly, fin, mask, false, cb);
options.rsv1 = false;
this.frameAndSend(data, options, cb);
this.continue();
return;
}

this.extensions[PerMessageDeflate.extensionName].compress(data, fin, (err, buf) => {
this.perMessageDeflate.compress(data, options.fin, (err, buf) => {
if (err) {
if (cb) cb(err);
else this.onerror(err);
return;
}

this.frameAndSend(opcode, buf, false, fin, mask, rsv1, cb);
options.readOnly = false;
this.frameAndSend(buf, options, cb);
this.continue();
});
}

/**
* Frames and sends a piece of data according to the HyBi WebSocket protocol.
*
* @param {Number} opcode The opcode
* @param {*} data The data to send
* @param {Boolean} readOnly Specifies whether `data` can be modified
* @param {Boolean} fin Specifies whether or not to set the FIN bit
* @param {Boolean} maskData Specifies whether or not to mask `data`
* @param {Boolean} rsv1 Specifies whether or not to set the RSV1 bit
* @param {Object} options Options object
* @param {Number} options.opcode The opcode
* @param {Boolean} options.readOnly Specifies whether `data` can be modified
* @param {Boolean} options.fin Specifies whether or not to set the FIN bit
* @param {Boolean} options.mask Specifies whether or not to mask `data`
* @param {Boolean} options.rsv1 Specifies whether or not to set the RSV1 bit
* @param {Function} cb Callback
* @private
*/
frameAndSend (opcode, data, readOnly, fin, maskData, rsv1, cb) {
frameAndSend (data, options, cb) {
if (!data) {
const bytes = [opcode, 0];
const bytes = [options.opcode, 0];

if (fin) bytes[0] |= 0x80;
if (maskData) {
if (options.fin) bytes[0] |= 0x80;
if (options.mask) {
bytes[1] |= 0x80;
bytes.push(0, 0, 0, 0);
}
Expand All @@ -245,12 +278,12 @@ class Sender {
data = viewToBuffer(data);
} else {
data = Buffer.from(typeof data === 'number' ? data.toString() : data);
readOnly = false;
options.readOnly = false;
}
}

const mergeBuffers = data.length < 1024 || maskData && readOnly;
var dataOffset = maskData ? 6 : 2;
const mergeBuffers = data.length < 1024 || options.mask && options.readOnly;
var dataOffset = options.mask ? 6 : 2;
var payloadLength = data.length;

if (data.length >= 65536) {
Expand All @@ -265,8 +298,8 @@ class Sender {
mergeBuffers ? data.length + dataOffset : dataOffset
);

outputBuffer[0] = fin ? opcode | 0x80 : opcode;
if (rsv1) outputBuffer[0] |= 0x40;
outputBuffer[0] = options.fin ? options.opcode | 0x80 : options.opcode;
if (options.rsv1) outputBuffer[0] |= 0x40;

if (payloadLength === 126) {
outputBuffer.writeUInt16BE(data.length, 2, true);
Expand All @@ -275,7 +308,7 @@ class Sender {
outputBuffer.writeUInt32BE(data.length, 6, true);
}

if (maskData) {
if (options.mask) {
const mask = getRandomMask();

outputBuffer[1] = payloadLength | 0x80;
Expand Down Expand Up @@ -310,7 +343,7 @@ class Sender {

this.processing = true;

handler[0].apply(this, handler[1]);
handler[0].apply(this, handler.slice(1));
}

/**
Expand Down
24 changes: 21 additions & 3 deletions test/Sender.test.js
Expand Up @@ -17,7 +17,13 @@ describe('Sender', function () {
const sender = new Sender({ write: () => {} });
const buf = Buffer.from([1, 2, 3, 4, 5]);

sender.frameAndSend(2, buf, true, true, true);
sender.frameAndSend(buf, {
readOnly: true,
rsv1: false,
mask: true,
opcode: 2,
fin: true
});

assert.ok(buf.equals(Buffer.from([1, 2, 3, 4, 5])));
});
Expand All @@ -26,7 +32,13 @@ describe('Sender', function () {
const sender = new Sender({ write: () => {} });
const text = Buffer.from('hi there');

sender.frameAndSend(1, text, true, true, true);
sender.frameAndSend(text, {
readOnly: true,
rsv1: false,
mask: true,
opcode: 1,
fin: true
});

assert.ok(text.equals(Buffer.from('hi there')));
});
Expand All @@ -39,7 +51,13 @@ describe('Sender', function () {
}
});

sender.frameAndSend(1, Buffer.from('hi'), false, true, false, true);
sender.frameAndSend(Buffer.from('hi'), {
readOnly: false,
mask: false,
rsv1: true,
opcode: 1,
fin: true
});
});
});

Expand Down

0 comments on commit 2dd4156

Please sign in to comment.