Skip to content

Commit

Permalink
feat: prepend a header to each WebTransport chunk
Browse files Browse the repository at this point in the history
WebTransport is a stream-based protocol, so chunking boundaries are not
always preserved.

That's why we will now prepend a 4-bytes header to each chunk:

- first bit indicates whether the payload is plain text (0) or binary (1)
- next 31 bits indicate the length of the payload

See also: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#format
  • Loading branch information
darrachequesne committed Jul 31, 2023
1 parent 2d0b755 commit 6142324
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 205 deletions.
108 changes: 95 additions & 13 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { encodePacket, encodePacketToBinary } from "./encodePacket.js";
import { decodePacket } from "./decodePacket.js";
import { Packet, PacketType, RawData, BinaryType } from "./commons.js";
import {
Packet,
PacketType,
RawData,
BinaryType,
ERROR_PACKET
} from "./commons.js";

const SEPARATOR = String.fromCharCode(30); // see https://en.wikipedia.org/wiki/Delimiter#ASCII_delimited_text

Expand Down Expand Up @@ -40,30 +46,106 @@ const decodePayload = (
return packets;
};

const HEADER_LENGTH = 4;

export function createPacketEncoderStream() {
return new TransformStream({
transform(packet: Packet, controller) {
encodePacketToBinary(packet, encodedPacket => {
const header = new Uint8Array(HEADER_LENGTH);
// last 31 bits indicate the length of the payload
new DataView(header.buffer).setUint32(0, encodedPacket.length);
// first bit indicates whether the payload is plain text (0) or binary (1)
if (packet.data && typeof packet.data !== "string") {
header[0] |= 0x80;
}
controller.enqueue(header);
controller.enqueue(encodedPacket);
});
}
});
}

let TEXT_DECODER;

