Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 117 additions & 59 deletions engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as protocol from "@rivetkit/engine-runner-protocol";
import type { Logger } from "pino";
import type WebSocket from "ws";
import { type ActorConfig, RunnerActor } from "./actor";
import { logger, setLogger } from "./log.js";
import { stringifyToClient, stringifyToServer } from "./stringify";
import { type HibernatingWebSocketMetadata, Tunnel } from "./tunnel";
Expand All @@ -10,7 +11,6 @@
unreachable,
} from "./utils";
import { importWebSocket } from "./websocket.js";
import { RunnerActor, type ActorConfig } from "./actor";

export type { HibernatingWebSocketMetadata };
export * as tunnelId from "./tunnel-id";
Expand All @@ -22,7 +22,7 @@

/** Warn once the backlog significantly exceeds the server's ack batch size. */
const EVENT_BACKLOG_WARN_THRESHOLD = 10_000;
const SIGNAL_HANDLERS: (() => void)[] = [];
const SIGNAL_HANDLERS: (() => void | Promise<void>)[] = [];

export interface RunnerConfig {
logger?: Logger;
Expand Down Expand Up @@ -199,9 +199,10 @@
runnerId?: string;
#lastCommandIdx: number = -1;
#pingLoop?: NodeJS.Timeout;
#nextEventIdx: bigint = 0n;

Check warning on line 202 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.

Check warning on line 202 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.

Check warning on line 202 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.

Check warning on line 202 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.

Check warning on line 202 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.

Check warning on line 202 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.
#started: boolean = false;
#shutdown: boolean = false;
#shuttingDown: boolean = false;
#reconnectAttempt: number = 0;
#reconnectTimeout?: NodeJS.Timeout;

Expand Down Expand Up @@ -260,13 +261,6 @@

// MARK: Manage actors
sleepActor(actorId: string, generation?: number) {
if (this.#shutdown) {
this.log?.warn({
msg: "runner is shut down, cannot sleep actor",
});
return;
}

const actor = this.getActor(actorId, generation);
if (!actor) return;

Expand Down Expand Up @@ -420,39 +414,50 @@

if (!this.#config.noAutoShutdown) {
if (!SIGNAL_HANDLERS.length) {
process.on("SIGTERM", () => {
process.on("SIGTERM", async () => {
this.log?.debug("received SIGTERM");

for (const handler of SIGNAL_HANDLERS) {
handler();
await handler();
}

process.exit(0);
// TODO: Add back
// process.exit(0);
});
process.on("SIGINT", () => {
process.on("SIGINT", async () => {
this.log?.debug("received SIGINT");

for (const handler of SIGNAL_HANDLERS) {
handler();
await handler();
}

process.exit(0);
// TODO: Add back
// process.exit(0);
});

this.log?.debug({
msg: "added SIGTERM listeners",
});
}

SIGNAL_HANDLERS.push(() => {
SIGNAL_HANDLERS.push(async () => {
const weak = new WeakRef(this);
weak.deref()?.shutdown(false, false);
await weak.deref()?.shutdown(false, false);
});
}
}

// MARK: Shutdown
async shutdown(immediate: boolean, exit: boolean = false) {
// Prevent concurrent shutdowns
if (this.#shuttingDown) {
this.log?.debug({
msg: "shutdown already in progress, ignoring",
});
return;
}
this.#shuttingDown = true;

this.log?.info({
msg: "starting shutdown",
immediate,
Expand Down Expand Up @@ -515,8 +520,10 @@
readyState: pegboardWebSocket.readyState,
});

// NOTE: We don't use #sendToServer here because that function checks if the runner is
// shut down
// Start stopping
//
// The runner workflow will send StopActor commands for all
// actors
this.__sendToServer({
tag: "ToServerStopping",
val: null,
Expand All @@ -536,7 +543,8 @@
});
});

// TODO: Wait for all actors to stop before closing ws
// Wait for all actors to stop before closing ws
await this.#waitForActorsToStop(pegboardWebSocket);

this.log?.info({
msg: "closing WebSocket",
Expand Down Expand Up @@ -571,9 +579,96 @@
this.#tunnel = undefined;
}

this.#config.onShutdown();

if (exit) process.exit(0);
}

this.#config.onShutdown();
/**
* Wait for all actors to stop before proceeding with shutdown.
*
* This method polls every 100ms to check if all actors have been stopped.
*
* It will resolve early if:
* - All actors are stopped
* - The WebSocket connection is closed
* - The shutdown timeout is reached (120 seconds)
*/
async #waitForActorsToStop(ws: WebSocket): Promise<void> {
const shutdownTimeout = 120_000; // 120 seconds
const shutdownCheckInterval = 100; // Check every 100ms
const progressLogInterval = 5_000; // Log progress every 5 seconds
const shutdownStartTs = Date.now();
let lastProgressLogTs = 0; // Ensure first log happens immediately

return new Promise<void>((resolve) => {
const checkActors = () => {
const now = Date.now();
const elapsed = now - shutdownStartTs;
const wsIsClosed = ws.readyState === 2 || ws.readyState === 3;

if (this.#actors.size === 0) {
this.log?.info({
msg: "all actors stopped",
elapsed,
});
return true;
} else if (wsIsClosed) {
this.log?.warn({
msg: "websocket closed before all actors stopped",
remainingActors: this.#actors.size,
elapsed,
});
return true;
} else if (elapsed >= shutdownTimeout) {
this.log?.warn({
msg: "shutdown timeout reached, forcing close",
remainingActors: this.#actors.size,
elapsed,
});
return true;
} else {
// Log progress every 5 seconds
if (now - lastProgressLogTs >= progressLogInterval) {
this.log?.info({
msg: "waiting for actors to stop",
remainingActors: this.#actors.size,
elapsed,
});
lastProgressLogTs = now;
}
return false;
}
};

// Check immediately first
if (checkActors()) {
this.log?.debug({
msg: "actors check completed immediately",
});
resolve();
return;
}

this.log?.debug({
msg: "starting actor wait interval",
checkInterval: shutdownCheckInterval,
});

const interval = setInterval(() => {
this.log?.debug({
msg: "actor wait interval tick",
actorCount: this.#actors.size,
});
if (checkActors()) {
this.log?.debug({
msg: "actors check completed, clearing interval",
});
clearInterval(interval);
resolve();
}
}, shutdownCheckInterval);
});
}

// MARK: Networking
Expand Down Expand Up @@ -1014,13 +1109,6 @@
generation: number,
intentType: "sleep" | "stop",
) {
if (this.#shutdown) {
console.trace("send actor intent", actorId, intentType);
this.log?.warn({
msg: "Runner is shut down, cannot send actor intent",
});
return;
}
let actorIntent: protocol.ActorIntent;

if (intentType === "sleep") {
Expand Down Expand Up @@ -1062,12 +1150,6 @@
generation: number,
stateType: "running" | "stopped",
) {
if (this.#shutdown) {
this.log?.warn({
msg: "Runner is shut down, cannot send actor state update",
});
return;
}
let actorState: protocol.ActorState;

if (stateType === "running") {
Expand Down Expand Up @@ -1108,13 +1190,6 @@
}

#sendCommandAcknowledgment() {
if (this.#shutdown) {
this.log?.warn({
msg: "Runner is shut down, cannot send command acknowledgment",
});
return;
}

if (this.#lastCommandIdx < 0) {
// No commands received yet, nothing to acknowledge
return;
Expand Down Expand Up @@ -1423,11 +1498,6 @@
const actor = this.getActor(actorId, generation);
if (!actor) return;

if (this.#shutdown) {
console.warn("Runner is shut down, cannot set alarm");
return;
}

const alarmEvent: protocol.EventActorSetAlarm = {
actorId,
generation: actor.generation,
Expand Down Expand Up @@ -1460,11 +1530,6 @@
requestData: protocol.KvRequestData,
): Promise<any> {
return new Promise((resolve, reject) => {
if (this.#shutdown) {
reject(new Error("Runner is shut down"));
return;
}

const requestId = this.#nextKvRequestId++;
const isConnected =
this.#pegboardWebSocket &&
Expand Down Expand Up @@ -1541,14 +1606,7 @@
: false;
}

__sendToServer(message: protocol.ToServer, allowShutdown: boolean = false) {
if (!allowShutdown && this.#shutdown) {
this.log?.warn({
msg: "Runner is shut down, cannot send message to server",
});
return;
}

__sendToServer(message: protocol.ToServer) {
this.log?.debug({
msg: "sending runner message",
data: stringifyToServer(message),
Expand Down
3 changes: 0 additions & 3 deletions examples/counter/scripts/connect.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 28 additions & 2 deletions rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ export class ActorConnRaw {
resolve,
reject,
});
logger().debug({
msg: "added action to in-flight map",
actionId,
actionName: opts.name,
inFlightCount: this.#actionsInFlight.size,
});

this.#sendMessage({
body: {
Expand Down Expand Up @@ -460,9 +466,11 @@ enc
} else if (response.body.tag === "ActionResponse") {
// Action response OK
const { id: actionId } = response.body.val;
logger().trace({
logger().debug({
msg: "received action response",
actionId,
actionId: Number(actionId),
inFlightCount: this.#actionsInFlight.size,
inFlightIds: Array.from(this.#actionsInFlight.keys()),
});

const inFlight = this.#takeActionInFlight(Number(actionId));
Expand Down Expand Up @@ -561,9 +569,27 @@ enc
#takeActionInFlight(id: number): ActionInFlight {
const inFlight = this.#actionsInFlight.get(id);
if (!inFlight) {
logger().error({
msg: "action not found in in-flight map",
lookupId: id,
inFlightCount: this.#actionsInFlight.size,
inFlightIds: Array.from(this.#actionsInFlight.keys()),
inFlightActions: Array.from(
this.#actionsInFlight.entries(),
).map(([id, action]) => ({
id,
name: action.name,
})),
});
throw new errors.InternalError(`No in flight response for ${id}`);
}
this.#actionsInFlight.delete(id);
logger().debug({
msg: "removed action from in-flight map",
actionId: id,
actionName: inFlight.name,
inFlightCount: this.#actionsInFlight.size,
});
return inFlight;
}

Expand Down
Loading