Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"dev": "npx turbo watch dev",
"build": "npx turbo build",
"test": "npx turbo test",
"test:watch": "npx turbo watch test",
"check-types": "npx turbo check-types",
"fmt": "yarn biome check --write .",
"dev-docs": "cd docs && yarn dlx mintlify@latest dev",
Expand Down
37 changes: 4 additions & 33 deletions packages/actor-core/src/actor/connection.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
import type {
ActorInstance,
AnyActorInstance,
ExtractActorConnParams,
ExtractActorConnState,
} from "./instance";
import type { ActorInstance } from "./instance";
import * as errors from "./errors";
import { generateSecureToken } from "./utils";
import { CachedSerializer, Encoding } from "./protocol/serde";
import { logger } from "./log";
import { ConnDriver } from "./driver";
import { CachedSerializer } from "./protocol/serde";
import type { ConnDriver } from "./driver";
import * as messageToClient from "@/actor/protocol/message/to-client";
import { Actions } from "./config";
import type { PersistedConn } from "./persisted";

export function generateConnId(): string {
return crypto.randomUUID();
Expand All @@ -22,29 +16,6 @@ export function generateConnToken(): string {

export type ConnId = string;

/** Object representing connection that gets persisted to storage. */
export interface PersistedConn<CP, CS> {
// ID
i: string;
// Token
t: string;
// Connection driver
d: string;
// Connection driver state
ds: unknown;
// Parameters
p: CP;
// State
s: CS;
// Subscriptions
su: PersistedSub[];
}

export interface PersistedSub {
// Event name
n: string;
}

export type AnyConn = Conn<any, any, any, any>;

/**
Expand Down
13 changes: 2 additions & 11 deletions packages/actor-core/src/actor/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,12 @@ import { AnyConn } from "./connection";

export type ConnDrivers = Record<string, ConnDriver>;

export type KvKey = unknown[];
export type KvValue = unknown;


export interface ActorDriver {
//load(): Promise<LoadOutput>;
getContext(actorId: string): unknown;

// HACK: Clean these up
kvGet(actorId: string, key: KvKey): Promise<KvValue | undefined>;
kvGetBatch(actorId: string, key: KvKey[]): Promise<(KvValue | undefined)[]>;
kvPut(actorId: string, key: KvKey, value: KvValue): Promise<void>;
kvPutBatch(actorId: string, key: [KvKey, KvValue][]): Promise<void>;
kvDelete(actorId: string, key: KvKey): Promise<void>;
kvDeleteBatch(actorId: string, key: KvKey[]): Promise<void>;
readPersistedData(actorId: string): Promise<unknown | undefined>;
writePersistedData(actorId: string, unknown: unknown): Promise<void>;

// Schedule
setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void>;
Expand Down
155 changes: 128 additions & 27 deletions packages/actor-core/src/actor/instance.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import type { PersistedConn } from "./connection";
import type { Logger } from "@/common//log";
import { type ActorTags, isJsonSerializable, stringifyError } from "@/common//utils";
import {
type ActorTags,
isJsonSerializable,
stringifyError,
} from "@/common//utils";
import onChange from "on-change";
import type { ActorConfig } from "./config";
import { Conn, type ConnId } from "./connection";
Expand All @@ -9,15 +12,15 @@ import type { ConnDriver } from "./driver";
import * as errors from "./errors";
import { processMessage } from "./protocol/message/mod";
import { instanceLogger, logger } from "./log";
import { ActionContext } from "./action";
import type { ActionContext } from "./action";
import { Lock, deadline } from "./utils";
import { Schedule } from "./schedule";
import { KEYS } from "./keys";
import type * as wsToServer from "@/actor/protocol/message/to-server";
import { CachedSerializer } from "./protocol/serde";
import { ActorInspector } from "@/inspector/actor";
import { ActorContext } from "./context";
import invariant from "invariant";
import type { PersistedActor, PersistedConn, PersistedScheduleEvents } from "./persisted";

/**
* Options for the `_saveState` method.
Expand Down Expand Up @@ -72,14 +75,6 @@ export type ExtractActorConnState<A extends AnyActorInstance> =
? ConnState
: never;

/** State object that gets automatically persisted to storage. */
interface PersistedActor<S, CP, CS> {
// State
s: S;
// Connections
c: PersistedConn<CP, CS>[];
}

export class ActorInstance<S, CP, CS, V> {
// Shared actor context for this instance
actorContext: ActorContext<S, CP, CS, V>;
Expand Down Expand Up @@ -155,7 +150,7 @@ export class ActorInstance<S, CP, CS, V> {
this.#name = name;
this.#tags = tags;
this.#region = region;
this.#schedule = new Schedule(this, actorDriver);
this.#schedule = new Schedule(this);
this.inspector = new ActorInspector(this);

// Initialize server
Expand All @@ -171,7 +166,12 @@ export class ActorInstance<S, CP, CS, V> {
let vars: V | undefined = undefined;
if ("createVars" in this.#config) {
const dataOrPromise = this.#config.createVars(
this.actorContext as unknown as ActorContext<undefined, undefined, undefined, undefined>,
this.actorContext as unknown as ActorContext<
undefined,
undefined,
undefined,
undefined
>,
this.#actorDriver.getContext(this.#actorId),
);
if (dataOrPromise instanceof Promise) {
Expand Down Expand Up @@ -200,8 +200,101 @@ export class ActorInstance<S, CP, CS, V> {
this.#ready = true;
}

async scheduleEvent(
timestamp: number,
fn: string,
args: unknown[],
): Promise<void> {
// Build event
const eventId = crypto.randomUUID();
const newEvent: PersistedScheduleEvents = {
e: eventId,
t: timestamp,
a: fn,
ar: args,
};

this.actorContext.log.info("scheduling event", {
event: eventId,
timestamp,
action: fn
});

// Insert event in to index
const insertIndex = this.#persist.e.findIndex((x) => x.t > newEvent.t);
if (insertIndex === -1) {
this.#persist.e.push(newEvent);
} else {
this.#persist.e.splice(insertIndex, 0, newEvent);
}

// Update alarm if:
// - this is the newest event (i.e. at beginning of array) or
// - this is the only event (i.e. the only event in the array)
if (insertIndex === 0 || this.#persist.e.length === 1) {
this.actorContext.log.info("setting alarm", { timestamp });
await this.#actorDriver.setAlarm(this, newEvent.t);
}
}

async onAlarm() {
await this.#schedule.__onAlarm();
const now = Date.now();
this.actorContext.log.debug("alarm triggered", { now, events: this.#persist.e.length });

// Remove events from schedule that we're about to run
const runIndex = this.#persist.e.findIndex((x) => x.t <= now);
if (runIndex === -1) {
this.actorContext.log.debug("no events to run", { now });
return;
}
const scheduleEvents = this.#persist.e.splice(0, runIndex + 1);
this.actorContext.log.debug("running events", { count: scheduleEvents.length });

// Set alarm for next event
if (this.#persist.e.length > 0) {
await this.#actorDriver.setAlarm(this, this.#persist.e[0].t);
}

// Iterate by event key in order to ensure we call the events in order
for (const event of scheduleEvents) {
try {
this.actorContext.log.info("running action for event", {
event: event.e,
timestamp: event.t,
action: event.a,
args: event.ar
});

// Look up function
const fn: unknown = this.#config.actions[event.a];
if (!fn) throw new Error(`Missing action for alarm ${event.a}`);
if (typeof fn !== "function")
throw new Error(
`Alarm function lookup for ${event.a} returned ${typeof fn}`,
);

// Call function
try {
await fn.call(undefined, this.actorContext, ...event.ar);
} catch (error) {
this.actorContext.log.error("error while running event", {
error: stringifyError(error),
event: event.e,
timestamp: event.t,
action: event.a,
args: event.ar,
});
}
} catch (error) {
this.actorContext.log.error("internal error while running event", {
error: stringifyError(error),
event: event.e,
timestamp: event.t,
action: event.a,
args: event.ar,
});
}
}
}

get stateEnabled() {
Expand Down Expand Up @@ -268,9 +361,8 @@ export class ActorInstance<S, CP, CS, V> {
this.#persistChanged = false;

// Write to KV
await this.#actorDriver.kvPut(
await this.#actorDriver.writePersistedData(
this.#actorId,
KEYS.STATE.DATA,
this.#persistRaw,
);

Expand Down Expand Up @@ -359,12 +451,11 @@ export class ActorInstance<S, CP, CS, V> {

async #initialize() {
// Read initial state
const [initialized, persistData] = (await this.#actorDriver.kvGetBatch(
const persistData = (await this.#actorDriver.readPersistedData(
this.#actorId,
[KEYS.STATE.INITIALIZED, KEYS.STATE.DATA],
)) as [boolean, PersistedActor<S, CP, CS>];
)) as PersistedActor<S, CP, CS>;

if (initialized) {
if (persistData !== undefined) {
logger().info("actor restoring", {
connections: persistData.c.length,
});
Expand Down Expand Up @@ -406,7 +497,12 @@ export class ActorInstance<S, CP, CS, V> {

// Convert state to undefined since state is not defined yet here
stateData = await this.#config.createState(
this.actorContext as unknown as ActorContext<undefined, undefined, undefined, undefined>,
this.actorContext as unknown as ActorContext<
undefined,
undefined,
undefined,
undefined
>,
);
} else if ("state" in this.#config) {
stateData = structuredClone(this.#config.state);
Expand All @@ -420,14 +516,12 @@ export class ActorInstance<S, CP, CS, V> {
const persist: PersistedActor<S, CP, CS> = {
s: stateData as S,
c: [],
e: [],
};

// Update state
logger().debug("writing state");
await this.#actorDriver.kvPutBatch(this.#actorId, [
[KEYS.STATE.INITIALIZED, true],
[KEYS.STATE.DATA, persist],
]);
await this.#actorDriver.writePersistedData(this.#actorId, persist);

this.#setPersist(persist);
}
Expand Down Expand Up @@ -509,7 +603,12 @@ export class ActorInstance<S, CP, CS, V> {
if (this.#connStateEnabled) {
if ("createConnState" in this.#config) {
const dataOrPromise = this.#config.createConnState(
this.actorContext as unknown as ActorContext<undefined, undefined, undefined, undefined>,
this.actorContext as unknown as ActorContext<
undefined,
undefined,
undefined,
undefined
>,
onBeforeConnectOpts,
);
if (dataOrPromise instanceof Promise) {
Expand Down Expand Up @@ -723,6 +822,8 @@ export class ActorInstance<S, CP, CS, V> {
rpcName: string,
args: unknown[],
): Promise<unknown> {
invariant(this.#ready, "exucuting rpc before ready");

// Prevent calling private or reserved methods
if (!(rpcName in this.#config.actions)) {
logger().warn("rpc does not exist", { rpcName });
Expand Down
16 changes: 0 additions & 16 deletions packages/actor-core/src/actor/keys.ts

This file was deleted.

43 changes: 43 additions & 0 deletions packages/actor-core/src/actor/persisted.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/** State object that gets automatically persisted to storage. */
export interface PersistedActor<S, CP, CS> {
// State
s: S;
// Connections
c: PersistedConn<CP, CS>[];
// Scheduled events
e: PersistedScheduleEvents[];
}

/** Object representing connection that gets persisted to storage. */
export interface PersistedConn<CP, CS> {
// ID
i: string;
// Token
t: string;
// Connection driver
d: string;
// Connection driver state
ds: unknown;
// Parameters
p: CP;
// State
s: CS;
// Subscriptions
su: PersistedSubscription[];
}

export interface PersistedSubscription {
// Event name
n: string;
}

export interface PersistedScheduleEvents {
// Event ID
e: string;
// Timestamp
t: number;
// Action name
a: string;
// Arguments
ar: unknown[];
}
Loading
Loading