Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/smart-bugs-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@opennextjs/cloudflare": patch
---

feat: durable object de-duping revalidation queue
2 changes: 1 addition & 1 deletion examples/e2e/app-router/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"migrations": [
{
"tag": "v1",
"new_classes": ["DurableObjectQueueHandler"]
"new_sqlite_classes": ["DurableObjectQueueHandler"]
}
],
"kv_namespaces": [
Expand Down
11 changes: 11 additions & 0 deletions packages/cloudflare/src/api/cloudflare-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ declare global {
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
// Asset binding
ASSETS?: Fetcher;

// Below are the potential environment variables that can be set by the user to configure the durable object queue handler
// The max number of revalidations that can be processed by the durable worker at the same time
MAX_REVALIDATION_BY_DURABLE_OBJECT?: string;
// The max time in milliseconds that a revalidation can take before being considered as failed
REVALIDATION_TIMEOUT_MS?: string;
// The amount of time after which a revalidation will be attempted again if it failed
// If it fails again it will exponentially back off until it reaches the max retry interval
REVALIDATION_RETRY_INTERVAL_MS?: string;
// The maximum number of attempts that can be made to revalidate a path
MAX_REVALIDATION_ATTEMPTS?: string;
}
}

Expand Down
26 changes: 20 additions & 6 deletions packages/cloudflare/src/api/durable-objects/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ const createDurableObjectQueue = ({
storage: {
setAlarm: vi.fn(),
getAlarm: vi.fn(),
sql: {
exec: vi.fn().mockImplementation(() => ({
one: vi.fn(),
})),
},
},
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -60,6 +65,7 @@ const createMessage = (dedupId: string, lastModified = Date.now()) => ({
describe("DurableObjectQueue", () => {
describe("successful revalidation", () => {
it("should process a single revalidation", async () => {
process.env.__NEXT_PREVIEW_MODE_ID = "test";
const queue = createDurableObjectQueue({ fetchDuration: 10 });
const firstRequest = await queue.revalidate(createMessage("id"));
expect(firstRequest).toBeUndefined();
Expand Down Expand Up @@ -102,8 +108,9 @@ describe("DurableObjectQueue", () => {
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);

// BlockConcurrencyWhile is called twice here, first time during creation of the object and second time when we try to revalidate
// @ts-expect-error
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1);
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(2);

// Here we await the blocked request to ensure it's resolved
await blockedReq;
Expand Down Expand Up @@ -201,9 +208,10 @@ describe("DurableObjectQueue", () => {

it("should add an alarm if there are failed states", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
const nextAlarmMs = Date.now() + 1000;
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs });
await queue.addAlarm();
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000);
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(nextAlarmMs);
});

it("should not add an alarm if there is already an alarm set", async () => {
Expand All @@ -217,10 +225,16 @@ describe("DurableObjectQueue", () => {

it("should set the alarm to the lowest nextAlarm", async () => {
const queue = createDurableObjectQueue({ fetchDuration: 10 });
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 });
const nextAlarmMs = Date.now() + 1000;
const firstAlarm = Date.now() + 500;
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs });
queue.routeInFailedState.set("id2", {
msg: createMessage("id2"),
retryCount: 0,
nextAlarmMs: firstAlarm,
});
await queue.addAlarm();
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500);
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(firstAlarm);
});
});

Expand Down
177 changes: 148 additions & 29 deletions packages/cloudflare/src/api/durable-objects/queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { error } from "@opennextjs/aws/adapters/logger.js";
import { debug, error } from "@opennextjs/aws/adapters/logger.js";
import type { QueueMessage } from "@opennextjs/aws/types/overrides";
import {
FatalError,
Expand All @@ -8,11 +8,15 @@ import {
} from "@opennextjs/aws/utils/error.js";
import { DurableObject } from "cloudflare:workers";

const MAX_REVALIDATION_BY_DURABLE_OBJECT = 5;
const DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT = 5;
const DEFAULT_REVALIDATION_TIMEOUT_MS = 10_000;
const DEFAULT_REVALIDATION_RETRY_INTERVAL_MS = 2_000;
const DEFAULT_MAX_REVALIDATION_ATTEMPTS = 6;

interface ExtendedQueueMessage extends QueueMessage {
previewModeId: string;
interface FailedState {
msg: QueueMessage;
retryCount: number;
nextAlarmMs: number;
}

export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
Expand All @@ -21,37 +25,73 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
// TODO: handle incremental cache with only eventual consistency (i.e. KV or R2/D1 with the optional cache layer on top)
ongoingRevalidations = new Map<string, Promise<void>>();

// TODO: restore the state of the failed revalidations - Probably in the next PR where i'll add the storage
routeInFailedState = new Map<
string,
{ msg: ExtendedQueueMessage; retryCount: number; nextAlarmMs: number }
>();
sql: SqlStorage;

routeInFailedState = new Map<string, FailedState>();

service: NonNullable<CloudflareEnv["NEXT_CACHE_REVALIDATION_WORKER"]>;

// TODO: allow this to be configurable - How do we want todo that? env variable? passed down from the queue override ?
maxRevalidations = MAX_REVALIDATION_BY_DURABLE_OBJECT;
// Configurable params
readonly maxRevalidations: number;
readonly revalidationTimeout: number;
readonly revalidationRetryInterval: number;
readonly maxRevalidationAttempts: number;

constructor(ctx: DurableObjectState, env: CloudflareEnv) {
super(ctx, env);
this.service = env.NEXT_CACHE_REVALIDATION_WORKER!;
// If there is no service binding, we throw an error because we can't revalidate without it
if (!this.service) throw new IgnorableError("No service binding for cache revalidation worker");
this.sql = ctx.storage.sql;

// We restore the state
ctx.blockConcurrencyWhile(async () => {
debug(`Restoring the state of the durable object`);
await this.initState();
});

this.maxRevalidations = env.MAX_REVALIDATION_BY_DURABLE_OBJECT
? parseInt(env.MAX_REVALIDATION_BY_DURABLE_OBJECT)
: DEFAULT_MAX_REVALIDATION_BY_DURABLE_OBJECT;

this.revalidationTimeout = env.REVALIDATION_TIMEOUT_MS
? parseInt(env.REVALIDATION_TIMEOUT_MS)
: DEFAULT_REVALIDATION_TIMEOUT_MS;

this.revalidationRetryInterval = env.REVALIDATION_RETRY_INTERVAL_MS
? parseInt(env.REVALIDATION_RETRY_INTERVAL_MS)
: DEFAULT_REVALIDATION_RETRY_INTERVAL_MS;

this.maxRevalidationAttempts = env.MAX_REVALIDATION_ATTEMPTS
? parseInt(env.MAX_REVALIDATION_ATTEMPTS)
: DEFAULT_MAX_REVALIDATION_ATTEMPTS;

debug(`Durable object initialized`);
}

async revalidate(msg: ExtendedQueueMessage) {
async revalidate(msg: QueueMessage) {
// If there is already an ongoing revalidation, we don't need to revalidate again
if (this.ongoingRevalidations.has(msg.MessageDeduplicationId)) return;

// The route is already in a failed state, it will be retried later
if (this.routeInFailedState.has(msg.MessageDeduplicationId)) return;

if (this.ongoingRevalidations.size >= MAX_REVALIDATION_BY_DURABLE_OBJECT) {
// If the last success is newer than the last modified, it's likely that the regional cache is out of date
// We don't need to revalidate in this case
if (this.checkSyncTable(msg)) return;

if (this.ongoingRevalidations.size >= this.maxRevalidations) {
debug(
`The maximum number of revalidations (${this.maxRevalidations}) is reached. Blocking until one of the revalidations finishes.`
);
const ongoingRevalidations = this.ongoingRevalidations.values();
// When there is more than the max revalidations, we block concurrency until one of the revalidations finishes
// We still await the promise to ensure the revalidation is completed
// This is fine because the queue itself run inside a waitUntil
await this.ctx.blockConcurrencyWhile(() => Promise.race(ongoingRevalidations));
await this.ctx.blockConcurrencyWhile(async () => {
debug(`Waiting for one of the revalidations to finish`);
await Promise.race(ongoingRevalidations);
});
}

const revalidationPromise = this.executeRevalidation(msg);
Expand All @@ -63,31 +103,33 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
this.ctx.waitUntil(revalidationPromise);
}

private async executeRevalidation(msg: ExtendedQueueMessage) {
private async executeRevalidation(msg: QueueMessage) {
try {
debug(`Revalidating ${msg.MessageBody.host}${msg.MessageBody.url}`);
const {
MessageBody: { host, url },
previewModeId,
} = msg;
const protocol = host.includes("localhost") ? "http" : "https";

//TODO: handle the different types of errors that can occur during the fetch (i.e. timeout, network error, etc)
const response = await this.service.fetch(`${protocol}://${host}${url}`, {
method: "HEAD",
headers: {
"x-prerender-revalidate": previewModeId,
// This is defined during build
"x-prerender-revalidate": process.env.__NEXT_PREVIEW_MODE_ID!,
"x-isr": "1",
},
signal: AbortSignal.timeout(DEFAULT_REVALIDATION_TIMEOUT_MS),
signal: AbortSignal.timeout(this.revalidationTimeout),
});
// Now we need to handle errors from the fetch
if (response.status === 200 && response.headers.get("x-nextjs-cache") !== "REVALIDATED") {
// Something is very wrong here, it means that either the page is not ISR/SSG (and we shouldn't be here) or the `x-prerender-revalidate` header is not correct (and it should not happen either)
this.routeInFailedState.delete(msg.MessageDeduplicationId);
throw new FatalError(
`The revalidation for ${host}${url} cannot be done. This error should never happen.`
);
} else if (response.status === 404) {
// The page is not found, we should not revalidate it
// We remove the route from the failed state because it might be expected (i.e. a route that was deleted)
this.routeInFailedState.delete(msg.MessageDeduplicationId);
throw new IgnorableError(
`The revalidation for ${host}${url} cannot be done because the page is not found. It's either expected or an error in user code itself`
);
Expand All @@ -100,8 +142,23 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
} else if (response.status !== 200) {
// TODO: check if we need to handle cloudflare specific status codes/errors
// An unknown error occurred, most likely from something in user code like missing auth in the middleware

// We probably want to retry in this case as well
await this.addToFailedState(msg);

throw new RecoverableError(`An unknown error occurred while revalidating ${host}${url}`);
}
// Everything went well, we can update the sync table
// We use unixepoch here,it also works with Date.now()/1000, but not with Date.now() alone.
// TODO: This needs to be investigated
this.sql.exec(
"INSERT OR REPLACE INTO sync (id, lastSuccess, buildId) VALUES (?, unixepoch(), ?)",
// We cannot use the deduplication id because it's not unique per route - every time a route is revalidated, the deduplication id is different.
`${host}${url}`,
process.env.__NEXT_BUILD_ID
);
// If everything went well, we can remove the route from the failed state
this.routeInFailedState.delete(msg.MessageDeduplicationId);
} catch (e) {
// Do we want to propagate the error to the calling worker?
if (!isOpenNextError(e)) {
Expand All @@ -125,36 +182,47 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
);
const allEventsToRetry = nextEventToRetry ? [nextEventToRetry, ...expiredEvents] : expiredEvents;
for (const event of allEventsToRetry) {
debug(`Retrying revalidation for ${event.msg.MessageBody.host}${event.msg.MessageBody.url}`);
await this.executeRevalidation(event.msg);
this.routeInFailedState.delete(event.msg.MessageDeduplicationId);
}
}

async addToFailedState(msg: ExtendedQueueMessage) {
async addToFailedState(msg: QueueMessage) {
debug(`Adding ${msg.MessageBody.host}${msg.MessageBody.url} to the failed state`);
const existingFailedState = this.routeInFailedState.get(msg.MessageDeduplicationId);

let updatedFailedState: FailedState;

if (existingFailedState) {
if (existingFailedState.retryCount >= 6) {
if (existingFailedState.retryCount >= this.maxRevalidationAttempts) {
// We give up after 6 retries and log the error
error(
`The revalidation for ${msg.MessageBody.host}${msg.MessageBody.url} has failed after 6 retries. It will not be tried again, but subsequent ISR requests will retry.`
);
this.routeInFailedState.delete(msg.MessageDeduplicationId);
return;
}
const nextAlarmMs = Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * 2_000;
this.routeInFailedState.set(msg.MessageDeduplicationId, {
const nextAlarmMs =
Date.now() + Math.pow(2, existingFailedState.retryCount + 1) * this.revalidationRetryInterval;
updatedFailedState = {
...existingFailedState,
retryCount: existingFailedState.retryCount + 1,
nextAlarmMs,
});
};
} else {
this.routeInFailedState.set(msg.MessageDeduplicationId, {
updatedFailedState = {
msg,
retryCount: 1,
nextAlarmMs: Date.now() + 2_000,
});
};
}
this.routeInFailedState.set(msg.MessageDeduplicationId, updatedFailedState);
this.sql.exec(
"INSERT OR REPLACE INTO failed_state (id, data, buildId) VALUES (?, ?, ?)",
msg.MessageDeduplicationId,
JSON.stringify(updatedFailedState),
process.env.__NEXT_BUILD_ID
);
// We probably want to do something if routeInFailedState is becoming too big, at least log it
await this.addAlarm();
}
Expand All @@ -164,9 +232,60 @@ export class DurableObjectQueueHandler extends DurableObject<CloudflareEnv> {
if (existingAlarm) return;
if (this.routeInFailedState.size === 0) return;

const nextAlarmToSetup = Math.min(
let nextAlarmToSetup = Math.min(
...Array.from(this.routeInFailedState.values()).map(({ nextAlarmMs }) => nextAlarmMs)
);
if (nextAlarmToSetup < Date.now()) {
// We don't want to set an alarm in the past
nextAlarmToSetup = Date.now() + this.revalidationRetryInterval;
}
await this.ctx.storage.setAlarm(nextAlarmToSetup);
}

// This function is used to restore the state of the durable object
// We don't restore the ongoing revalidations because we cannot know in which state they are
// We only restore the failed state and the alarm
async initState() {
// We store the failed state as a blob, we don't want to do anything with it anyway besides restoring
this.sql.exec("CREATE TABLE IF NOT EXISTS failed_state (id TEXT PRIMARY KEY, data TEXT, buildId TEXT)");

// We create the sync table to handle eventually consistent incremental cache
this.sql.exec("CREATE TABLE IF NOT EXISTS sync (id TEXT PRIMARY KEY, lastSuccess INTEGER, buildId TEXT)");

// Before doing anything else, we clear the DB for any potential old data
this.sql.exec("DELETE FROM failed_state WHERE buildId != ?", process.env.__NEXT_BUILD_ID);
this.sql.exec("DELETE FROM sync WHERE buildId != ?", process.env.__NEXT_BUILD_ID);

const failedStateCursor = this.sql.exec<{ id: string; data: string }>("SELECT * FROM failed_state");
for (const row of failedStateCursor) {
this.routeInFailedState.set(row.id, JSON.parse(row.data));
}

// Now that we have restored the failed state, we can restore the alarm as well
await this.addAlarm();
}

/**
*
* @param msg
* @returns `true` if the route has been revalidated since the lastModified from the message, `false` otherwise
*/
checkSyncTable(msg: QueueMessage) {
try {
const numNewer = this.sql
.exec<{
numNewer: number;
}>(
"SELECT COUNT(*) as numNewer FROM sync WHERE id = ? AND lastSuccess > ? LIMIT 1",
`${msg.MessageBody.host}${msg.MessageBody.url}`,
Math.round(msg.MessageBody.lastModified / 1000)
)
.one().numNewer;

return numNewer > 0;
// eslint-disable-next-line @typescript-eslint/no-unused-vars
} catch (e: unknown) {
return false;
}
}
}
2 changes: 0 additions & 2 deletions packages/cloudflare/src/api/durable-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ export default {

const id = durableObject.idFromName(msg.MessageGroupId);
const stub = durableObject.get(id);
const previewModeId = process.env.__NEXT_PREVIEW_MODE_ID!;
await stub.revalidate({
...msg,
previewModeId,
});
},
} satisfies Queue;
Loading