From 05f71067bac635b8f033cf6fba40aa6205d5410d Mon Sep 17 00:00:00 2001 From: Aria Arctic Date: Thu, 16 Apr 2026 01:54:06 +1000 Subject: [PATCH] Avoid redundant deep decode in targeted thread detail hydration Co-authored-by: OpenAI Codex --- .../Layers/ProjectionSnapshotQuery.test.ts | 141 ++++++++++++++++++ .../Layers/ProjectionSnapshotQuery.ts | 70 +++++---- 2 files changed, 180 insertions(+), 31 deletions(-) diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index 9f0d63545f..8f08ada8cb 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -565,6 +565,147 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { }), ); + it.effect("normalizes targeted thread detail fields from projection rows", () => + Effect.gen(function* () { + const snapshotQuery = yield* ProjectionSnapshotQuery; + const sql = yield* SqlClient.SqlClient; + + yield* sql`DELETE FROM projection_projects`; + yield* sql`DELETE FROM projection_threads`; + yield* sql`DELETE FROM projection_thread_messages`; + yield* sql`DELETE FROM projection_thread_proposed_plans`; + yield* sql`DELETE FROM projection_thread_activities`; + yield* sql`DELETE FROM projection_thread_sessions`; + yield* sql`DELETE FROM projection_turns`; + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + default_model_selection_json, + scripts_json, + created_at, + updated_at, + deleted_at + ) + VALUES ( + 'project-normalized', + 'Normalized Project', + '/tmp/normalized-project', + '{"provider":"codex","model":"gpt-5-codex"}', + '[]', + '2026-03-03T00:00:00.000Z', + '2026-03-03T00:00:01.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + latest_user_message_at, + pending_approval_count, + pending_user_input_count, + has_actionable_proposed_plan, + created_at, + updated_at, + archived_at, + deleted_at + ) + VALUES ( + 'thread-normalized', + 'project-normalized', + ' Normalized Thread ', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + ' feature/normalized ', + ' /tmp/normalized-worktree ', + NULL, + NULL, + 0, + 0, + 0, + '2026-03-03T00:00:02.000Z', + '2026-03-03T00:00:03.000Z', + NULL, + NULL + ) + `; + + yield* sql` + INSERT INTO projection_thread_activities ( + activity_id, + thread_id, + turn_id, + tone, + kind, + summary, + payload_json, + created_at + ) + VALUES ( + 'activity-normalized', + 'thread-normalized', + NULL, + 'info', + ' task.progress ', + ' Running checks ', + '{"detail":"still available"}', + '2026-03-03T00:00:04.000Z' + ) + `; + + yield* sql` + INSERT INTO projection_thread_sessions ( + thread_id, + status, + provider_name, + provider_session_id, + provider_thread_id, + runtime_mode, + active_turn_id, + last_error, + updated_at + ) + VALUES ( + 'thread-normalized', + 'error', + ' codex ', + 'provider-session-normalized', + 'provider-thread-normalized', + 'full-access', + NULL, + ' recoverable issue ', + '2026-03-03T00:00:05.000Z' + ) + `; + + const threadDetail = yield* snapshotQuery.getThreadDetailById( + ThreadId.make("thread-normalized"), + ); + assert.equal(threadDetail._tag, "Some"); + if (threadDetail._tag === "Some") { + assert.equal(threadDetail.value.title, "Normalized Thread"); + assert.equal(threadDetail.value.branch, "feature/normalized"); + assert.equal(threadDetail.value.worktreePath, "/tmp/normalized-worktree"); + assert.equal(threadDetail.value.activities[0]?.kind, "task.progress"); + assert.equal(threadDetail.value.activities[0]?.summary, "Running checks"); + assert.equal(threadDetail.value.session?.providerName, "codex"); + assert.equal(threadDetail.value.session?.lastError, "recoverable issue"); + } + }), + ); + it.effect("reads single-thread checkpoint context without hydrating unrelated threads", () => Effect.gen(function* () { const snapshotQuery = yield* ProjectionSnapshotQuery; diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 07645571ba..5c763d7a1b 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -7,8 +7,8 @@ import { OrchestrationProposedPlanId, OrchestrationReadModel, OrchestrationShellSnapshot, - OrchestrationThread, ProjectScript, + TrimmedNonEmptyString, TurnId, type OrchestrationCheckpointSummary, type OrchestrationLatestTurn, @@ -17,6 +17,7 @@ import { type OrchestrationProposedPlan, type OrchestrationProject, type OrchestrationSession, + type OrchestrationThread, type OrchestrationThreadActivity, type OrchestrationThreadShell, ModelSelection, @@ -52,7 +53,6 @@ import { const decodeReadModel = Schema.decodeUnknownEffect(OrchestrationReadModel); const decodeShellSnapshot = Schema.decodeUnknownEffect(OrchestrationShellSnapshot); -const decodeThread = Schema.decodeUnknownEffect(OrchestrationThread); const ProjectionProjectDbRowSchema = ProjectionProject.mapFields( Struct.assign({ defaultModelSelection: Schema.NullOr(Schema.fromJsonString(ModelSelection)), @@ -68,16 +68,26 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields( const ProjectionThreadProposedPlanDbRowSchema = ProjectionThreadProposedPlan; const ProjectionThreadDbRowSchema = ProjectionThread.mapFields( Struct.assign({ + title: TrimmedNonEmptyString, modelSelection: Schema.fromJsonString(ModelSelection), + branch: Schema.NullOr(TrimmedNonEmptyString), + worktreePath: Schema.NullOr(TrimmedNonEmptyString), }), ); const ProjectionThreadActivityDbRowSchema = ProjectionThreadActivity.mapFields( Struct.assign({ + kind: TrimmedNonEmptyString, + summary: TrimmedNonEmptyString, payload: Schema.fromJsonString(Schema.Unknown), sequence: Schema.NullOr(NonNegativeInt), }), ); -const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession; +const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession.mapFields( + Struct.assign({ + providerName: Schema.NullOr(TrimmedNonEmptyString), + lastError: Schema.NullOr(TrimmedNonEmptyString), + }), +); const ProjectionCheckpointDbRowSchema = ProjectionCheckpoint.mapFields( Struct.assign({ files: Schema.fromJsonString(Schema.Array(OrchestrationCheckpointFile)), @@ -1355,7 +1365,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { updatedAt: threadRow.value.updatedAt, archivedAt: threadRow.value.archivedAt, deletedAt: null, - messages: messageRows.map((row) => { + messages: messageRows.map((row): OrchestrationMessage => { const message = { id: row.messageId, role: row.role, @@ -1370,16 +1380,18 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { } return message; }), - proposedPlans: proposedPlanRows.map((row) => ({ - id: row.planId, - turnId: row.turnId, - planMarkdown: row.planMarkdown, - implementedAt: row.implementedAt, - implementationThreadId: row.implementationThreadId, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - })), - activities: activityRows.map((row) => { + proposedPlans: proposedPlanRows.map( + (row): OrchestrationProposedPlan => ({ + id: row.planId, + turnId: row.turnId, + planMarkdown: row.planMarkdown, + implementedAt: row.implementedAt, + implementationThreadId: row.implementationThreadId, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }), + ), + activities: activityRows.map((row): OrchestrationThreadActivity => { const activity = { id: row.activityId, tone: row.tone, @@ -1394,25 +1406,21 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { } return activity; }), - checkpoints: checkpointRows.map((row) => ({ - turnId: row.turnId, - checkpointTurnCount: row.checkpointTurnCount, - checkpointRef: row.checkpointRef, - status: row.status, - files: row.files, - assistantMessageId: row.assistantMessageId, - completedAt: row.completedAt, - })), + checkpoints: checkpointRows.map( + (row): OrchestrationCheckpointSummary => ({ + turnId: row.turnId, + checkpointTurnCount: row.checkpointTurnCount, + checkpointRef: row.checkpointRef, + status: row.status, + files: row.files, + assistantMessageId: row.assistantMessageId, + completedAt: row.completedAt, + }), + ), session: Option.isSome(sessionRow) ? mapSessionRow(sessionRow.value) : null, - }; + } satisfies OrchestrationThread; - return Option.some( - yield* decodeThread(thread).pipe( - Effect.mapError( - toPersistenceDecodeError("ProjectionSnapshotQuery.getThreadDetailById:decodeThread"), - ), - ), - ); + return Option.some(thread); }); return {