Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 323 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1505,6 +1505,329 @@ it.layer(BaseTestLayer)("OrchestrationProjectionPipeline", (it) => {
}),
);

it.effect("clears stale pending approvals from projected shell summaries", () =>
Effect.gen(function* () {
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const eventStore = yield* OrchestrationEventStore;
const sql = yield* SqlClient.SqlClient;
const appendAndProject = (event: Parameters<typeof eventStore.append>[0]) =>
eventStore
.append(event)
.pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent)));

yield* appendAndProject({
type: "project.created",
eventId: EventId.make("evt-stale-approval-1"),
aggregateKind: "project",
aggregateId: ProjectId.make("project-stale-approval"),
occurredAt: "2026-02-26T12:30:00.000Z",
commandId: CommandId.make("cmd-stale-approval-1"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-stale-approval-1"),
metadata: {},
payload: {
projectId: ProjectId.make("project-stale-approval"),
title: "Project Stale Approval",
workspaceRoot: "/tmp/project-stale-approval",
defaultModelSelection: null,
scripts: [],
createdAt: "2026-02-26T12:30:00.000Z",
updatedAt: "2026-02-26T12:30:00.000Z",
},
});

yield* appendAndProject({
type: "thread.created",
eventId: EventId.make("evt-stale-approval-2"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-stale-approval"),
occurredAt: "2026-02-26T12:30:01.000Z",
commandId: CommandId.make("cmd-stale-approval-2"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-stale-approval-2"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-stale-approval"),
projectId: ProjectId.make("project-stale-approval"),
title: "Thread Stale Approval",
modelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
runtimeMode: "approval-required",
interactionMode: "default",
branch: null,
worktreePath: null,
createdAt: "2026-02-26T12:30:01.000Z",
updatedAt: "2026-02-26T12:30:01.000Z",
},
});

yield* appendAndProject({
type: "thread.activity-appended",
eventId: EventId.make("evt-stale-approval-3"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-stale-approval"),
occurredAt: "2026-02-26T12:30:02.000Z",
commandId: CommandId.make("cmd-stale-approval-3"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-stale-approval-3"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-stale-approval"),
activity: {
id: EventId.make("activity-stale-approval-requested"),
tone: "approval",
kind: "approval.requested",
summary: "Command approval requested",
payload: {
requestId: "approval-request-stale-1",
requestKind: "command",
},
turnId: null,
createdAt: "2026-02-26T12:30:02.000Z",
},
},
});

yield* appendAndProject({
type: "thread.activity-appended",
eventId: EventId.make("evt-stale-approval-4"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-stale-approval"),
occurredAt: "2026-02-26T12:30:03.000Z",
commandId: CommandId.make("cmd-stale-approval-4"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-stale-approval-4"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-stale-approval"),
activity: {
id: EventId.make("activity-stale-approval-failed"),
tone: "error",
kind: "provider.approval.respond.failed",
summary: "Provider approval response failed",
payload: {
requestId: "approval-request-stale-1",
detail: "Unknown pending permission request: approval-request-stale-1",
},
turnId: null,
createdAt: "2026-02-26T12:30:03.000Z",
},
},
});

const approvalRows = yield* sql<{
readonly requestId: string;
readonly status: string;
readonly resolvedAt: string | null;
}>`
SELECT
request_id AS "requestId",
status,
resolved_at AS "resolvedAt"
FROM projection_pending_approvals
WHERE request_id = 'approval-request-stale-1'
`;
assert.deepEqual(approvalRows, [
{
requestId: "approval-request-stale-1",
status: "resolved",
resolvedAt: "2026-02-26T12:30:03.000Z",
},
]);

const threadRows = yield* sql<{
readonly pendingApprovalCount: number;
}>`
SELECT pending_approval_count AS "pendingApprovalCount"
FROM projection_threads
WHERE thread_id = 'thread-stale-approval'
`;
assert.deepEqual(threadRows, [{ pendingApprovalCount: 0 }]);
}),
);

