Skip to content

Commit

Permalink
Fix some cache issue (#444)
Browse files Browse the repository at this point in the history
* use a global fetch because it seems next patched fetch even ignore internals sometimes

* fix SocketError: other side closed

* Always await cache set and queue send before returning

* remove unnecessary debug

* never reject detached promises

* fix lint

* Create small-rivers-taste.md

* fix writeTags using the not patched fetch
  • Loading branch information
conico974 committed Jun 18, 2024
1 parent f469fcc commit 5fc48d0
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 122 deletions.
5 changes: 5 additions & 0 deletions .changeset/small-rivers-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"open-next": patch
---

Fix some cache issue
176 changes: 98 additions & 78 deletions packages/open-next/src/adapters/cache.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { DetachedPromise } from "utils/promise.js";

import { IncrementalCache } from "../cache/incremental/types.js";
import { TagCache } from "../cache/tag/types.js";
import { isBinaryContentType } from "./binary.js";
Expand Down Expand Up @@ -144,7 +146,8 @@ export default class S3Cache {
value: value,
} as CacheHandlerValue;
} catch (e) {
error("Failed to get fetch cache", e);
// We can usually ignore errors here as they are usually due to cache not being found
debug("Failed to get fetch cache", e);
return null;
}
}
Expand All @@ -166,7 +169,7 @@ export default class S3Cache {
// If some tags are stale we need to force revalidation
return null;
}
const requestId = globalThis.__als.getStore() ?? "";
const requestId = globalThis.__als.getStore()?.requestId ?? "";
globalThis.lastModified[requestId] = _lastModified;
if (cacheData?.type === "route") {
return {
Expand Down Expand Up @@ -208,7 +211,8 @@ export default class S3Cache {
return null;
}
} catch (e) {
error("Failed to get body cache", e);
// We can usually ignore errors here as they are usually due to cache not being found
debug("Failed to get body cache", e);
return null;
}
}
Expand All @@ -221,99 +225,115 @@ export default class S3Cache {
if (globalThis.disableIncrementalCache) {
return;
}
if (data?.kind === "ROUTE") {
const { body, status, headers } = data;
await globalThis.incrementalCache.set(
key,
{
type: "route",
body: body.toString(
isBinaryContentType(String(headers["content-type"]))
? "base64"
: "utf8",
),
meta: {
status,
headers,
},
},
false,
);
} else if (data?.kind === "PAGE") {
const { html, pageData } = data;
const isAppPath = typeof pageData === "string";
if (isAppPath) {
globalThis.incrementalCache.set(
const detachedPromise = new DetachedPromise<void>();
globalThis.__als.getStore()?.pendingPromises.push(detachedPromise);
try {
if (data?.kind === "ROUTE") {
const { body, status, headers } = data;
await globalThis.incrementalCache.set(
key,
{
type: "app",
html,
rsc: pageData,
type: "route",
body: body.toString(
isBinaryContentType(String(headers["content-type"]))
? "base64"
: "utf8",
),
meta: {
status,
headers,
},
},
false,
);
} else {
globalThis.incrementalCache.set(
} else if (data?.kind === "PAGE") {
const { html, pageData } = data;
const isAppPath = typeof pageData === "string";
if (isAppPath) {
globalThis.incrementalCache.set(
key,
{
type: "app",
html,
rsc: pageData,
},
false,
);
} else {
globalThis.incrementalCache.set(
key,
{
type: "page",
html,
json: pageData,
},
false,
);
}
} else if (data?.kind === "FETCH") {
await globalThis.incrementalCache.set<true>(key, data, true);
} else if (data?.kind === "REDIRECT") {
await globalThis.incrementalCache.set(
key,
{
type: "page",
html,
json: pageData,
type: "redirect",
props: data.props,
},
false,
);
} else if (data === null || data === undefined) {

Check warning on line 284 in packages/open-next/src/adapters/cache.ts

View workflow job for this annotation

GitHub Actions / Release

Add the missing "else" clause
await globalThis.incrementalCache.delete(key);
}
} else if (data?.kind === "FETCH") {
await globalThis.incrementalCache.set<true>(key, data, true);
} else if (data?.kind === "REDIRECT") {
await globalThis.incrementalCache.set(
key,
{
type: "redirect",
props: data.props,
},
false,
);
} else if (data === null || data === undefined) {
await globalThis.incrementalCache.delete(key);
}
// Write derivedTags to dynamodb
// If we use an in house version of getDerivedTags in build we should use it here instead of next's one
const derivedTags: string[] =
data?.kind === "FETCH"
? ctx?.tags ?? data?.data?.tags ?? [] // before version 14 next.js used data?.data?.tags so we keep it for backward compatibility
: data?.kind === "PAGE"
? data.headers?.["x-next-cache-tags"]?.split(",") ?? []
: [];
debug("derivedTags", derivedTags);
// Get all tags stored in dynamodb for the given key
// If any of the derived tags are not stored in dynamodb for the given key, write them
const storedTags = await globalThis.tagCache.getByPath(key);
const tagsToWrite = derivedTags.filter((tag) => !storedTags.includes(tag));
if (tagsToWrite.length > 0) {
await globalThis.tagCache.writeTags(
tagsToWrite.map((tag) => ({
path: key,
tag: tag,
})),
// Write derivedTags to dynamodb
// If we use an in house version of getDerivedTags in build we should use it here instead of next's one
const derivedTags: string[] =
data?.kind === "FETCH"
? ctx?.tags ?? data?.data?.tags ?? [] // before version 14 next.js used data?.data?.tags so we keep it for backward compatibility
: data?.kind === "PAGE"
? data.headers?.["x-next-cache-tags"]?.split(",") ?? []
: [];
debug("derivedTags", derivedTags);
// Get all tags stored in dynamodb for the given key
// If any of the derived tags are not stored in dynamodb for the given key, write them
const storedTags = await globalThis.tagCache.getByPath(key);
const tagsToWrite = derivedTags.filter(
(tag) => !storedTags.includes(tag),
);
if (tagsToWrite.length > 0) {
await globalThis.tagCache.writeTags(
tagsToWrite.map((tag) => ({
path: key,
tag: tag,
})),
);
}
debug("Finished setting cache");
} catch (e) {
error("Failed to set cache", e);
} finally {
// We need to resolve the promise even if there was an error
detachedPromise.resolve();
}
}

public async revalidateTag(tag: string) {
if (globalThis.disableDynamoDBCache || globalThis.disableIncrementalCache) {
return;
}
debug("revalidateTag", tag);
// Find all keys with the given tag
const paths = await globalThis.tagCache.getByTag(tag);
debug("Items", paths);
// Update all keys with the given tag with revalidatedAt set to now
await globalThis.tagCache.writeTags(
paths?.map((path) => ({
path: path,
tag: tag,
})) ?? [],
);
try {
debug("revalidateTag", tag);
// Find all keys with the given tag
const paths = await globalThis.tagCache.getByTag(tag);
debug("Items", paths);
// Update all keys with the given tag with revalidatedAt set to now
await globalThis.tagCache.writeTags(
paths?.map((path) => ({
path: path,
tag: tag,
})) ?? [],
);
} catch (e) {
error("Failed to revalidate tag", e);
}
}
}
6 changes: 6 additions & 0 deletions packages/open-next/src/adapters/server-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ setNodeEnv();
setBuildIdEnv();
setNextjsServerWorkingDirectory();

// Because next is messing with fetch, we have to make sure that we use an untouched version of fetch
declare global {
var internalFetch: typeof fetch;
}
globalThis.internalFetch = fetch;

/////////////
// Handler //
/////////////
Expand Down
2 changes: 1 addition & 1 deletion packages/open-next/src/cache/tag/dynamodb-lite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ const tagCache: TagCache = {
for (const paramsChunk of toInsert) {
await Promise.all(
paramsChunk.map(async (params) => {
const response = await awsClient.fetch(
const response = await awsFetch(
`https://dynamodb.${CACHE_BUCKET_REGION}.amazonaws.com`,
{
method: "POST",
Expand Down
6 changes: 5 additions & 1 deletion packages/open-next/src/core/createMainHandler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AsyncLocalStorage } from "node:async_hooks";

import type { OpenNextConfig } from "types/open-next";
import { DetachedPromise } from "utils/promise";

import { debug } from "../adapters/logger";
import { generateUniqueId } from "../adapters/util";
Expand All @@ -20,7 +21,10 @@ declare global {
var incrementalCache: IncrementalCache;
var fnName: string | undefined;
var serverId: string;
var __als: AsyncLocalStorage<string>;
var __als: AsyncLocalStorage<{
requestId: string;
pendingPromises: DetachedPromise<void>[];
}>;
}

export async function createMainHandler() {
Expand Down
77 changes: 45 additions & 32 deletions packages/open-next/src/core/requestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ import {
StreamCreator,
} from "http/index.js";
import { InternalEvent, InternalResult } from "types/open-next";
import { DetachedPromise } from "utils/promise";

import { debug, error, warn } from "../adapters/logger";
import { convertRes, createServerResponse, proxyRequest } from "./routing/util";
import routingHandler, { MiddlewareOutputEvent } from "./routingHandler";
import { requestHandler, setNextjsPrebundledReact } from "./util";

// This is used to identify requests in the cache
globalThis.__als = new AsyncLocalStorage<string>();
globalThis.__als = new AsyncLocalStorage<{
requestId: string;
pendingPromises: DetachedPromise<any>[];
}>();

export async function openNextHandler(
internalEvent: InternalEvent,
Expand Down Expand Up @@ -81,37 +85,46 @@ export async function openNextHandler(
remoteAddress: preprocessedEvent.remoteAddress,
};
const requestId = Math.random().toString(36);
const internalResult = await globalThis.__als.run(requestId, async () => {
const preprocessedResult = preprocessResult as MiddlewareOutputEvent;
const req = new IncomingMessage(reqProps);
const res = createServerResponse(
preprocessedEvent,
overwrittenResponseHeaders,
responseStreaming,
);

await processRequest(
req,
res,
preprocessedEvent,
preprocessedResult.isExternalRewrite,
);

const { statusCode, headers, isBase64Encoded, body } = convertRes(res);

const internalResult = {
type: internalEvent.type,
statusCode,
headers,
body,
isBase64Encoded,
};

// reset lastModified. We need to do this to avoid memory leaks
delete globalThis.lastModified[requestId];

return internalResult;
});
const pendingPromises: DetachedPromise<void>[] = [];
const internalResult = await globalThis.__als.run(
{ requestId, pendingPromises },
async () => {
const preprocessedResult = preprocessResult as MiddlewareOutputEvent;
const req = new IncomingMessage(reqProps);
const res = createServerResponse(
preprocessedEvent,
overwrittenResponseHeaders,
responseStreaming,
);

await processRequest(
req,
res,
preprocessedEvent,
preprocessedResult.isExternalRewrite,
);

const { statusCode, headers, isBase64Encoded, body } = convertRes(res);

const internalResult = {
type: internalEvent.type,
statusCode,
headers,
body,
isBase64Encoded,
};

// reset lastModified. We need to do this to avoid memory leaks
delete globalThis.lastModified[requestId];

// Wait for all promises to resolve
// We are not catching errors here, because they are catched before
// This may need to change in the future
await Promise.all(pendingPromises.map((p) => p.promise));

return internalResult;
},
);
return internalResult;
}
}
Expand Down
Loading

0 comments on commit 5fc48d0

Please sign in to comment.