Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/slow-plums-turn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'multitars': patch
---

Reduce `ReadableStreamBlockReader` memory complexity
108 changes: 67 additions & 41 deletions src/__tests__/reader.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { describe, it, expect } from 'vitest';
import { ReadableStreamBlockReader, readUntilBoundary } from '../reader';
import {
bytesToSkipTable,
ReadableStreamBlockReader,
readUntilBoundary,
} from '../reader';
import {
utf8Encode,
iterableToStream,
Expand Down Expand Up @@ -48,9 +52,7 @@ describe(ReadableStreamBlockReader, () => {
await expect(reader.read()).resolves.toEqual(
new Uint8Array([8, 9, 10, 11])
);
await expect(reader.read()).resolves.toEqual(null);
await expect(reader.pull()).resolves.toEqual(new Uint8Array([12, 13, 14]));
await expect(reader.pull()).resolves.toEqual(null);
await expect(reader.read()).resolves.toEqual(new Uint8Array([12, 13, 14]));
});

it('allows block-wise reads from a byte stream emitting oversized chunks (even)', async () => {
Expand All @@ -73,9 +75,8 @@ describe(ReadableStreamBlockReader, () => {
const reader = new ReadableStreamBlockReader(stream, 3);
await expect(reader.read()).resolves.toEqual(new Uint8Array([0, 1, 2]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([3, 4, 5]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([6, 7]));
await expect(reader.read()).resolves.toEqual(null);
await expect(reader.pull()).resolves.toEqual(new Uint8Array([6, 7]));
await expect(reader.pull()).resolves.toEqual(null);
});

it('allows block-wise reads from a byte stream emitting multiply-oversized chunks (even)', async () => {
Expand All @@ -92,15 +93,15 @@ describe(ReadableStreamBlockReader, () => {
await expect(reader.pull()).resolves.toEqual(null);
});

it('allows partial final blocks to be returned when `true` is passed to read()', async () => {
it('allows partial final blocks to be returned', async () => {
const stream = iterableToStream(
streamChunks({ numChunks: 2, chunkSize: 4 })
);
const reader = new ReadableStreamBlockReader(stream, 3);
await expect(reader.read(true)).resolves.toEqual(new Uint8Array([0, 1, 2]));
await expect(reader.read(true)).resolves.toEqual(new Uint8Array([3, 4, 5]));
await expect(reader.read(true)).resolves.toEqual(new Uint8Array([6, 7]));
await expect(reader.read(true)).resolves.toEqual(null);
await expect(reader.read()).resolves.toEqual(new Uint8Array([0, 1, 2]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([3, 4, 5]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([6, 7]));
await expect(reader.read()).resolves.toEqual(null);
});

it('allows block-wise reads from a byte stream emitting multiply-oversized chunks (uneven)', async () => {
Expand All @@ -110,8 +111,7 @@ describe(ReadableStreamBlockReader, () => {
const reader = new ReadableStreamBlockReader(stream, 2);
await expect(reader.read()).resolves.toEqual(new Uint8Array([0, 1]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([2, 3]));
await expect(reader.read()).resolves.toEqual(null);
await expect(reader.pull()).resolves.toEqual(new Uint8Array([4]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([4]));
});

it('allows block-wise reads from a byte stream emitting multiply-oversized chunks (single)', async () => {
Expand All @@ -121,8 +121,8 @@ describe(ReadableStreamBlockReader, () => {
const reader = new ReadableStreamBlockReader(stream, 4);
await expect(reader.read()).resolves.toEqual(new Uint8Array([0, 1, 2, 3]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([4, 5, 6, 7]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([8, 9]));
await expect(reader.read()).resolves.toEqual(null);
await expect(reader.pull()).resolves.toEqual(new Uint8Array([8, 9]));
});

it('allows skipping bytes for undersized chunks at end of blocks', async () => {
Expand All @@ -132,8 +132,7 @@ describe(ReadableStreamBlockReader, () => {
const reader = new ReadableStreamBlockReader(stream, 4);
await expect(reader.read()).resolves.toEqual(new Uint8Array([0, 1, 2, 3]));
await expect(reader.skip(2)).resolves.toBe(0);
await expect(reader.read()).resolves.toEqual(null);
await expect(reader.pull()).resolves.toEqual(new Uint8Array([6, 7]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([6, 7]));
});

it('allows skipping bytes for undersized chunks at beginning of blocks', async () => {
Expand All @@ -143,8 +142,7 @@ describe(ReadableStreamBlockReader, () => {
const reader = new ReadableStreamBlockReader(stream, 4);
await expect(reader.skip(2)).resolves.toBe(0);
await expect(reader.read()).resolves.toEqual(new Uint8Array([2, 3, 4, 5]));
await expect(reader.read()).resolves.toEqual(null);
await expect(reader.pull()).resolves.toEqual(new Uint8Array([6, 7]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([6, 7]));
});

it('allows skipping bytes for oversized chunks at end of blocks', async () => {
Expand Down Expand Up @@ -223,32 +221,28 @@ describe(ReadableStreamBlockReader, () => {
await expect(reader.pull()).resolves.toEqual(null);
});

it('respects pushed back buffers', async () => {
it('respects rewound data in the middle of a chunk', async () => {
const stream = iterableToStream(
streamChunks({ numChunks: 1, chunkSize: 8 })
);
const reader = new ReadableStreamBlockReader(stream, 4);
let chunk: Uint8Array | null;
expect((chunk = await reader.read())).toEqual(new Uint8Array([0, 1, 2, 3]));
reader.pushback(chunk!);
await expect(reader.read()).resolves.toEqual(chunk);
await expect(reader.read()).resolves.toEqual(new Uint8Array([4, 5, 6, 7]));
expect(await reader.read()).toEqual(new Uint8Array([0, 1, 2, 3]));
reader.rewind(2);
await expect(reader.read()).resolves.toEqual(new Uint8Array([2, 3, 4, 5]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([6, 7]));
await expect(reader.read()).resolves.toEqual(null);
await expect(reader.pull()).resolves.toEqual(null);
});

it('combines pushed back buffers with other buffers', async () => {
it('respects rewound data in a buffered block', async () => {
const stream = iterableToStream(
streamChunks({ numChunks: 1, chunkSize: 8 })
streamChunks({ numChunks: 4, chunkSize: 2 })
);
const reader = new ReadableStreamBlockReader(stream, 4);
let chunk: Uint8Array | null;
expect((chunk = await reader.read())).toEqual(new Uint8Array([0, 1, 2, 3]));
reader.pushback(new Uint8Array([1, 2, 3]));
reader.pushback(new Uint8Array([0]));
await expect(reader.read()).resolves.toEqual(chunk);
await expect(reader.read()).resolves.toEqual(new Uint8Array([4, 5, 6, 7]));
await expect(reader.pull()).resolves.toEqual(null);
expect(await reader.read()).toEqual(new Uint8Array([0, 1, 2, 3]));
reader.rewind(2);
await expect(reader.read()).resolves.toEqual(new Uint8Array([2, 3, 4, 5]));
await expect(reader.read()).resolves.toEqual(new Uint8Array([6, 7]));
await expect(reader.read()).resolves.toEqual(null);
});
});

Expand All @@ -259,9 +253,11 @@ describe(readUntilBoundary, () => {
await expect(async () => {
const stream = iterableToStream(streamText('', 1));
const reader = new ReadableStreamBlockReader(stream, 4);
const boundary = utf8Encode(BOUNDARY);
for await (const _chunk of readUntilBoundary(
reader,
utf8Encode(BOUNDARY)
boundary,
bytesToSkipTable(boundary)
)) {
// noop
}
Expand All @@ -277,7 +273,12 @@ describe(readUntilBoundary, () => {
// Reads data until a boundary across two chunks
let output = '';
const decoder = new TextDecoder();
for await (const chunk of readUntilBoundary(reader, utf8Encode(BOUNDARY))) {
const boundary = utf8Encode(BOUNDARY);
for await (const chunk of readUntilBoundary(
reader,
boundary,
bytesToSkipTable(boundary)
)) {
expect(chunk).not.toBe(null);
output += decoder.decode(chunk!);
}
Expand All @@ -302,7 +303,12 @@ describe(readUntilBoundary, () => {
// Reads data until a boundary across two chunks
let output = '';
const decoder = new TextDecoder();
for await (const chunk of readUntilBoundary(reader, utf8Encode(BOUNDARY))) {
const boundary = utf8Encode(BOUNDARY);
for await (const chunk of readUntilBoundary(
reader,
boundary,
bytesToSkipTable(boundary)
)) {
expect(chunk).not.toBe(null);
output += decoder.decode(chunk!);
}
Expand All @@ -320,8 +326,13 @@ describe(readUntilBoundary, () => {
streamText(BOUNDARY + 'test', BOUNDARY.length)
);
const reader = new ReadableStreamBlockReader(stream, BOUNDARY.length);
const boundary = utf8Encode(BOUNDARY);
let chunks = 0;
for await (const chunk of readUntilBoundary(reader, utf8Encode(BOUNDARY))) {
for await (const chunk of readUntilBoundary(
reader,
boundary,
bytesToSkipTable(boundary)
)) {
expect(chunk).toEqual(new Uint8Array([]));
chunks++;
}
Expand All @@ -332,8 +343,13 @@ describe(readUntilBoundary, () => {
it('aborts with null yield for EOF', async () => {
const stream = iterableToStream(streamText('some longer string', 4));
const reader = new ReadableStreamBlockReader(stream, 12);
const boundary = utf8Encode(BOUNDARY);
const chunks: (Uint8Array | null)[] = [];
for await (const chunk of readUntilBoundary(reader, utf8Encode(BOUNDARY))) {
for await (const chunk of readUntilBoundary(
reader,
boundary,
bytesToSkipTable(boundary)
)) {
if (chunk) {
const copy = new Uint8Array(chunk.byteLength);
copy.set(chunk);
Expand Down Expand Up @@ -376,8 +392,13 @@ describe(readUntilBoundary, () => {
streamText(`some longer string${BOUNDARY.slice(0, 4)}`, 4)
);
const reader = new ReadableStreamBlockReader(stream, 12);
const boundary = utf8Encode(BOUNDARY);
const chunks: (Uint8Array | null)[] = [];
for await (const chunk of readUntilBoundary(reader, utf8Encode(BOUNDARY))) {
for await (const chunk of readUntilBoundary(
reader,
boundary,
bytesToSkipTable(boundary)
)) {
if (chunk) {
const copy = new Uint8Array(chunk.byteLength);
copy.set(chunk);
Expand Down Expand Up @@ -427,10 +448,15 @@ describe(readUntilBoundary, () => {
streamText(`${before}${BOUNDARY}${after}`, 4)
);
const reader = new ReadableStreamBlockReader(stream, 12);
const boundary = utf8Encode(BOUNDARY);
// Reads data until a boundary across two chunks
let actual = '';
const decoder = new TextDecoder();
for await (const chunk of readUntilBoundary(reader, utf8Encode(BOUNDARY)))
for await (const chunk of readUntilBoundary(
reader,
boundary,
bytesToSkipTable(boundary)
))
actual += decoder.decode(chunk!);
expect(actual).toBe(before);

Expand Down
53 changes: 34 additions & 19 deletions src/multipartInput.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { ReadableStreamBlockReader, readUntilBoundary } from './reader';
import {
ReadableStreamBlockReader,
bytesToSkipTable,
readUntilBoundary,
} from './reader';
import { decodeName } from './multipartEncoding';
import { ReadableStreamLike } from './conversions';
import { MultipartHeaders, MultipartPart } from './multipartShared';

const BLOCK_SIZE = 4_096; /*4KiB*/

const CRLF = new Uint8Array([13, 10]);
const CRLF_SKIP_TABLE = bytesToSkipTable(CRLF);
const MAX_PREAMBLE_SIZE = 16_000; /*16kB*/
const MAX_HEADER_SIZE = 16_000; /*16kB*/
const MAX_HEADERS_SIZE = 32_000; /*32kB*/
Expand Down Expand Up @@ -74,11 +79,13 @@ function parseContentDisposition(
interface Boundary {
/** Initial boundary at the beginning of the multipart stream after the preamble */
raw: Uint8Array;
rawSkipTable: Uint8Array;
/** Trailer boundary after every multipart part.
* @remarks
* After every multipart part that isn't the initial one we expect a leading CRLF sequence
*/
trailer: Uint8Array;
trailerSkipTable: Uint8Array;
}

/** Create boundary patterns from `contentType` boundary parameter.
Expand All @@ -95,9 +102,13 @@ function convertToBoundaryBytes(contentType: string): Boundary {
const boundaryHeader = contentType.match(boundaryHeaderRe);
const boundaryRaw = `--${boundaryHeader?.[1] || '-'}`;
const boundaryTrailer = `\r\n${boundaryRaw}`;
const raw = utf8Encode(boundaryRaw);
const trailer = utf8Encode(boundaryTrailer);
return {
raw: utf8Encode(boundaryRaw),
trailer: utf8Encode(boundaryTrailer),
raw,
rawSkipTable: bytesToSkipTable(raw),
trailer,
trailerSkipTable: bytesToSkipTable(trailer),
};
}

Expand All @@ -106,7 +117,11 @@ async function expectPreamble(
boundary: Boundary
): Promise<void> {
let byteLength = 0;
for await (const chunk of readUntilBoundary(reader, boundary.raw)) {
for await (const chunk of readUntilBoundary(
reader,
boundary.raw,
boundary.rawSkipTable
)) {
if (chunk == null) {
throw new Error('Invalid Multipart Preamble: Unexpected EOF');
} else if ((byteLength += chunk?.byteLength) > MAX_PREAMBLE_SIZE) {
Expand All @@ -121,11 +136,10 @@ async function expectTrailer(
reader: ReadableStreamBlockReader,
boundary: Boundary
): Promise<void> {
for await (const chunk of readUntilBoundary(reader, boundary.trailer)) {
if (chunk == null || chunk.byteLength !== 0) {
const chunk = await reader.pull(boundary.trailer.byteLength);
for (let idx = 0; idx < boundary.trailer.byteLength; idx++) {
if (chunk == null || chunk[idx] !== boundary.trailer[idx]) {
throw new Error('Invalid Multipart Part: Expected trailing boundary');
} else {
break;
}
}
}
Expand All @@ -140,7 +154,11 @@ async function decodeHeaders(
const headers: MultipartHeaders = Object.create(null);
while (byteLength < MAX_HEADERS_SIZE) {
let header = '';
for await (const chunk of readUntilBoundary(reader, CRLF)) {
for await (const chunk of readUntilBoundary(
reader,
CRLF,
CRLF_SKIP_TABLE
)) {
if (chunk == null) {
throw new Error('Invalid Multipart Headers: Unexpected EOF');
} else if (
Expand Down Expand Up @@ -188,13 +206,6 @@ async function decodeHeaders(
return headers;
}

// NOTE(@kitten): We don't really want to copy but something isn't applying backpressure correctly
function copyUint8Array(src: Uint8Array) {
const dst = new Uint8Array(src.byteLength);
dst.set(src);
return dst;
}

interface ParseMultipartParams {
/** The `Content-Type` header value */
contentType: string;
Expand Down Expand Up @@ -250,7 +261,7 @@ export async function* parseMultipart(
if (!buffer)
throw new Error('Invalid Multipart Part: Unexpected EOF');
remaining -= buffer.byteLength;
controller.enqueue(copyUint8Array(buffer));
controller.enqueue(buffer.slice());
}
if (!remaining) {
await expectTrailer(reader, boundary);
Expand All @@ -263,7 +274,11 @@ export async function* parseMultipart(
);
} else {
// Without a size, we instead output a stream that ends at the multipart boundary
const iterator = readUntilBoundary(reader, boundary.trailer);
const iterator = readUntilBoundary(
reader,
boundary.trailer,
boundary.trailerSkipTable
);
stream = new ReadableStream(
{
cancel: (cancel = async function cancel() {
Expand All @@ -282,7 +297,7 @@ export async function* parseMultipart(
} else if (!result.value) {
throw new Error('Invalid Multipart Part: Unexpected EOF');
} else {
controller.enqueue(copyUint8Array(result.value));
controller.enqueue(result.value.slice());
}
},
},
Expand Down
Loading