it.effect("ignores non-stale provider approval response failures", () =>
Effect.gen(function* () {
const projectionPipeline = yield* OrchestrationProjectionPipeline;
const eventStore = yield* OrchestrationEventStore;
const sql = yield* SqlClient.SqlClient;
const appendAndProject = (event: Parameters<typeof eventStore.append>[0]) =>
eventStore
.append(event)
.pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent)));

yield* appendAndProject({
type: "project.created",
eventId: EventId.make("evt-nonstale-approval-1"),
aggregateKind: "project",
aggregateId: ProjectId.make("project-nonstale-approval"),
occurredAt: "2026-02-26T12:45:00.000Z",
commandId: CommandId.make("cmd-nonstale-approval-1"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-nonstale-approval-1"),
metadata: {},
payload: {
projectId: ProjectId.make("project-nonstale-approval"),
title: "Project Non-Stale Approval",
workspaceRoot: "/tmp/project-nonstale-approval",
defaultModelSelection: null,
scripts: [],
createdAt: "2026-02-26T12:45:00.000Z",
updatedAt: "2026-02-26T12:45:00.000Z",
},
});

yield* appendAndProject({
type: "thread.created",
eventId: EventId.make("evt-nonstale-approval-2"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-nonstale-approval"),
occurredAt: "2026-02-26T12:45:01.000Z",
commandId: CommandId.make("cmd-nonstale-approval-2"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-nonstale-approval-2"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-nonstale-approval"),
projectId: ProjectId.make("project-nonstale-approval"),
title: "Thread Non-Stale Approval",
modelSelection: {
provider: "codex",
model: "gpt-5-codex",
},
runtimeMode: "approval-required",
interactionMode: "default",
branch: null,
worktreePath: null,
createdAt: "2026-02-26T12:45:01.000Z",
updatedAt: "2026-02-26T12:45:01.000Z",
},
});

yield* appendAndProject({
type: "thread.activity-appended",
eventId: EventId.make("evt-nonstale-approval-3"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-nonstale-approval"),
occurredAt: "2026-02-26T12:45:02.000Z",
commandId: CommandId.make("cmd-nonstale-approval-3"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-nonstale-approval-3"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-nonstale-approval"),
activity: {
id: EventId.make("activity-nonstale-approval-requested"),
tone: "approval",
kind: "approval.requested",
summary: "Command approval requested",
payload: {
requestId: "approval-request-nonstale-existing",
requestKind: "command",
},
turnId: null,
createdAt: "2026-02-26T12:45:02.000Z",
},
},
});

yield* appendAndProject({
type: "thread.activity-appended",
eventId: EventId.make("evt-nonstale-approval-4"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-nonstale-approval"),
occurredAt: "2026-02-26T12:45:03.000Z",
commandId: CommandId.make("cmd-nonstale-approval-4"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-nonstale-approval-4"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-nonstale-approval"),
activity: {
id: EventId.make("activity-nonstale-approval-failed-existing"),
tone: "error",
kind: "provider.approval.respond.failed",
summary: "Provider approval response failed",
payload: {
requestId: "approval-request-nonstale-existing",
detail: "Provider timed out while responding to approval request",
},
turnId: TurnId.make("turn-nonstale-failure"),
createdAt: "2026-02-26T12:45:03.000Z",
},
},
});

yield* appendAndProject({
type: "thread.activity-appended",
eventId: EventId.make("evt-nonstale-approval-5"),
aggregateKind: "thread",
aggregateId: ThreadId.make("thread-nonstale-approval"),
occurredAt: "2026-02-26T12:45:04.000Z",
commandId: CommandId.make("cmd-nonstale-approval-5"),
causationEventId: null,
correlationId: CorrelationId.make("cmd-nonstale-approval-5"),
metadata: {},
payload: {
threadId: ThreadId.make("thread-nonstale-approval"),
activity: {
id: EventId.make("activity-nonstale-approval-failed-missing"),
tone: "error",
kind: "provider.approval.respond.failed",
summary: "Provider approval response failed",
payload: {
requestId: "approval-request-nonstale-missing",
detail: "Provider timed out while responding to approval request",
},
turnId: null,
createdAt: "2026-02-26T12:45:04.000Z",
},
},
});

const approvalRows = yield* sql<{
readonly requestId: string;
readonly status: string;
readonly turnId: string | null;
readonly createdAt: string;
readonly resolvedAt: string | null;
}>`
SELECT
request_id AS "requestId",
status,
turn_id AS "turnId",
created_at AS "createdAt",
resolved_at AS "resolvedAt"
FROM projection_pending_approvals
WHERE request_id IN (
'approval-request-nonstale-existing',
'approval-request-nonstale-missing'
)
ORDER BY request_id
`;
assert.deepEqual(approvalRows, [
{
requestId: "approval-request-nonstale-existing",
status: "pending",
turnId: null,
createdAt: "2026-02-26T12:45:02.000Z",
resolvedAt: null,
},
]);

const threadRows = yield* sql<{
readonly pendingApprovalCount: number;
}>`
SELECT pending_approval_count AS "pendingApprovalCount"
FROM projection_threads
WHERE thread_id = 'thread-nonstale-approval'
`;
assert.deepEqual(threadRows, [{ pendingApprovalCount: 1 }]);
}),
);

