From 6142324fa61204393028f3f58f336d053030ea5f Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Mon, 31 Jul 2023 07:59:14 +0200 Subject: [PATCH] feat: prepend a header to each WebTransport chunk WebTransport is a stream-based protocol, so chunking boundaries are not always preserved. That's why we will now prepend a 4-bytes header to each chunk: - first bit indicates whether the payload is plain text (0) or binary (1) - next 31 bits indicate the length of the payload See also: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#format --- lib/index.ts | 108 +++++++++++++++++++++--- test/browser.ts | 125 ++++++--------------------- test/index.ts | 218 ++++++++++++++++++++++++++++++++++++++++++++++++ test/node.ts | 119 ++++++-------------------- 4 files changed, 365 insertions(+), 205 deletions(-) diff --git a/lib/index.ts b/lib/index.ts index 1b1b50e..51915a5 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,6 +1,12 @@ import { encodePacket, encodePacketToBinary } from "./encodePacket.js"; import { decodePacket } from "./decodePacket.js"; -import { Packet, PacketType, RawData, BinaryType } from "./commons.js"; +import { + Packet, + PacketType, + RawData, + BinaryType, + ERROR_PACKET +} from "./commons.js"; const SEPARATOR = String.fromCharCode(30); // see https://en.wikipedia.org/wiki/Delimiter#ASCII_delimited_text @@ -40,30 +46,106 @@ const decodePayload = ( return packets; }; +const HEADER_LENGTH = 4; + +export function createPacketEncoderStream() { + return new TransformStream({ + transform(packet: Packet, controller) { + encodePacketToBinary(packet, encodedPacket => { + const header = new Uint8Array(HEADER_LENGTH); + // last 31 bits indicate the length of the payload + new DataView(header.buffer).setUint32(0, encodedPacket.length); + // first bit indicates whether the payload is plain text (0) or binary (1) + if (packet.data && typeof packet.data !== "string") { + header[0] |= 0x80; + } + controller.enqueue(header); + controller.enqueue(encodedPacket); + }); + } + }); +} + let TEXT_DECODER; -export function decodePacketFromBinary( - data: Uint8Array, - isBinary: boolean, +function totalLength(chunks: Uint8Array[]) { + return chunks.reduce((acc, chunk) => acc + chunk.length, 0); +} + +function concatChunks(chunks: Uint8Array[], size: number) { + if (chunks[0].length === size) { + return chunks.shift(); + } + const buffer = new Uint8Array(size); + let j = 0; + for (let i = 0; i < size; i++) { + buffer[i] = chunks[0][j++]; + if (j === chunks[0].length) { + chunks.shift(); + j = 0; + } + } + if (chunks.length && j < chunks[0].length) { + chunks[0] = chunks[0].slice(j); + } + return buffer; +} + +export function createPacketDecoderStream( + maxPayload: number, binaryType: BinaryType ) { if (!TEXT_DECODER) { - // lazily created for compatibility with old browser platforms TEXT_DECODER = new TextDecoder(); } - // 48 === "0".charCodeAt(0) (OPEN packet type) - // 54 === "6".charCodeAt(0) (NOOP packet type) - const isPlainBinary = isBinary || data[0] < 48 || data[0] > 54; - return decodePacket( - isPlainBinary ? data : TEXT_DECODER.decode(data), - binaryType - ); + const chunks: Uint8Array[] = []; + let expectedSize = -1; + let isBinary = false; + + return new TransformStream({ + transform(chunk: Uint8Array, controller) { + chunks.push(chunk); + while (true) { + const expectHeader = expectedSize === -1; + if (expectHeader) { + if (totalLength(chunks) < HEADER_LENGTH) { + break; + } + const headerArray = concatChunks(chunks, HEADER_LENGTH); + const header = new DataView( + headerArray.buffer, + headerArray.byteOffset, + headerArray.length + ).getUint32(0); + + isBinary = header >> 31 === -1; + expectedSize = header & 0x7fffffff; + + if (expectedSize === 0 || expectedSize > maxPayload) { + controller.enqueue(ERROR_PACKET); + break; + } + } else { + if (totalLength(chunks) < expectedSize) { + break; + } + const data = concatChunks(chunks, expectedSize); + controller.enqueue( + decodePacket( + isBinary ? data : TEXT_DECODER.decode(data), + binaryType + ) + ); + expectedSize = -1; + } + } + } + }); } export const protocol = 4; export { encodePacket, - encodePacketToBinary, encodePayload, decodePacket, decodePayload, diff --git a/test/browser.ts b/test/browser.ts index a3eac0d..131f2b4 100644 --- a/test/browser.ts +++ b/test/browser.ts @@ -1,10 +1,10 @@ import { decodePacket, - decodePacketFromBinary, decodePayload, encodePacket, - encodePacketToBinary, encodePayload, + createPacketEncoderStream, + createPacketDecoderStream, Packet } from ".."; import * as expect from "expect.js"; @@ -114,112 +114,41 @@ describe("engine.io-parser (browser only)", () => { } }); - describe("single packet (to/from Uint8Array)", function() { - if (!withNativeArrayBuffer) { - // @ts-ignore - return this.skip(); - } - - it("should encode a plaintext packet", done => { - const packet: Packet = { - type: "message", - data: "1€" - }; - encodePacketToBinary(packet, encodedPacket => { - expect(encodedPacket).to.be.an(Uint8Array); - expect(encodedPacket).to.eql(Uint8Array.from([52, 49, 226, 130, 172])); - - const decoded = decodePacketFromBinary( - encodedPacket, - false, - "arraybuffer" - ); - expect(decoded).to.eql(packet); - done(); - }); - }); + if (typeof TextEncoder === "function") { + describe("createPacketEncoderStream", () => { + it("should encode a binary packet (Blob)", async () => { + const stream = createPacketEncoderStream(); - it("should encode a binary packet (Uint8Array)", done => { - const packet: Packet = { - type: "message", - data: Uint8Array.from([1, 2, 3]) - }; - encodePacketToBinary(packet, encodedPacket => { - expect(encodedPacket === packet.data).to.be(true); - done(); - }); - }); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); - it("should encode a binary packet (Blob)", done => { - const packet: Packet = { - type: "message", - data: new Blob([Uint8Array.from([1, 2, 3])]) - }; - encodePacketToBinary(packet, encodedPacket => { - expect(encodedPacket).to.be.an(Uint8Array); - expect(encodedPacket).to.eql(Uint8Array.from([1, 2, 3])); - done(); - }); - }); + writer.write({ + type: "message", + data: new Blob([Uint8Array.from([1, 2, 3])]) + }); - it("should encode a binary packet (ArrayBuffer)", done => { - const packet: Packet = { - type: "message", - data: Uint8Array.from([1, 2, 3]).buffer - }; - encodePacketToBinary(packet, encodedPacket => { - expect(encodedPacket).to.be.an(Uint8Array); - expect(encodedPacket).to.eql(Uint8Array.from([1, 2, 3])); - done(); - }); - }); + const header = await reader.read(); + expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3)); - it("should encode a binary packet (Uint16Array)", done => { - const packet: Packet = { - type: "message", - data: Uint16Array.from([1, 2, 257]) - }; - encodePacketToBinary(packet, encodedPacket => { - expect(encodedPacket).to.be.an(Uint8Array); - expect(encodedPacket).to.eql(Uint8Array.from([1, 0, 2, 0, 1, 1])); - done(); + const payload = await reader.read(); + expect(payload.value).to.eql(Uint8Array.of(1, 2, 3)); }); }); - it("should decode a binary packet (Blob)", () => { - const decoded = decodePacketFromBinary( - Uint8Array.from([1, 2, 3]), - false, - "blob" - ); + describe("createPacketDecoderStream", () => { + it("should decode a binary packet (Blob)", async () => { + const stream = createPacketDecoderStream(1e6, "blob"); - expect(decoded.type).to.eql("message"); - expect(decoded.data).to.be.a(Blob); - }); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); - it("should decode a binary packet (ArrayBuffer)", () => { - const decoded = decodePacketFromBinary( - Uint8Array.from([1, 2, 3]), - false, - "arraybuffer" - ); + writer.write(Uint8Array.of(128, 0, 0, 3, 1, 2, 3)); - expect(decoded.type).to.eql("message"); - expect(decoded.data).to.be.an(ArrayBuffer); - expect(areArraysEqual(decoded.data, Uint8Array.from([1, 2, 3]))); - }); + const { value } = await reader.read(); - it("should decode a binary packet (with binary header)", () => { - // 52 === "4".charCodeAt(0) - const decoded = decodePacketFromBinary( - Uint8Array.from([52]), - true, - "arraybuffer" - ); - - expect(decoded.type).to.eql("message"); - expect(decoded.data).to.be.an(ArrayBuffer); - expect(areArraysEqual(decoded.data, Uint8Array.from([52]))); + expect(value.type).to.eql("message"); + expect(value.data).to.be.a(Blob); + }); }); - }); + } }); diff --git a/test/index.ts b/test/index.ts index 0c343d2..d90a34f 100644 --- a/test/index.ts +++ b/test/index.ts @@ -1,4 +1,6 @@ import { + createPacketDecoderStream, + createPacketEncoderStream, decodePacket, decodePayload, encodePacket, @@ -6,6 +8,8 @@ import { Packet } from ".."; import * as expect from "expect.js"; +import { areArraysEqual } from "./util"; + import "./node"; // replaced by "./browser" for the tests in the browser (see "browser" field in the package.json file) describe("engine.io-parser", () => { @@ -59,4 +63,218 @@ describe("engine.io-parser", () => { ]); }); }); + + // note: `describe("", function() { this.skip() } );` was added in mocha@10, which has dropped support for Node.js 10 + if (typeof TransformStream === "function") { + describe("createPacketEncoderStream", () => { + it("should encode a plaintext packet", async () => { + const stream = createPacketEncoderStream(); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write({ + type: "message", + data: "1€" + }); + + const header = await reader.read(); + expect(header.value).to.eql(Uint8Array.of(0, 0, 0, 5)); + + const payload = await reader.read(); + expect(payload.value).to.eql(Uint8Array.of(52, 49, 226, 130, 172)); + }); + + it("should encode a binary packet (Uint8Array)", async () => { + const stream = createPacketEncoderStream(); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + const data = Uint8Array.of(1, 2, 3); + + writer.write({ + type: "message", + data + }); + + const header = await reader.read(); + expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3)); + + const payload = await reader.read(); + expect(payload.value === data).to.be(true); + }); + + it("should encode a binary packet (ArrayBuffer)", async () => { + const stream = createPacketEncoderStream(); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write({ + type: "message", + data: Uint8Array.of(1, 2, 3).buffer + }); + + const header = await reader.read(); + expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3)); + + const payload = await reader.read(); + expect(payload.value).to.eql(Uint8Array.of(1, 2, 3)); + }); + + it("should encode a binary packet (Uint16Array)", async () => { + const stream = createPacketEncoderStream(); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write({ + type: "message", + data: Uint16Array.from([1, 2, 257]) + }); + + const header = await reader.read(); + expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 6)); + + const payload = await reader.read(); + expect(payload.value).to.eql(Uint8Array.of(1, 0, 2, 0, 1, 1)); + }); + }); + + describe("createPacketDecoderStream", () => { + it("should decode a plaintext packet", async () => { + const stream = createPacketDecoderStream(1e6, "arraybuffer"); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write(Uint8Array.of(0, 0, 0, 5)); + writer.write(Uint8Array.of(52, 49, 226, 130, 172)); + + const packet = await reader.read(); + expect(packet.value).to.eql({ + type: "message", + data: "1€" + }); + }); + + it("should decode a plaintext packet (bytes by bytes)", async () => { + const stream = createPacketDecoderStream(1e6, "arraybuffer"); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(5)); + writer.write(Uint8Array.of(52)); + writer.write(Uint8Array.of(49)); + writer.write(Uint8Array.of(226)); + writer.write(Uint8Array.of(130)); + writer.write(Uint8Array.of(172)); + + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(1)); + writer.write(Uint8Array.of(50)); + + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(0)); + writer.write(Uint8Array.of(1)); + writer.write(Uint8Array.of(51)); + + const { value } = await reader.read(); + expect(value).to.eql({ type: "message", data: "1€" }); + + const pingPacket = await reader.read(); + expect(pingPacket.value).to.eql({ type: "ping" }); + + const pongPacket = await reader.read(); + expect(pongPacket.value).to.eql({ type: "pong" }); + }); + + it("should decode a plaintext packet (all bytes at once)", async () => { + const stream = createPacketDecoderStream(1e6, "arraybuffer"); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write( + Uint8Array.of( + 0, + 0, + 0, + 5, + 52, + 49, + 226, + 130, + 172, + 0, + 0, + 0, + 1, + 50, + 0, + 0, + 0, + 1, + 51 + ) + ); + + const { value } = await reader.read(); + expect(value).to.eql({ type: "message", data: "1€" }); + + const pingPacket = await reader.read(); + expect(pingPacket.value).to.eql({ type: "ping" }); + + const pongPacket = await reader.read(); + expect(pongPacket.value).to.eql({ type: "pong" }); + }); + + it("should decode a binary packet (ArrayBuffer)", async () => { + const stream = createPacketDecoderStream(1e6, "arraybuffer"); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write(Uint8Array.of(128, 0, 0, 3, 1, 2, 3)); + + const { value } = await reader.read(); + + expect(value.type).to.eql("message"); + expect(value.data).to.be.an(ArrayBuffer); + expect(areArraysEqual(value.data, Uint8Array.of(1, 2, 3))); + }); + + it("should return an error packet if the length of the payload is too big", async () => { + const stream = createPacketDecoderStream(10, "arraybuffer"); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write(Uint8Array.of(0, 0, 1, 0)); + + const packet = await reader.read(); + expect(packet.value).to.eql({ type: "error", data: "parser error" }); + }); + + it("should return an error packet if the length of the payload is invalid", async () => { + const stream = createPacketDecoderStream(1e6, "arraybuffer"); + + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + + writer.write(Uint8Array.of(0, 0, 0, 0)); + + const packet = await reader.read(); + expect(packet.value).to.eql({ type: "error", data: "parser error" }); + }); + }); + } }); diff --git a/test/node.ts b/test/node.ts index 2d7a423..91d0462 100644 --- a/test/node.ts +++ b/test/node.ts @@ -1,11 +1,11 @@ import { decodePacket, - decodePacketFromBinary, decodePayload, encodePacket, - encodePacketToBinary, encodePayload, - Packet + Packet, + createPacketDecoderStream, + createPacketEncoderStream } from ".."; import * as expect from "expect.js"; import { areArraysEqual } from "./util"; @@ -110,109 +110,40 @@ describe("engine.io-parser (node.js only)", () => { }); if (typeof TextEncoder === "function") { - describe("single packet (to/from Uint8Array)", () => { - it("should encode/decode a plaintext packet", done => { - const packet: Packet = { - type: "message", - data: "1€" - }; - encodePacketToBinary(packet, encodedPacket => { - expect(encodedPacket).to.be.an(Uint8Array); - expect(encodedPacket).to.eql( - Uint8Array.from([52, 49, 226, 130, 172]) - ); - - const decoded = decodePacketFromBinary( - encodedPacket, - false, - "nodebuffer" - ); - expect(decoded).to.eql(packet); - done(); - }); - }); + describe("createPacketEncoderStream", () => { + it("should encode a binary packet (Buffer)", async () => { + const stream = createPacketEncoderStream(); - it("should encode a binary packet (Buffer)", done => { - const packet: Packet = { - type: "message", - data: Buffer.from([1, 2, 3]) - }; - encodePacketToBinary(packet, encodedPacket => { - expect(encodedPacket === packet.data).to.be(true); - done(); - }); - }); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); - it("should encode a binary packet (Uint8Array)", done => { - const packet: Packet = { + writer.write({ type: "message", - data: Uint8Array.from([1, 2, 3]) - }; - encodePacketToBinary(packet, encodedPacket => { - expect(encodedPacket === packet.data).to.be(true); - done(); + data: Buffer.of(1, 2, 3) }); - }); - it("should encode a binary packet (ArrayBuffer)", done => { - const packet: Packet = { - type: "message", - data: Uint8Array.from([1, 2, 3]).buffer - }; - encodePacketToBinary(packet, encodedPacket => { - expect(Buffer.isBuffer(encodedPacket)).to.be(true); - expect(encodedPacket).to.eql(Buffer.from([1, 2, 3])); - done(); - }); - }); + const header = await reader.read(); + expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3)); - it("should encode a binary packet (Uint16Array)", done => { - const packet: Packet = { - type: "message", - data: Uint16Array.from([1, 2, 257]) - }; - encodePacketToBinary(packet, encodedPacket => { - expect(Buffer.isBuffer(encodedPacket)).to.be(true); - expect(encodedPacket).to.eql(Buffer.from([1, 0, 2, 0, 1, 1])); - done(); - }); + const payload = await reader.read(); + expect(payload.value).to.eql(Uint8Array.of(1, 2, 3)); }); + }); - it("should decode a binary packet (Buffer)", () => { - const decoded = decodePacketFromBinary( - Uint8Array.from([1, 2, 3]), - false, - "nodebuffer" - ); + describe("createPacketDecoderStream", () => { + it("should decode a binary packet (Buffer)", async () => { + const stream = createPacketDecoderStream(1e6, "nodebuffer"); - expect(decoded.type).to.eql("message"); - expect(Buffer.isBuffer(decoded.data)).to.be(true); - expect(decoded.data).to.eql(Buffer.from([1, 2, 3])); - }); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); - it("should decode a binary packet (ArrayBuffer)", () => { - const decoded = decodePacketFromBinary( - Uint8Array.from([1, 2, 3]), - false, - "arraybuffer" - ); + writer.write(Uint8Array.of(128, 0, 0, 3, 1, 2, 3)); - expect(decoded.type).to.eql("message"); - expect(decoded.data).to.be.an(ArrayBuffer); - expect(areArraysEqual(decoded.data, Uint8Array.from([1, 2, 3]))); - }); + const { value } = await reader.read(); - it("should decode a binary packet (with binary header)", () => { - // 52 === "4".charCodeAt(0) - const decoded = decodePacketFromBinary( - Uint8Array.from([52]), - true, - "nodebuffer" - ); - - expect(decoded.type).to.eql("message"); - expect(Buffer.isBuffer(decoded.data)).to.be(true); - expect(decoded.data).to.eql(Buffer.from([52])); + expect(value.type).to.eql("message"); + expect(Buffer.isBuffer(value.data)).to.be(true); + expect(value.data).to.eql(Buffer.from([1, 2, 3])); }); }); }