From 0ed702c06a50c37f909270ee9b9d737e0e23998e Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Wed, 19 Nov 2025 23:37:26 -0800 Subject: [PATCH] fix(rivetkit): fix graceful runner shutdown --- engine/sdks/typescript/runner/src/mod.ts | 176 ++++++++++++------ examples/counter/scripts/connect.ts | 3 - .../rivetkit/src/client/actor-conn.ts | 30 ++- 3 files changed, 145 insertions(+), 64 deletions(-) diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index 7e9eb1f640..22e0b3b9ac 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -1,6 +1,7 @@ import * as protocol from "@rivetkit/engine-runner-protocol"; import type { Logger } from "pino"; import type WebSocket from "ws"; +import { type ActorConfig, RunnerActor } from "./actor"; import { logger, setLogger } from "./log.js"; import { stringifyToClient, stringifyToServer } from "./stringify"; import { type HibernatingWebSocketMetadata, Tunnel } from "./tunnel"; @@ -10,7 +11,6 @@ import { unreachable, } from "./utils"; import { importWebSocket } from "./websocket.js"; -import { RunnerActor, type ActorConfig } from "./actor"; export type { HibernatingWebSocketMetadata }; export * as tunnelId from "./tunnel-id"; @@ -22,7 +22,7 @@ const RUNNER_PING_INTERVAL = 3_000; /** Warn once the backlog significantly exceeds the server's ack batch size. */ const EVENT_BACKLOG_WARN_THRESHOLD = 10_000; -const SIGNAL_HANDLERS: (() => void)[] = []; +const SIGNAL_HANDLERS: (() => void | Promise)[] = []; export interface RunnerConfig { logger?: Logger; @@ -202,6 +202,7 @@ export class Runner { #nextEventIdx: bigint = 0n; #started: boolean = false; #shutdown: boolean = false; + #shuttingDown: boolean = false; #reconnectAttempt: number = 0; #reconnectTimeout?: NodeJS.Timeout; @@ -260,13 +261,6 @@ export class Runner { // MARK: Manage actors sleepActor(actorId: string, generation?: number) { - if (this.#shutdown) { - this.log?.warn({ - msg: "runner is shut down, cannot sleep actor", - }); - return; - } - const actor = this.getActor(actorId, generation); if (!actor) return; @@ -420,23 +414,25 @@ export class Runner { if (!this.#config.noAutoShutdown) { if (!SIGNAL_HANDLERS.length) { - process.on("SIGTERM", () => { + process.on("SIGTERM", async () => { this.log?.debug("received SIGTERM"); for (const handler of SIGNAL_HANDLERS) { - handler(); + await handler(); } - process.exit(0); + // TODO: Add back + // process.exit(0); }); - process.on("SIGINT", () => { + process.on("SIGINT", async () => { this.log?.debug("received SIGINT"); for (const handler of SIGNAL_HANDLERS) { - handler(); + await handler(); } - process.exit(0); + // TODO: Add back + // process.exit(0); }); this.log?.debug({ @@ -444,15 +440,24 @@ export class Runner { }); } - SIGNAL_HANDLERS.push(() => { + SIGNAL_HANDLERS.push(async () => { const weak = new WeakRef(this); - weak.deref()?.shutdown(false, false); + await weak.deref()?.shutdown(false, false); }); } } // MARK: Shutdown async shutdown(immediate: boolean, exit: boolean = false) { + // Prevent concurrent shutdowns + if (this.#shuttingDown) { + this.log?.debug({ + msg: "shutdown already in progress, ignoring", + }); + return; + } + this.#shuttingDown = true; + this.log?.info({ msg: "starting shutdown", immediate, @@ -515,8 +520,10 @@ export class Runner { readyState: pegboardWebSocket.readyState, }); - // NOTE: We don't use #sendToServer here because that function checks if the runner is - // shut down + // Start stopping + // + // The runner workflow will send StopActor commands for all + // actors this.__sendToServer({ tag: "ToServerStopping", val: null, @@ -536,7 +543,8 @@ export class Runner { }); }); - // TODO: Wait for all actors to stop before closing ws + // Wait for all actors to stop before closing ws + await this.#waitForActorsToStop(pegboardWebSocket); this.log?.info({ msg: "closing WebSocket", @@ -571,9 +579,96 @@ export class Runner { this.#tunnel = undefined; } + this.#config.onShutdown(); + if (exit) process.exit(0); + } - this.#config.onShutdown(); + /** + * Wait for all actors to stop before proceeding with shutdown. + * + * This method polls every 100ms to check if all actors have been stopped. + * + * It will resolve early if: + * - All actors are stopped + * - The WebSocket connection is closed + * - The shutdown timeout is reached (120 seconds) + */ + async #waitForActorsToStop(ws: WebSocket): Promise { + const shutdownTimeout = 120_000; // 120 seconds + const shutdownCheckInterval = 100; // Check every 100ms + const progressLogInterval = 5_000; // Log progress every 5 seconds + const shutdownStartTs = Date.now(); + let lastProgressLogTs = 0; // Ensure first log happens immediately + + return new Promise((resolve) => { + const checkActors = () => { + const now = Date.now(); + const elapsed = now - shutdownStartTs; + const wsIsClosed = ws.readyState === 2 || ws.readyState === 3; + + if (this.#actors.size === 0) { + this.log?.info({ + msg: "all actors stopped", + elapsed, + }); + return true; + } else if (wsIsClosed) { + this.log?.warn({ + msg: "websocket closed before all actors stopped", + remainingActors: this.#actors.size, + elapsed, + }); + return true; + } else if (elapsed >= shutdownTimeout) { + this.log?.warn({ + msg: "shutdown timeout reached, forcing close", + remainingActors: this.#actors.size, + elapsed, + }); + return true; + } else { + // Log progress every 5 seconds + if (now - lastProgressLogTs >= progressLogInterval) { + this.log?.info({ + msg: "waiting for actors to stop", + remainingActors: this.#actors.size, + elapsed, + }); + lastProgressLogTs = now; + } + return false; + } + }; + + // Check immediately first + if (checkActors()) { + this.log?.debug({ + msg: "actors check completed immediately", + }); + resolve(); + return; + } + + this.log?.debug({ + msg: "starting actor wait interval", + checkInterval: shutdownCheckInterval, + }); + + const interval = setInterval(() => { + this.log?.debug({ + msg: "actor wait interval tick", + actorCount: this.#actors.size, + }); + if (checkActors()) { + this.log?.debug({ + msg: "actors check completed, clearing interval", + }); + clearInterval(interval); + resolve(); + } + }, shutdownCheckInterval); + }); } // MARK: Networking @@ -1014,13 +1109,6 @@ export class Runner { generation: number, intentType: "sleep" | "stop", ) { - if (this.#shutdown) { - console.trace("send actor intent", actorId, intentType); - this.log?.warn({ - msg: "Runner is shut down, cannot send actor intent", - }); - return; - } let actorIntent: protocol.ActorIntent; if (intentType === "sleep") { @@ -1062,12 +1150,6 @@ export class Runner { generation: number, stateType: "running" | "stopped", ) { - if (this.#shutdown) { - this.log?.warn({ - msg: "Runner is shut down, cannot send actor state update", - }); - return; - } let actorState: protocol.ActorState; if (stateType === "running") { @@ -1108,13 +1190,6 @@ export class Runner { } #sendCommandAcknowledgment() { - if (this.#shutdown) { - this.log?.warn({ - msg: "Runner is shut down, cannot send command acknowledgment", - }); - return; - } - if (this.#lastCommandIdx < 0) { // No commands received yet, nothing to acknowledge return; @@ -1423,11 +1498,6 @@ export class Runner { const actor = this.getActor(actorId, generation); if (!actor) return; - if (this.#shutdown) { - console.warn("Runner is shut down, cannot set alarm"); - return; - } - const alarmEvent: protocol.EventActorSetAlarm = { actorId, generation: actor.generation, @@ -1460,11 +1530,6 @@ export class Runner { requestData: protocol.KvRequestData, ): Promise { return new Promise((resolve, reject) => { - if (this.#shutdown) { - reject(new Error("Runner is shut down")); - return; - } - const requestId = this.#nextKvRequestId++; const isConnected = this.#pegboardWebSocket && @@ -1541,14 +1606,7 @@ export class Runner { : false; } - __sendToServer(message: protocol.ToServer, allowShutdown: boolean = false) { - if (!allowShutdown && this.#shutdown) { - this.log?.warn({ - msg: "Runner is shut down, cannot send message to server", - }); - return; - } - + __sendToServer(message: protocol.ToServer) { this.log?.debug({ msg: "sending runner message", data: stringifyToServer(message), diff --git a/examples/counter/scripts/connect.ts b/examples/counter/scripts/connect.ts index aa8d659547..42208adced 100644 --- a/examples/counter/scripts/connect.ts +++ b/examples/counter/scripts/connect.ts @@ -14,9 +14,6 @@ async function main() { await new Promise((resolve) => setTimeout(resolve, 1000)); } - - await new Promise((resolve) => setTimeout(resolve, 10000)); - await counter.dispose(); } main(); diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts index 93041edd9b..91770e0cc5 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts @@ -197,6 +197,12 @@ export class ActorConnRaw { resolve, reject, }); + logger().debug({ + msg: "added action to in-flight map", + actionId, + actionName: opts.name, + inFlightCount: this.#actionsInFlight.size, + }); this.#sendMessage({ body: { @@ -460,9 +466,11 @@ enc } else if (response.body.tag === "ActionResponse") { // Action response OK const { id: actionId } = response.body.val; - logger().trace({ + logger().debug({ msg: "received action response", - actionId, + actionId: Number(actionId), + inFlightCount: this.#actionsInFlight.size, + inFlightIds: Array.from(this.#actionsInFlight.keys()), }); const inFlight = this.#takeActionInFlight(Number(actionId)); @@ -561,9 +569,27 @@ enc #takeActionInFlight(id: number): ActionInFlight { const inFlight = this.#actionsInFlight.get(id); if (!inFlight) { + logger().error({ + msg: "action not found in in-flight map", + lookupId: id, + inFlightCount: this.#actionsInFlight.size, + inFlightIds: Array.from(this.#actionsInFlight.keys()), + inFlightActions: Array.from( + this.#actionsInFlight.entries(), + ).map(([id, action]) => ({ + id, + name: action.name, + })), + }); throw new errors.InternalError(`No in flight response for ${id}`); } this.#actionsInFlight.delete(id); + logger().debug({ + msg: "removed action from in-flight map", + actionId: id, + actionName: inFlight.name, + inFlightCount: this.#actionsInFlight.size, + }); return inFlight; }