Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stale-ducks-hide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": patch
---

Improve composable-cache performance by avoiding unnecessary copies
82 changes: 47 additions & 35 deletions packages/open-next/src/adapters/composable-cache.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
import type { CacheValue } from "types/overrides";
import type {
ComposableCacheEntry,
ComposableCacheHandler,
StoredComposableCacheEntry,
} from "types/cache";
import { writeTags } from "utils/cache";
import { fromReadableStream, toReadableStream } from "utils/stream";
import { debug } from "./logger";

const pendingWritePromiseMap = new Map<
string,
Promise<CacheValue<"composable">>
Promise<StoredComposableCacheEntry>
>();

export default {
async get(cacheKey: string) {
try {
// We first check if we have a pending write for this cache key
// If we do, we return the pending promise instead of fetching the cache
if (pendingWritePromiseMap.has(cacheKey)) {
const stored = pendingWritePromiseMap.get(cacheKey);
if (stored) {
return stored.then((entry) => ({
...entry,
value: toReadableStream(entry.value),
}));
}
const stored = pendingWritePromiseMap.get(cacheKey);
if (stored) {
return stored.then((val) => ({
...val,
value: val.value.stream(),
}));
}

const result = await globalThis.incrementalCache.get(
cacheKey,
"composable",
Expand Down Expand Up @@ -60,7 +61,7 @@ export default {

return {
...result.value,
value: toReadableStream(result.value.value),
value: result.value.value.stream(),
};
} catch (e) {
debug("Cannot read composable cache entry");
Expand All @@ -69,28 +70,42 @@ export default {
},

async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
const promiseEntry = pendingEntry.then(async (entry) => ({
...entry,
value: await fromReadableStream(entry.value),
}));
pendingWritePromiseMap.set(cacheKey, promiseEntry);

const entry = await promiseEntry.finally(() => {
pendingWritePromiseMap.delete(cacheKey);
// Convert ReadableStream to Blob first
const blobPromise = pendingEntry.then(async (entry) => {
const reader = entry.value.getReader();
const chunks: Uint8Array[] = [];
let result: ReadableStreamReadResult<Uint8Array>;
while (!(result = await reader.read()).done) {
chunks.push(result.value);
}
reader.releaseLock();
return { ...entry, value: new Blob(chunks) };
});

// Store a stream from the blob in the pending map for concurrent get() calls
pendingWritePromiseMap.set(cacheKey, blobPromise);

const entryWithBlob = await blobPromise;

await globalThis.incrementalCache.set(
cacheKey,
{
...entry,
value: entry.value,
},
entryWithBlob,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure that the incrementalCache here expect a string for entry.value for the set here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually changed the type, so we store a blob now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's not enough, if we'd want to do that we need to change every implementation here and the ones in cloudflare.
And I'm not sure what we gain from doing that really, someone will serialize that in the incrementalCache.set anyway.
We can use a blob in the map, but not for the incremental cache

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so that would be problematic. because if we store it as string, we would need to convert it into stream everytime we need to interact with it, since next.js waits for ReadableStream. We want to avoid having this unnecessary allocation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but that's a big breaking change (definitely major).
And that's actually already kind of planned for next major, but i think it requires some thought about how we want to store it really.
The way I see it is an entire refactor of the incremental cache and splitting the data into actual data (that would be blob/stream) and metadata that would be serializable small data

"composable",
);

// Delete from pending map only after the write is complete
pendingWritePromiseMap.delete(cacheKey);

if (globalThis.tagCache.mode === "original") {
const storedTags = await globalThis.tagCache.getByPath(cacheKey);
const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag));
const tagsToWrite = [];
for (const tag of entryWithBlob.tags) {
if (!storedTags.includes(tag)) {
tagsToWrite.push({ tag, path: cacheKey });
}
}
if (tagsToWrite.length > 0) {
await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey })));
await writeTags(tagsToWrite);
}
}
},
Expand Down Expand Up @@ -125,17 +140,14 @@ export default {
}));
}),
);
// We need to deduplicate paths, we use a set for that
const setToWrite = new Set<{ path: string; tag: string }>();

