Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 108 additions & 1 deletion apps/web/src/environments/runtime/service.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { describe, expect, it } from "vitest";

import { shouldApplyTerminalEvent } from "./service";
import {
shouldApplyProjectionEvent,
shouldApplyProjectionSnapshot,
shouldApplyTerminalEvent,
} from "./service";

describe("shouldApplyTerminalEvent", () => {
it("applies terminal events for draft-only threads", () => {
Expand Down Expand Up @@ -39,3 +43,106 @@ describe("shouldApplyTerminalEvent", () => {
).toBe(true);
});
});

describe("shouldApplyProjectionSnapshot", () => {
it("accepts the first snapshot for an environment", () => {
expect(
shouldApplyProjectionSnapshot({
current: null,
next: {
snapshotSequence: 1,
updatedAt: "2026-04-22T10:00:00.000Z",
},
}),
).toBe(true);
});

it("drops snapshots with an older sequence", () => {
expect(
shouldApplyProjectionSnapshot({
current: {
sequence: 5,
updatedAt: "2026-04-22T10:05:00.000Z",
},
next: {
snapshotSequence: 4,
updatedAt: "2026-04-22T10:06:00.000Z",
},
}),
).toBe(false);
});

it("drops snapshots with the same sequence and older timestamp", () => {
expect(
shouldApplyProjectionSnapshot({
current: {
sequence: 5,
updatedAt: "2026-04-22T10:05:00.000Z",
},
next: {
snapshotSequence: 5,
updatedAt: "2026-04-22T10:04:59.000Z",
},
}),
).toBe(false);
});

it("accepts snapshots with the same sequence and a newer timestamp", () => {
expect(
shouldApplyProjectionSnapshot({
current: {
sequence: 5,
updatedAt: "2026-04-22T10:05:00.000Z",
},
next: {
snapshotSequence: 5,
updatedAt: "2026-04-22T10:05:01.000Z",
},
}),
).toBe(true);
});
});

describe("shouldApplyProjectionEvent", () => {
it("accepts the first event for an environment", () => {
expect(
shouldApplyProjectionEvent({
current: null,
sequence: 1,
}),
).toBe(true);
});

it("drops stale or duplicate events", () => {
expect(
shouldApplyProjectionEvent({
current: {
sequence: 5,
updatedAt: "2026-04-22T10:05:00.000Z",
},
sequence: 5,
}),
).toBe(false);
expect(
shouldApplyProjectionEvent({
current: {
sequence: 5,
updatedAt: "2026-04-22T10:05:00.000Z",
},
sequence: 4,
}),
).toBe(false);
});

it("accepts newer events", () => {
expect(
shouldApplyProjectionEvent({
current: {
sequence: 5,
updatedAt: "2026-04-22T10:05:00.000Z",
},
sequence: 6,
}),
).toBe(true);
});
});
121 changes: 121 additions & 0 deletions apps/web/src/environments/runtime/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ type ThreadDetailSubscriptionEntry = {
const environmentConnections = new Map<EnvironmentId, EnvironmentConnection>();
const environmentConnectionListeners = new Set<() => void>();
const threadDetailSubscriptions = new Map<string, ThreadDetailSubscriptionEntry>();
const lastAppliedProjectionVersionByEnvironment = new Map<
EnvironmentId,
{
readonly sequence: number;
readonly updatedAt: string | null;
}
>();

let activeService: EnvironmentServiceState | null = null;
let needsProviderInvalidation = false;
Expand All @@ -102,6 +109,98 @@ const THREAD_DETAIL_SUBSCRIPTION_IDLE_EVICTION_MS = 15 * 60 * 1000;
const MAX_CACHED_THREAD_DETAIL_SUBSCRIPTIONS = 32;
const NOOP = () => undefined;

function compareAppliedProjectionVersion(
left: { readonly sequence: number; readonly updatedAt: string | null },
right: { readonly sequence: number; readonly updatedAt: string | null },
): number {
if (left.sequence !== right.sequence) {
return left.sequence - right.sequence;
}

const leftUpdatedAt = left.updatedAt ?? "";
const rightUpdatedAt = right.updatedAt ?? "";
if (leftUpdatedAt === rightUpdatedAt) {
return 0;
}

return leftUpdatedAt < rightUpdatedAt ? -1 : 1;
}

function toAppliedProjectionVersion(
snapshot: Pick<OrchestrationShellSnapshot, "snapshotSequence" | "updatedAt">,
): {
readonly sequence: number;
readonly updatedAt: string;
} {
return {
sequence: snapshot.snapshotSequence,
updatedAt: snapshot.updatedAt,
};
}

export function shouldApplyProjectionSnapshot(input: {
readonly current: {
readonly sequence: number;
readonly updatedAt: string | null;
} | null;
readonly next: Pick<OrchestrationShellSnapshot, "snapshotSequence" | "updatedAt">;
}): boolean {
if (input.current === null) {
return true;
}

return compareAppliedProjectionVersion(input.current, toAppliedProjectionVersion(input.next)) < 0;
}

export function shouldApplyProjectionEvent(input: {
readonly current: {
readonly sequence: number;
readonly updatedAt: string | null;
} | null;
readonly sequence: number;
}): boolean {
if (input.current === null) {
return true;
}

return input.sequence > input.current.sequence;
}

function readLastAppliedProjectionVersion(environmentId: EnvironmentId): {
readonly sequence: number;
readonly updatedAt: string | null;
} | null {
return lastAppliedProjectionVersionByEnvironment.get(environmentId) ?? null;
}

function markAppliedProjectionSnapshot(
environmentId: EnvironmentId,
snapshot: Pick<OrchestrationShellSnapshot, "snapshotSequence" | "updatedAt">,
): void {
const nextVersion = toAppliedProjectionVersion(snapshot);
const currentVersion = readLastAppliedProjectionVersion(environmentId);
if (
currentVersion !== null &&
compareAppliedProjectionVersion(currentVersion, nextVersion) >= 0
) {
return;
}

lastAppliedProjectionVersionByEnvironment.set(environmentId, nextVersion);
}

function markAppliedProjectionEvent(environmentId: EnvironmentId, sequence: number): void {
const currentVersion = readLastAppliedProjectionVersion(environmentId);
if (currentVersion !== null && sequence <= currentVersion.sequence) {
return;
}

lastAppliedProjectionVersionByEnvironment.set(environmentId, {
sequence,
updatedAt: currentVersion?.updatedAt ?? null,
});
}

function getThreadDetailSubscriptionKey(environmentId: EnvironmentId, threadId: ThreadId): string {
return scopedThreadKey(scopeThreadRef(environmentId, threadId));
}
Expand Down Expand Up @@ -600,6 +699,15 @@ export function applyEnvironmentThreadDetailEvent(
}

function applyShellEvent(event: OrchestrationShellStreamEvent, environmentId: EnvironmentId) {
if (
!shouldApplyProjectionEvent({
current: readLastAppliedProjectionVersion(environmentId),
sequence: event.sequence,
})
) {
return;
}

const threadId =
event.kind === "thread-upserted"
? event.thread.id
Expand All @@ -610,6 +718,7 @@ function applyShellEvent(event: OrchestrationShellStreamEvent, environmentId: En
const previousThread = threadRef ? selectThreadByRef(useStore.getState(), threadRef) : undefined;

useStore.getState().applyShellEvent(event, environmentId);
markAppliedProjectionEvent(environmentId, event.sequence);

switch (event.kind) {
case "project-upserted":
Expand Down Expand Up @@ -643,7 +752,17 @@ function createEnvironmentConnectionHandlers() {
return {
applyShellEvent,
syncShellSnapshot: (snapshot: OrchestrationShellSnapshot, environmentId: EnvironmentId) => {
if (
!shouldApplyProjectionSnapshot({
current: readLastAppliedProjectionVersion(environmentId),
next: snapshot,
})
) {
return;
}

useStore.getState().syncServerShellSnapshot(snapshot, environmentId);
markAppliedProjectionSnapshot(environmentId, snapshot);
reconcileThreadDetailSubscriptionsForEnvironment(
environmentId,
snapshot.threads.map((thread) => thread.id),
Expand Down Expand Up @@ -758,6 +877,7 @@ async function removeConnection(environmentId: EnvironmentId): Promise<boolean>
}

disposeThreadDetailSubscriptionsForEnvironment(environmentId);
lastAppliedProjectionVersionByEnvironment.delete(environmentId);
environmentConnections.delete(environmentId);
emitEnvironmentConnectionRegistryChange();
await connection.dispose();
Expand Down Expand Up @@ -1086,6 +1206,7 @@ export function startEnvironmentConnectionService(queryClient: QueryClient): ()

export async function resetEnvironmentServiceForTests(): Promise<void> {
stopActiveService();
lastAppliedProjectionVersionByEnvironment.clear();
for (const key of Array.from(threadDetailSubscriptions.keys())) {
disposeThreadDetailSubscriptionByKey(key);
}
Expand Down
Loading