diff --git a/lib/web/websocket/sender.js b/lib/web/websocket/sender.js index b9fc7a72364..8305d158369 100644 --- a/lib/web/websocket/sender.js +++ b/lib/web/websocket/sender.js @@ -3,14 +3,34 @@ const { WebsocketFrameSend } = require('./frame') const { opcodes, sendHints } = require('./constants') -/** @type {Uint8Array} */ +/** @type {typeof Uint8Array} */ const FastBuffer = Buffer[Symbol.species] -class SendQueue { - #queued = new Set() - #size = 0 +/** + * @typedef {object} SendQueueNode + * @property {SendQueueNode | null} next + * @property {Promise | null} promise + * @property {((...args: any[]) => any)} callback + * @property {Buffer | null} frame + * @property {boolean} resolved + */ - /** @type {import('net').Socket} */ +class SendQueue { + /** + * @type {SendQueueNode | null} + */ + #head = null + /** + * @type {SendQueueNode | null} + */ + #tail = null + + /** + * @type {boolean} + */ + #running = false + + /** @type {import('node:net').Socket} */ #socket constructor (socket) { @@ -19,58 +39,79 @@ class SendQueue { add (item, cb, hint) { if (hint !== sendHints.blob) { - const data = clone(item, hint) - - if (this.#size === 0) { - this.#dispatch(data, cb, hint) + const frame = createFrame(item, hint) + if (!this.#running) { + // fast-path + this.#socket.write(frame, cb) } else { - this.#queued.add([data, cb, true, hint]) - this.#size++ - - this.#run() + /** @type {SendQueueNode} */ + const node = { + next: null, + promise: null, + callback: cb, + frame, + resolved: true + } + if (this.#tail !== null) { + this.#tail.next = node + } + this.#tail = node } - return } - const promise = item.arrayBuffer() - const queue = [null, cb, false, hint] - promise.then((ab) => { - queue[0] = clone(ab, hint) - queue[2] = true - - this.#run() - }) - - this.#queued.add(queue) - this.#size++ - } - - #run () { - for (const queued of this.#queued) { - const [data, cb, done, hint] = queued + /** @type {SendQueueNode} */ + const node = { + next: null, + promise: item.arrayBuffer().then((ab) => { + node.resolved = true + node.frame = createFrame(ab, hint) + }), + callback: cb, + frame: null, + resolved: false + } - if (!done) return + if (this.#tail === null) { + this.#tail = node + } - this.#queued.delete(queued) - this.#size-- + if (this.#head === null) { + this.#head = node + } - this.#dispatch(data, cb, hint) + if (!this.#running) { + this.#run() } } - #dispatch (data, cb, hint) { - const frame = new WebsocketFrameSend() - const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY - - frame.frameData = data - const buffer = frame.createFrame(opcode) - - this.#socket.write(buffer, cb) + async #run () { + this.#running = true + /** @type {SendQueueNode | null} */ + let node = this.#head + while (node !== null) { + // wait pending promise + if (!node.resolved) { + await node.promise + } + // write + this.#socket.write(node.frame, node.callback) + // cleanup + node.callback = node.frame = node.promise = null + // set next + node = node.next + } + this.#head = null + this.#tail = null + this.#running = false } } -function clone (data, hint) { +function createFrame (data, hint) { + return new WebsocketFrameSend(toBuffer(data, hint)).createFrame(hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY) +} + +function toBuffer (data, hint) { switch (hint) { case sendHints.string: return Buffer.from(data) @@ -78,7 +119,7 @@ function clone (data, hint) { case sendHints.blob: return new FastBuffer(data) case sendHints.typedArray: - return Buffer.copyBytesFrom(data) + return new FastBuffer(data.buffer, data.byteOffset, data.byteLength) } }