diff --git a/.changeset/stale-ducks-hide.md b/.changeset/stale-ducks-hide.md new file mode 100644 index 000000000..689dcea23 --- /dev/null +++ b/.changeset/stale-ducks-hide.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/aws": patch +--- + +Improve composable-cache performance by avoiding unnecessary copies diff --git a/packages/open-next/src/adapters/composable-cache.ts b/packages/open-next/src/adapters/composable-cache.ts index 5ed51478e..a8db6e969 100644 --- a/packages/open-next/src/adapters/composable-cache.ts +++ b/packages/open-next/src/adapters/composable-cache.ts @@ -1,12 +1,14 @@ -import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache"; -import type { CacheValue } from "types/overrides"; +import type { + ComposableCacheEntry, + ComposableCacheHandler, + StoredComposableCacheEntry, +} from "types/cache"; import { writeTags } from "utils/cache"; -import { fromReadableStream, toReadableStream } from "utils/stream"; import { debug } from "./logger"; const pendingWritePromiseMap = new Map< string, - Promise> + Promise >(); export default { @@ -14,15 +16,14 @@ export default { try { // We first check if we have a pending write for this cache key // If we do, we return the pending promise instead of fetching the cache - if (pendingWritePromiseMap.has(cacheKey)) { - const stored = pendingWritePromiseMap.get(cacheKey); - if (stored) { - return stored.then((entry) => ({ - ...entry, - value: toReadableStream(entry.value), - })); - } + const stored = pendingWritePromiseMap.get(cacheKey); + if (stored) { + return stored.then((val) => ({ + ...val, + value: val.value.stream(), + })); } + const result = await globalThis.incrementalCache.get( cacheKey, "composable", @@ -60,7 +61,7 @@ export default { return { ...result.value, - value: toReadableStream(result.value.value), + value: result.value.value.stream(), }; } catch (e) { debug("Cannot read composable cache entry"); @@ -69,28 +70,42 @@ export default { }, async set(cacheKey: string, pendingEntry: Promise) { - const promiseEntry = pendingEntry.then(async (entry) => ({ - ...entry, - value: await fromReadableStream(entry.value), - })); - pendingWritePromiseMap.set(cacheKey, promiseEntry); - - const entry = await promiseEntry.finally(() => { - pendingWritePromiseMap.delete(cacheKey); + // Convert ReadableStream to Blob first + const blobPromise = pendingEntry.then(async (entry) => { + const reader = entry.value.getReader(); + const chunks: Uint8Array[] = []; + let result: ReadableStreamReadResult; + while (!(result = await reader.read()).done) { + chunks.push(result.value); + } + reader.releaseLock(); + return { ...entry, value: new Blob(chunks) }; }); + + // Store a stream from the blob in the pending map for concurrent get() calls + pendingWritePromiseMap.set(cacheKey, blobPromise); + + const entryWithBlob = await blobPromise; + await globalThis.incrementalCache.set( cacheKey, - { - ...entry, - value: entry.value, - }, + entryWithBlob, "composable", ); + + // Delete from pending map only after the write is complete + pendingWritePromiseMap.delete(cacheKey); + if (globalThis.tagCache.mode === "original") { const storedTags = await globalThis.tagCache.getByPath(cacheKey); - const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag)); + const tagsToWrite = []; + for (const tag of entryWithBlob.tags) { + if (!storedTags.includes(tag)) { + tagsToWrite.push({ tag, path: cacheKey }); + } + } if (tagsToWrite.length > 0) { - await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey }))); + await writeTags(tagsToWrite); } } }, @@ -125,17 +140,14 @@ export default { })); }), ); - // We need to deduplicate paths, we use a set for that - const setToWrite = new Set<{ path: string; tag: string }>(); + + const dedupeMap = new Map(); for (const entry of pathsToUpdate.flat()) { - setToWrite.add(entry); + dedupeMap.set(`${entry.path}|${entry.tag}`, entry); } - await writeTags(Array.from(setToWrite)); + await writeTags(Array.from(dedupeMap.values())); }, // This one is necessary for older versions of next - async receiveExpiredTags(...tags: string[]) { - // This function does absolutely nothing - return; - }, + async receiveExpiredTags() {}, } satisfies ComposableCacheHandler; diff --git a/packages/open-next/src/types/cache.ts b/packages/open-next/src/types/cache.ts index 26b5b396e..79e8db29d 100644 --- a/packages/open-next/src/types/cache.ts +++ b/packages/open-next/src/types/cache.ts @@ -1,5 +1,3 @@ -import type { ReadableStream } from "node:stream/web"; - interface CachedFetchValue { kind: "FETCH"; data: { @@ -152,7 +150,7 @@ export interface ComposableCacheEntry { } export type StoredComposableCacheEntry = Omit & { - value: string; + value: Blob; }; export interface ComposableCacheHandler { diff --git a/packages/tests-unit/tests/adapters/composable-cache.test.ts b/packages/tests-unit/tests/adapters/composable-cache.test.ts index 7d110ce0c..44d497ee3 100644 --- a/packages/tests-unit/tests/adapters/composable-cache.test.ts +++ b/packages/tests-unit/tests/adapters/composable-cache.test.ts @@ -1,8 +1,4 @@ import ComposableCache from "@opennextjs/aws/adapters/composable-cache"; -import { - fromReadableStream, - toReadableStream, -} from "@opennextjs/aws/utils/stream"; import { vi } from "vitest"; describe("Composable cache handler", () => { @@ -19,7 +15,7 @@ describe("Composable cache handler", () => { timestamp: Date.now(), expire: Date.now() + 1000, revalidate: 3600, - value: "test-value", + value: new Blob(["test-value"]), }, lastModified: Date.now(), }), @@ -132,7 +128,7 @@ describe("Composable cache handler", () => { type: "route", body: "{}", tags: [], - value: "test-value", + value: new Blob(["test-value"]), }, lastModified: Date.now(), }); @@ -185,7 +181,7 @@ describe("Composable cache handler", () => { it("should return pending write promise if available", async () => { const pendingEntry = Promise.resolve({ - value: toReadableStream("pending-value"), + value: new Blob(["pending-value"]).stream(), tags: ["tag1"], stale: 0, timestamp: Date.now(), @@ -214,8 +210,9 @@ describe("Composable cache handler", () => { it("should set cache entry and handle tags in original mode", async () => { tagCache.mode = "original"; + const blob = new Blob(["test-value"]); const entry = { - value: toReadableStream("test-value"), + value: blob.stream(), tags: ["tag1", "tag2"], stale: 0, timestamp: Date.now(), @@ -229,7 +226,7 @@ describe("Composable cache handler", () => { "test-key", expect.objectContaining({ tags: ["tag1", "tag2"], - value: "test-value", + value: expect.any(Blob), }), "composable", ); @@ -241,7 +238,7 @@ describe("Composable cache handler", () => { tagCache.getByPath.mockResolvedValueOnce(["tag1"]); const entry = { - value: toReadableStream("test-value"), + value: new Blob(["test-value"]).stream(), tags: ["tag1", "tag2", "tag3"], stale: 0, timestamp: Date.now(), @@ -262,7 +259,7 @@ describe("Composable cache handler", () => { tagCache.getByPath.mockResolvedValueOnce(["tag1", "tag2"]); const entry = { - value: toReadableStream("test-value"), + value: new Blob(["test-value"]).stream(), tags: ["tag1", "tag2"], stale: 0, timestamp: Date.now(), @@ -279,7 +276,7 @@ describe("Composable cache handler", () => { tagCache.mode = "nextMode"; const entry = { - value: toReadableStream("test-value"), + value: new Blob(["test-value"]).stream(), tags: ["tag1", "tag2"], stale: 0, timestamp: Date.now(), @@ -293,9 +290,10 @@ describe("Composable cache handler", () => { expect(tagCache.writeTags).not.toHaveBeenCalled(); }); - it("should convert ReadableStream to string", async () => { + it("should store Blob directly", async () => { + const blob = new Blob(["test-content"]); const entry = { - value: toReadableStream("test-content"), + value: blob.stream(), tags: ["tag1"], stale: 0, timestamp: Date.now(), @@ -308,7 +306,7 @@ describe("Composable cache handler", () => { expect(incrementalCache.set).toHaveBeenCalledWith( "test-key", expect.objectContaining({ - value: "test-content", + value: expect.any(Blob), }), "composable", ); @@ -437,8 +435,9 @@ describe("Composable cache handler", () => { describe("integration tests", () => { it("should handle complete cache lifecycle", async () => { // Set a cache entry + const blob = new Blob(["integration-test"]); const entry = { - value: toReadableStream("integration-test"), + value: blob.stream(), tags: ["integration-tag"], stale: 0, timestamp: Date.now(), @@ -452,7 +451,7 @@ describe("Composable cache handler", () => { expect(incrementalCache.set).toHaveBeenCalledWith( "integration-key", expect.objectContaining({ - value: "integration-test", + value: expect.any(Blob), tags: ["integration-tag"], }), "composable", @@ -462,7 +461,7 @@ describe("Composable cache handler", () => { incrementalCache.get.mockResolvedValueOnce({ value: { ...entry, - value: "integration-test", + value: blob, }, lastModified: Date.now(), }); @@ -474,13 +473,21 @@ describe("Composable cache handler", () => { expect(result?.tags).toEqual(["integration-tag"]); // Convert the stream back to verify content - const content = await fromReadableStream(result!.value); + const reader = result!.value.getReader(); + const chunks: Uint8Array[] = []; + let readResult: ReadableStreamReadResult; + while (!(readResult = await reader.read()).done) { + chunks.push(readResult.value); + } + const content = new TextDecoder().decode( + new Uint8Array(chunks.flatMap((c) => Array.from(c))), + ); expect(content).toBe("integration-test"); }); it("should handle concurrent get/set operations", async () => { const entry1 = { - value: toReadableStream("concurrent-1"), + value: new Blob(["concurrent-1"]).stream(), tags: ["tag1"], stale: 0, timestamp: Date.now(), @@ -489,7 +496,7 @@ describe("Composable cache handler", () => { }; const entry2 = { - value: toReadableStream("concurrent-2"), + value: new Blob(["concurrent-2"]).stream(), tags: ["tag2"], stale: 0, timestamp: Date.now(), @@ -513,10 +520,28 @@ describe("Composable cache handler", () => { expect(results[2]).toBeDefined(); expect(results[3]).toBeDefined(); - const content1 = await fromReadableStream(results[2]!.value); + // Convert stream 1 to text + const reader1 = results[2]!.value.getReader(); + const chunks1: Uint8Array[] = []; + let readResult1: ReadableStreamReadResult; + while (!(readResult1 = await reader1.read()).done) { + chunks1.push(readResult1.value); + } + const content1 = new TextDecoder().decode( + new Uint8Array(chunks1.flatMap((c) => Array.from(c))), + ); expect(content1).toBe("concurrent-1"); - const content2 = await fromReadableStream(results[3]!.value); + // Convert stream 2 to text + const reader2 = results[3]!.value.getReader(); + const chunks2: Uint8Array[] = []; + let readResult2: ReadableStreamReadResult; + while (!(readResult2 = await reader2.read()).done) { + chunks2.push(readResult2.value); + } + const content2 = new TextDecoder().decode( + new Uint8Array(chunks2.flatMap((c) => Array.from(c))), + ); expect(content2).toBe("concurrent-2"); }); });