Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,115 @@ describe("ProviderRuntimeIngestion", () => {
expect(message?.streaming).toBe(false);
});

it("projects a proposed plan from assistant message text tagged with proposed_plan", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "content.delta",
eventId: asEventId("evt-message-delta-plan-1"),
provider: ProviderDriverKind.make("codex"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-assistant-plan"),
itemId: asItemId("item-assistant-plan"),
payload: {
streamKind: "assistant_text",
delta: "<proposed_plan>\n# Streamed fallback plan\n\n- one\n</proposed_plan>",
},
});
harness.emit({
type: "item.completed",
eventId: asEventId("evt-message-completed-plan"),
provider: ProviderDriverKind.make("codex"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-assistant-plan"),
itemId: asItemId("item-assistant-plan"),
payload: {
itemType: "assistant_message",
status: "completed",
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.proposedPlans.some(
(proposedPlan: ProviderRuntimeTestProposedPlan) =>
proposedPlan.id === "plan:thread-1:turn:turn-assistant-plan",
),
);

expect(
thread.messages.find(
(entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-assistant-plan",
),
).toMatchObject({
text: "",
streaming: false,
turnId: "turn-assistant-plan",
});
expect(
thread.proposedPlans.find(
(entry: ProviderRuntimeTestProposedPlan) =>
entry.id === "plan:thread-1:turn:turn-assistant-plan",
)?.planMarkdown,
).toBe("# Streamed fallback plan\n\n- one");
});

it("projects a proposed plan from assistant message text tagged with proposed_plan on turn completion", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "content.delta",
eventId: asEventId("evt-message-delta-plan-turn-complete-1"),
provider: ProviderDriverKind.make("codex"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-assistant-plan-turn-complete"),
itemId: asItemId("item-assistant-plan-turn-complete"),
payload: {
streamKind: "assistant_text",
delta: "<proposed_plan>\n# Turn completion fallback plan\n\n- one\n</proposed_plan>",
},
});
harness.emit({
type: "turn.completed",
eventId: asEventId("evt-turn-completed-plan-turn-complete"),
provider: ProviderDriverKind.make("codex"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-assistant-plan-turn-complete"),
payload: {
summary: "done",
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.proposedPlans.some(
(proposedPlan: ProviderRuntimeTestProposedPlan) =>
proposedPlan.id === "plan:thread-1:turn:turn-assistant-plan-turn-complete",
),
);

expect(
thread.messages.find(
(entry: ProviderRuntimeTestMessage) =>
entry.id === "assistant:item-assistant-plan-turn-complete",
),
).toMatchObject({
text: "",
streaming: false,
turnId: "turn-assistant-plan-turn-complete",
});
expect(
thread.proposedPlans.find(
(entry: ProviderRuntimeTestProposedPlan) =>
entry.id === "plan:thread-1:turn:turn-assistant-plan-turn-complete",
)?.planMarkdown,
).toBe("# Turn completion fallback plan\n\n- one");
});

it("preserves completed tool metadata on projected tool activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down Expand Up @@ -1607,6 +1716,77 @@ describe("ProviderRuntimeIngestion", () => {
expect(message?.streaming).toBe(false);
});

it("projects a proposed plan when an approval request opens after buffered tagged assistant text", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

harness.emit({
type: "turn.started",
eventId: asEventId("evt-turn-started-buffered-request-plan-flush"),
provider: ProviderDriverKind.make("codex"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-buffered-request-plan-flush"),
});
await waitForThread(
harness.engine,
(thread) =>
thread.session?.status === "running" &&
thread.session?.activeTurnId === "turn-buffered-request-plan-flush",
);

harness.emit({
type: "content.delta",
eventId: asEventId("evt-message-delta-buffered-request-plan-flush"),
provider: ProviderDriverKind.make("codex"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-buffered-request-plan-flush"),
itemId: asItemId("item-buffered-request-plan-flush"),
payload: {
streamKind: "assistant_text",
delta: "<proposed_plan>\n# Approval boundary plan\n\n- one\n</proposed_plan>",
},
});
harness.emit({
type: "request.opened",
eventId: asEventId("evt-request-opened-buffered-request-plan-flush"),
provider: ProviderDriverKind.make("codex"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-buffered-request-plan-flush"),
requestId: ApprovalRequestId.make("req-buffered-request-plan-flush"),
payload: {
requestType: "command_execution_approval",
detail: "pwd",
},
});

const thread = await waitForThread(harness.engine, (entry) =>
entry.proposedPlans.some(
(proposedPlan: ProviderRuntimeTestProposedPlan) =>
proposedPlan.id === "plan:thread-1:turn:turn-buffered-request-plan-flush",
),
);

expect(
thread.messages.find(
(entry: ProviderRuntimeTestMessage) =>
entry.id === "assistant:item-buffered-request-plan-flush",
),
).toMatchObject({
text: "",
streaming: false,
turnId: "turn-buffered-request-plan-flush",
});
expect(
thread.proposedPlans.find(
(entry: ProviderRuntimeTestProposedPlan) =>
entry.id === "plan:thread-1:turn:turn-buffered-request-plan-flush",
)?.planMarkdown,
).toBe("# Approval boundary plan\n\n- one");
});

it("flushes and completes buffered assistant text when user input is requested", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
93 changes: 78 additions & 15 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Comment thread
macroscopeapp[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ function truncateDetail(value: string, limit = 180): string {
return value.length > limit ? `${value.slice(0, limit - 3)}...` : value;
}

const PROPOSED_PLAN_BLOCK_REGEX = /<proposed_plan>\s*[\s\S]*?\s*<\/proposed_plan>/g;
const PROPOSED_PLAN_CAPTURE_REGEX = /<proposed_plan>\s*([\s\S]*?)\s*<\/proposed_plan>/;

function extractProposedPlanMarkdown(text: string | undefined): string | undefined {
return text?.trim().match(PROPOSED_PLAN_CAPTURE_REGEX)?.[1]?.trim() || undefined;
}

function normalizeProposedPlanMarkdown(planMarkdown: string | undefined): string | undefined {
const trimmed = planMarkdown?.trim();
if (!trimmed) {
Expand Down Expand Up @@ -762,6 +769,10 @@ const make = Effect.gen(function* () {
if (!hasRenderableAssistantText(bufferedText)) {
return false;
}
if (extractProposedPlanMarkdown(bufferedText)) {
yield* Cache.set(bufferedAssistantTextByMessageId, input.messageId, bufferedText);
return false;
}

yield* orchestrationEngine.dispatch({
type: "thread.message.assistant.delta",
Expand Down Expand Up @@ -821,12 +832,14 @@ const make = Effect.gen(function* () {
}) =>
Effect.gen(function* () {
const bufferedText = yield* takeBufferedAssistantText(input.messageId);
const text =
const rawText =
bufferedText.length > 0
? bufferedText
: (input.fallbackText?.trim().length ?? 0) > 0
? input.fallbackText!
: "";
const hasProposedPlanBlock = PROPOSED_PLAN_CAPTURE_REGEX.test(rawText);
const text = hasProposedPlanBlock ? rawText.replace(PROPOSED_PLAN_BLOCK_REGEX, "") : rawText;
Comment thread
cursor[bot] marked this conversation as resolved.
const hasRenderableText = hasRenderableAssistantText(text);

if (hasRenderableText) {
Expand All @@ -841,7 +854,7 @@ const make = Effect.gen(function* () {
});
}

if (input.hasProjectedMessage || hasRenderableText) {
if (input.hasProjectedMessage || hasRenderableText || rawText.length > 0) {
yield* orchestrationEngine.dispatch({
type: "thread.message.assistant.complete",
commandId: providerCommandId(input.event, input.commandTag),
Expand All @@ -852,11 +865,18 @@ const make = Effect.gen(function* () {
});
}
yield* clearAssistantMessageState(input.messageId);
return rawText;
});

const finalizeActiveAssistantSegmentForTurn = (input: {
event: ProviderRuntimeEvent;
threadId: ThreadId;
threadProposedPlans: ReadonlyArray<{
id: string;
createdAt: string;
implementedAt: string | null;
implementationThreadId: ThreadId | null;
}>;
turnId: TurnId;
createdAt: string;
commandTag: string;
Expand All @@ -873,7 +893,7 @@ const make = Effect.gen(function* () {
return;
}

yield* finalizeAssistantMessage({
const completedAssistantText = yield* finalizeAssistantMessage({
event: input.event,
threadId: input.threadId,
messageId: activeMessageId.value,
Expand All @@ -885,6 +905,18 @@ const make = Effect.gen(function* () {
input.hasProjectedMessage ||
(input.flushedMessageIds?.has(activeMessageId.value) ?? false),
});
const proposedPlanFromAssistantMessage = extractProposedPlanMarkdown(completedAssistantText);
if (proposedPlanFromAssistantMessage) {
yield* finalizeBufferedProposedPlan({
event: input.event,
threadId: input.threadId,
threadProposedPlans: input.threadProposedPlans,
planId: proposedPlanIdForTurn(input.threadId, input.turnId),
turnId: input.turnId,
fallbackMarkdown: proposedPlanFromAssistantMessage,
updatedAt: input.createdAt,
});
}
yield* forgetAssistantMessageId(input.threadId, input.turnId, activeMessageId.value);

const state = yield* getAssistantSegmentStateForTurn(input.threadId, input.turnId);
Expand Down Expand Up @@ -1297,6 +1329,7 @@ const make = Effect.gen(function* () {
yield* finalizeActiveAssistantSegmentForTurn({
event,
threadId: thread.id,
threadProposedPlans: thread.proposedPlans,
turnId: pauseForUserTurnId,
createdAt: now,
commandTag:
Expand Down Expand Up @@ -1368,7 +1401,7 @@ const make = Effect.gen(function* () {
yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId);
}

yield* finalizeAssistantMessage({
const completedAssistantText = yield* finalizeAssistantMessage({
event,
threadId: thread.id,
messageId: assistantMessageId,
Expand All @@ -1382,6 +1415,20 @@ const make = Effect.gen(function* () {
: {}),
});

const proposedPlanFromAssistantMessage =
extractProposedPlanMarkdown(completedAssistantText);
if (proposedPlanFromAssistantMessage) {
yield* finalizeBufferedProposedPlan({
event,
threadId: thread.id,
threadProposedPlans: thread.proposedPlans,
planId: proposedPlanIdFromEvent(event, thread.id),
...(turnId ? { turnId } : {}),
fallbackMarkdown: proposedPlanFromAssistantMessage,
updatedAt: now,
});
}

if (turnId) {
yield* forgetAssistantMessageId(thread.id, turnId, assistantMessageId);
}
Expand Down Expand Up @@ -1411,17 +1458,33 @@ const make = Effect.gen(function* () {
yield* Effect.forEach(
assistantMessageIds,
(assistantMessageId) =>
finalizeAssistantMessage({
event,
threadId: thread.id,
messageId: assistantMessageId,
turnId,
createdAt: now,
commandTag: "assistant-complete-finalize",
finalDeltaCommandTag: "assistant-delta-finalize-fallback",
hasProjectedMessage: thread.messages.some(
(entry) => entry.id === assistantMessageId,
),
Effect.gen(function* () {
const completedAssistantText = yield* finalizeAssistantMessage({
event,
threadId: thread.id,
messageId: assistantMessageId,
turnId,
createdAt: now,
commandTag: "assistant-complete-finalize",
finalDeltaCommandTag: "assistant-delta-finalize-fallback",
hasProjectedMessage: thread.messages.some(
(entry) => entry.id === assistantMessageId,
),
});

const proposedPlanFromAssistantMessage =
extractProposedPlanMarkdown(completedAssistantText);
if (proposedPlanFromAssistantMessage) {
yield* finalizeBufferedProposedPlan({
event,
threadId: thread.id,
threadProposedPlans: thread.proposedPlans,
planId: proposedPlanIdForTurn(thread.id, turnId),
turnId,
fallbackMarkdown: proposedPlanFromAssistantMessage,
updatedAt: now,
});
}
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);
Expand Down
Loading