diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
index 487d1a3aac..b7ad70447c 100644
--- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
+++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
@@ -738,6 +738,115 @@ describe("ProviderRuntimeIngestion", () => {
expect(message?.streaming).toBe(false);
});
+ it("projects a proposed plan from assistant message text tagged with proposed_plan", async () => {
+ const harness = await createHarness();
+ const now = new Date().toISOString();
+
+ harness.emit({
+ type: "content.delta",
+ eventId: asEventId("evt-message-delta-plan-1"),
+ provider: ProviderDriverKind.make("codex"),
+ createdAt: now,
+ threadId: asThreadId("thread-1"),
+ turnId: asTurnId("turn-assistant-plan"),
+ itemId: asItemId("item-assistant-plan"),
+ payload: {
+ streamKind: "assistant_text",
+ delta: "\n# Streamed fallback plan\n\n- one\n",
+ },
+ });
+ harness.emit({
+ type: "item.completed",
+ eventId: asEventId("evt-message-completed-plan"),
+ provider: ProviderDriverKind.make("codex"),
+ createdAt: now,
+ threadId: asThreadId("thread-1"),
+ turnId: asTurnId("turn-assistant-plan"),
+ itemId: asItemId("item-assistant-plan"),
+ payload: {
+ itemType: "assistant_message",
+ status: "completed",
+ },
+ });
+
+ const thread = await waitForThread(harness.engine, (entry) =>
+ entry.proposedPlans.some(
+ (proposedPlan: ProviderRuntimeTestProposedPlan) =>
+ proposedPlan.id === "plan:thread-1:turn:turn-assistant-plan",
+ ),
+ );
+
+ expect(
+ thread.messages.find(
+ (entry: ProviderRuntimeTestMessage) => entry.id === "assistant:item-assistant-plan",
+ ),
+ ).toMatchObject({
+ text: "",
+ streaming: false,
+ turnId: "turn-assistant-plan",
+ });
+ expect(
+ thread.proposedPlans.find(
+ (entry: ProviderRuntimeTestProposedPlan) =>
+ entry.id === "plan:thread-1:turn:turn-assistant-plan",
+ )?.planMarkdown,
+ ).toBe("# Streamed fallback plan\n\n- one");
+ });
+
+ it("projects a proposed plan from assistant message text tagged with proposed_plan on turn completion", async () => {
+ const harness = await createHarness();
+ const now = new Date().toISOString();
+
+ harness.emit({
+ type: "content.delta",
+ eventId: asEventId("evt-message-delta-plan-turn-complete-1"),
+ provider: ProviderDriverKind.make("codex"),
+ createdAt: now,
+ threadId: asThreadId("thread-1"),
+ turnId: asTurnId("turn-assistant-plan-turn-complete"),
+ itemId: asItemId("item-assistant-plan-turn-complete"),
+ payload: {
+ streamKind: "assistant_text",
+ delta: "\n# Turn completion fallback plan\n\n- one\n",
+ },
+ });
+ harness.emit({
+ type: "turn.completed",
+ eventId: asEventId("evt-turn-completed-plan-turn-complete"),
+ provider: ProviderDriverKind.make("codex"),
+ createdAt: now,
+ threadId: asThreadId("thread-1"),
+ turnId: asTurnId("turn-assistant-plan-turn-complete"),
+ payload: {
+ summary: "done",
+ },
+ });
+
+ const thread = await waitForThread(harness.engine, (entry) =>
+ entry.proposedPlans.some(
+ (proposedPlan: ProviderRuntimeTestProposedPlan) =>
+ proposedPlan.id === "plan:thread-1:turn:turn-assistant-plan-turn-complete",
+ ),
+ );
+
+ expect(
+ thread.messages.find(
+ (entry: ProviderRuntimeTestMessage) =>
+ entry.id === "assistant:item-assistant-plan-turn-complete",
+ ),
+ ).toMatchObject({
+ text: "",
+ streaming: false,
+ turnId: "turn-assistant-plan-turn-complete",
+ });
+ expect(
+ thread.proposedPlans.find(
+ (entry: ProviderRuntimeTestProposedPlan) =>
+ entry.id === "plan:thread-1:turn:turn-assistant-plan-turn-complete",
+ )?.planMarkdown,
+ ).toBe("# Turn completion fallback plan\n\n- one");
+ });
+
it("preserves completed tool metadata on projected tool activities", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
@@ -1607,6 +1716,77 @@ describe("ProviderRuntimeIngestion", () => {
expect(message?.streaming).toBe(false);
});
+ it("projects a proposed plan when an approval request opens after buffered tagged assistant text", async () => {
+ const harness = await createHarness();
+ const now = new Date().toISOString();
+
+ harness.emit({
+ type: "turn.started",
+ eventId: asEventId("evt-turn-started-buffered-request-plan-flush"),
+ provider: ProviderDriverKind.make("codex"),
+ createdAt: now,
+ threadId: asThreadId("thread-1"),
+ turnId: asTurnId("turn-buffered-request-plan-flush"),
+ });
+ await waitForThread(
+ harness.engine,
+ (thread) =>
+ thread.session?.status === "running" &&
+ thread.session?.activeTurnId === "turn-buffered-request-plan-flush",
+ );
+
+ harness.emit({
+ type: "content.delta",
+ eventId: asEventId("evt-message-delta-buffered-request-plan-flush"),
+ provider: ProviderDriverKind.make("codex"),
+ createdAt: now,
+ threadId: asThreadId("thread-1"),
+ turnId: asTurnId("turn-buffered-request-plan-flush"),
+ itemId: asItemId("item-buffered-request-plan-flush"),
+ payload: {
+ streamKind: "assistant_text",
+ delta: "\n# Approval boundary plan\n\n- one\n",
+ },
+ });
+ harness.emit({
+ type: "request.opened",
+ eventId: asEventId("evt-request-opened-buffered-request-plan-flush"),
+ provider: ProviderDriverKind.make("codex"),
+ createdAt: now,
+ threadId: asThreadId("thread-1"),
+ turnId: asTurnId("turn-buffered-request-plan-flush"),
+ requestId: ApprovalRequestId.make("req-buffered-request-plan-flush"),
+ payload: {
+ requestType: "command_execution_approval",
+ detail: "pwd",
+ },
+ });
+
+ const thread = await waitForThread(harness.engine, (entry) =>
+ entry.proposedPlans.some(
+ (proposedPlan: ProviderRuntimeTestProposedPlan) =>
+ proposedPlan.id === "plan:thread-1:turn:turn-buffered-request-plan-flush",
+ ),
+ );
+
+ expect(
+ thread.messages.find(
+ (entry: ProviderRuntimeTestMessage) =>
+ entry.id === "assistant:item-buffered-request-plan-flush",
+ ),
+ ).toMatchObject({
+ text: "",
+ streaming: false,
+ turnId: "turn-buffered-request-plan-flush",
+ });
+ expect(
+ thread.proposedPlans.find(
+ (entry: ProviderRuntimeTestProposedPlan) =>
+ entry.id === "plan:thread-1:turn:turn-buffered-request-plan-flush",
+ )?.planMarkdown,
+ ).toBe("# Approval boundary plan\n\n- one");
+ });
+
it("flushes and completes buffered assistant text when user input is requested", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
index b7a4c195a5..bc631a067b 100644
--- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
+++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
@@ -81,6 +81,13 @@ function truncateDetail(value: string, limit = 180): string {
return value.length > limit ? `${value.slice(0, limit - 3)}...` : value;
}
+const PROPOSED_PLAN_BLOCK_REGEX = /\s*[\s\S]*?\s*<\/proposed_plan>/g;
+const PROPOSED_PLAN_CAPTURE_REGEX = /\s*([\s\S]*?)\s*<\/proposed_plan>/;
+
+function extractProposedPlanMarkdown(text: string | undefined): string | undefined {
+ return text?.trim().match(PROPOSED_PLAN_CAPTURE_REGEX)?.[1]?.trim() || undefined;
+}
+
function normalizeProposedPlanMarkdown(planMarkdown: string | undefined): string | undefined {
const trimmed = planMarkdown?.trim();
if (!trimmed) {
@@ -762,6 +769,10 @@ const make = Effect.gen(function* () {
if (!hasRenderableAssistantText(bufferedText)) {
return false;
}
+ if (extractProposedPlanMarkdown(bufferedText)) {
+ yield* Cache.set(bufferedAssistantTextByMessageId, input.messageId, bufferedText);
+ return false;
+ }
yield* orchestrationEngine.dispatch({
type: "thread.message.assistant.delta",
@@ -821,12 +832,14 @@ const make = Effect.gen(function* () {
}) =>
Effect.gen(function* () {
const bufferedText = yield* takeBufferedAssistantText(input.messageId);
- const text =
+ const rawText =
bufferedText.length > 0
? bufferedText
: (input.fallbackText?.trim().length ?? 0) > 0
? input.fallbackText!
: "";
+ const hasProposedPlanBlock = PROPOSED_PLAN_CAPTURE_REGEX.test(rawText);
+ const text = hasProposedPlanBlock ? rawText.replace(PROPOSED_PLAN_BLOCK_REGEX, "") : rawText;
const hasRenderableText = hasRenderableAssistantText(text);
if (hasRenderableText) {
@@ -841,7 +854,7 @@ const make = Effect.gen(function* () {
});
}
- if (input.hasProjectedMessage || hasRenderableText) {
+ if (input.hasProjectedMessage || hasRenderableText || rawText.length > 0) {
yield* orchestrationEngine.dispatch({
type: "thread.message.assistant.complete",
commandId: providerCommandId(input.event, input.commandTag),
@@ -852,11 +865,18 @@ const make = Effect.gen(function* () {
});
}
yield* clearAssistantMessageState(input.messageId);
+ return rawText;
});
const finalizeActiveAssistantSegmentForTurn = (input: {
event: ProviderRuntimeEvent;
threadId: ThreadId;
+ threadProposedPlans: ReadonlyArray<{
+ id: string;
+ createdAt: string;
+ implementedAt: string | null;
+ implementationThreadId: ThreadId | null;
+ }>;
turnId: TurnId;
createdAt: string;
commandTag: string;
@@ -873,7 +893,7 @@ const make = Effect.gen(function* () {
return;
}
- yield* finalizeAssistantMessage({
+ const completedAssistantText = yield* finalizeAssistantMessage({
event: input.event,
threadId: input.threadId,
messageId: activeMessageId.value,
@@ -885,6 +905,18 @@ const make = Effect.gen(function* () {
input.hasProjectedMessage ||
(input.flushedMessageIds?.has(activeMessageId.value) ?? false),
});
+ const proposedPlanFromAssistantMessage = extractProposedPlanMarkdown(completedAssistantText);
+ if (proposedPlanFromAssistantMessage) {
+ yield* finalizeBufferedProposedPlan({
+ event: input.event,
+ threadId: input.threadId,
+ threadProposedPlans: input.threadProposedPlans,
+ planId: proposedPlanIdForTurn(input.threadId, input.turnId),
+ turnId: input.turnId,
+ fallbackMarkdown: proposedPlanFromAssistantMessage,
+ updatedAt: input.createdAt,
+ });
+ }
yield* forgetAssistantMessageId(input.threadId, input.turnId, activeMessageId.value);
const state = yield* getAssistantSegmentStateForTurn(input.threadId, input.turnId);
@@ -1297,6 +1329,7 @@ const make = Effect.gen(function* () {
yield* finalizeActiveAssistantSegmentForTurn({
event,
threadId: thread.id,
+ threadProposedPlans: thread.proposedPlans,
turnId: pauseForUserTurnId,
createdAt: now,
commandTag:
@@ -1368,7 +1401,7 @@ const make = Effect.gen(function* () {
yield* rememberAssistantMessageId(thread.id, turnId, assistantMessageId);
}
- yield* finalizeAssistantMessage({
+ const completedAssistantText = yield* finalizeAssistantMessage({
event,
threadId: thread.id,
messageId: assistantMessageId,
@@ -1382,6 +1415,20 @@ const make = Effect.gen(function* () {
: {}),
});
+ const proposedPlanFromAssistantMessage =
+ extractProposedPlanMarkdown(completedAssistantText);
+ if (proposedPlanFromAssistantMessage) {
+ yield* finalizeBufferedProposedPlan({
+ event,
+ threadId: thread.id,
+ threadProposedPlans: thread.proposedPlans,
+ planId: proposedPlanIdFromEvent(event, thread.id),
+ ...(turnId ? { turnId } : {}),
+ fallbackMarkdown: proposedPlanFromAssistantMessage,
+ updatedAt: now,
+ });
+ }
+
if (turnId) {
yield* forgetAssistantMessageId(thread.id, turnId, assistantMessageId);
}
@@ -1411,17 +1458,33 @@ const make = Effect.gen(function* () {
yield* Effect.forEach(
assistantMessageIds,
(assistantMessageId) =>
- finalizeAssistantMessage({
- event,
- threadId: thread.id,
- messageId: assistantMessageId,
- turnId,
- createdAt: now,
- commandTag: "assistant-complete-finalize",
- finalDeltaCommandTag: "assistant-delta-finalize-fallback",
- hasProjectedMessage: thread.messages.some(
- (entry) => entry.id === assistantMessageId,
- ),
+ Effect.gen(function* () {
+ const completedAssistantText = yield* finalizeAssistantMessage({
+ event,
+ threadId: thread.id,
+ messageId: assistantMessageId,
+ turnId,
+ createdAt: now,
+ commandTag: "assistant-complete-finalize",
+ finalDeltaCommandTag: "assistant-delta-finalize-fallback",
+ hasProjectedMessage: thread.messages.some(
+ (entry) => entry.id === assistantMessageId,
+ ),
+ });
+
+ const proposedPlanFromAssistantMessage =
+ extractProposedPlanMarkdown(completedAssistantText);
+ if (proposedPlanFromAssistantMessage) {
+ yield* finalizeBufferedProposedPlan({
+ event,
+ threadId: thread.id,
+ threadProposedPlans: thread.proposedPlans,
+ planId: proposedPlanIdForTurn(thread.id, turnId),
+ turnId,
+ fallbackMarkdown: proposedPlanFromAssistantMessage,
+ updatedAt: now,
+ });
+ }
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);