diff --git a/.changeset/smart-bugs-play.md b/.changeset/smart-bugs-play.md new file mode 100644 index 000000000..544f9fa7d --- /dev/null +++ b/.changeset/smart-bugs-play.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/cloudflare": patch +--- + +feat: durable object de-duping revalidation queue diff --git a/examples/e2e/app-router/wrangler.jsonc b/examples/e2e/app-router/wrangler.jsonc index 8440d8c5e..0884846ff 100644 --- a/examples/e2e/app-router/wrangler.jsonc +++ b/examples/e2e/app-router/wrangler.jsonc @@ -19,7 +19,7 @@ "migrations": [ { "tag": "v1", - "new_classes": ["DurableObjectQueueHandler"] + "new_sqlite_classes": ["DurableObjectQueueHandler"] } ], "kv_namespaces": [ diff --git a/packages/cloudflare/src/api/cloudflare-context.ts b/packages/cloudflare/src/api/cloudflare-context.ts index 51aa29606..506cd795f 100644 --- a/packages/cloudflare/src/api/cloudflare-context.ts +++ b/packages/cloudflare/src/api/cloudflare-context.ts @@ -18,6 +18,17 @@ declare global { NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace; // Asset binding ASSETS?: Fetcher; + + // Below are the potential environment variables that can be set by the user to configure the durable object queue handler + // The max number of revalidations that can be processed by the durable worker at the same time + MAX_REVALIDATION_BY_DURABLE_OBJECT?: string; + // The max time in milliseconds that a revalidation can take before being considered as failed + REVALIDATION_TIMEOUT_MS?: string; + // The amount of time after which a revalidation will be attempted again if it failed + // If it fails again it will exponentially back off until it reaches the max retry interval + REVALIDATION_RETRY_INTERVAL_MS?: string; + // The maximum number of attempts that can be made to revalidate a path + MAX_REVALIDATION_ATTEMPTS?: string; } } diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index e877d2f92..b4032e48a 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -26,6 +26,11 @@ const createDurableObjectQueue = ({ storage: { setAlarm: vi.fn(), getAlarm: vi.fn(), + sql: { + exec: vi.fn().mockImplementation(() => ({ + one: vi.fn(), + })), + }, }, }; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -60,6 +65,7 @@ const createMessage = (dedupId: string, lastModified = Date.now()) => ({ describe("DurableObjectQueue", () => { describe("successful revalidation", () => { it("should process a single revalidation", async () => { + process.env.__NEXT_PREVIEW_MODE_ID = "test"; const queue = createDurableObjectQueue({ fetchDuration: 10 }); const firstRequest = await queue.revalidate(createMessage("id")); expect(firstRequest).toBeUndefined(); @@ -102,8 +108,9 @@ describe("DurableObjectQueue", () => { expect(queue.ongoingRevalidations.has("id6")).toBe(false); expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]); + // BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate // @ts-expect-error - expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1); + expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2); // Here we await the blocked request to ensure it's resolved await blockedReq; @@ -201,9 +208,10 @@ describe("DurableObjectQueue", () => { it("should add an alarm if there are failed states", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); + const nextAlarmMs = Date.now() + 1000; + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs }); await queue.addAlarm(); - expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(nextAlarmMs); }); it("should not add an alarm if there is already an alarm set", async () => { @@ -217,10 +225,16 @@ describe("DurableObjectQueue", () => { it("should set the alarm to the lowest nextAlarm", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); - queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 }); + const nextAlarmMs = Date.now() + 1000; + const firstAlarm = Date.now() + 500; + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs }); + queue.routeInFailedState.set("id2", { + msg: createMessage("id2"), + retryCount: 0, + nextAlarmMs: firstAlarm, + }); await queue.addAlarm(); - expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(firstAlarm); }); }); diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index ca6bb961a..ea95b0dfb 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -1,4 +1,4 @@ -import { error } from "@opennextjs/aws/adapters/logger.js"; +import { debug, error } from "@opennextjs/aws/adapters/logger.js"; import type { QueueMessage } from "@opennextjs/aws/types/overrides"; import { FatalError, @@ -8,11 +8,15 @@ import { } from "@opennextjs/aws/utils/error.js"; import { DurableObject } from "cloudflare:workers"; -const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; +const DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; const DEFAULT_REVALIDATION_TIMEOUT_MS = 10_000; +const DEFAULT_REVALIDATION_RETRY_INTERVAL_MS = 2_000; +const DEFAULT_MAX_REVALIDATION_ATTEMPTS = 6; -interface ExtendedQueueMessage extends QueueMessage { - previewModeId: string; +interface FailedState { + msg: QueueMessage; + retryCount: number; + nextAlarmMs: number; } export class DurableObjectQueueHandler extends DurableObject { @@ -21,37 +25,73 @@ export class DurableObjectQueueHandler extends DurableObject { // TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top) ongoingRevalidations = new Map>(); - // TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage - routeInFailedState = new Map< - string, - { msg: ExtendedQueueMessage; retryCount: number; nextAlarmMs: number } - >(); + sql: SqlStorage; + + routeInFailedState = new Map(); service: NonNullable; - // TODO: allow this to be configurable - How do we want todo that? env variable? passed down from the queue override ? - maxRevalidations = MAX_REVALIDATION_BY_DURABLE_OBJECT; + // Configurable params + readonly maxRevalidations: number; + readonly revalidationTimeout: number; + readonly revalidationRetryInterval: number; + readonly maxRevalidationAttempts: number; constructor(ctx: DurableObjectState, env: CloudflareEnv) { super(ctx, env); this.service = env.NEXT_CACHE_REVALIDATION_WORKER!; // If there is no service binding, we throw an error because we can't revalidate without it if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker"); + this.sql = ctx.storage.sql; + + // We restore the state + ctx.blockConcurrencyWhile(async () => { + debug(`Restoring the state of the durable object`); + await this.initState(); + }); + + this.maxRevalidations = env.MAX_REVALIDATION_BY_DURABLE_OBJECT + ? parseInt(env.MAX_REVALIDATION_BY_DURABLE_OBJECT) + : DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT; + + this.revalidationTimeout = env.REVALIDATION_TIMEOUT_MS + ? parseInt(env.REVALIDATION_TIMEOUT_MS) + : DEFAULT_REVALIDATION_TIMEOUT_MS; + + this.revalidationRetryInterval = env.REVALIDATION_RETRY_INTERVAL_MS + ? parseInt(env.REVALIDATION_RETRY_INTERVAL_MS) + : DEFAULT_REVALIDATION_RETRY_INTERVAL_MS; + + this.maxRevalidationAttempts = env.MAX_REVALIDATION_ATTEMPTS + ? parseInt(env.MAX_REVALIDATION_ATTEMPTS) + : DEFAULT_MAX_REVALIDATION_ATTEMPTS; + + debug(`Durable object initialized`); } - async revalidate(msg: ExtendedQueueMessage) { + async revalidate(msg: QueueMessage) { // If there is already an ongoing revalidation, we don't need to revalidate again if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return; // The route is already in a failed state, it will be retried later if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; - if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { + // If the last success is newer than the last modified, it's likely that the regional cache is out of date + // We don't need to revalidate in this case + if (this.checkSyncTable(msg)) return; + + if (this.ongoingRevalidations.size >= this.maxRevalidations) { + debug( + `The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.` + ); const ongoingRevalidations = this.ongoingRevalidations.values(); // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes // We still await the promise to ensure the revalidation is completed // This is fine because the queue itself run inside a waitUntil - await this.ctx.blockConcurrencyWhile(() => Promise.race(ongoingRevalidations)); + await this.ctx.blockConcurrencyWhile(async () => { + debug(`Waiting for one of the revalidations to finish`); + await Promise.race(ongoingRevalidations); + }); } const revalidationPromise = this.executeRevalidation(msg); @@ -63,31 +103,33 @@ export class DurableObjectQueueHandler extends DurableObject { this.ctx.waitUntil(revalidationPromise); } - private async executeRevalidation(msg: ExtendedQueueMessage) { + private async executeRevalidation(msg: QueueMessage) { try { + debug(`Revalidating ${msg.MessageBody.host}${msg.MessageBody.url}`); const { MessageBody: { host, url }, - previewModeId, } = msg; const protocol = host.includes("localhost") ? "http" : "https"; - //TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc) const response = await this.service.fetch(`${protocol}://${host}${url}`, { method: "HEAD", headers: { - "x-prerender-revalidate": previewModeId, + // This is defined during build + "x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!, "x-isr": "1", }, - signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS), + signal: AbortSignal.timeout(this.revalidationTimeout), }); // Now we need to handle errors from the fetch if (response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") { - // Something is very wrong here, it means that either the page is not ISR/SSG (and we shouldn't be here) or the `x-prerender-revalidate` header is not correct (and it should not happen either) + this.routeInFailedState.delete(msg.MessageDeduplicationId); throw new FatalError( `The revalidation for ${host}${url} cannot be done. This error should never happen.` ); } else if (response.status === 404) { // The page is not found, we should not revalidate it + // We remove the route from the failed state because it might be expected (i.e. a route that was deleted) + this.routeInFailedState.delete(msg.MessageDeduplicationId); throw new IgnorableError( `The revalidation for ${host}${url} cannot be done because the page is not found. It's either expected or an error in user code itself` ); @@ -100,8 +142,23 @@ export class DurableObjectQueueHandler extends DurableObject { } else if (response.status !== 200) { // TODO: check if we need to handle cloudflare specific status codes/errors // An unknown error occurred, most likely from something in user code like missing auth in the middleware + + // We probably want to retry in this case as well + await this.addToFailedState(msg); + throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`); } + // Everything went well, we can update the sync table + // We use unixepoch here,it also works with Date.now()/1000, but not with Date.now() alone. + // TODO: This needs to be investigated + this.sql.exec( + "INSERT OR REPLACE INTO sync (id, lastSuccess, buildId) VALUES (?, unixepoch(), ?)", + // We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different. + `${host}${url}`, + process.env.__NEXT_BUILD_ID + ); + // If everything went well, we can remove the route from the failed state + this.routeInFailedState.delete(msg.MessageDeduplicationId); } catch (e) { // Do we want to propagate the error to the calling worker? if (!isOpenNextError(e)) { @@ -125,16 +182,19 @@ export class DurableObjectQueueHandler extends DurableObject { ); const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents; for (const event of allEventsToRetry) { + debug(`Retrying revalidation for ${event.msg.MessageBody.host}${event.msg.MessageBody.url}`); await this.executeRevalidation(event.msg); - this.routeInFailedState.delete(event.msg.MessageDeduplicationId); } } - async addToFailedState(msg: ExtendedQueueMessage) { + async addToFailedState(msg: QueueMessage) { + debug(`Adding ${msg.MessageBody.host}${msg.MessageBody.url} to the failed state`); const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId); + let updatedFailedState: FailedState; + if (existingFailedState) { - if (existingFailedState.retryCount >= 6) { + if (existingFailedState.retryCount >= this.maxRevalidationAttempts) { // We give up after 6 retries and log the error error( `The revalidation for ${msg.MessageBody.host}${msg.MessageBody.url} has failed after 6 retries. It will not be tried again, but subsequent ISR requests will retry.` @@ -142,19 +202,27 @@ export class DurableObjectQueueHandler extends DurableObject { this.routeInFailedState.delete(msg.MessageDeduplicationId); return; } - const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; - this.routeInFailedState.set(msg.MessageDeduplicationId, { + const nextAlarmMs = + Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * this.revalidationRetryInterval; + updatedFailedState = { ...existingFailedState, retryCount: existingFailedState.retryCount + 1, nextAlarmMs, - }); + }; } else { - this.routeInFailedState.set(msg.MessageDeduplicationId, { + updatedFailedState = { msg, retryCount: 1, nextAlarmMs: Date.now() + 2_000, - }); + }; } + this.routeInFailedState.set(msg.MessageDeduplicationId, updatedFailedState); + this.sql.exec( + "INSERT OR REPLACE INTO failed_state (id, data, buildId) VALUES (?, ?, ?)", + msg.MessageDeduplicationId, + JSON.stringify(updatedFailedState), + process.env.__NEXT_BUILD_ID + ); // We probably want to do something if routeInFailedState is becoming too big, at least log it await this.addAlarm(); } @@ -164,9 +232,60 @@ export class DurableObjectQueueHandler extends DurableObject { if (existingAlarm) return; if (this.routeInFailedState.size === 0) return; - const nextAlarmToSetup = Math.min( + let nextAlarmToSetup = Math.min( ...Array.from(this.routeInFailedState.values()).map(({ nextAlarmMs }) => nextAlarmMs) ); + if (nextAlarmToSetup < Date.now()) { + // We don't want to set an alarm in the past + nextAlarmToSetup = Date.now() + this.revalidationRetryInterval; + } await this.ctx.storage.setAlarm(nextAlarmToSetup); } + + // This function is used to restore the state of the durable object + // We don't restore the ongoing revalidations because we cannot know in which state they are + // We only restore the failed state and the alarm + async initState() { + // We store the failed state as a blob, we don't want to do anything with it anyway besides restoring + this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT, buildId TEXT)"); + + // We create the sync table to handle eventually consistent incremental cache + this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER, buildId TEXT)"); + + // Before doing anything else, we clear the DB for any potential old data + this.sql.exec("DELETE FROM failed_state WHERE buildId != ?", process.env.__NEXT_BUILD_ID); + this.sql.exec("DELETE FROM sync WHERE buildId != ?", process.env.__NEXT_BUILD_ID); + + const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state"); + for (const row of failedStateCursor) { + this.routeInFailedState.set(row.id, JSON.parse(row.data)); + } + + // Now that we have restored the failed state, we can restore the alarm as well + await this.addAlarm(); + } + + /** + * + * @param msg + * @returns `true` if the route has been revalidated since the lastModified from the message, `false` otherwise + */ + checkSyncTable(msg: QueueMessage) { + try { + const numNewer = this.sql + .exec<{ + numNewer: number; + }>( + "SELECT COUNT(*) as numNewer FROM sync WHERE id = ? AND lastSuccess > ? LIMIT 1", + `${msg.MessageBody.host}${msg.MessageBody.url}`, + Math.round(msg.MessageBody.lastModified / 1000) + ) + .one().numNewer; + + return numNewer > 0; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + } catch (e: unknown) { + return false; + } + } } diff --git a/packages/cloudflare/src/api/durable-queue.ts b/packages/cloudflare/src/api/durable-queue.ts index 52dd3bdcc..fd3fa55fb 100644 --- a/packages/cloudflare/src/api/durable-queue.ts +++ b/packages/cloudflare/src/api/durable-queue.ts @@ -11,10 +11,8 @@ export default { const id = durableObject.idFromName(msg.MessageGroupId); const stub = durableObject.get(id); - const previewModeId = process.env.__NEXT_PREVIEW_MODE_ID!; await stub.revalidate({ ...msg, - previewModeId, }); }, } satisfies Queue; diff --git a/packages/cloudflare/src/cli/build/build.ts b/packages/cloudflare/src/cli/build/build.ts index 110f563b8..727621b34 100644 --- a/packages/cloudflare/src/cli/build/build.ts +++ b/packages/cloudflare/src/cli/build/build.ts @@ -14,6 +14,7 @@ import type { ProjectOptions } from "../project-options.js"; import { bundleServer } from "./bundle-server.js"; import { compileCacheAssetsManifestSqlFile } from "./open-next/compile-cache-assets-manifest.js"; import { compileEnvFiles } from "./open-next/compile-env-files.js"; +import { compileDurableObjects } from "./open-next/compileDurableObjects.js"; import { copyCacheAssets } from "./open-next/copyCacheAssets.js"; import { createServerBundle } from "./open-next/createServerBundle.js"; import { @@ -97,6 +98,8 @@ export async function build(projectOpts: ProjectOptions): Promise { await createServerBundle(options); + await compileDurableObjects(options); + await bundleServer(options); if (!projectOpts.skipWranglerConfigCheck) { diff --git a/packages/cloudflare/src/cli/build/bundle-server.ts b/packages/cloudflare/src/cli/build/bundle-server.ts index 425ee9e02..a0d8e0af2 100644 --- a/packages/cloudflare/src/cli/build/bundle-server.ts +++ b/packages/cloudflare/src/cli/build/bundle-server.ts @@ -58,14 +58,6 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { const serverFiles = path.join(baseManifestPath, "required-server-files.json"); const nextConfig = JSON.parse(fs.readFileSync(serverFiles, "utf-8")).config; - // TODO: This is a temporary solution to get the previewModeId from the prerender-manifest.json - // We should find a better way to get this value, probably directly provided from aws - // probably in an env variable exactly as for BUILD_ID - const prerenderManifest = path.join(baseManifestPath, "prerender-manifest.json"); - const prerenderManifestContent = fs.readFileSync(prerenderManifest, "utf-8"); - const prerenderManifestJson = JSON.parse(prerenderManifestContent); - const previewModeId = prerenderManifestJson.preview.previewModeId; - console.log(`\x1b[35m⚙️ Bundling the OpenNext server...\n\x1b[0m`); await patchWebpackRuntime(buildOpts); @@ -153,8 +145,6 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { "process.env.TURBOPACK": "false", // This define should be safe to use for Next 14.2+, earlier versions (13.5 and less) will cause trouble "process.env.__NEXT_EXPERIMENTAL_REACT": `${needsExperimentalReact(nextConfig)}`, - // Used for the durable object queue handler - "process.env.__NEXT_PREVIEW_MODE_ID": `"${previewModeId}"`, }, platform: "node", banner: { diff --git a/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts b/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts new file mode 100644 index 000000000..b129ec3a7 --- /dev/null +++ b/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts @@ -0,0 +1,42 @@ +import { createRequire } from "node:module"; +import path from "node:path"; + +import { loadBuildId, loadPrerenderManifest } from "@opennextjs/aws/adapters/config/util.js"; +import { type BuildOptions, esbuildSync, getPackagePath } from "@opennextjs/aws/build/helper.js"; + +export function compileDurableObjects(buildOpts: BuildOptions) { + const _require = createRequire(import.meta.url); + const entryPoints = [_require.resolve("@opennextjs/cloudflare/durable-objects/queue")]; + + const { outputDir } = buildOpts; + + const baseManifestPath = path.join( + outputDir, + "server-functions/default", + getPackagePath(buildOpts), + ".next" + ); + + // We need to change the type in aws + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const prerenderManifest = loadPrerenderManifest(baseManifestPath) as any; + const previewModeId = prerenderManifest.preview.previewModeId; + + const BUILD_ID = loadBuildId(baseManifestPath); + + return esbuildSync( + { + entryPoints, + bundle: true, + platform: "node", + format: "esm", + outdir: path.join(buildOpts.buildDir, "durable-objects"), + external: ["cloudflare:workers"], + define: { + "process.env.__NEXT_PREVIEW_MODE_ID": `"${previewModeId}"`, + "process.env.__NEXT_BUILD_ID": `"${BUILD_ID}"`, + }, + }, + buildOpts + ); +} diff --git a/packages/cloudflare/src/cli/templates/worker.ts b/packages/cloudflare/src/cli/templates/worker.ts index 4e36ceb3e..585db93e8 100644 --- a/packages/cloudflare/src/cli/templates/worker.ts +++ b/packages/cloudflare/src/cli/templates/worker.ts @@ -18,7 +18,7 @@ Object.defineProperty(globalThis, Symbol.for("__cloudflare-context__"), { }); //@ts-expect-error: Will be resolved by wrangler build -export { DurableObjectQueueHandler } from "@opennextjs/cloudflare/durable-objects/queue"; +export { DurableObjectQueueHandler } from "./.build/durable-objects/queue.js"; // Populate process.env on the first request let processEnvPopulated = false;