Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/curly-beans-exercise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/aws": patch
---

Improve composable cache performance
101 changes: 45 additions & 56 deletions packages/open-next/src/adapters/composable-cache.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,24 @@
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
import type { CacheValue } from "types/overrides";
import { writeTags } from "utils/cache";
import { fromReadableStream, toReadableStream } from "utils/stream";
import { debug } from "./logger";

const pendingWritePromiseMap = new Map<
string,
Promise<CacheValue<"composable">>
>();
const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();

export default {
async get(cacheKey: string) {
try {
// We first check if we have a pending write for this cache key
// If we do, we return the pending promise instead of fetching the cache
if (pendingWritePromiseMap.has(cacheKey)) {
const stored = pendingWritePromiseMap.get(cacheKey);
if (stored) {
return stored.then((entry) => ({
...entry,
value: toReadableStream(entry.value),
}));
}
}
const stored = pendingWritePromiseMap.get(cacheKey);
if (stored) return stored;

const result = await globalThis.incrementalCache.get(
cacheKey,
"composable",
);
if (!result?.value?.value) {
return undefined;
}
if (!result?.value?.value) return undefined;

debug("composable cache result", result);

// We need to check if the tags associated with this entry has been revalidated
if (
globalThis.tagCache.mode === "nextMode" &&
result.value.tags.length > 0
Expand Down Expand Up @@ -69,73 +54,77 @@ export default {
},

async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
const promiseEntry = pendingEntry.then(async (entry) => ({
...entry,
value: await fromReadableStream(entry.value),
}));
pendingWritePromiseMap.set(cacheKey, promiseEntry);
const teedPromise = pendingEntry.then((entry) => {
// Optimization: We avoid consuming and stringifying the stream here,
// because it creates double copies just to be discarded when this function
// ends. This avoids unnecessary memory usage, and reduces GC pressure.
const [stream1, stream2] = entry.value.tee();
return [
{ ...entry, value: stream1 },
{ ...entry, value: stream2 },
] as const;
});

const entry = await promiseEntry.finally(() => {
pendingWritePromiseMap.set(
cacheKey,
teedPromise.then(([entry]) => entry),
);

const [, entryForStorage] = await teedPromise.finally(() => {
pendingWritePromiseMap.delete(cacheKey);
});

await globalThis.incrementalCache.set(
cacheKey,
{
...entry,
value: entry.value,
...entryForStorage,
value: await fromReadableStream(entryForStorage.value),
},
"composable",
);

if (globalThis.tagCache.mode === "original") {
const storedTags = await globalThis.tagCache.getByPath(cacheKey);
const tagsToWrite = entry.tags.filter((tag) => !storedTags.includes(tag));
const tagsToWrite = [];
for (const tag of entryForStorage.tags) {
if (!storedTags.includes(tag)) {
tagsToWrite.push({ tag, path: cacheKey });
}
}
if (tagsToWrite.length > 0) {
await writeTags(tagsToWrite.map((tag) => ({ tag, path: cacheKey })));
await writeTags(tagsToWrite);
}
}
},

async refreshTags() {
// We don't do anything for now, do we want to do something here ???
return;
},
async refreshTags() {},

async getExpiration(...tags: string[]) {
if (globalThis.tagCache.mode === "nextMode") {
return globalThis.tagCache.getLastRevalidated(tags);
}
// We always return 0 here, original tag cache are handled directly in the get part
// TODO: We need to test this more, i'm not entirely sure that this is working as expected
return 0;
return globalThis.tagCache.mode === "nextMode"
? globalThis.tagCache.getLastRevalidated(tags)
: 0;
},

async expireTags(...tags: string[]) {
if (globalThis.tagCache.mode === "nextMode") {
return writeTags(tags);
}

const tagCache = globalThis.tagCache;
const revalidatedAt = Date.now();
// For the original mode, we have more work to do here.
// We need to find all paths linked to to these tags
const pathsToUpdate = await Promise.all(
tags.map(async (tag) => {
const paths = await tagCache.getByTag(tag);
return paths.map((path) => ({
path,
tag,
revalidatedAt,
}));
return paths.map((path) => ({ path, tag, revalidatedAt }));
}),
);
// We need to deduplicate paths, we use a set for that
const setToWrite = new Set<{ path: string; tag: string }>();

const dedupeMap = new Map();
for (const entry of pathsToUpdate.flat()) {
setToWrite.add(entry);
dedupeMap.set(`${entry.path}|${entry.tag}`, entry);
}
await writeTags(Array.from(setToWrite));
await writeTags(Array.from(dedupeMap.values()));
},

// This one is necessary for older versions of next
async receiveExpiredTags(...tags: string[]) {
// This function does absolutely nothing
return;
},
async receiveExpiredTags() {},
} satisfies ComposableCacheHandler;
5 changes: 3 additions & 2 deletions packages/open-next/src/core/requestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ export async function openNextHandler(
overwrittenResponseHeaders[key] = value;
}
headers[key] = value;
delete headers[rawKey];
// @ts-expect-error Setting it to undefined is faster than deleting the attribute.
headers[rawKey] = undefined;
}

if (
Expand Down Expand Up @@ -238,7 +239,7 @@ async function processRequest(
// @ts-ignore
// Next.js doesn't parse body if the property exists
// https://github.com/dougmoscrop/serverless-http/issues/227
delete req.body;
req.body = undefined;

// Here we try to apply as much request metadata as possible
// We apply every metadata from `resolve-routes` https://github.com/vercel/next.js/blob/916f105b97211de50f8580f0b39c9e7c60de4886/packages/next/src/server/lib/router-utils/resolve-routes.ts
Expand Down
10 changes: 2 additions & 8 deletions packages/open-next/src/core/routing/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,9 @@ export function convertToQueryString(query: Record<string, string | string[]>) {
* @__PURE__
*/
export function convertToQuery(querystring: string) {
if (!querystring) return {};
const query = new URLSearchParams(querystring);
const queryObject: Record<string, string[] | string> = {};

for (const key of query.keys()) {
const queries = query.getAll(key);
queryObject[key] = queries.length > 1 ? queries : queries[0];
}

return queryObject;
return getQueryFromIterator(query.entries());
}

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/open-next/src/core/routingHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ export default async function routingHandler(
key.startsWith(INTERNAL_HEADER_PREFIX) ||
key.startsWith(MIDDLEWARE_HEADER_PREFIX)
) {
delete event.headers[key];
// @ts-expect-error Assigning undefined is faster than deleting the attribute.
event.headers[key] = undefined;
}
}

Expand Down
6 changes: 1 addition & 5 deletions packages/open-next/src/http/openNextResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ export class OpenNextNodeResponse extends Transform implements ServerResponse {
if (key === SET_COOKIE_HEADER) {
this._cookies = [];
} else {
delete this.headers[key];
this.headers[key] = undefined;
}
return this;
}
Expand Down Expand Up @@ -188,10 +188,6 @@ export class OpenNextNodeResponse extends Transform implements ServerResponse {

const parsedHeaders = parseHeaders(this.headers);

// We need to remove the set-cookie header from the parsed headers because
// it does not handle multiple set-cookie headers properly
delete parsedHeaders[SET_COOKIE_HEADER];

if (this.streamCreator) {
this.responseStream = this.streamCreator?.writeHeaders({
statusCode: this.statusCode ?? 200,
Expand Down
8 changes: 7 additions & 1 deletion packages/open-next/src/http/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ export const parseHeaders = (
continue;
}
const keyLower = key.toLowerCase();
if (keyLower === "set-cookie") {
// We need to remove the set-cookie header from the parsed headers because
// it does not handle multiple set-cookie headers properly
continue;
}

/**
* Next can return an Array for the Location header when you return null from a get in the cacheHandler on a page that has a redirect()
* We dont want to merge that into a comma-separated string
Expand All @@ -22,7 +28,7 @@ export const parseHeaders = (
* See: https://github.com/opennextjs/opennextjs-cloudflare/issues/875#issuecomment-3258248276
* and https://github.com/opennextjs/opennextjs-aws/pull/977#issuecomment-3261763114
*/
if (keyLower === "location" && Array.isArray(value)) {
if (Array.isArray(value) && keyLower === "location") {
if (value[0] === value[1]) {
result[keyLower] = value[0];
} else {
Expand Down
6 changes: 4 additions & 2 deletions packages/open-next/src/overrides/converters/aws-apigw-v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ function normalizeAPIGatewayProxyEventV2Headers(
headers.cookie = cookies.join("; ");
}

for (const [key, value] of Object.entries(rawHeaders || {})) {
headers[key.toLowerCase()] = value!;
if (rawHeaders) {
for (const [key, value] of Object.entries(rawHeaders)) {
headers[key.toLowerCase()] = value!;
}
}

return headers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ function normalizeCloudFrontRequestEventHeaders(
const headers: Record<string, string> = {};

for (const [key, values] of Object.entries(rawHeaders)) {
const lowerKey = key.toLowerCase();
for (const { value } of values) {
if (value) {
headers[key.toLowerCase()] = value;
headers[lowerKey] = value;
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions packages/open-next/src/overrides/converters/edge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ const converter: Converter<InternalEvent, InternalResult | MiddlewareResult> = {

const searchParams = url.searchParams;
const query = getQueryFromSearchParams(searchParams);
// Transform body into Buffer
const body = await event.arrayBuffer();
const headers: Record<string, string> = {};
event.headers.forEach((value, key) => {
headers[key] = value;
Expand All @@ -34,6 +32,11 @@ const converter: Converter<InternalEvent, InternalResult | MiddlewareResult> = {
const method = event.method;
const shouldHaveBody = method !== "GET" && method !== "HEAD";

// Only read body for methods that should have one
const body = shouldHaveBody
? Buffer.from(await event.arrayBuffer())
: undefined;

const cookieHeader = event.headers.get("cookie");
const cookies = cookieHeader
? (cookieParser.parse(cookieHeader) as Record<string, string>)
Expand All @@ -44,7 +47,7 @@ const converter: Converter<InternalEvent, InternalResult | MiddlewareResult> = {
method,
rawPath,
url: event.url,
body: shouldHaveBody ? Buffer.from(body) : undefined,
body,
headers,
remoteAddress: event.headers.get("x-forwarded-for") ?? "::1",
query,
Expand Down
18 changes: 17 additions & 1 deletion packages/open-next/src/overrides/converters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,28 @@ import { extractHostFromHeaders, getQueryFromSearchParams } from "./utils.js";
const converter: Converter = {
convertFrom: async (req: IncomingMessage & { protocol?: string }) => {
const body = await new Promise<Buffer>((resolve) => {
const contentLength = req.headers["content-length"];
const expectedLength = contentLength
? Number.parseInt(contentLength, 10)
: undefined;
const chunks: Uint8Array[] = [];
let receivedLength = 0;

req.on("data", (chunk) => {
chunks.push(chunk);
receivedLength += chunk.length;
});
req.on("end", () => {
resolve(Buffer.concat(chunks));
// Use pre-allocated buffer if we have content-length and it matches
if (
expectedLength &&
receivedLength === expectedLength &&
chunks.length === 1
) {
resolve(Buffer.from(chunks[0]));
} else {
resolve(Buffer.concat(chunks));
}
});
});

Expand Down
Loading
Loading