it.effect("does not fallback-retain messages whose turnId is removed by revert", () =>
Effect.gen(function* () {
const projectionPipeline = yield* OrchestrationProjectionPipeline;
Expand Down
39 changes: 39 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@ function extractActivityRequestId(payload: unknown): ApprovalRequestId | null {
return typeof requestId === "string" ? ApprovalRequestId.make(requestId) : null;
}

function isStalePendingApprovalFailureDetail(detail: string | null): boolean {
if (detail === null) {
return false;
}
return (
detail.includes("stale pending approval request") ||
detail.includes("unknown pending approval request") ||
detail.includes("unknown pending permission request")
);
}

function derivePendingUserInputCountFromActivities(
activities: ReadonlyArray<ProjectionThreadActivity>,
): number {
Expand Down Expand Up @@ -1245,6 +1256,34 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti
});
return;
}
if (event.payload.activity.kind === "provider.approval.respond.failed") {
const payload =
typeof event.payload.activity.payload === "object" &&
event.payload.activity.payload !== null
? (event.payload.activity.payload as Record<string, unknown>)
: null;
const detail =
typeof payload?.detail === "string" ? payload.detail.toLowerCase() : null;
if (isStalePendingApprovalFailureDetail(detail)) {
if (Option.isNone(existingRow)) {
return;
}
if (existingRow.value.status === "resolved") {
return;
}
yield* projectionPendingApprovalRepository.upsert({
requestId,
threadId: existingRow.value.threadId,
turnId: existingRow.value.turnId,
status: "resolved",
decision: null,
createdAt: existingRow.value.createdAt,
resolvedAt: event.payload.activity.createdAt,
});
return;
}
return;
}
if (Option.isSome(existingRow) && existingRow.value.status === "resolved") {
return;
}
Expand Down
2 changes: 2 additions & 0 deletions apps/server/src/persistence/Migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import Migration0020 from "./Migrations/020_AuthAccessManagement.ts";
import Migration0021 from "./Migrations/021_AuthSessionClientMetadata.ts";
import Migration0022 from "./Migrations/022_AuthSessionLastConnectedAt.ts";
import Migration0023 from "./Migrations/023_ProjectionThreadShellSummary.ts";
import Migration0024 from "./Migrations/024_BackfillProjectionThreadShellSummary.ts";

/**
* Migration loader with all migrations defined inline.
Expand Down Expand Up @@ -71,6 +72,7 @@ export const migrationEntries = [
[21, "AuthSessionClientMetadata", Migration0021],
[22, "AuthSessionLastConnectedAt", Migration0022],
[23, "ProjectionThreadShellSummary", Migration0023],
[24, "BackfillProjectionThreadShellSummary", Migration0024],
] as const;

export const makeMigrationLoader = (throughId?: number) =>
Expand Down
Loading
Loading