From 6667370ba7fdd083ce8a06cb114a5e6fc2a4635e Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 12:24:28 +0100 Subject: [PATCH 01/12] restore failed state --- examples/e2e/app-router/wrangler.jsonc | 2 +- .../src/api/durable-objects/queue.spec.ts | 23 ++++++-- .../src/api/durable-objects/queue.ts | 57 +++++++++++++++---- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/examples/e2e/app-router/wrangler.jsonc b/examples/e2e/app-router/wrangler.jsonc index 8440d8c5e..0884846ff 100644 --- a/examples/e2e/app-router/wrangler.jsonc +++ b/examples/e2e/app-router/wrangler.jsonc @@ -19,7 +19,7 @@ "migrations": [ { "tag": "v1", - "new_classes": ["DurableObjectQueueHandler"] + "new_sqlite_classes": ["DurableObjectQueueHandler"] } ], "kv_namespaces": [ diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index e877d2f92..2de7d0f2e 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -26,6 +26,9 @@ const createDurableObjectQueue = ({ storage: { setAlarm: vi.fn(), getAlarm: vi.fn(), + sql: { + exec: vi.fn(), + }, }, }; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -102,8 +105,9 @@ describe("DurableObjectQueue", () => { expect(queue.ongoingRevalidations.has("id6")).toBe(false); expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]); + // BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate // @ts-expect-error - expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1); + expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2); // Here we await the blocked request to ensure it's resolved await blockedReq; @@ -201,9 +205,10 @@ describe("DurableObjectQueue", () => { it("should add an alarm if there are failed states", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); + const nextAlarm = Date.now() + 1000; + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: nextAlarm }); await queue.addAlarm(); - expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(nextAlarm); }); it("should not add an alarm if there is already an alarm set", async () => { @@ -217,10 +222,16 @@ describe("DurableObjectQueue", () => { it("should set the alarm to the lowest nextAlarm", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 }); - queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 }); + const nextAlarm = Date.now() + 1000; + const firstAlarm = Date.now() + 500; + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: nextAlarm }); + queue.routeInFailedState.set("id2", { + msg: createMessage("id2"), + retryCount: 0, + nextAlarmMs: firstAlarm, + }); await queue.addAlarm(); - expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(firstAlarm); }); }); diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index ca6bb961a..9b201a71a 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -15,17 +15,21 @@ interface ExtendedQueueMessage extends QueueMessage { previewModeId: string; } +interface FailedState { + msg: ExtendedQueueMessage; + retryCount: number; + nextAlarmMs: number; +} + export class DurableObjectQueueHandler extends DurableObject { // Ongoing revalidations are deduped by the deduplication id // Since this is running in waitUntil, we expect the durable object state to persist this during the duration of the revalidation // TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top) ongoingRevalidations = new Map>(); - // TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage - routeInFailedState = new Map< - string, - { msg: ExtendedQueueMessage; retryCount: number; nextAlarmMs: number } - >(); + sql: SqlStorage; + + routeInFailedState = new Map(); service: NonNullable; @@ -37,6 +41,10 @@ export class DurableObjectQueueHandler extends DurableObject { this.service = env.NEXT_CACHE_REVALIDATION_WORKER!; // If there is no service binding, we throw an error because we can't revalidate without it if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker"); + this.sql = ctx.storage.sql; + + // We restore the state + ctx.blockConcurrencyWhile(() => this.initState()); } async revalidate(msg: ExtendedQueueMessage) { @@ -71,7 +79,6 @@ export class DurableObjectQueueHandler extends DurableObject { } = msg; const protocol = host.includes("localhost") ? "http" : "https"; - //TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc) const response = await this.service.fetch(`${protocol}://${host}${url}`, { method: "HEAD", headers: { @@ -133,6 +140,8 @@ export class DurableObjectQueueHandler extends DurableObject { async addToFailedState(msg: ExtendedQueueMessage) { const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId); + let updatedFailedState: FailedState; + if (existingFailedState) { if (existingFailedState.retryCount >= 6) { // We give up after 6 retries and log the error @@ -143,18 +152,24 @@ export class DurableObjectQueueHandler extends DurableObject { return; } const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; - this.routeInFailedState.set(msg.MessageDeduplicationId, { + updatedFailedState = { ...existingFailedState, retryCount: existingFailedState.retryCount + 1, nextAlarmMs, - }); + }; } else { - this.routeInFailedState.set(msg.MessageDeduplicationId, { + updatedFailedState = { msg, retryCount: 1, nextAlarmMs: Date.now() + 2_000, - }); + }; } + this.routeInFailedState.set(msg.MessageDeduplicationId, updatedFailedState); + this.sql.exec( + "INSERT OR REPLACE INTO failed_state (id, data) VALUES (?, ?)", + msg.MessageDeduplicationId, + JSON.stringify(updatedFailedState) + ); // We probably want to do something if routeInFailedState is becoming too big, at least log it await this.addAlarm(); } @@ -164,9 +179,29 @@ export class DurableObjectQueueHandler extends DurableObject { if (existingAlarm) return; if (this.routeInFailedState.size === 0) return; - const nextAlarmToSetup = Math.min( + let nextAlarmToSetup = Math.min( ...Array.from(this.routeInFailedState.values()).map(({ nextAlarmMs }) => nextAlarmMs) ); + if (nextAlarmToSetup < Date.now()) { + // We don't want to set an alarm in the past + nextAlarmToSetup = Date.now() + 2_000; + } await this.ctx.storage.setAlarm(nextAlarmToSetup); } + + // This function is used to restore the state of the durable object + // We don't restore the ongoing revalidations because we cannot know in which state they are + // We only restore the failed state and the alarm + async initState() { + // We store the failed state as a blob, we don't want to do anything with it anyway besides restoring + this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT)"); + + const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state"); + for (const row of failedStateCursor) { + this.routeInFailedState.set(row.id, JSON.parse(row.data)); + } + + // Now that we have restored the failed state, we can restore the alarm as well + await this.addAlarm(); + } } From da743ee7e37fa4bdc8a79f8e9b36e943336809b3 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 14:44:35 +0100 Subject: [PATCH 02/12] Basic handling of eventual consistency --- .../src/api/durable-objects/queue.spec.ts | 4 ++- .../src/api/durable-objects/queue.ts | 36 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index 2de7d0f2e..a8abcefaf 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -27,7 +27,9 @@ const createDurableObjectQueue = ({ setAlarm: vi.fn(), getAlarm: vi.fn(), sql: { - exec: vi.fn(), + exec: vi.fn().mockImplementation(() => ({ + one: vi.fn() + })), }, }, }; diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 9b201a71a..898da5217 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -54,6 +54,10 @@ export class DurableObjectQueueHandler extends DurableObject { // The route is already in a failed state, it will be retried later if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; + // If the last success is newer than the last modified, it's likely that the regional cache is out of date + // We don't need to revalidate in this case + if (this.checkSyncTable(msg)) return; + if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { const ongoingRevalidations = this.ongoingRevalidations.values(); // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes @@ -78,6 +82,7 @@ export class DurableObjectQueueHandler extends DurableObject { previewModeId, } = msg; const protocol = host.includes("localhost") ? "http" : "https"; + console.log('previewModeId', previewModeId); const response = await this.service.fetch(`${protocol}://${host}${url}`, { method: "HEAD", @@ -109,6 +114,13 @@ export class DurableObjectQueueHandler extends DurableObject { // An unknown error occurred, most likely from something in user code like missing auth in the middleware throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`); } + // Everything went well, we can update the sync table + // We use unixepoch here because without IO the date doesn't change and it will make the e2e tests fail + this.sql.exec( + "INSERT OR REPLACE INTO sync (id, lastSuccess) VALUES (?, unixepoch())", + // We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different. + `${host}${url}` + ); } catch (e) { // Do we want to propagate the error to the calling worker? if (!isOpenNextError(e)) { @@ -133,7 +145,6 @@ export class DurableObjectQueueHandler extends DurableObject { const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents; for (const event of allEventsToRetry) { await this.executeRevalidation(event.msg); - this.routeInFailedState.delete(event.msg.MessageDeduplicationId); } } @@ -196,6 +207,9 @@ export class DurableObjectQueueHandler extends DurableObject { // We store the failed state as a blob, we don't want to do anything with it anyway besides restoring this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT)"); + // We create the sync table to handle eventually consistent incremental cache + this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER)"); + const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state"); for (const row of failedStateCursor) { this.routeInFailedState.set(row.id, JSON.parse(row.data)); @@ -204,4 +218,24 @@ export class DurableObjectQueueHandler extends DurableObject { // Now that we have restored the failed state, we can restore the alarm as well await this.addAlarm(); } + + /** + * + * @param msg + * @returns `true` if the route has been revalidated since the lastModified from the message, `false` otherwise + */ + checkSyncTable(msg: ExtendedQueueMessage) { + try { + const isNewer = this.sql.exec<{ isNewer: number }>( + "SELECT COUNT(*) as isNewer FROM sync WHERE id = ? AND lastSuccess > ?", + `${msg.MessageBody.host}${msg.MessageBody.url}`, + Math.round(msg.MessageBody.lastModified/1000) + ).one().isNewer; + + return isNewer > 0; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + }catch(e: unknown){ + return false; + } + } } From 0f96f28d74c9f3bb778ee3ea30ec10d51915731d Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 15:25:56 +0100 Subject: [PATCH 03/12] fix issue with restored state --- .../src/api/durable-objects/queue.ts | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 898da5217..51887c87b 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -54,10 +54,6 @@ export class DurableObjectQueueHandler extends DurableObject { // The route is already in a failed state, it will be retried later if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; - // If the last success is newer than the last modified, it's likely that the regional cache is out of date - // We don't need to revalidate in this case - if (this.checkSyncTable(msg)) return; - if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { const ongoingRevalidations = this.ongoingRevalidations.values(); // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes @@ -82,7 +78,6 @@ export class DurableObjectQueueHandler extends DurableObject { previewModeId, } = msg; const protocol = host.includes("localhost") ? "http" : "https"; - console.log('previewModeId', previewModeId); const response = await this.service.fetch(`${protocol}://${host}${url}`, { method: "HEAD", @@ -94,12 +89,15 @@ export class DurableObjectQueueHandler extends DurableObject { }); // Now we need to handle errors from the fetch if (response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") { - // Something is very wrong here, it means that either the page is not ISR/SSG (and we shouldn't be here) or the `x-prerender-revalidate` header is not correct (and it should not happen either) + // TODO: when restoring from the failed state during a new deployment, previewModeId will be different and we'll be in this case. Figure out how to handle this. + this.routeInFailedState.delete(msg.MessageDeduplicationId); throw new FatalError( `The revalidation for ${host}${url} cannot be done. This error should never happen.` ); } else if (response.status === 404) { // The page is not found, we should not revalidate it + // We remove the route from the failed state because it might be expected (i.e. a route that was deleted) + this.routeInFailedState.delete(msg.MessageDeduplicationId); throw new IgnorableError( `The revalidation for ${host}${url} cannot be done because the page is not found. It's either expected or an error in user code itself` ); @@ -112,15 +110,14 @@ export class DurableObjectQueueHandler extends DurableObject { } else if (response.status !== 200) { // TODO: check if we need to handle cloudflare specific status codes/errors // An unknown error occurred, most likely from something in user code like missing auth in the middleware + + // We probably want to retry in this case as well + await this.addToFailedState(msg); + throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`); } - // Everything went well, we can update the sync table - // We use unixepoch here because without IO the date doesn't change and it will make the e2e tests fail - this.sql.exec( - "INSERT OR REPLACE INTO sync (id, lastSuccess) VALUES (?, unixepoch())", - // We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different. - `${host}${url}` - ); + // If everything went well, we can remove the route from the failed state + this.routeInFailedState.delete(msg.MessageDeduplicationId); } catch (e) { // Do we want to propagate the error to the calling worker? if (!isOpenNextError(e)) { @@ -145,6 +142,7 @@ export class DurableObjectQueueHandler extends DurableObject { const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents; for (const event of allEventsToRetry) { await this.executeRevalidation(event.msg); + this.routeInFailedState.delete(event.msg.MessageDeduplicationId); } } From 5697e404377da6f5bdc42d6c64f53e4474142e71 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 16:14:04 +0100 Subject: [PATCH 04/12] precompile the durable object --- .../src/api/durable-objects/queue.spec.ts | 3 +- .../src/api/durable-objects/queue.ts | 37 ++++++++--------- packages/cloudflare/src/api/durable-queue.ts | 2 - packages/cloudflare/src/cli/build/build.ts | 3 ++ .../cloudflare/src/cli/build/bundle-server.ts | 10 ----- .../build/open-next/compileDurableObjects.ts | 41 +++++++++++++++++++ .../cloudflare/src/cli/templates/worker.ts | 2 +- 7 files changed, 63 insertions(+), 35 deletions(-) create mode 100644 packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index a8abcefaf..f1424826e 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -28,7 +28,7 @@ const createDurableObjectQueue = ({ getAlarm: vi.fn(), sql: { exec: vi.fn().mockImplementation(() => ({ - one: vi.fn() + one: vi.fn(), })), }, }, @@ -65,6 +65,7 @@ const createMessage = (dedupId: string, lastModified = Date.now()) => ({ describe("DurableObjectQueue", () => { describe("successful revalidation", () => { it("should process a single revalidation", async () => { + process.env.__NEXT_PREVIEW_MODE_ID = "test"; const queue = createDurableObjectQueue({ fetchDuration: 10 }); const firstRequest = await queue.revalidate(createMessage("id")); expect(firstRequest).toBeUndefined(); diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 51887c87b..f40568574 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -11,12 +11,8 @@ import { DurableObject } from "cloudflare:workers"; const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; const DEFAULT_REVALIDATION_TIMEOUT_MS = 10_000; -interface ExtendedQueueMessage extends QueueMessage { - previewModeId: string; -} - interface FailedState { - msg: ExtendedQueueMessage; + msg: QueueMessage; retryCount: number; nextAlarmMs: number; } @@ -47,7 +43,7 @@ export class DurableObjectQueueHandler extends DurableObject { ctx.blockConcurrencyWhile(() => this.initState()); } - async revalidate(msg: ExtendedQueueMessage) { + async revalidate(msg: QueueMessage) { // If there is already an ongoing revalidation, we don't need to revalidate again if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return; @@ -71,25 +67,24 @@ export class DurableObjectQueueHandler extends DurableObject { this.ctx.waitUntil(revalidationPromise); } - private async executeRevalidation(msg: ExtendedQueueMessage) { + private async executeRevalidation(msg: QueueMessage) { try { const { MessageBody: { host, url }, - previewModeId, } = msg; const protocol = host.includes("localhost") ? "http" : "https"; const response = await this.service.fetch(`${protocol}://${host}${url}`, { method: "HEAD", headers: { - "x-prerender-revalidate": previewModeId, + // This is defined during build + "x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!, "x-isr": "1", }, signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS), }); // Now we need to handle errors from the fetch if (response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") { - // TODO: when restoring from the failed state during a new deployment, previewModeId will be different and we'll be in this case. Figure out how to handle this. this.routeInFailedState.delete(msg.MessageDeduplicationId); throw new FatalError( `The revalidation for ${host}${url} cannot be done. This error should never happen.` @@ -146,7 +141,7 @@ export class DurableObjectQueueHandler extends DurableObject { } } - async addToFailedState(msg: ExtendedQueueMessage) { + async addToFailedState(msg: QueueMessage) { const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId); let updatedFailedState: FailedState; @@ -218,21 +213,21 @@ export class DurableObjectQueueHandler extends DurableObject { } /** - * - * @param msg + * + * @param msg * @returns `true` if the route has been revalidated since the lastModified from the message, `false` otherwise */ - checkSyncTable(msg: ExtendedQueueMessage) { + checkSyncTable(msg: QueueMessage) { try { - const isNewer = this.sql.exec<{ isNewer: number }>( - "SELECT COUNT(*) as isNewer FROM sync WHERE id = ? AND lastSuccess > ?", - `${msg.MessageBody.host}${msg.MessageBody.url}`, - Math.round(msg.MessageBody.lastModified/1000) - ).one().isNewer; + const isNewer = this.sql + .exec<{ + isNewer: number; + }>("SELECT COUNT(*) as isNewer FROM sync WHERE id = ? AND lastSuccess > ?", `${msg.MessageBody.host}${msg.MessageBody.url}`, Math.round(msg.MessageBody.lastModified / 1000)) + .one().isNewer; return isNewer > 0; - // eslint-disable-next-line @typescript-eslint/no-unused-vars - }catch(e: unknown){ + // eslint-disable-next-line @typescript-eslint/no-unused-vars + } catch (e: unknown) { return false; } } diff --git a/packages/cloudflare/src/api/durable-queue.ts b/packages/cloudflare/src/api/durable-queue.ts index 52dd3bdcc..fd3fa55fb 100644 --- a/packages/cloudflare/src/api/durable-queue.ts +++ b/packages/cloudflare/src/api/durable-queue.ts @@ -11,10 +11,8 @@ export default { const id = durableObject.idFromName(msg.MessageGroupId); const stub = durableObject.get(id); - const previewModeId = process.env.__NEXT_PREVIEW_MODE_ID!; await stub.revalidate({ ...msg, - previewModeId, }); }, } satisfies Queue; diff --git a/packages/cloudflare/src/cli/build/build.ts b/packages/cloudflare/src/cli/build/build.ts index 110f563b8..727621b34 100644 --- a/packages/cloudflare/src/cli/build/build.ts +++ b/packages/cloudflare/src/cli/build/build.ts @@ -14,6 +14,7 @@ import type { ProjectOptions } from "../project-options.js"; import { bundleServer } from "./bundle-server.js"; import { compileCacheAssetsManifestSqlFile } from "./open-next/compile-cache-assets-manifest.js"; import { compileEnvFiles } from "./open-next/compile-env-files.js"; +import { compileDurableObjects } from "./open-next/compileDurableObjects.js"; import { copyCacheAssets } from "./open-next/copyCacheAssets.js"; import { createServerBundle } from "./open-next/createServerBundle.js"; import { @@ -97,6 +98,8 @@ export async function build(projectOpts: ProjectOptions): Promise { await createServerBundle(options); + await compileDurableObjects(options); + await bundleServer(options); if (!projectOpts.skipWranglerConfigCheck) { diff --git a/packages/cloudflare/src/cli/build/bundle-server.ts b/packages/cloudflare/src/cli/build/bundle-server.ts index 425ee9e02..a0d8e0af2 100644 --- a/packages/cloudflare/src/cli/build/bundle-server.ts +++ b/packages/cloudflare/src/cli/build/bundle-server.ts @@ -58,14 +58,6 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { const serverFiles = path.join(baseManifestPath, "required-server-files.json"); const nextConfig = JSON.parse(fs.readFileSync(serverFiles, "utf-8")).config; - // TODO: This is a temporary solution to get the previewModeId from the prerender-manifest.json - // We should find a better way to get this value, probably directly provided from aws - // probably in an env variable exactly as for BUILD_ID - const prerenderManifest = path.join(baseManifestPath, "prerender-manifest.json"); - const prerenderManifestContent = fs.readFileSync(prerenderManifest, "utf-8"); - const prerenderManifestJson = JSON.parse(prerenderManifestContent); - const previewModeId = prerenderManifestJson.preview.previewModeId; - console.log(`\x1b[35m⚙️ Bundling the OpenNext server...\n\x1b[0m`); await patchWebpackRuntime(buildOpts); @@ -153,8 +145,6 @@ export async function bundleServer(buildOpts: BuildOptions): Promise { "process.env.TURBOPACK": "false", // This define should be safe to use for Next 14.2+, earlier versions (13.5 and less) will cause trouble "process.env.__NEXT_EXPERIMENTAL_REACT": `${needsExperimentalReact(nextConfig)}`, - // Used for the durable object queue handler - "process.env.__NEXT_PREVIEW_MODE_ID": `"${previewModeId}"`, }, platform: "node", banner: { diff --git a/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts b/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts new file mode 100644 index 000000000..e79317680 --- /dev/null +++ b/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts @@ -0,0 +1,41 @@ +import fs from "node:fs"; +import { createRequire } from "node:module"; +import path from "node:path"; + +import { type BuildOptions, getPackagePath } from "@opennextjs/aws/build/helper.js"; +import { build } from "esbuild"; + +export function compileDurableObjects(buildOpts: BuildOptions) { + const _require = createRequire(import.meta.url); + const entryPoints = [_require.resolve("@opennextjs/cloudflare/durable-objects/queue")]; + + const { outputDir } = buildOpts; + + const baseManifestPath = path.join( + outputDir, + "server-functions/default", + getPackagePath(buildOpts), + ".next" + ); + + // TODO: Reuse the manifest + const prerenderManifest = path.join(baseManifestPath, "prerender-manifest.json"); + const prerenderManifestContent = fs.readFileSync(prerenderManifest, "utf-8"); + const prerenderManifestJson = JSON.parse(prerenderManifestContent); + const previewModeId = prerenderManifestJson.preview.previewModeId; + + const BUILD_ID = fs.readFileSync(path.join(baseManifestPath, "BUILD_ID"), "utf-8"); + + return build({ + entryPoints, + bundle: true, + platform: "node", + format: "esm", + outdir: path.join(buildOpts.buildDir, "durable-objects"), + external: ["cloudflare:workers"], + define: { + "process.env.__NEXT_PREVIEW_MODE_ID": `"${previewModeId}"`, + "process.env.__NEXT_BUILD_ID": `"${BUILD_ID}"`, + }, + }); +} diff --git a/packages/cloudflare/src/cli/templates/worker.ts b/packages/cloudflare/src/cli/templates/worker.ts index 4e36ceb3e..585db93e8 100644 --- a/packages/cloudflare/src/cli/templates/worker.ts +++ b/packages/cloudflare/src/cli/templates/worker.ts @@ -18,7 +18,7 @@ Object.defineProperty(globalThis, Symbol.for("__cloudflare-context__"), { }); //@ts-expect-error: Will be resolved by wrangler build -export { DurableObjectQueueHandler } from "@opennextjs/cloudflare/durable-objects/queue"; +export { DurableObjectQueueHandler } from "./.build/durable-objects/queue.js"; // Populate process.env on the first request let processEnvPopulated = false; From 99a137a022c23f1a3cbd55439c4142faa1a71aab Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 18:07:04 +0100 Subject: [PATCH 05/12] remove old unused data --- .../src/api/durable-objects/queue.ts | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index f40568574..537184079 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -111,6 +111,15 @@ export class DurableObjectQueueHandler extends DurableObject { throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`); } + // Everything went well, we can update the sync table + // We use unixepoch here,it also works with Date.now()/1000, but not with Date.now() alone. + // TODO: This needs to be investigated + this.sql.exec( + "INSERT OR REPLACE INTO sync (id, lastSuccess, buildId) VALUES (?, unixepoch(), ?)", + // We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different. + `${host}${url}`, + process.env.__NEXT_BUILD_ID + ); // If everything went well, we can remove the route from the failed state this.routeInFailedState.delete(msg.MessageDeduplicationId); } catch (e) { @@ -170,9 +179,10 @@ export class DurableObjectQueueHandler extends DurableObject { } this.routeInFailedState.set(msg.MessageDeduplicationId, updatedFailedState); this.sql.exec( - "INSERT OR REPLACE INTO failed_state (id, data) VALUES (?, ?)", + "INSERT OR REPLACE INTO failed_state (id, data, buildId) VALUES (?, ?, ?)", msg.MessageDeduplicationId, - JSON.stringify(updatedFailedState) + JSON.stringify(updatedFailedState), + process.env.__NEXT_BUILD_ID ); // We probably want to do something if routeInFailedState is becoming too big, at least log it await this.addAlarm(); @@ -198,10 +208,14 @@ export class DurableObjectQueueHandler extends DurableObject { // We only restore the failed state and the alarm async initState() { // We store the failed state as a blob, we don't want to do anything with it anyway besides restoring - this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT)"); + this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT, buildId TEXT)"); // We create the sync table to handle eventually consistent incremental cache - this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER)"); + this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER, buildId TEXT)"); + + // Before doing anything else, we clear the DB for any potential old data + this.sql.exec("DELETE FROM failed_state WHERE buildId != ?", process.env.__NEXT_BUILD_ID); + this.sql.exec("DELETE FROM sync WHERE buildId != ?", process.env.__NEXT_BUILD_ID); const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state"); for (const row of failedStateCursor) { @@ -222,7 +236,11 @@ export class DurableObjectQueueHandler extends DurableObject { const isNewer = this.sql .exec<{ isNewer: number; - }>("SELECT COUNT(*) as isNewer FROM sync WHERE id = ? AND lastSuccess > ?", `${msg.MessageBody.host}${msg.MessageBody.url}`, Math.round(msg.MessageBody.lastModified / 1000)) + }>( + "SELECT COUNT(*) as isNewer FROM sync WHERE id = ? AND lastSuccess > ?", + `${msg.MessageBody.host}${msg.MessageBody.url}`, + Math.round(msg.MessageBody.lastModified / 1000) + ) .one().isNewer; return isNewer > 0; From 22abfe7ac9009b87d74da2af6470c768dba02d41 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Thu, 13 Mar 2025 18:41:36 +0100 Subject: [PATCH 06/12] Add some customization --- .../cloudflare/src/api/cloudflare-context.ts | 11 ++++++ .../src/api/durable-objects/queue.ts | 37 +++++++++++++++---- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/packages/cloudflare/src/api/cloudflare-context.ts b/packages/cloudflare/src/api/cloudflare-context.ts index 51aa29606..506cd795f 100644 --- a/packages/cloudflare/src/api/cloudflare-context.ts +++ b/packages/cloudflare/src/api/cloudflare-context.ts @@ -18,6 +18,17 @@ declare global { NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace; // Asset binding ASSETS?: Fetcher; + + // Below are the potential environment variables that can be set by the user to configure the durable object queue handler + // The max number of revalidations that can be processed by the durable worker at the same time + MAX_REVALIDATION_BY_DURABLE_OBJECT?: string; + // The max time in milliseconds that a revalidation can take before being considered as failed + REVALIDATION_TIMEOUT_MS?: string; + // The amount of time after which a revalidation will be attempted again if it failed + // If it fails again it will exponentially back off until it reaches the max retry interval + REVALIDATION_RETRY_INTERVAL_MS?: string; + // The maximum number of attempts that can be made to revalidate a path + MAX_REVALIDATION_ATTEMPTS?: string; } } diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 537184079..32a222840 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -8,8 +8,10 @@ import { } from "@opennextjs/aws/utils/error.js"; import { DurableObject } from "cloudflare:workers"; -const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; +const DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT = 5; const DEFAULT_REVALIDATION_TIMEOUT_MS = 10_000; +const DEFAULT_REVALIDATION_RETRY_INTERVAL_MS = 2_000; +const DEFAULT_MAX_REVALIDATION_ATTEMPTS = 6; interface FailedState { msg: QueueMessage; @@ -29,8 +31,11 @@ export class DurableObjectQueueHandler extends DurableObject { service: NonNullable; - // TODO: allow this to be configurable - How do we want todo that? env variable? passed down from the queue override ? - maxRevalidations = MAX_REVALIDATION_BY_DURABLE_OBJECT; + // Configurable params + maxRevalidations = DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT; + revalidationTimeout = DEFAULT_REVALIDATION_TIMEOUT_MS; + revalidationRetryInterval = DEFAULT_REVALIDATION_RETRY_INTERVAL_MS; + maxRevalidationAttempts = DEFAULT_MAX_REVALIDATION_ATTEMPTS; constructor(ctx: DurableObjectState, env: CloudflareEnv) { super(ctx, env); @@ -41,6 +46,22 @@ export class DurableObjectQueueHandler extends DurableObject { // We restore the state ctx.blockConcurrencyWhile(() => this.initState()); + + this.maxRevalidations = env.MAX_REVALIDATION_BY_DURABLE_OBJECT + ? parseInt(env.MAX_REVALIDATION_BY_DURABLE_OBJECT) + : DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT; + + this.revalidationTimeout = env.REVALIDATION_TIMEOUT_MS + ? parseInt(env.REVALIDATION_TIMEOUT_MS) + : DEFAULT_REVALIDATION_TIMEOUT_MS; + + this.revalidationRetryInterval = env.REVALIDATION_RETRY_INTERVAL_MS + ? parseInt(env.REVALIDATION_RETRY_INTERVAL_MS) + : DEFAULT_REVALIDATION_RETRY_INTERVAL_MS; + + this.maxRevalidationAttempts = env.MAX_REVALIDATION_ATTEMPTS + ? parseInt(env.MAX_REVALIDATION_ATTEMPTS) + : DEFAULT_MAX_REVALIDATION_ATTEMPTS; } async revalidate(msg: QueueMessage) { @@ -50,7 +71,7 @@ export class DurableObjectQueueHandler extends DurableObject { // The route is already in a failed state, it will be retried later if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; - if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) { + if (this.ongoingRevalidations.size >= this.maxRevalidations) { const ongoingRevalidations = this.ongoingRevalidations.values(); // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes // We still await the promise to ensure the revalidation is completed @@ -81,7 +102,7 @@ export class DurableObjectQueueHandler extends DurableObject { "x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!, "x-isr": "1", }, - signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS), + signal: AbortSignal.timeout(this.revalidationTimeout), }); // Now we need to handle errors from the fetch if (response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") { @@ -156,7 +177,7 @@ export class DurableObjectQueueHandler extends DurableObject { let updatedFailedState: FailedState; if (existingFailedState) { - if (existingFailedState.retryCount >= 6) { + if (existingFailedState.retryCount >= this.maxRevalidationAttempts) { // We give up after 6 retries and log the error error( `The revalidation for ${msg.MessageBody.host}${msg.MessageBody.url} has failed after 6 retries. It will not be tried again, but subsequent ISR requests will retry.` @@ -164,7 +185,7 @@ export class DurableObjectQueueHandler extends DurableObject { this.routeInFailedState.delete(msg.MessageDeduplicationId); return; } - const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000; + const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * this.revalidationRetryInterval; updatedFailedState = { ...existingFailedState, retryCount: existingFailedState.retryCount + 1, @@ -198,7 +219,7 @@ export class DurableObjectQueueHandler extends DurableObject { ); if (nextAlarmToSetup < Date.now()) { // We don't want to set an alarm in the past - nextAlarmToSetup = Date.now() + 2_000; + nextAlarmToSetup = Date.now() + this.revalidationRetryInterval; } await this.ctx.storage.setAlarm(nextAlarmToSetup); } From 0a12a38af2637ed83fe87da4369dee4ea5a614b8 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Fri, 14 Mar 2025 11:16:39 +0100 Subject: [PATCH 07/12] added debug info --- .../src/api/durable-objects/queue.ts | 20 +++++++++++-- .../build/open-next/compileDurableObjects.ts | 28 ++++++++++--------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 32a222840..b20b4a0df 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -1,4 +1,4 @@ -import { error } from "@opennextjs/aws/adapters/logger.js"; +import { debug, error } from "@opennextjs/aws/adapters/logger.js"; import type { QueueMessage } from "@opennextjs/aws/types/overrides"; import { FatalError, @@ -45,7 +45,10 @@ export class DurableObjectQueueHandler extends DurableObject { this.sql = ctx.storage.sql; // We restore the state - ctx.blockConcurrencyWhile(() => this.initState()); + ctx.blockConcurrencyWhile(async () => { + debug(`Restoring the state of the durable object`); + await this.initState(); + }); this.maxRevalidations = env.MAX_REVALIDATION_BY_DURABLE_OBJECT ? parseInt(env.MAX_REVALIDATION_BY_DURABLE_OBJECT) @@ -62,6 +65,8 @@ export class DurableObjectQueueHandler extends DurableObject { this.maxRevalidationAttempts = env.MAX_REVALIDATION_ATTEMPTS ? parseInt(env.MAX_REVALIDATION_ATTEMPTS) : DEFAULT_MAX_REVALIDATION_ATTEMPTS; + + debug(`Durable object initialized`); } async revalidate(msg: QueueMessage) { @@ -72,11 +77,17 @@ export class DurableObjectQueueHandler extends DurableObject { if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; if (this.ongoingRevalidations.size >= this.maxRevalidations) { + debug( + `The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.` + ); const ongoingRevalidations = this.ongoingRevalidations.values(); // When there is more than the max revalidations, we block concurrency until one of the revalidations finishes // We still await the promise to ensure the revalidation is completed // This is fine because the queue itself run inside a waitUntil - await this.ctx.blockConcurrencyWhile(() => Promise.race(ongoingRevalidations)); + await this.ctx.blockConcurrencyWhile(async () => { + debug(`Waiting for one of the revalidations to finish`); + await Promise.race(ongoingRevalidations); + }); } const revalidationPromise = this.executeRevalidation(msg); @@ -90,6 +101,7 @@ export class DurableObjectQueueHandler extends DurableObject { private async executeRevalidation(msg: QueueMessage) { try { + debug(`Revalidating ${msg.MessageBody.host}${msg.MessageBody.url}`); const { MessageBody: { host, url }, } = msg; @@ -166,12 +178,14 @@ export class DurableObjectQueueHandler extends DurableObject { ); const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents; for (const event of allEventsToRetry) { + debug(`Retrying revalidation for ${event.msg.MessageBody.host}${event.msg.MessageBody.url}`); await this.executeRevalidation(event.msg); this.routeInFailedState.delete(event.msg.MessageDeduplicationId); } } async addToFailedState(msg: QueueMessage) { + debug(`Adding ${msg.MessageBody.host}${msg.MessageBody.url} to the failed state`); const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId); let updatedFailedState: FailedState; diff --git a/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts b/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts index e79317680..b26c3d51a 100644 --- a/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts +++ b/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts @@ -2,8 +2,7 @@ import fs from "node:fs"; import { createRequire } from "node:module"; import path from "node:path"; -import { type BuildOptions, getPackagePath } from "@opennextjs/aws/build/helper.js"; -import { build } from "esbuild"; +import { type BuildOptions, esbuildSync, getPackagePath } from "@opennextjs/aws/build/helper.js"; export function compileDurableObjects(buildOpts: BuildOptions) { const _require = createRequire(import.meta.url); @@ -26,16 +25,19 @@ export function compileDurableObjects(buildOpts: BuildOptions) { const BUILD_ID = fs.readFileSync(path.join(baseManifestPath, "BUILD_ID"), "utf-8"); - return build({ - entryPoints, - bundle: true, - platform: "node", - format: "esm", - outdir: path.join(buildOpts.buildDir, "durable-objects"), - external: ["cloudflare:workers"], - define: { - "process.env.__NEXT_PREVIEW_MODE_ID": `"${previewModeId}"`, - "process.env.__NEXT_BUILD_ID": `"${BUILD_ID}"`, + return esbuildSync( + { + entryPoints, + bundle: true, + platform: "node", + format: "esm", + outdir: path.join(buildOpts.buildDir, "durable-objects"), + external: ["cloudflare:workers"], + define: { + "process.env.__NEXT_PREVIEW_MODE_ID": `"${previewModeId}"`, + "process.env.__NEXT_BUILD_ID": `"${BUILD_ID}"`, + }, }, - }); + buildOpts + ); } From 5d8b8bd8fb4482654f36a0fca1fb5d7a7abb1c3d Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Mon, 17 Mar 2025 13:05:11 +0100 Subject: [PATCH 08/12] fix rebase --- packages/cloudflare/src/api/durable-objects/queue.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index b20b4a0df..6f49fc24e 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -76,6 +76,10 @@ export class DurableObjectQueueHandler extends DurableObject { // The route is already in a failed state, it will be retried later if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return; + // If the last success is newer than the last modified, it's likely that the regional cache is out of date + // We don't need to revalidate in this case + if (this.checkSyncTable(msg)) return; + if (this.ongoingRevalidations.size >= this.maxRevalidations) { debug( `The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.` @@ -180,7 +184,6 @@ export class DurableObjectQueueHandler extends DurableObject { for (const event of allEventsToRetry) { debug(`Retrying revalidation for ${event.msg.MessageBody.host}${event.msg.MessageBody.url}`); await this.executeRevalidation(event.msg); - this.routeInFailedState.delete(event.msg.MessageDeduplicationId); } } From 40636592d965479f70b464f282407a1103a7d509 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Tue, 18 Mar 2025 21:57:17 +0100 Subject: [PATCH 09/12] lint fix --- packages/cloudflare/src/api/durable-objects/queue.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 6f49fc24e..97ec4f307 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -202,7 +202,8 @@ export class DurableObjectQueueHandler extends DurableObject { this.routeInFailedState.delete(msg.MessageDeduplicationId); return; } - const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * this.revalidationRetryInterval; + const nextAlarmMs = + Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * this.revalidationRetryInterval; updatedFailedState = { ...existingFailedState, retryCount: existingFailedState.retryCount + 1, From 038e122f8afa501e6a90736cea7f7b9da2787d74 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 19 Mar 2025 10:51:27 +0100 Subject: [PATCH 10/12] changeset --- .changeset/smart-bugs-play.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/smart-bugs-play.md diff --git a/.changeset/smart-bugs-play.md b/.changeset/smart-bugs-play.md new file mode 100644 index 000000000..544f9fa7d --- /dev/null +++ b/.changeset/smart-bugs-play.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/cloudflare": patch +--- + +feat: durable object de-duping revalidation queue From c37eb826c91c422bb5395020c95e9f6b6a20defc Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 19 Mar 2025 10:54:43 +0100 Subject: [PATCH 11/12] make params readonly --- packages/cloudflare/src/api/durable-objects/queue.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index 97ec4f307..a1fdc15e3 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -32,10 +32,10 @@ export class DurableObjectQueueHandler extends DurableObject { service: NonNullable; // Configurable params - maxRevalidations = DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT; - revalidationTimeout = DEFAULT_REVALIDATION_TIMEOUT_MS; - revalidationRetryInterval = DEFAULT_REVALIDATION_RETRY_INTERVAL_MS; - maxRevalidationAttempts = DEFAULT_MAX_REVALIDATION_ATTEMPTS; + readonly maxRevalidations: number; + readonly revalidationTimeout: number; + readonly revalidationRetryInterval: number; + readonly maxRevalidationAttempts: number; constructor(ctx: DurableObjectState, env: CloudflareEnv) { super(ctx, env); From d5f467a8011eee1e8a8c6aab6287ad05270e83c8 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 19 Mar 2025 12:25:31 +0100 Subject: [PATCH 12/12] review fix --- .../src/api/durable-objects/queue.spec.ts | 10 +++++----- .../cloudflare/src/api/durable-objects/queue.ts | 10 +++++----- .../cli/build/open-next/compileDurableObjects.ts | 13 ++++++------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/packages/cloudflare/src/api/durable-objects/queue.spec.ts b/packages/cloudflare/src/api/durable-objects/queue.spec.ts index f1424826e..b4032e48a 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.spec.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.spec.ts @@ -208,10 +208,10 @@ describe("DurableObjectQueue", () => { it("should add an alarm if there are failed states", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - const nextAlarm = Date.now() + 1000; - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: nextAlarm }); + const nextAlarmMs = Date.now() + 1000; + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs }); await queue.addAlarm(); - expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(nextAlarm); + expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(nextAlarmMs); }); it("should not add an alarm if there is already an alarm set", async () => { @@ -225,9 +225,9 @@ describe("DurableObjectQueue", () => { it("should set the alarm to the lowest nextAlarm", async () => { const queue = createDurableObjectQueue({ fetchDuration: 10 }); - const nextAlarm = Date.now() + 1000; + const nextAlarmMs = Date.now() + 1000; const firstAlarm = Date.now() + 500; - queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: nextAlarm }); + queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs }); queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, diff --git a/packages/cloudflare/src/api/durable-objects/queue.ts b/packages/cloudflare/src/api/durable-objects/queue.ts index a1fdc15e3..ea95b0dfb 100644 --- a/packages/cloudflare/src/api/durable-objects/queue.ts +++ b/packages/cloudflare/src/api/durable-objects/queue.ts @@ -272,17 +272,17 @@ export class DurableObjectQueueHandler extends DurableObject { */ checkSyncTable(msg: QueueMessage) { try { - const isNewer = this.sql + const numNewer = this.sql .exec<{ - isNewer: number; + numNewer: number; }>( - "SELECT COUNT(*) as isNewer FROM sync WHERE id = ? AND lastSuccess > ?", + "SELECT COUNT(*) as numNewer FROM sync WHERE id = ? AND lastSuccess > ? LIMIT 1", `${msg.MessageBody.host}${msg.MessageBody.url}`, Math.round(msg.MessageBody.lastModified / 1000) ) - .one().isNewer; + .one().numNewer; - return isNewer > 0; + return numNewer > 0; // eslint-disable-next-line @typescript-eslint/no-unused-vars } catch (e: unknown) { return false; diff --git a/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts b/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts index b26c3d51a..b129ec3a7 100644 --- a/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts +++ b/packages/cloudflare/src/cli/build/open-next/compileDurableObjects.ts @@ -1,7 +1,7 @@ -import fs from "node:fs"; import { createRequire } from "node:module"; import path from "node:path"; +import { loadBuildId, loadPrerenderManifest } from "@opennextjs/aws/adapters/config/util.js"; import { type BuildOptions, esbuildSync, getPackagePath } from "@opennextjs/aws/build/helper.js"; export function compileDurableObjects(buildOpts: BuildOptions) { @@ -17,13 +17,12 @@ export function compileDurableObjects(buildOpts: BuildOptions) { ".next" ); - // TODO: Reuse the manifest - const prerenderManifest = path.join(baseManifestPath, "prerender-manifest.json"); - const prerenderManifestContent = fs.readFileSync(prerenderManifest, "utf-8"); - const prerenderManifestJson = JSON.parse(prerenderManifestContent); - const previewModeId = prerenderManifestJson.preview.previewModeId; + // We need to change the type in aws + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const prerenderManifest = loadPrerenderManifest(baseManifestPath) as any; + const previewModeId = prerenderManifest.preview.previewModeId; - const BUILD_ID = fs.readFileSync(path.join(baseManifestPath, "BUILD_ID"), "utf-8"); + const BUILD_ID = loadBuildId(baseManifestPath); return esbuildSync( {