Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/rivetkit/scripts/dump-openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod";
import type { ManagerDriver } from "@/manager/driver";
import { createManagerRouter } from "@/manager/router";
import { type RegistryConfig, RegistryConfigSchema, setup } from "@/mod";
import { type RunConfig, RunConfigSchema } from "@/registry/run-config";
import { type RunnerConfig, RunnerConfigSchema } from "@/registry/run-config";
import { VERSION } from "@/utils";

function main() {
Expand All @@ -13,7 +13,7 @@ function main() {
});
const registry = setup(registryConfig);

const driverConfig: RunConfig = RunConfigSchema.parse({
const driverConfig: RunnerConfig = RunnerConfigSchema.parse({
driver: createFileSystemOrMemoryDriver(false),
getUpgradeWebSocket: () => () => unimplemented(),
inspector: {
Expand Down
4 changes: 2 additions & 2 deletions packages/rivetkit/src/actor/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import type { Context as HonoContext } from "hono";
import type { AnyClient } from "@/client/client";
import type { ManagerDriver } from "@/manager/driver";
import type { RegistryConfig } from "@/registry/config";
import type { RunConfig } from "@/registry/run-config";
import type { RunnerConfig } from "@/registry/run-config";
import type { AnyActorInstance } from "./instance";

export type ActorDriverBuilder = (
registryConfig: RegistryConfig,
runConfig: RunConfig,
runConfig: RunnerConfig,
managerDriver: ManagerDriver,
inlineClient: AnyClient,
) => ActorDriver;
Expand Down
12 changes: 6 additions & 6 deletions packages/rivetkit/src/actor/router-endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type { UpgradeWebSocketArgs } from "@/common/inline-websocket-adapter2";
import { deconstructError, stringifyError } from "@/common/utils";
import type { UniversalWebSocket } from "@/common/websocket-interface";
import { HonoWebSocketAdapter } from "@/manager/hono-websocket-adapter";
import type { RunConfig } from "@/registry/run-config";
import type { RunnerConfig } from "@/registry/run-config";
import type * as protocol from "@/schemas/client-protocol/mod";
import {
HTTP_ACTION_REQUEST_VERSIONED,
Expand Down Expand Up @@ -106,7 +106,7 @@ export interface WebSocketOpts {
*/
export async function handleWebSocketConnect(
req: Request | undefined,
runConfig: RunConfig,
runConfig: RunnerConfig,
actorDriver: ActorDriver,
actorId: string,
encoding: Encoding,
Expand Down Expand Up @@ -325,7 +325,7 @@ export async function handleWebSocketConnect(
*/
export async function handleSseConnect(
c: HonoContext,
_runConfig: RunConfig,
_runConfig: RunnerConfig,
actorDriver: ActorDriver,
actorId: string,
) {
Expand Down Expand Up @@ -437,7 +437,7 @@ export async function handleSseConnect(
*/
export async function handleAction(
c: HonoContext,
_runConfig: RunConfig,
_runConfig: RunnerConfig,
actorDriver: ActorDriver,
actionName: string,
actorId: string,
Expand Down Expand Up @@ -505,7 +505,7 @@ export async function handleAction(
*/
export async function handleConnectionMessage(
c: HonoContext,
_runConfig: RunConfig,
_runConfig: RunnerConfig,
actorDriver: ActorDriver,
connId: string,
connToken: string,
Expand Down Expand Up @@ -542,7 +542,7 @@ export async function handleConnectionMessage(

export async function handleConnectionClose(
c: HonoContext,
_runConfig: RunConfig,
_runConfig: RunnerConfig,
actorDriver: ActorDriver,
connId: string,
connToken: string,
Expand Down
4 changes: 2 additions & 2 deletions packages/rivetkit/src/actor/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import {
createActorInspectorRouter,
} from "@/inspector/actor";
import { isInspectorEnabled, secureInspector } from "@/inspector/utils";
import type { RunConfig } from "@/registry/run-config";
import type { RunnerConfig } from "@/registry/run-config";
import { ConnDriverKind } from "./conn-drivers";
import type { ActorDriver } from "./driver";
import { InternalError } from "./errors";
Expand All @@ -67,7 +67,7 @@ export type ActorRouter = Hono<{ Bindings: ActorRouterBindings }>;
* Creates a router that runs on the partitioned instance.
*/
export function createActorRouter(
runConfig: RunConfig,
runConfig: RunnerConfig,
actorDriver: ActorDriver,
isTest: boolean,
): ActorRouter {
Expand Down
44 changes: 17 additions & 27 deletions packages/rivetkit/src/client/config.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,32 @@
import z from "zod";
import { TransportSchema } from "@/actor/protocol/old";
import { EncodingSchema } from "@/actor/protocol/serde";
import { getEnvUniversal, type UpgradeWebSocket } from "@/utils";

export type GetUpgradeWebSocket = () => UpgradeWebSocket;
import { type GetUpgradeWebSocket, getEnvUniversal } from "@/utils";

export const ClientConfigSchema = z.object({
/** Configure serving the API */
api: z
.object({
host: z.string().default("127.0.0.1"),
port: z.number().default(6420),
})
.default({}),
/** Endpoint to connect to for Rivet Engine or RivetKit manager API. */
endpoint: z
.string()
.optional()
.transform(
(x) =>
x ??
getEnvUniversal("RIVET_ENGINE") ??
getEnvUniversal("RIVET_ENDPOINT"),
),

/** Token to use to authenticate with the API. */
token: z
.string()
.optional()
.transform((x) => x ?? getEnvUniversal("RIVET_TOKEN")),

headers: z.record(z.string()).optional().default({}),

/** Endpoint to connect to the Rivet engine. Can be configured via RIVET_ENGINE env var. */
endpoint: z
.string()
.nullable()
.default(
() =>
getEnvUniversal("RIVET_ENGINE") ??
getEnvUniversal("RIVET_ENDPOINT") ??
null,
),

/** Namespace to connect to. */
namespace: z
.string()
.default(() => getEnvUniversal("RIVET_NAMESPACE") ?? "default"),

/** Name of the runner. This is used to group together runners in to different pools. */
runnerName: z
.string()
.default(() => getEnvUniversal("RIVET_RUNNER") ?? "rivetkit"),
Expand All @@ -44,10 +35,9 @@ export const ClientConfigSchema = z.object({

transport: TransportSchema.default("websocket"),

// This is a function to allow for lazy configuration of upgradeWebSocket on the
// fly. This is required since the dependencies that upgradeWebSocket
// (specifically Node.js) can sometimes only be specified after the router is
// created or must be imported async using `await import(...)`
headers: z.record(z.string()).optional().default({}),

// See RunConfig.getUpgradeWebSocket
getUpgradeWebSocket: z.custom<GetUpgradeWebSocket>().optional(),
});

Expand Down
5 changes: 4 additions & 1 deletion packages/rivetkit/src/driver-helpers/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ export type {
ManagerDisplayInformation,
ManagerDriver,
} from "@/manager/driver";
export { DriverConfigSchema, RunConfigSchema } from "@/registry/run-config";
export {
DriverConfigSchema,
RunnerConfigSchema as RunConfigSchema,
} from "@/registry/run-config";
export { serializeEmptyPersistData } from "./utils";
4 changes: 2 additions & 2 deletions packages/rivetkit/src/driver-test-suite/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { Transport } from "@/client/mod";
import { configureInspectorAccessToken } from "@/inspector/utils";
import { createManagerRouter } from "@/manager/router";
import type { DriverConfig, Registry, RunConfig } from "@/mod";
import { RunConfigSchema } from "@/registry/run-config";
import { RunnerConfigSchema } from "@/registry/run-config";
import { getPort } from "@/test/mod";
import { logger } from "./log";
import { runActionFeaturesTests } from "./tests/action-features";
Expand Down Expand Up @@ -199,7 +199,7 @@ export async function createTestRuntime(
// Build driver config
// biome-ignore lint/style/useConst: Assigned later
let upgradeWebSocket: any;
const config: RunConfig = RunConfigSchema.parse({
const config: RunConfig = RunnerConfigSchema.parse({
driver,
getUpgradeWebSocket: () => upgradeWebSocket!,
inspector: {
Expand Down
12 changes: 4 additions & 8 deletions packages/rivetkit/src/drivers/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { UserError } from "@/actor/errors";
import { loggerWithoutContext } from "@/actor/log";
import { createEngineDriver } from "@/drivers/engine/mod";
import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod";
import type { DriverConfig, RunConfig } from "@/registry/run-config";
import type { DriverConfig, RunnerConfig } from "@/registry/run-config";

/**
* Chooses the appropriate driver based on the run configuration.
*/
export function chooseDefaultDriver(runConfig: RunConfig): DriverConfig {
export function chooseDefaultDriver(runConfig: RunnerConfig): DriverConfig {
if (runConfig.endpoint && runConfig.driver) {
throw new UserError(
"Cannot specify both 'endpoint' and 'driver' in configuration",
Expand All @@ -24,16 +24,12 @@ export function chooseDefaultDriver(runConfig: RunConfig): DriverConfig {
return runConfig.driver;
}

if (runConfig.endpoint) {
if (runConfig.endpoint || runConfig.token) {
loggerWithoutContext().debug({
msg: "using rivet engine driver",
endpoint: runConfig.endpoint,
});
// TODO: Add all properties from config
return createEngineDriver({
endpoint: runConfig.endpoint,
token: runConfig.token,
});
return createEngineDriver(runConfig);
}

loggerWithoutContext().debug({ msg: "using default file system driver" });
Expand Down
22 changes: 11 additions & 11 deletions packages/rivetkit/src/drivers/engine/actor-driver.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type {
RunnerConfig as EngineRunnerConfig,
ActorConfig as RunnerActorConfig,
RunnerConfig,
} from "@rivetkit/engine-runner";
import { Runner } from "@rivetkit/engine-runner";
import * as cbor from "cbor-x";
Expand Down Expand Up @@ -34,13 +34,14 @@ import {
serializeEmptyPersistData,
} from "@/driver-helpers/mod";
import type { RegistryConfig } from "@/registry/config";
import type { RunConfig } from "@/registry/run-config";
import type { RunnerConfig } from "@/registry/run-config";
import { getEndpoint } from "@/remote-manager-driver/api-utils";
import {
type LongTimeoutHandle,
promiseWithResolvers,
setLongTimeout,
} from "@/utils";
import type { Config } from "./config";
import type { EngineConfig } from "./config";
import { KEYS } from "./kv";
import { logger } from "./log";

Expand All @@ -54,10 +55,10 @@ export type DriverContext = {};

export class EngineActorDriver implements ActorDriver {
#registryConfig: RegistryConfig;
#runConfig: RunConfig;
#runConfig: RunnerConfig;
#managerDriver: ManagerDriver;
#inlineClient: Client<any>;
#config: Config;
#config: EngineConfig;
#runner: Runner;
#actors: Map<string, ActorHandler> = new Map();
#actorRouter: ActorRouter;
Expand All @@ -69,10 +70,10 @@ export class EngineActorDriver implements ActorDriver {

constructor(
registryConfig: RegistryConfig,
runConfig: RunConfig,
runConfig: RunnerConfig,
managerDriver: ManagerDriver,
inlineClient: Client<any>,
config: Config,
config: EngineConfig,
) {
this.#registryConfig = registryConfig;
this.#runConfig = runConfig;
Expand All @@ -95,11 +96,10 @@ export class EngineActorDriver implements ActorDriver {

// Create runner configuration
let hasDisconnected = false;
const runnerConfig: RunnerConfig = {
const engineRunnerConfig: EngineRunnerConfig = {
version: this.#version,
endpoint: config.endpoint,
endpoint: getEndpoint(config),
token,
pegboardEndpoint: config.pegboardEndpoint,
namespace: runConfig.namespace ?? config.namespace,
totalSlots: runConfig.totalSlots ?? config.totalSlots,
runnerName: runConfig.runnerName ?? config.runnerName,
Expand Down Expand Up @@ -149,7 +149,7 @@ export class EngineActorDriver implements ActorDriver {
};

// Create and start runner
this.#runner = new Runner(runnerConfig);
this.#runner = new Runner(engineRunnerConfig);
this.#runner.start();
logger().debug({
msg: "engine runner started",
Expand Down
34 changes: 9 additions & 25 deletions packages/rivetkit/src/drivers/engine/config.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,22 @@
import type { Hono } from "hono";
import { z } from "zod";
import { ClientConfigSchema } from "@/client/config";
import { getEnvUniversal } from "@/utils";

export const ConfigSchema = z
export const EngingConfigSchema = z
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a typo in the schema name EngingConfigSchema which should be EngineConfigSchema. This needs to be corrected for consistency with how it's referenced elsewhere in the codebase.

Suggested change
export const EngingConfigSchema = z
export const EngineConfigSchema = z

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

.object({
app: z.custom<Hono>().optional(),
endpoint: z
.string()
.default(
() =>
getEnvUniversal("RIVET_ENGINE") ??
getEnvUniversal("RIVET_ENDPOINT") ??
"http://localhost:6420",
),
token: z
.string()
.optional()
.transform((val) => val ?? getEnvUniversal("RIVET_TOKEN")),
pegboardEndpoint: z.string().optional(),
namespace: z
.string()
.default(() => getEnvUniversal("RIVET_NAMESPACE") ?? "default"),
runnerName: z
.string()
.default(() => getEnvUniversal("RIVET_RUNNER") ?? "rivetkit"),
// TODO: Automatically attempt to determine key by common env vars (e.g. k8s pod name)
/** Unique key for this runner. Runners connecting a given key will replace any other runner connected with the same key. */
runnerKey: z
.string()
.default(
() => getEnvUniversal("RIVET_RUNNER_KEY") ?? crypto.randomUUID(),
),

/** How many actors this runner can run. */
totalSlots: z.number().default(100_000),
})
// We include the client config since this includes the common properties like endpoint, namespace, etc.
.merge(ClientConfigSchema)
.default({});

export type InputConfig = z.input<typeof ConfigSchema>;
export type Config = z.infer<typeof ConfigSchema>;
export type EngineConfig = z.infer<typeof EngingConfigSchema>;
export type EngineConfigInput = z.input<typeof EngingConfigSchema>;
18 changes: 12 additions & 6 deletions packages/rivetkit/src/drivers/engine/mod.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import type { Client } from "@/client/client";
import type { ManagerDriver } from "@/manager/driver";
import type { RegistryConfig } from "@/registry/config";
import type { DriverConfig, RunConfig } from "@/registry/run-config";
import type { DriverConfig, RunnerConfig } from "@/registry/run-config";
import { RemoteManagerDriver } from "@/remote-manager-driver/mod";
import { EngineActorDriver } from "./actor-driver";
import { ConfigSchema, type InputConfig } from "./config";
import { type EngineConfigInput, EngingConfigSchema } from "./config";

export { EngineActorDriver } from "./actor-driver";
export { type Config, ConfigSchema, type InputConfig } from "./config";
export {
type EngineConfig as Config,
type EngineConfigInput as InputConfig,
EngingConfigSchema as ConfigSchema,
} from "./config";

export function createEngineDriver(inputConfig?: InputConfig): DriverConfig {
const config = ConfigSchema.parse(inputConfig);
export function createEngineDriver(
inputConfig?: EngineConfigInput,
): DriverConfig {
const config = EngingConfigSchema.parse(inputConfig);

return {
name: "engine",
Expand All @@ -19,7 +25,7 @@ export function createEngineDriver(inputConfig?: InputConfig): DriverConfig {
},
actor: (
registryConfig: RegistryConfig,
runConfig: RunConfig,
runConfig: RunnerConfig,
managerDriver: ManagerDriver,
inlineClient: Client<any>,
) => {
Expand Down
Loading
Loading