From 1b2ad3eb2928baccafc989b9945f80beb814d294 Mon Sep 17 00:00:00 2001 From: pedrokpp Date: Sat, 2 May 2026 01:27:22 -0300 Subject: [PATCH 1/4] fix(server): handle proposed_plan fallback after mode switch --- .../Layers/ProviderRuntimeIngestion.test.ts | 55 +++++++++++++++++++ .../Layers/ProviderRuntimeIngestion.ts | 29 +++++++++- 2 files changed, 81 insertions(+), 3 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 487d1a3aac..412eeac479 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -738,6 +738,61 @@ describe("ProviderRuntimeIngestion", () => { expect(message?.streaming).toBe(false); }); + it("projects a proposed plan from assistant message text tagged with proposed_plan", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-plan-1"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-assistant-plan"), + itemId: asItemId("item-assistant-plan"), + payload: { + streamKind: "assistant_text", + delta: "\n# Streamed fallback plan\n\n- one\n", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-message-completed-plan"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-assistant-plan"), + itemId: asItemId("item-assistant-plan"), + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.proposedPlans.some( + (proposedPlan: ProviderRuntimeTestProposedPlan) => + proposedPlan.id === "plan:thread-1:turn:turn-assistant-plan", + ), + ); + + expect( + thread.messages.find( + (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-assistant-plan", + ), + ).toMatchObject({ + text: "", + streaming: false, + turnId: "turn-assistant-plan", + }); + expect( + thread.proposedPlans.find( + (entry: ProviderRuntimeTestProposedPlan) => + entry.id === "plan:thread-1:turn:turn-assistant-plan", + )?.planMarkdown, + ).toBe("# Streamed fallback plan\n\n- one"); + }); + it("preserves completed tool metadata on projected tool activities", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index b7a4c195a5..5d7ecb8f91 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -81,6 +81,13 @@ function truncateDetail(value: string, limit = 180): string { return value.length > limit ? `${value.slice(0, limit - 3)}...` : value; } +const PROPOSED_PLAN_BLOCK_REGEX = /\s*[\s\S]*?\s*<\/proposed_plan>/g; +const PROPOSED_PLAN_CAPTURE_REGEX = /\s*([\s\S]*?)\s*<\/proposed_plan>/; + +function extractProposedPlanMarkdown(text: string | undefined): string | undefined { + return text?.trim().match(PROPOSED_PLAN_CAPTURE_REGEX)?.[1]?.trim() || undefined; +} + function normalizeProposedPlanMarkdown(planMarkdown: string | undefined): string | undefined { const trimmed = planMarkdown?.trim(); if (!trimmed) { @@ -821,12 +828,13 @@ const make = Effect.gen(function* () { }) => Effect.gen(function* () { const bufferedText = yield* takeBufferedAssistantText(input.messageId); - const text = + const rawText = bufferedText.length > 0 ? bufferedText : (input.fallbackText?.trim().length ?? 0) > 0 ? input.fallbackText! : ""; + const text = rawText.replace(PROPOSED_PLAN_BLOCK_REGEX, "").trim(); const hasRenderableText = hasRenderableAssistantText(text); if (hasRenderableText) { @@ -841,7 +849,7 @@ const make = Effect.gen(function* () { }); } - if (input.hasProjectedMessage || hasRenderableText) { + if (input.hasProjectedMessage || hasRenderableText || rawText.length > 0) { yield* orchestrationEngine.dispatch({ type: "thread.message.assistant.complete", commandId: providerCommandId(input.event, input.commandTag), @@ -852,6 +860,7 @@ const make = Effect.gen(function* () { }); } yield* clearAssistantMessageState(input.messageId); + return rawText; }); const finalizeActiveAssistantSegmentForTurn = (input: { @@ -1368,7 +1377,7 @@ const make = Effect.gen(function* () { yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId); } - yield* finalizeAssistantMessage({ + const completedAssistantText = yield* finalizeAssistantMessage({ event, threadId: thread.id, messageId: assistantMessageId, @@ -1382,6 +1391,20 @@ const make = Effect.gen(function* () { : {}), }); + const proposedPlanFromAssistantMessage = + extractProposedPlanMarkdown(completedAssistantText); + if (proposedPlanFromAssistantMessage) { + yield* finalizeBufferedProposedPlan({ + event, + threadId: thread.id, + threadProposedPlans: thread.proposedPlans, + planId: proposedPlanIdFromEvent(event, thread.id), + ...(turnId ? { turnId } : {}), + fallbackMarkdown: proposedPlanFromAssistantMessage, + updatedAt: now, + }); + } + if (turnId) { yield* forgetAssistantMessageId(thread.id, turnId, assistantMessageId); } From 29b428d972a2cf466639efc4e83e5693f2db04ff Mon Sep 17 00:00:00 2001 From: pedrokpp Date: Sat, 2 May 2026 01:38:07 -0300 Subject: [PATCH 2/4] fix(server): preserve assistant text outside proposed_plan fallback --- .../src/orchestration/Layers/ProviderRuntimeIngestion.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 5d7ecb8f91..3f0f6014cd 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -834,7 +834,8 @@ const make = Effect.gen(function* () { : (input.fallbackText?.trim().length ?? 0) > 0 ? input.fallbackText! : ""; - const text = rawText.replace(PROPOSED_PLAN_BLOCK_REGEX, "").trim(); + const hasProposedPlanBlock = PROPOSED_PLAN_CAPTURE_REGEX.test(rawText); + const text = hasProposedPlanBlock ? rawText.replace(PROPOSED_PLAN_BLOCK_REGEX, "") : rawText; const hasRenderableText = hasRenderableAssistantText(text); if (hasRenderableText) { From 850c5847b0f02c14c28053bed1858df2a3f51d33 Mon Sep 17 00:00:00 2001 From: pedrokpp Date: Sat, 2 May 2026 12:43:40 -0300 Subject: [PATCH 3/4] fix(server): handle proposed_plan fallback on turn completion --- .../Layers/ProviderRuntimeIngestion.test.ts | 54 +++++++++++++++++++ .../Layers/ProviderRuntimeIngestion.ts | 38 +++++++++---- 2 files changed, 81 insertions(+), 11 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 412eeac479..a1b8e5afda 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -793,6 +793,60 @@ describe("ProviderRuntimeIngestion", () => { ).toBe("# Streamed fallback plan\n\n- one"); }); + it("projects a proposed plan from assistant message text tagged with proposed_plan on turn completion", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-plan-turn-complete-1"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-assistant-plan-turn-complete"), + itemId: asItemId("item-assistant-plan-turn-complete"), + payload: { + streamKind: "assistant_text", + delta: "\n# Turn completion fallback plan\n\n- one\n", + }, + }); + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-turn-completed-plan-turn-complete"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-assistant-plan-turn-complete"), + payload: { + summary: "done", + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.proposedPlans.some( + (proposedPlan: ProviderRuntimeTestProposedPlan) => + proposedPlan.id === "plan:thread-1:turn:turn-assistant-plan-turn-complete", + ), + ); + + expect( + thread.messages.find( + (entry: ProviderRuntimeTestMessage) => + entry.id === "assistant:item-assistant-plan-turn-complete", + ), + ).toMatchObject({ + text: "", + streaming: false, + turnId: "turn-assistant-plan-turn-complete", + }); + expect( + thread.proposedPlans.find( + (entry: ProviderRuntimeTestProposedPlan) => + entry.id === "plan:thread-1:turn:turn-assistant-plan-turn-complete", + )?.planMarkdown, + ).toBe("# Turn completion fallback plan\n\n- one"); + }); + it("preserves completed tool metadata on projected tool activities", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3f0f6014cd..050fd01995 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -1435,17 +1435,33 @@ const make = Effect.gen(function* () { yield* Effect.forEach( assistantMessageIds, (assistantMessageId) => - finalizeAssistantMessage({ - event, - threadId: thread.id, - messageId: assistantMessageId, - turnId, - createdAt: now, - commandTag: "assistant-complete-finalize", - finalDeltaCommandTag: "assistant-delta-finalize-fallback", - hasProjectedMessage: thread.messages.some( - (entry) => entry.id === assistantMessageId, - ), + Effect.gen(function* () { + const completedAssistantText = yield* finalizeAssistantMessage({ + event, + threadId: thread.id, + messageId: assistantMessageId, + turnId, + createdAt: now, + commandTag: "assistant-complete-finalize", + finalDeltaCommandTag: "assistant-delta-finalize-fallback", + hasProjectedMessage: thread.messages.some( + (entry) => entry.id === assistantMessageId, + ), + }); + + const proposedPlanFromAssistantMessage = + extractProposedPlanMarkdown(completedAssistantText); + if (proposedPlanFromAssistantMessage) { + yield* finalizeBufferedProposedPlan({ + event, + threadId: thread.id, + threadProposedPlans: thread.proposedPlans, + planId: proposedPlanIdForTurn(thread.id, turnId), + turnId, + fallbackMarkdown: proposedPlanFromAssistantMessage, + updatedAt: now, + }); + } }), { concurrency: 1 }, ).pipe(Effect.asVoid); From bbc954c95d5971e7ca240c170ad8de1807c172e9 Mon Sep 17 00:00:00 2001 From: pedrokpp Date: Sat, 2 May 2026 13:06:39 -0300 Subject: [PATCH 4/4] fix(server): preserve proposed_plan fallback across approval boundaries --- .../Layers/ProviderRuntimeIngestion.test.ts | 71 +++++++++++++++++++ .../Layers/ProviderRuntimeIngestion.ts | 25 ++++++- 2 files changed, 95 insertions(+), 1 deletion(-) diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index a1b8e5afda..b7ad70447c 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -1716,6 +1716,77 @@ describe("ProviderRuntimeIngestion", () => { expect(message?.streaming).toBe(false); }); + it("projects a proposed plan when an approval request opens after buffered tagged assistant text", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-turn-started-buffered-request-plan-flush"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-buffered-request-plan-flush"), + }); + await waitForThread( + harness.engine, + (thread) => + thread.session?.status === "running" && + thread.session?.activeTurnId === "turn-buffered-request-plan-flush", + ); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-message-delta-buffered-request-plan-flush"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-buffered-request-plan-flush"), + itemId: asItemId("item-buffered-request-plan-flush"), + payload: { + streamKind: "assistant_text", + delta: "\n# Approval boundary plan\n\n- one\n", + }, + }); + harness.emit({ + type: "request.opened", + eventId: asEventId("evt-request-opened-buffered-request-plan-flush"), + provider: ProviderDriverKind.make("codex"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-buffered-request-plan-flush"), + requestId: ApprovalRequestId.make("req-buffered-request-plan-flush"), + payload: { + requestType: "command_execution_approval", + detail: "pwd", + }, + }); + + const thread = await waitForThread(harness.engine, (entry) => + entry.proposedPlans.some( + (proposedPlan: ProviderRuntimeTestProposedPlan) => + proposedPlan.id === "plan:thread-1:turn:turn-buffered-request-plan-flush", + ), + ); + + expect( + thread.messages.find( + (entry: ProviderRuntimeTestMessage) => + entry.id === "assistant:item-buffered-request-plan-flush", + ), + ).toMatchObject({ + text: "", + streaming: false, + turnId: "turn-buffered-request-plan-flush", + }); + expect( + thread.proposedPlans.find( + (entry: ProviderRuntimeTestProposedPlan) => + entry.id === "plan:thread-1:turn:turn-buffered-request-plan-flush", + )?.planMarkdown, + ).toBe("# Approval boundary plan\n\n- one"); + }); + it("flushes and completes buffered assistant text when user input is requested", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 050fd01995..bc631a067b 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -769,6 +769,10 @@ const make = Effect.gen(function* () { if (!hasRenderableAssistantText(bufferedText)) { return false; } + if (extractProposedPlanMarkdown(bufferedText)) { + yield* Cache.set(bufferedAssistantTextByMessageId, input.messageId, bufferedText); + return false; + } yield* orchestrationEngine.dispatch({ type: "thread.message.assistant.delta", @@ -867,6 +871,12 @@ const make = Effect.gen(function* () { const finalizeActiveAssistantSegmentForTurn = (input: { event: ProviderRuntimeEvent; threadId: ThreadId; + threadProposedPlans: ReadonlyArray<{ + id: string; + createdAt: string; + implementedAt: string | null; + implementationThreadId: ThreadId | null; + }>; turnId: TurnId; createdAt: string; commandTag: string; @@ -883,7 +893,7 @@ const make = Effect.gen(function* () { return; } - yield* finalizeAssistantMessage({ + const completedAssistantText = yield* finalizeAssistantMessage({ event: input.event, threadId: input.threadId, messageId: activeMessageId.value, @@ -895,6 +905,18 @@ const make = Effect.gen(function* () { input.hasProjectedMessage || (input.flushedMessageIds?.has(activeMessageId.value) ?? false), }); + const proposedPlanFromAssistantMessage = extractProposedPlanMarkdown(completedAssistantText); + if (proposedPlanFromAssistantMessage) { + yield* finalizeBufferedProposedPlan({ + event: input.event, + threadId: input.threadId, + threadProposedPlans: input.threadProposedPlans, + planId: proposedPlanIdForTurn(input.threadId, input.turnId), + turnId: input.turnId, + fallbackMarkdown: proposedPlanFromAssistantMessage, + updatedAt: input.createdAt, + }); + } yield* forgetAssistantMessageId(input.threadId, input.turnId, activeMessageId.value); const state = yield* getAssistantSegmentStateForTurn(input.threadId, input.turnId); @@ -1307,6 +1329,7 @@ const make = Effect.gen(function* () { yield* finalizeActiveAssistantSegmentForTurn({ event, threadId: thread.id, + threadProposedPlans: thread.proposedPlans, turnId: pauseForUserTurnId, createdAt: now, commandTag: