From 2d435bf69693c565e4a90240ef787db45d74ddb6 Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Fri, 10 Oct 2025 09:39:58 -0400 Subject: [PATCH 1/2] improve composable-cache performance --- .changeset/stale-ducks-hide.md | 5 ++ .../src/adapters/composable-cache.ts | 68 ++++++++++--------- packages/open-next/src/utils/stream.ts | 10 +-- 3 files changed, 44 insertions(+), 39 deletions(-) create mode 100644 .changeset/stale-ducks-hide.md diff --git a/.changeset/stale-ducks-hide.md b/.changeset/stale-ducks-hide.md new file mode 100644 index 000000000..689dcea23 --- /dev/null +++ b/.changeset/stale-ducks-hide.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/aws": patch +--- + +Improve composable-cache performance by avoiding unnecessary copies diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index 5ed51478e..cfdec136d 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -1,28 +1,18 @@ import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache"; -import type { CacheValue } from "types/overrides"; import { writeTags } from "utils/cache"; import { fromReadableStream, toReadableStream } from "utils/stream"; import { debug } from "./logger"; -const pendingWritePromiseMap = new Map< - string, - Promise> ->(); +const pendingWritePromiseMap = new Map>(); export default { async get(cacheKey: string) { try { // We first check if we have a pending write for this cache key // If we do, we return the pending promise instead of fetching the cache - if (pendingWritePromiseMap.has(cacheKey)) { - const stored = pendingWritePromiseMap.get(cacheKey); - if (stored) { - return stored.then((entry) => ({ - ...entry, - value: toReadableStream(entry.value), - })); - } - } + const stored = pendingWritePromiseMap.get(cacheKey); + if (stored) return stored; + const result = await globalThis.incrementalCache.get( cacheKey, "composable", @@ -69,28 +59,45 @@ export default { }, async set(cacheKey: string, pendingEntry: Promise) { - const promiseEntry = pendingEntry.then(async (entry) => ({ - ...entry, - value: await fromReadableStream(entry.value), - })); - pendingWritePromiseMap.set(cacheKey, promiseEntry); + const teedPromise = pendingEntry.then((entry) => { + // Optimization: We avoid consuming and stringifying the stream here, + // because it creates double copies just to be discarded when this function + // ends. This avoids unnecessary memory usage, and reduces GC pressure. + const [stream1, stream2] = entry.value.tee(); + return [ + { ...entry, value: stream1 }, + { ...entry, value: stream2 }, + ] as const; + }); + + pendingWritePromiseMap.set( + cacheKey, + teedPromise.then(([entry]) => entry), + ); - const entry = await promiseEntry.finally(() => { + const [, entryForStorage] = await teedPromise.finally(() => { pendingWritePromiseMap.delete(cacheKey); }); + await globalThis.incrementalCache.set( cacheKey, { - ...entry, - value: entry.value, + ...entryForStorage, + value: await fromReadableStream(entryForStorage.value), }, "composable", ); + if (globalThis.tagCache.mode === "original") { const storedTags = await globalThis.tagCache.getByPath(cacheKey); - const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag)); + const tagsToWrite = []; + for (const tag of entryForStorage.tags) { + if (!storedTags.includes(tag)) { + tagsToWrite.push({ tag, path: cacheKey }); + } + } if (tagsToWrite.length > 0) { - await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey }))); + await writeTags(tagsToWrite); } } }, @@ -125,17 +132,14 @@ export default { })); }), ); - // We need to deduplicate paths, we use a set for that - const setToWrite = new Set<{ path: string; tag: string }>(); + + const dedupeMap = new Map(); for (const entry of pathsToUpdate.flat()) { - setToWrite.add(entry); + dedupeMap.set(`${entry.path}|${entry.tag}`, entry); } - await writeTags(Array.from(setToWrite)); + await writeTags(Array.from(dedupeMap.values())); }, // This one is necessary for older versions of next - async receiveExpiredTags(...tags: string[]) { - // This function does absolutely nothing - return; - }, + async receiveExpiredTags() {}, } satisfies ComposableCacheHandler; diff --git a/packages/open-next/src/utils/stream.ts b/packages/open-next/src/utils/stream.ts index 52e834f63..a30f51f51 100644 --- a/packages/open-next/src/utils/stream.ts +++ b/packages/open-next/src/utils/stream.ts @@ -20,13 +20,9 @@ export async function fromReadableStream( return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8"); } - // Pre-allocate buffer with exact size to avoid reallocation - const buffer = Buffer.alloc(totalLength); - let offset = 0; - for (const chunk of chunks) { - buffer.set(chunk, offset); - offset += chunk.length; - } + // Use Buffer.concat which is more efficient than manual allocation and copy + // It handles the allocation and copy in optimized native code + const buffer = Buffer.concat(chunks, totalLength); return buffer.toString(base64 ? "base64" : "utf8"); } From 0a946d302070856bb32cb97231217ef3677beca3 Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Fri, 10 Oct 2025 13:29:13 -0400 Subject: [PATCH 2/2] store blobs rather than texts --- .../src/adapters/composable-cache.ts | 60 +++++++++------- packages/open-next/src/types/cache.ts | 4 +- packages/open-next/src/utils/stream.ts | 10 ++- .../tests/adapters/composable-cache.test.ts | 71 +++++++++++++------ 4 files changed, 90 insertions(+), 55 deletions(-) diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index cfdec136d..a8db6e969 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -1,9 +1,15 @@ -import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache"; +import type { + ComposableCacheEntry, + ComposableCacheHandler, + StoredComposableCacheEntry, +} from "types/cache"; import { writeTags } from "utils/cache"; -import { fromReadableStream, toReadableStream } from "utils/stream"; import { debug } from "./logger"; -const pendingWritePromiseMap = new Map>(); +const pendingWritePromiseMap = new Map< + string, + Promise +>(); export default { async get(cacheKey: string) { @@ -11,7 +17,12 @@ export default { // We first check if we have a pending write for this cache key // If we do, we return the pending promise instead of fetching the cache const stored = pendingWritePromiseMap.get(cacheKey); - if (stored) return stored; + if (stored) { + return stored.then((val) => ({ + ...val, + value: val.value.stream(), + })); + } const result = await globalThis.incrementalCache.get( cacheKey, @@ -50,7 +61,7 @@ export default { return { ...result.value, - value: toReadableStream(result.value.value), + value: result.value.value.stream(), }; } catch (e) { debug("Cannot read composable cache entry"); @@ -59,39 +70,36 @@ export default { }, async set(cacheKey: string, pendingEntry: Promise) { - const teedPromise = pendingEntry.then((entry) => { - // Optimization: We avoid consuming and stringifying the stream here, - // because it creates double copies just to be discarded when this function - // ends. This avoids unnecessary memory usage, and reduces GC pressure. - const [stream1, stream2] = entry.value.tee(); - return [ - { ...entry, value: stream1 }, - { ...entry, value: stream2 }, - ] as const; + // Convert ReadableStream to Blob first + const blobPromise = pendingEntry.then(async (entry) => { + const reader = entry.value.getReader(); + const chunks: Uint8Array[] = []; + let result: ReadableStreamReadResult; + while (!(result = await reader.read()).done) { + chunks.push(result.value); + } + reader.releaseLock(); + return { ...entry, value: new Blob(chunks) }; }); - pendingWritePromiseMap.set( - cacheKey, - teedPromise.then(([entry]) => entry), - ); + // Store a stream from the blob in the pending map for concurrent get() calls + pendingWritePromiseMap.set(cacheKey, blobPromise); - const [, entryForStorage] = await teedPromise.finally(() => { - pendingWritePromiseMap.delete(cacheKey); - }); + const entryWithBlob = await blobPromise; await globalThis.incrementalCache.set( cacheKey, - { - ...entryForStorage, - value: await fromReadableStream(entryForStorage.value), - }, + entryWithBlob, "composable", ); + // Delete from pending map only after the write is complete + pendingWritePromiseMap.delete(cacheKey); + if (globalThis.tagCache.mode === "original") { const storedTags = await globalThis.tagCache.getByPath(cacheKey); const tagsToWrite = []; - for (const tag of entryForStorage.tags) { + for (const tag of entryWithBlob.tags) { if (!storedTags.includes(tag)) { tagsToWrite.push({ tag, path: cacheKey }); } diff --git a/packages/open-next/src/types/cache.ts b/packages/open-next/src/types/cache.ts index 26b5b396e..79e8db29d 100644 --- a/packages/open-next/src/types/cache.ts +++ b/packages/open-next/src/types/cache.ts @@ -1,5 +1,3 @@ -import type { ReadableStream } from "node:stream/web"; - interface CachedFetchValue { kind: "FETCH"; data: { @@ -152,7 +150,7 @@ export interface ComposableCacheEntry { } export type StoredComposableCacheEntry = Omit & { - value: string; + value: Blob; }; export interface ComposableCacheHandler { diff --git a/packages/open-next/src/utils/stream.ts b/packages/open-next/src/utils/stream.ts index a30f51f51..52e834f63 100644 --- a/packages/open-next/src/utils/stream.ts +++ b/packages/open-next/src/utils/stream.ts @@ -20,9 +20,13 @@ export async function fromReadableStream( return Buffer.from(chunks[0]).toString(base64 ? "base64" : "utf8"); } - // Use Buffer.concat which is more efficient than manual allocation and copy - // It handles the allocation and copy in optimized native code - const buffer = Buffer.concat(chunks, totalLength); + // Pre-allocate buffer with exact size to avoid reallocation + const buffer = Buffer.alloc(totalLength); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.length; + } return buffer.toString(base64 ? "base64" : "utf8"); } diff --git a/packages/tests-unit/tests/adapters/composable-cache.test.ts b/packages/tests-unit/tests/adapters/composable-cache.test.ts index 7d110ce0c..44d497ee3 100644 --- a/packages/tests-unit/tests/adapters/composable-cache.test.ts +++ b/packages/tests-unit/tests/adapters/composable-cache.test.ts @@ -1,8 +1,4 @@ import ComposableCache from "@opennextjs/aws/adapters/composable-cache"; -import { - fromReadableStream, - toReadableStream, -} from "@opennextjs/aws/utils/stream"; import { vi } from "vitest"; describe("Composable cache handler", () => { @@ -19,7 +15,7 @@ describe("Composable cache handler", () => { timestamp: Date.now(), expire: Date.now() + 1000, revalidate: 3600, - value: "test-value", + value: new Blob(["test-value"]), }, lastModified: Date.now(), }), @@ -132,7 +128,7 @@ describe("Composable cache handler", () => { type: "route", body: "{}", tags: [], - value: "test-value", + value: new Blob(["test-value"]), }, lastModified: Date.now(), }); @@ -185,7 +181,7 @@ describe("Composable cache handler", () => { it("should return pending write promise if available", async () => { const pendingEntry = Promise.resolve({ - value: toReadableStream("pending-value"), + value: new Blob(["pending-value"]).stream(), tags: ["tag1"], stale: 0, timestamp: Date.now(), @@ -214,8 +210,9 @@ describe("Composable cache handler", () => { it("should set cache entry and handle tags in original mode", async () => { tagCache.mode = "original"; + const blob = new Blob(["test-value"]); const entry = { - value: toReadableStream("test-value"), + value: blob.stream(), tags: ["tag1", "tag2"], stale: 0, timestamp: Date.now(), @@ -229,7 +226,7 @@ describe("Composable cache handler", () => { "test-key", expect.objectContaining({ tags: ["tag1", "tag2"], - value: "test-value", + value: expect.any(Blob), }), "composable", ); @@ -241,7 +238,7 @@ describe("Composable cache handler", () => { tagCache.getByPath.mockResolvedValueOnce(["tag1"]); const entry = { - value: toReadableStream("test-value"), + value: new Blob(["test-value"]).stream(), tags: ["tag1", "tag2", "tag3"], stale: 0, timestamp: Date.now(), @@ -262,7 +259,7 @@ describe("Composable cache handler", () => { tagCache.getByPath.mockResolvedValueOnce(["tag1", "tag2"]); const entry = { - value: toReadableStream("test-value"), + value: new Blob(["test-value"]).stream(), tags: ["tag1", "tag2"], stale: 0, timestamp: Date.now(), @@ -279,7 +276,7 @@ describe("Composable cache handler", () => { tagCache.mode = "nextMode"; const entry = { - value: toReadableStream("test-value"), + value: new Blob(["test-value"]).stream(), tags: ["tag1", "tag2"], stale: 0, timestamp: Date.now(), @@ -293,9 +290,10 @@ describe("Composable cache handler", () => { expect(tagCache.writeTags).not.toHaveBeenCalled(); }); - it("should convert ReadableStream to string", async () => { + it("should store Blob directly", async () => { + const blob = new Blob(["test-content"]); const entry = { - value: toReadableStream("test-content"), + value: blob.stream(), tags: ["tag1"], stale: 0, timestamp: Date.now(), @@ -308,7 +306,7 @@ describe("Composable cache handler", () => { expect(incrementalCache.set).toHaveBeenCalledWith( "test-key", expect.objectContaining({ - value: "test-content", + value: expect.any(Blob), }), "composable", ); @@ -437,8 +435,9 @@ describe("Composable cache handler", () => { describe("integration tests", () => { it("should handle complete cache lifecycle", async () => { // Set a cache entry + const blob = new Blob(["integration-test"]); const entry = { - value: toReadableStream("integration-test"), + value: blob.stream(), tags: ["integration-tag"], stale: 0, timestamp: Date.now(), @@ -452,7 +451,7 @@ describe("Composable cache handler", () => { expect(incrementalCache.set).toHaveBeenCalledWith( "integration-key", expect.objectContaining({ - value: "integration-test", + value: expect.any(Blob), tags: ["integration-tag"], }), "composable", @@ -462,7 +461,7 @@ describe("Composable cache handler", () => { incrementalCache.get.mockResolvedValueOnce({ value: { ...entry, - value: "integration-test", + value: blob, }, lastModified: Date.now(), }); @@ -474,13 +473,21 @@ describe("Composable cache handler", () => { expect(result?.tags).toEqual(["integration-tag"]); // Convert the stream back to verify content - const content = await fromReadableStream(result!.value); + const reader = result!.value.getReader(); + const chunks: Uint8Array[] = []; + let readResult: ReadableStreamReadResult; + while (!(readResult = await reader.read()).done) { + chunks.push(readResult.value); + } + const content = new TextDecoder().decode( + new Uint8Array(chunks.flatMap((c) => Array.from(c))), + ); expect(content).toBe("integration-test"); }); it("should handle concurrent get/set operations", async () => { const entry1 = { - value: toReadableStream("concurrent-1"), + value: new Blob(["concurrent-1"]).stream(), tags: ["tag1"], stale: 0, timestamp: Date.now(), @@ -489,7 +496,7 @@ describe("Composable cache handler", () => { }; const entry2 = { - value: toReadableStream("concurrent-2"), + value: new Blob(["concurrent-2"]).stream(), tags: ["tag2"], stale: 0, timestamp: Date.now(), @@ -513,10 +520,28 @@ describe("Composable cache handler", () => { expect(results[2]).toBeDefined(); expect(results[3]).toBeDefined(); - const content1 = await fromReadableStream(results[2]!.value); + // Convert stream 1 to text + const reader1 = results[2]!.value.getReader(); + const chunks1: Uint8Array[] = []; + let readResult1: ReadableStreamReadResult; + while (!(readResult1 = await reader1.read()).done) { + chunks1.push(readResult1.value); + } + const content1 = new TextDecoder().decode( + new Uint8Array(chunks1.flatMap((c) => Array.from(c))), + ); expect(content1).toBe("concurrent-1"); - const content2 = await fromReadableStream(results[3]!.value); + // Convert stream 2 to text + const reader2 = results[3]!.value.getReader(); + const chunks2: Uint8Array[] = []; + let readResult2: ReadableStreamReadResult; + while (!(readResult2 = await reader2.read()).done) { + chunks2.push(readResult2.value); + } + const content2 = new TextDecoder().decode( + new Uint8Array(chunks2.flatMap((c) => Array.from(c))), + ); expect(content2).toBe("concurrent-2"); }); });