From 8dd3dd1ddff6e4d90592ff4c74ada7f11fc27fa0 Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Wed, 8 Oct 2025 12:05:46 -0400 Subject: [PATCH 1/4] improve performance of streams --- .changeset/shaggy-walls-sniff.md | 5 ++ packages/open-next/src/core/routing/util.ts | 8 +-- packages/open-next/src/utils/binary.ts | 5 +- packages/open-next/src/utils/fetch.ts | 3 +- packages/open-next/src/utils/stream.ts | 75 ++++++++++++++------- 5 files changed, 65 insertions(+), 31 deletions(-) create mode 100644 .changeset/shaggy-walls-sniff.md diff --git a/.changeset/shaggy-walls-sniff.md b/.changeset/shaggy-walls-sniff.md new file mode 100644 index 000000000..2d1ef7579 --- /dev/null +++ b/.changeset/shaggy-walls-sniff.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/aws": patch +--- + +Reduces allocations and copies of streams diff --git a/packages/open-next/src/core/routing/util.ts b/packages/open-next/src/core/routing/util.ts index abce39236..b6204f043 100644 --- a/packages/open-next/src/core/routing/util.ts +++ b/packages/open-next/src/core/routing/util.ts @@ -105,14 +105,15 @@ export function convertRes(res: OpenNextNodeResponse): InternalResult { const isBase64Encoded = isBinaryContentType(headers["content-type"]) || !!headers["content-encoding"]; + let index = 0; const body = new ReadableStream({ pull(controller) { - if (!res._chunks || res._chunks.length === 0) { + if (!res._chunks || index >= res._chunks.length) { controller.close(); return; } - controller.enqueue(res._chunks.shift()); + controller.enqueue(res._chunks[index++]); }, }); return { @@ -217,13 +218,12 @@ export function convertBodyToReadableStream( ) { if (method === "GET" || method === "HEAD") return undefined; if (!body) return undefined; - const readable = new ReadableStream({ + return new ReadableStream({ start(controller) { controller.enqueue(body); controller.close(); }, }); - return readable; } enum CommonHeaders { diff --git a/packages/open-next/src/utils/binary.ts b/packages/open-next/src/utils/binary.ts index 4ca324285..f439e1e23 100644 --- a/packages/open-next/src/utils/binary.ts +++ b/packages/open-next/src/utils/binary.ts @@ -60,8 +60,9 @@ const commonBinaryMimeTypes = new Set([ ]); export function isBinaryContentType(contentType?: string | null) { - if (!contentType) return false; + if (contentType == null) return false; - const value = contentType?.split(";")[0] ?? ""; + const value = contentType.split(";").at(0); + if (value == null) return false; return commonBinaryMimeTypes.has(value); } diff --git a/packages/open-next/src/utils/fetch.ts b/packages/open-next/src/utils/fetch.ts index 8378fafbf..853f853ae 100644 --- a/packages/open-next/src/utils/fetch.ts +++ b/packages/open-next/src/utils/fetch.ts @@ -18,7 +18,6 @@ export function customFetchClient(client: AwsClient) { * This is necessary otherwise we get some error : SocketError: other side closed * https://github.com/nodejs/undici/issues/583#issuecomment-855384858 */ - const clonedResponse = response.clone(); - return clonedResponse; + return response.clone(); }; } diff --git a/packages/open-next/src/utils/stream.ts b/packages/open-next/src/utils/stream.ts index 4361b5738..7d078bbfe 100644 --- a/packages/open-next/src/utils/stream.ts +++ b/packages/open-next/src/utils/stream.ts @@ -1,43 +1,72 @@ -import { Readable } from "node:stream"; -import type { ReadableStream } from "node:stream/web"; +import { ReadableStream } from "node:stream/web"; -export function fromReadableStream( +export async function fromReadableStream( stream: ReadableStream, base64?: boolean, ): Promise { const reader = stream.getReader(); const chunks: Uint8Array[] = []; + let totalLength = 0; - return new Promise((resolve, reject) => { - function pump() { - reader - .read() - .then(({ done, value }) => { - if (done) { - resolve(Buffer.concat(chunks).toString(base64 ? "base64" : "utf8")); - return; - } - chunks.push(value); - pump(); - }) - .catch(reject); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + chunks.push(value); + totalLength += value.length; } - pump(); - }); + + if (chunks.length === 0) { + return ""; + } + + if (chunks.length === 1) { + return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8"); + } + + // Pre-allocate buffer with exact size to avoid reallocation + const buffer = Buffer.allocUnsafe(totalLength); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.length; + } + + return buffer.toString(base64 ? "base64" : "utf8"); + } finally { + reader.releaseLock(); + } } export function toReadableStream( value: string, isBase64?: boolean, ): ReadableStream { - return Readable.toWeb( - Readable.from(Buffer.from(value, isBase64 ? "base64" : "utf8")), - ); + const buffer = Buffer.from(value, isBase64 ? "base64" : "utf8"); + + return new ReadableStream({ + start(controller) { + controller.enqueue(buffer); + controller.close(); + }, + }); } +let maybeSomethingBuffer: Buffer | undefined; + export function emptyReadableStream(): ReadableStream { if (process.env.OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE === "true") { - return Readable.toWeb(Readable.from([Buffer.from("SOMETHING")])); + return new ReadableStream({ + start(controller) { + maybeSomethingBuffer ??= Buffer.from("SOMETHING"); + controller.enqueue(maybeSomethingBuffer); + controller.close(); + }, + }); } - return Readable.toWeb(Readable.from([])); + return new ReadableStream({ + start(controller) { + controller.close(); + }, + }); } From abbd288437dca6c570e5b72d0a3a7f167acf751a Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Wed, 8 Oct 2025 12:52:01 -0400 Subject: [PATCH 2/4] Update binary.ts Co-authored-by: Victor Berchet --- packages/open-next/src/utils/binary.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/open-next/src/utils/binary.ts b/packages/open-next/src/utils/binary.ts index f439e1e23..c2f35ec98 100644 --- a/packages/open-next/src/utils/binary.ts +++ b/packages/open-next/src/utils/binary.ts @@ -60,7 +60,7 @@ const commonBinaryMimeTypes = new Set([ ]); export function isBinaryContentType(contentType?: string | null) { - if (contentType == null) return false; + if (!contentType) return false; const value = contentType.split(";").at(0); if (value == null) return false; From e140a73fabc61411660ec36a09a71c78f9c258d1 Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Wed, 8 Oct 2025 14:37:49 -0400 Subject: [PATCH 3/4] Update binary.ts Co-authored-by: Victor Berchet --- packages/open-next/src/utils/binary.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/open-next/src/utils/binary.ts b/packages/open-next/src/utils/binary.ts index c2f35ec98..a938d34ad 100644 --- a/packages/open-next/src/utils/binary.ts +++ b/packages/open-next/src/utils/binary.ts @@ -63,6 +63,5 @@ export function isBinaryContentType(contentType?: string | null) { if (!contentType) return false; const value = contentType.split(";").at(0); - if (value == null) return false; return commonBinaryMimeTypes.has(value); } From ca06ec17e8cbe0dceafd1fc85d1b09e05b858d84 Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Wed, 8 Oct 2025 14:51:30 -0400 Subject: [PATCH 4/4] address pr reviews --- packages/open-next/src/core/routing/util.ts | 5 +- packages/open-next/src/utils/binary.ts | 2 +- packages/open-next/src/utils/stream.ts | 74 ++++++++++----------- 3 files changed, 39 insertions(+), 42 deletions(-) diff --git a/packages/open-next/src/core/routing/util.ts b/packages/open-next/src/core/routing/util.ts index b6204f043..c27ba0ec8 100644 --- a/packages/open-next/src/core/routing/util.ts +++ b/packages/open-next/src/core/routing/util.ts @@ -105,15 +105,14 @@ export function convertRes(res: OpenNextNodeResponse): InternalResult { const isBase64Encoded = isBinaryContentType(headers["content-type"]) || !!headers["content-encoding"]; - let index = 0; const body = new ReadableStream({ pull(controller) { - if (!res._chunks || index >= res._chunks.length) { + if (!res._chunks || res._chunks.length === 0) { controller.close(); return; } - controller.enqueue(res._chunks[index++]); + controller.enqueue(res._chunks.shift()); }, }); return { diff --git a/packages/open-next/src/utils/binary.ts b/packages/open-next/src/utils/binary.ts index a938d34ad..bff113a2d 100644 --- a/packages/open-next/src/utils/binary.ts +++ b/packages/open-next/src/utils/binary.ts @@ -62,6 +62,6 @@ const commonBinaryMimeTypes = new Set([ export function isBinaryContentType(contentType?: string | null) { if (!contentType) return false; - const value = contentType.split(";").at(0); + const value = contentType.split(";")[0]; return commonBinaryMimeTypes.has(value); } diff --git a/packages/open-next/src/utils/stream.ts b/packages/open-next/src/utils/stream.ts index 7d078bbfe..52e834f63 100644 --- a/packages/open-next/src/utils/stream.ts +++ b/packages/open-next/src/utils/stream.ts @@ -4,65 +4,63 @@ export async function fromReadableStream( stream: ReadableStream, base64?: boolean, ): Promise { - const reader = stream.getReader(); const chunks: Uint8Array[] = []; let totalLength = 0; - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - chunks.push(value); - totalLength += value.length; - } - - if (chunks.length === 0) { - return ""; - } + for await (const chunk of stream) { + chunks.push(chunk); + totalLength += chunk.length; + } - if (chunks.length === 1) { - return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8"); - } + if (chunks.length === 0) { + return ""; + } - // Pre-allocate buffer with exact size to avoid reallocation - const buffer = Buffer.allocUnsafe(totalLength); - let offset = 0; - for (const chunk of chunks) { - buffer.set(chunk, offset); - offset += chunk.length; - } + if (chunks.length === 1) { + return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8"); + } - return buffer.toString(base64 ? "base64" : "utf8"); - } finally { - reader.releaseLock(); + // Pre-allocate buffer with exact size to avoid reallocation + const buffer = Buffer.alloc(totalLength); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.length; } + + return buffer.toString(base64 ? "base64" : "utf8"); } export function toReadableStream( value: string, isBase64?: boolean, ): ReadableStream { - const buffer = Buffer.from(value, isBase64 ? "base64" : "utf8"); - - return new ReadableStream({ - start(controller) { - controller.enqueue(buffer); - controller.close(); + return new ReadableStream( + { + pull(controller) { + // Defer the Buffer.from conversion to when the stream is actually read. + controller.enqueue(Buffer.from(value, isBase64 ? "base64" : "utf8")); + controller.close(); + }, }, - }); + { highWaterMark: 0 }, + ); } let maybeSomethingBuffer: Buffer | undefined; export function emptyReadableStream(): ReadableStream { if (process.env.OPEN_NEXT_FORCE_NON_EMPTY_RESPONSE === "true") { - return new ReadableStream({ - start(controller) { - maybeSomethingBuffer ??= Buffer.from("SOMETHING"); - controller.enqueue(maybeSomethingBuffer); - controller.close(); + return new ReadableStream( + { + pull(controller) { + maybeSomethingBuffer ??= Buffer.from("SOMETHING"); + controller.enqueue(maybeSomethingBuffer); + controller.close(); + }, }, - }); + { highWaterMark: 0 }, + ); } return new ReadableStream({ start(controller) {