Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions engine/sdks/typescript/runner/src/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
}

#actors: Map<string, ActorInstance> = new Map();
#actorWebSockets: Map<string, Set<WebSocketTunnelAdapter>> = new Map();

Check warning on line 107 in engine/sdks/typescript/runner/src/mod.ts

View workflow job for this annotation

GitHub Actions / quality

lint/correctness/noUnusedPrivateClassMembers

This private class member is defined but never used.

// WebSocket
#pegboardWebSocket?: WebSocket;
Expand Down Expand Up @@ -172,6 +172,13 @@

// MARK: Manage actors
sleepActor(actorId: string, generation?: number) {
if (this.#shutdown) {
this.log?.warn({
msg: "runner is shut down, cannot sleep actor",
});
return;
}

const actor = this.getActor(actorId, generation);
if (!actor) return;

Expand Down Expand Up @@ -860,6 +867,7 @@
intentType: "sleep" | "stop",
) {
if (this.#shutdown) {
console.trace("send actor intent", actorId, intentType);
this.log?.warn({
msg: "Runner is shut down, cannot send actor intent",
});
Expand Down
15 changes: 13 additions & 2 deletions rivetkit-typescript/packages/rivetkit/src/actor/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,9 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
#resetSleepTimer() {
if (this.#config.options.noSleep || !this.#sleepingSupported) return;

// Don't sleep if already stopping
if (this.#stopCalled) return;

const canSleep = this.#canSleep();

this.#rLog.debug({
Expand Down Expand Up @@ -1979,11 +1982,20 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {
* 4. Engine runner will publish EventActorStateUpdate with ActorStateSTop
**/
_startSleep() {
if (this.#stopCalled) {
this.#rLog.debug({
msg: "cannot call _startSleep if actor already stopping",
});
return;
}

// IMPORTANT: #sleepCalled should have no effect on the actor's
// behavior aside from preventing calling _startSleep twice. Wait for
// `_onStop` before putting in a stopping state.
if (this.#sleepCalled) {
this.#rLog.warn({ msg: "already sleeping actor" });
this.#rLog.warn({
msg: "cannot call _startSleep twice, actor already sleeping",
});
return;
}
this.#sleepCalled = true;
Expand Down Expand Up @@ -2069,7 +2081,6 @@ export class ActorInstance<S, CP, CS, V, I, DB extends AnyDatabaseProvider> {

// Clear timeouts
if (this.#pendingSaveTimeout) clearTimeout(this.#pendingSaveTimeout);
if (this.#sleepTimeout) clearTimeout(this.#sleepTimeout);
if (this.#checkConnLivenessInterval)
clearInterval(this.#checkConnLivenessInterval);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ export class EngineActorDriver implements ActorDriver {
}

async shutdownRunner(immediate: boolean): Promise<void> {
logger().info({ msg: "stopping engine actor driver" });
logger().info({ msg: "stopping engine actor driver", immediate });

// Clear the ack flush interval
if (this.#wsAckFlushInterval) {
Expand Down
Loading