const dedupeMap = new Map();
for (const entry of pathsToUpdate.flat()) {
setToWrite.add(entry);
dedupeMap.set(`${entry.path}|${entry.tag}`, entry);
}
await writeTags(Array.from(setToWrite));
await writeTags(Array.from(dedupeMap.values()));
},

// This one is necessary for older versions of next
async receiveExpiredTags(...tags: string[]) {
// This function does absolutely nothing
return;
},
async receiveExpiredTags() {},
} satisfies ComposableCacheHandler;
4 changes: 1 addition & 3 deletions packages/open-next/src/types/cache.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { ReadableStream } from "node:stream/web";

interface CachedFetchValue {
kind: "FETCH";
data: {
Expand Down Expand Up @@ -152,7 +150,7 @@ export interface ComposableCacheEntry {
}

export type StoredComposableCacheEntry = Omit<ComposableCacheEntry, "value"> & {
value: string;
value: Blob;
};

export interface ComposableCacheHandler {
Expand Down
71 changes: 48 additions & 23 deletions packages/tests-unit/tests/adapters/composable-cache.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import ComposableCache from "@opennextjs/aws/adapters/composable-cache";
import {
fromReadableStream,
toReadableStream,
} from "@opennextjs/aws/utils/stream";
import { vi } from "vitest";

describe("Composable cache handler", () => {
Expand All @@ -19,7 +15,7 @@ describe("Composable cache handler", () => {
timestamp: Date.now(),
expire: Date.now() + 1000,
revalidate: 3600,
value: "test-value",
value: new Blob(["test-value"]),
},
lastModified: Date.now(),
}),
Expand Down Expand Up @@ -132,7 +128,7 @@ describe("Composable cache handler", () => {
type: "route",
body: "{}",
tags: [],
value: "test-value",
value: new Blob(["test-value"]),
},
lastModified: Date.now(),
});
Expand Down Expand Up @@ -185,7 +181,7 @@ describe("Composable cache handler", () => {

it("should return pending write promise if available", async () => {
const pendingEntry = Promise.resolve({
value: toReadableStream("pending-value"),
value: new Blob(["pending-value"]).stream(),
tags: ["tag1"],
stale: 0,
timestamp: Date.now(),
Expand Down Expand Up @@ -214,8 +210,9 @@ describe("Composable cache handler", () => {

it("should set cache entry and handle tags in original mode", async () => {
tagCache.mode = "original";
const blob = new Blob(["test-value"]);
const entry = {
value: toReadableStream("test-value"),
value: blob.stream(),
tags: ["tag1", "tag2"],
stale: 0,
timestamp: Date.now(),
Expand All @@ -229,7 +226,7 @@ describe("Composable cache handler", () => {
"test-key",
expect.objectContaining({
tags: ["tag1", "tag2"],
value: "test-value",
value: expect.any(Blob),
}),
"composable",
);
Expand All @@ -241,7 +238,7 @@ describe("Composable cache handler", () => {
tagCache.getByPath.mockResolvedValueOnce(["tag1"]);

const entry = {
value: toReadableStream("test-value"),
value: new Blob(["test-value"]).stream(),
tags: ["tag1", "tag2", "tag3"],
stale: 0,
timestamp: Date.now(),
Expand All @@ -262,7 +259,7 @@ describe("Composable cache handler", () => {
tagCache.getByPath.mockResolvedValueOnce(["tag1", "tag2"]);

const entry = {
value: toReadableStream("test-value"),
value: new Blob(["test-value"]).stream(),
tags: ["tag1", "tag2"],
stale: 0,
timestamp: Date.now(),
Expand All @@ -279,7 +276,7 @@ describe("Composable cache handler", () => {
tagCache.mode = "nextMode";

const entry = {
value: toReadableStream("test-value"),
value: new Blob(["test-value"]).stream(),
tags: ["tag1", "tag2"],
stale: 0,
timestamp: Date.now(),
Expand All @@ -293,9 +290,10 @@ describe("Composable cache handler", () => {
expect(tagCache.writeTags).not.toHaveBeenCalled();
});

it("should convert ReadableStream to string", async () => {
it("should store Blob directly", async () => {
const blob = new Blob(["test-content"]);
const entry = {
value: toReadableStream("test-content"),
value: blob.stream(),
tags: ["tag1"],
stale: 0,
timestamp: Date.now(),
Expand All @@ -308,7 +306,7 @@ describe("Composable cache handler", () => {
expect(incrementalCache.set).toHaveBeenCalledWith(
"test-key",
expect.objectContaining({
value: "test-content",
value: expect.any(Blob),
}),
"composable",
);
Expand Down Expand Up @@ -437,8 +435,9 @@ describe("Composable cache handler", () => {
describe("integration tests", () => {
it("should handle complete cache lifecycle", async () => {
// Set a cache entry
const blob = new Blob(["integration-test"]);
const entry = {
value: toReadableStream("integration-test"),
value: blob.stream(),
tags: ["integration-tag"],
stale: 0,
timestamp: Date.now(),
Expand All @@ -452,7 +451,7 @@ describe("Composable cache handler", () => {
expect(incrementalCache.set).toHaveBeenCalledWith(
"integration-key",
expect.objectContaining({
value: "integration-test",
value: expect.any(Blob),
tags: ["integration-tag"],
}),
"composable",
Expand All @@ -462,7 +461,7 @@ describe("Composable cache handler", () => {
incrementalCache.get.mockResolvedValueOnce({
value: {
...entry,
value: "integration-test",
value: blob,
},
lastModified: Date.now(),
});
Expand All @@ -474,13 +473,21 @@ describe("Composable cache handler", () => {
expect(result?.tags).toEqual(["integration-tag"]);

// Convert the stream back to verify content
const content = await fromReadableStream(result!.value);
const reader = result!.value.getReader();
const chunks: Uint8Array[] = [];
let readResult: ReadableStreamReadResult<Uint8Array>;
while (!(readResult = await reader.read()).done) {
chunks.push(readResult.value);
}
const content = new TextDecoder().decode(
new Uint8Array(chunks.flatMap((c) => Array.from(c))),
);
expect(content).toBe("integration-test");
});

it("should handle concurrent get/set operations", async () => {
const entry1 = {
value: toReadableStream("concurrent-1"),
value: new Blob(["concurrent-1"]).stream(),
tags: ["tag1"],
stale: 0,
timestamp: Date.now(),
Expand All @@ -489,7 +496,7 @@ describe("Composable cache handler", () => {
};

const entry2 = {
value: toReadableStream("concurrent-2"),
value: new Blob(["concurrent-2"]).stream(),
tags: ["tag2"],
stale: 0,
timestamp: Date.now(),
Expand All @@ -513,10 +520,28 @@ describe("Composable cache handler", () => {
expect(results[2]).toBeDefined();
expect(results[3]).toBeDefined();

const content1 = await fromReadableStream(results[2]!.value);
// Convert stream 1 to text
const reader1 = results[2]!.value.getReader();
const chunks1: Uint8Array[] = [];
let readResult1: ReadableStreamReadResult<Uint8Array>;
while (!(readResult1 = await reader1.read()).done) {
chunks1.push(readResult1.value);
}
const content1 = new TextDecoder().decode(
new Uint8Array(chunks1.flatMap((c) => Array.from(c))),
);
expect(content1).toBe("concurrent-1");

const content2 = await fromReadableStream(results[3]!.value);
// Convert stream 2 to text
const reader2 = results[3]!.value.getReader();
const chunks2: Uint8Array[] = [];
let readResult2: ReadableStreamReadResult<Uint8Array>;
while (!(readResult2 = await reader2.read()).done) {
chunks2.push(readResult2.value);
}
const content2 = new TextDecoder().decode(
new Uint8Array(chunks2.flatMap((c) => Array.from(c))),
);
expect(content2).toBe("concurrent-2");
});
});
Expand Down
Loading