Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions frontend/src/app/data-providers/cloud-data-provider.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Clerk } from "@clerk/clerk-js";
import { type Rivet, RivetClient } from "@rivet-gg/cloud";
import { type FetchFunction, fetcher } from "@rivetkit/engine-api-full/core";
import { infiniteQueryOptions, QueryKey, QueryOptions, queryOptions, UseQueryOptions } from "@tanstack/react-query";
import { fetcher } from "@rivetkit/engine-api-full/core";
import { infiniteQueryOptions, queryOptions } from "@tanstack/react-query";
import { cloudEnv } from "@/lib/env";
import { queryClient } from "@/queries/global";
import { RECORDS_PER_PAGE } from "./default-data-provider";
Expand Down Expand Up @@ -339,19 +339,21 @@ export const createNamespaceContext = ({
} & ReturnType<typeof createProjectContext> &
ReturnType<typeof createOrganizationContext> &
ReturnType<typeof createGlobalContext>) => {
const token = async () => {
const response = await queryClient.fetchQuery(
parent.accessTokenQueryOptions({ namespace }),
);

return response.token;
};
return {
...createEngineNamespaceContext({
...parent,
namespace: engineNamespaceName,
namespaceId: engineNamespaceId,
engineToken: token,
client: createEngineClient(cloudEnv().VITE_APP_API_URL, {
token: async () => {
const response = await queryClient.fetchQuery(
parent.accessTokenQueryOptions({ namespace }),
);

return response.token;
},
token,
}),
}),
namespaceQueryOptions() {
Expand Down
1 change: 1 addition & 0 deletions frontend/src/app/data-providers/default-data-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ const defaultContext = {
destroyedAt: data.destroyedAt
? new Date(data.destroyedAt)
: null,
runner: data.runner ?? undefined,
sleepingAt: data.sleepingAt ? new Date(data.sleepingAt) : null,
startedAt: data.startedAt ? new Date(data.startedAt) : null,
}),
Expand Down
11 changes: 7 additions & 4 deletions frontend/src/app/data-providers/engine-data-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ export function createClient(
environment: "",
...opts,
fetcher: async (args) => {
Object.keys(args.headers).forEach((key) => {
Object.keys(args.headers || {}).forEach((key) => {
if (key.toLowerCase().startsWith("x-fern-")) {
delete args.headers[key];
delete args.headers?.[key];
}
});
return await fetcher(args);
Expand All @@ -60,13 +60,14 @@ export function createClient(
}

export const createGlobalContext = (opts: {
engineToken: (() => string) | string;
engineToken: (() => string) | string | (() => Promise<string>);
}) => {
const client = createClient(engineEnv().VITE_APP_API_URL, {
token: opts.engineToken,
});
return {
client,
...opts,
namespacesQueryOptions() {
return infiniteQueryOptions({
queryKey: ["namespaces"] as any,
Expand Down Expand Up @@ -138,6 +139,7 @@ export const createNamespaceContext = ({
const def = createDefaultGlobalContext();
const dataProvider = {
...def,
endpoint: engineEnv().VITE_APP_API_URL,
features: {
canCreateActors: true,
canDeleteActors: true,
Expand Down Expand Up @@ -190,7 +192,7 @@ export const createNamespaceContext = ({
return queryOptions({
...def.regionQueryOptions(regionId),
queryKey: [
{ namespace, namespaceId },
{ namespace },
...def.regionQueryOptions(regionId).queryKey,
],
queryFn: async ({ client }) => {
Expand Down Expand Up @@ -426,6 +428,7 @@ export const createNamespaceContext = ({

return {
engineNamespace: namespace,
engineToken: parent.engineToken,
...dataProvider,
runnersQueryOptions() {
return infiniteQueryOptions({
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/app/dialogs/connect-railway-frame.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ export function EnvVariablesStep() {
<p>Value</p>
</Label>
<RivetEndpointEnv />
<RivetNamespaceEnv />
<RivetTokenEnv />
<RivetNamespaceEnv />
<RivetRunnerEnv />
</div>
<div className="mt-2 flex justify-end">
Expand Down
12 changes: 4 additions & 8 deletions frontend/src/components/actors/actor-editable-state.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ import {
JsonCode,
} from "@/components/code-mirror";
import { ActorStateChangeIndicator } from "./actor-state-change-indicator";
import {
type ActorId,
useActorStatePatchMutation,
useActorStateStream,
} from "./queries";
import { type ActorId, useActorStatePatchMutation } from "./queries";

const isValidJson = (json: string | null): json is string => {
if (!json) return false;
Expand Down Expand Up @@ -50,7 +46,7 @@ export function ActorEditableState({

const isValid = isValidJson(value) ? JSON.parse(value) : false;

const { mutate, isPending } = useActorStatePatchMutation(actorId);
const { mutateAsync, isPending } = useActorStatePatchMutation(actorId);

// useActorStateStream(actorId);

Expand Down Expand Up @@ -88,8 +84,8 @@ export function ActorEditableState({
variant="outline"
isLoading={isPending}
disabled={!isValid || !isEditing}
onClick={() => {
mutate(JSON.parse(value || ""));
onClick={async () => {
await mutateAsync(JSON.parse(value || ""));
setIsEditing(false);
setValue(null);
}}
Expand Down
11 changes: 7 additions & 4 deletions frontend/src/components/actors/queries/actor.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { fetchEventSource } from "@microsoft/fetch-event-source";
import { useMutation, useQueryClient } from "@tanstack/react-query";
import { applyPatch, compare } from "fast-json-patch";
import { compare } from "fast-json-patch";
import { useCallback, useEffect, useMemo } from "react";
import type { ActorId, Patch, RecordedRealtimeEvent } from "rivetkit/inspector";
import type { ActorId, RecordedRealtimeEvent } from "rivetkit/inspector";
import { useActor } from "../actor-queries-context";
import { useAsyncMemo } from "@/components/hooks/use-async-memo";

Expand Down Expand Up @@ -59,12 +59,15 @@ export const useActorStatePatchMutation = (
if (!response.ok) {
throw response;
}
return await response.json();
return (await response.json()) as {
state: unknown;
enabled: boolean;
};
},
onSuccess: (data) => {
queryClient.setQueryData(
queries.actorStateQueryOptions(actorId).queryKey,
() => ({ enabled: true, state: data }),
() => ({ enabled: true, state: data.state }),
);
},
...options,
Expand Down
54 changes: 42 additions & 12 deletions frontend/src/components/actors/worker/actor-repl.worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { fromJs } from "esast-util-from-js";
import { toJs } from "estree-util-to-js";
import { createClient } from "rivetkit/client";
import { createActorInspectorClient } from "rivetkit/inspector";
import {
createHighlighterCore,
createOnigurumaEngine,
type HighlighterCore,
} from "shiki";
import { match } from "ts-pattern";
import {
type InitMessage,
MessageSchema,
Expand Down Expand Up @@ -113,12 +114,7 @@ addEventListener("message", async (event) => {
}

if (data.type === "init") {
init = {
rpcs: data.rpcs ?? [],
endpoint: data.endpoint,
name: data.name,
id: data.id,
};
init = structuredClone(data);
respond({
type: "ready",
});
Expand Down Expand Up @@ -181,9 +177,43 @@ function respond(msg: Response) {
async function callAction({ name, args }: { name: string; args: unknown[] }) {
if (!init) throw new Error("Actor not initialized");

const client = createClient({
endpoint: init.endpoint,
token: init.engineToken,
}).getForId(init.name, init.id);
return await client.action({ name, args });
const url = new URL(`inspect`, init.endpoint).href;

// we need to build this from scratch because we don't have access to
// createInspectorActorContext in the worker
// and we want to avoid bundling the entire RivetKit here, issues with @react-refresh
const client = createActorInspectorClient(url, {
headers: {
Authorization: init.inspectorToken
? `Bearer ${init.inspectorToken}`
: "",
"x-rivet-target": "actor",
"x-rivet-actor": init.id,
"X-RivetKit-Query": JSON.stringify({
getForId: { actorId: init.id },
}),

...match(__APP_TYPE__)
.with("engine", () => {
return init?.engineToken
? { "X-Rivet-Token": init.engineToken }
: {};
})
.otherwise(() => ({})),
},
});

const response = await client.action.$post({
json: { name, params: args },
});

if (!response.ok) {
try {
return await response.json();
} catch {
return await response.text();
}
}

return (await response.json()).result;
}
50 changes: 26 additions & 24 deletions frontend/src/components/actors/worker/actor-worker-container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,38 @@ export type ContainerState = {
commands: ReplCommand[];
};

interface Meta {
actorId: string;
rpcs: string[];
endpoint?: string;
name?: string;
engineToken?: string | (() => Promise<string>) | (() => string);
runnerName?: string;
namespace?: string;
inspectorToken?: string;
}

export class ActorWorkerContainer {
#state: ContainerState = {
status: { type: "unknown" },
commands: [],
};

#meta: {
actorId: string;
rpcs: string[];
endpoint?: string;
name?: string;
engineToken?: string;
} | null = null;
#meta: Meta | null = null;

#listeners: (() => void)[] = [];
#worker: Worker | undefined;

//
async init({
actorId,
signal,
rpcs = [],
endpoint,
name,
engineToken,
...meta
}: {
actorId: string;
signal: AbortSignal;
rpcs?: string[];
endpoint?: string;
name?: string;
engineToken?: string;
}) {
} & Meta) {
this.terminate();

this.#meta = { actorId, rpcs, endpoint, name, engineToken };
this.#meta = { ...meta };
this.#state.status = { type: "pending" };
this.#update();
try {
Expand All @@ -81,12 +77,12 @@ export class ActorWorkerContainer {

// If we reached this point, the actor is supported
// check if we still operate on the same actor
if (this.#meta.actorId !== actorId) {
if (this.#meta.actorId !== meta.actorId) {
// if not, we don't need to do anything
return null;
}

const worker = new ActorWorker({ name: `actor-${actorId}` });
const worker = new ActorWorker({ name: `actor-${meta.actorId}` });
signal.throwIfAborted();
// now worker needs to check if the actor is supported
this.#setupWorker(worker);
Expand Down Expand Up @@ -123,7 +119,7 @@ export class ActorWorkerContainer {
this.#update();
}

#setupWorker(worker: Worker) {
async #setupWorker(worker: Worker) {
this.#worker = worker;
this.#worker.addEventListener("message", (event) => {
try {
Expand All @@ -145,7 +141,13 @@ export class ActorWorkerContainer {
id: this.#meta?.actorId ?? "",
endpoint: this.#meta?.endpoint ?? "",
name: this.#meta?.name ?? "",
engineToken: this.#meta?.engineToken ?? "",
engineToken:
typeof this.#meta?.engineToken === "function"
? await this.#meta?.engineToken()
: this.#meta?.engineToken,
namespace: this.#meta?.namespace,
runnerName: this.#meta?.runnerName,
inspectorToken: this.#meta?.inspectorToken,
} satisfies InitMessage);
}

Expand Down Expand Up @@ -238,9 +240,9 @@ export class ActorWorkerContainer {
}

if (msg.type === "error") {
console.error("Actor Worker Error", msg);
if (!msg.id) {
this.#state.status = { type: "error", error: msg.data };
console.error("Actor Worker Error", msg.data);
this.#update();
return;
}
Expand Down
Loading
Loading