export function decodePacketFromBinary(
data: Uint8Array,
isBinary: boolean,
function totalLength(chunks: Uint8Array[]) {
return chunks.reduce((acc, chunk) => acc + chunk.length, 0);
}

function concatChunks(chunks: Uint8Array[], size: number) {
if (chunks[0].length === size) {
return chunks.shift();
}
const buffer = new Uint8Array(size);
let j = 0;
for (let i = 0; i < size; i++) {
buffer[i] = chunks[0][j++];
if (j === chunks[0].length) {
chunks.shift();
j = 0;
}
}
if (chunks.length && j < chunks[0].length) {
chunks[0] = chunks[0].slice(j);
}
return buffer;
}

export function createPacketDecoderStream(
maxPayload: number,
binaryType: BinaryType
) {
if (!TEXT_DECODER) {
// lazily created for compatibility with old browser platforms
TEXT_DECODER = new TextDecoder();
}
// 48 === "0".charCodeAt(0) (OPEN packet type)
// 54 === "6".charCodeAt(0) (NOOP packet type)
const isPlainBinary = isBinary || data[0] < 48 || data[0] > 54;
return decodePacket(
isPlainBinary ? data : TEXT_DECODER.decode(data),
binaryType
);
const chunks: Uint8Array[] = [];
let expectedSize = -1;
let isBinary = false;

return new TransformStream({
transform(chunk: Uint8Array, controller) {
chunks.push(chunk);
while (true) {
const expectHeader = expectedSize === -1;
if (expectHeader) {
if (totalLength(chunks) < HEADER_LENGTH) {
break;
}
const headerArray = concatChunks(chunks, HEADER_LENGTH);
const header = new DataView(
headerArray.buffer,
headerArray.byteOffset,
headerArray.length
).getUint32(0);

isBinary = header >> 31 === -1;
expectedSize = header & 0x7fffffff;

if (expectedSize === 0 || expectedSize > maxPayload) {
controller.enqueue(ERROR_PACKET);
break;
}
} else {
if (totalLength(chunks) < expectedSize) {
break;
}
const data = concatChunks(chunks, expectedSize);
controller.enqueue(
decodePacket(
isBinary ? data : TEXT_DECODER.decode(data),
binaryType
)
);
expectedSize = -1;
}
}
}
});
}

export const protocol = 4;
export {
encodePacket,
encodePacketToBinary,
encodePayload,
decodePacket,
decodePayload,
Expand Down
125 changes: 27 additions & 98 deletions test/browser.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {
decodePacket,
decodePacketFromBinary,
decodePayload,
encodePacket,
encodePacketToBinary,
encodePayload,
createPacketEncoderStream,
createPacketDecoderStream,
Packet
} from "..";
import * as expect from "expect.js";
Expand Down Expand Up @@ -114,112 +114,41 @@ describe("engine.io-parser (browser only)", () => {
}
});

describe("single packet (to/from Uint8Array)", function() {
if (!withNativeArrayBuffer) {
// @ts-ignore
return this.skip();
}

it("should encode a plaintext packet", done => {
const packet: Packet = {
type: "message",
data: "1€"
};
encodePacketToBinary(packet, encodedPacket => {
expect(encodedPacket).to.be.an(Uint8Array);
expect(encodedPacket).to.eql(Uint8Array.from([52, 49, 226, 130, 172]));

const decoded = decodePacketFromBinary(
encodedPacket,
false,
"arraybuffer"
);
expect(decoded).to.eql(packet);
done();
});
});
if (typeof TextEncoder === "function") {
describe("createPacketEncoderStream", () => {
it("should encode a binary packet (Blob)", async () => {
const stream = createPacketEncoderStream();

it("should encode a binary packet (Uint8Array)", done => {
const packet: Packet = {
type: "message",
data: Uint8Array.from([1, 2, 3])
};
encodePacketToBinary(packet, encodedPacket => {
expect(encodedPacket === packet.data).to.be(true);
done();
});
});
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

it("should encode a binary packet (Blob)", done => {
const packet: Packet = {
type: "message",
data: new Blob([Uint8Array.from([1, 2, 3])])
};
encodePacketToBinary(packet, encodedPacket => {
expect(encodedPacket).to.be.an(Uint8Array);
expect(encodedPacket).to.eql(Uint8Array.from([1, 2, 3]));
done();
});
});
writer.write({
type: "message",
data: new Blob([Uint8Array.from([1, 2, 3])])
});

it("should encode a binary packet (ArrayBuffer)", done => {
const packet: Packet = {
type: "message",
data: Uint8Array.from([1, 2, 3]).buffer
};
encodePacketToBinary(packet, encodedPacket => {
expect(encodedPacket).to.be.an(Uint8Array);
expect(encodedPacket).to.eql(Uint8Array.from([1, 2, 3]));
done();
});
});
const header = await reader.read();
expect(header.value).to.eql(Uint8Array.of(128, 0, 0, 3));

it("should encode a binary packet (Uint16Array)", done => {
const packet: Packet = {
type: "message",
data: Uint16Array.from([1, 2, 257])
};
encodePacketToBinary(packet, encodedPacket => {
expect(encodedPacket).to.be.an(Uint8Array);
expect(encodedPacket).to.eql(Uint8Array.from([1, 0, 2, 0, 1, 1]));
done();
const payload = await reader.read();
expect(payload.value).to.eql(Uint8Array.of(1, 2, 3));
});
});

it("should decode a binary packet (Blob)", () => {
const decoded = decodePacketFromBinary(
Uint8Array.from([1, 2, 3]),
false,
"blob"
);
describe("createPacketDecoderStream", () => {
it("should decode a binary packet (Blob)", async () => {
const stream = createPacketDecoderStream(1e6, "blob");

expect(decoded.type).to.eql("message");
expect(decoded.data).to.be.a(Blob);
});
const writer = stream.writable.getWriter();
const reader = stream.readable.getReader();

it("should decode a binary packet (ArrayBuffer)", () => {
const decoded = decodePacketFromBinary(
Uint8Array.from([1, 2, 3]),
false,
"arraybuffer"
);
writer.write(Uint8Array.of(128, 0, 0, 3, 1, 2, 3));

expect(decoded.type).to.eql("message");
expect(decoded.data).to.be.an(ArrayBuffer);
expect(areArraysEqual(decoded.data, Uint8Array.from([1, 2, 3])));
});
const { value } = await reader.read();

it("should decode a binary packet (with binary header)", () => {
// 52 === "4".charCodeAt(0)
const decoded = decodePacketFromBinary(
Uint8Array.from([52]),
true,
"arraybuffer"
);

expect(decoded.type).to.eql("message");
expect(decoded.data).to.be.an(ArrayBuffer);
expect(areArraysEqual(decoded.data, Uint8Array.from([52])));
expect(value.type).to.eql("message");
expect(value.data).to.be.a(Blob);
});
});
});
}
});

0 comments on commit 6142324

Please sign in to comment.