Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,147 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => {
}),
);

it.effect("normalizes targeted thread detail fields from projection rows", () =>
Effect.gen(function* () {
const snapshotQuery = yield* ProjectionSnapshotQuery;
const sql = yield* SqlClient.SqlClient;

yield* sql`DELETE FROM projection_projects`;
yield* sql`DELETE FROM projection_threads`;
yield* sql`DELETE FROM projection_thread_messages`;
yield* sql`DELETE FROM projection_thread_proposed_plans`;
yield* sql`DELETE FROM projection_thread_activities`;
yield* sql`DELETE FROM projection_thread_sessions`;
yield* sql`DELETE FROM projection_turns`;

yield* sql`
INSERT INTO projection_projects (
project_id,
title,
workspace_root,
default_model_selection_json,
scripts_json,
created_at,
updated_at,
deleted_at
)
VALUES (
'project-normalized',
'Normalized Project',
'/tmp/normalized-project',
'{"provider":"codex","model":"gpt-5-codex"}',
'[]',
'2026-03-03T00:00:00.000Z',
'2026-03-03T00:00:01.000Z',
NULL
)
`;

yield* sql`
INSERT INTO projection_threads (
thread_id,
project_id,
title,
model_selection_json,
runtime_mode,
interaction_mode,
branch,
worktree_path,
latest_turn_id,
latest_user_message_at,
pending_approval_count,
pending_user_input_count,
has_actionable_proposed_plan,
created_at,
updated_at,
archived_at,
deleted_at
)
VALUES (
'thread-normalized',
'project-normalized',
' Normalized Thread ',
'{"provider":"codex","model":"gpt-5-codex"}',
'full-access',
'default',
' feature/normalized ',
' /tmp/normalized-worktree ',
NULL,
NULL,
0,
0,
0,
'2026-03-03T00:00:02.000Z',
'2026-03-03T00:00:03.000Z',
NULL,
NULL
)
`;

yield* sql`
INSERT INTO projection_thread_activities (
activity_id,
thread_id,
turn_id,
tone,
kind,
summary,
payload_json,
created_at
)
VALUES (
'activity-normalized',
'thread-normalized',
NULL,
'info',
' task.progress ',
' Running checks ',
'{"detail":"still available"}',
'2026-03-03T00:00:04.000Z'
)
`;

yield* sql`
INSERT INTO projection_thread_sessions (
thread_id,
status,
provider_name,
provider_session_id,
provider_thread_id,
runtime_mode,
active_turn_id,
last_error,
updated_at
)
VALUES (
'thread-normalized',
'error',
' codex ',
'provider-session-normalized',
'provider-thread-normalized',
'full-access',
NULL,
' recoverable issue ',
'2026-03-03T00:00:05.000Z'
)
`;

const threadDetail = yield* snapshotQuery.getThreadDetailById(
ThreadId.make("thread-normalized"),
);
assert.equal(threadDetail._tag, "Some");
if (threadDetail._tag === "Some") {
assert.equal(threadDetail.value.title, "Normalized Thread");
assert.equal(threadDetail.value.branch, "feature/normalized");
assert.equal(threadDetail.value.worktreePath, "/tmp/normalized-worktree");
assert.equal(threadDetail.value.activities[0]?.kind, "task.progress");
assert.equal(threadDetail.value.activities[0]?.summary, "Running checks");
assert.equal(threadDetail.value.session?.providerName, "codex");
assert.equal(threadDetail.value.session?.lastError, "recoverable issue");
}
}),
);

it.effect("reads single-thread checkpoint context without hydrating unrelated threads", () =>
Effect.gen(function* () {
const snapshotQuery = yield* ProjectionSnapshotQuery;
Expand Down
70 changes: 39 additions & 31 deletions apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import {
OrchestrationProposedPlanId,
OrchestrationReadModel,
OrchestrationShellSnapshot,
OrchestrationThread,
ProjectScript,
TrimmedNonEmptyString,
TurnId,
type OrchestrationCheckpointSummary,
type OrchestrationLatestTurn,
Expand All @@ -17,6 +17,7 @@ import {
type OrchestrationProposedPlan,
type OrchestrationProject,
type OrchestrationSession,
type OrchestrationThread,
type OrchestrationThreadActivity,
type OrchestrationThreadShell,
ModelSelection,
Expand Down Expand Up @@ -52,7 +53,6 @@ import {

const decodeReadModel = Schema.decodeUnknownEffect(OrchestrationReadModel);
const decodeShellSnapshot = Schema.decodeUnknownEffect(OrchestrationShellSnapshot);
const decodeThread = Schema.decodeUnknownEffect(OrchestrationThread);
const ProjectionProjectDbRowSchema = ProjectionProject.mapFields(
Struct.assign({
defaultModelSelection: Schema.NullOr(Schema.fromJsonString(ModelSelection)),
Expand All @@ -68,16 +68,26 @@ const ProjectionThreadMessageDbRowSchema = ProjectionThreadMessage.mapFields(
const ProjectionThreadProposedPlanDbRowSchema = ProjectionThreadProposedPlan;
const ProjectionThreadDbRowSchema = ProjectionThread.mapFields(
Struct.assign({
title: TrimmedNonEmptyString,
modelSelection: Schema.fromJsonString(ModelSelection),
branch: Schema.NullOr(TrimmedNonEmptyString),
worktreePath: Schema.NullOr(TrimmedNonEmptyString),
}),
);
const ProjectionThreadActivityDbRowSchema = ProjectionThreadActivity.mapFields(
Struct.assign({
kind: TrimmedNonEmptyString,
summary: TrimmedNonEmptyString,
payload: Schema.fromJsonString(Schema.Unknown),
sequence: Schema.NullOr(NonNegativeInt),
}),
);
const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession;
const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession.mapFields(
Struct.assign({
providerName: Schema.NullOr(TrimmedNonEmptyString),
lastError: Schema.NullOr(TrimmedNonEmptyString),
}),
);
const ProjectionCheckpointDbRowSchema = ProjectionCheckpoint.mapFields(
Struct.assign({
files: Schema.fromJsonString(Schema.Array(OrchestrationCheckpointFile)),
Expand Down Expand Up @@ -1355,7 +1365,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () {
updatedAt: threadRow.value.updatedAt,
archivedAt: threadRow.value.archivedAt,
deletedAt: null,
messages: messageRows.map((row) => {
messages: messageRows.map((row): OrchestrationMessage => {
const message = {
id: row.messageId,
role: row.role,
Expand All @@ -1370,16 +1380,18 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () {
}
return message;
}),
proposedPlans: proposedPlanRows.map((row) => ({
id: row.planId,
turnId: row.turnId,
planMarkdown: row.planMarkdown,
implementedAt: row.implementedAt,
implementationThreadId: row.implementationThreadId,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
})),
activities: activityRows.map((row) => {
proposedPlans: proposedPlanRows.map(
(row): OrchestrationProposedPlan => ({
id: row.planId,
turnId: row.turnId,
planMarkdown: row.planMarkdown,
implementedAt: row.implementedAt,
implementationThreadId: row.implementationThreadId,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
}),
),
activities: activityRows.map((row): OrchestrationThreadActivity => {
const activity = {
id: row.activityId,
tone: row.tone,
Expand All @@ -1394,25 +1406,21 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () {
}
return activity;
}),
checkpoints: checkpointRows.map((row) => ({
turnId: row.turnId,
checkpointTurnCount: row.checkpointTurnCount,
checkpointRef: row.checkpointRef,
status: row.status,
files: row.files,
assistantMessageId: row.assistantMessageId,
completedAt: row.completedAt,
})),
checkpoints: checkpointRows.map(
(row): OrchestrationCheckpointSummary => ({
turnId: row.turnId,
checkpointTurnCount: row.checkpointTurnCount,
checkpointRef: row.checkpointRef,
status: row.status,
files: row.files,
assistantMessageId: row.assistantMessageId,
completedAt: row.completedAt,
}),
),
session: Option.isSome(sessionRow) ? mapSessionRow(sessionRow.value) : null,
};
} satisfies OrchestrationThread;

return Option.some(
yield* decodeThread(thread).pipe(
Effect.mapError(
toPersistenceDecodeError("ProjectionSnapshotQuery.getThreadDetailById:decodeThread"),
),
),
);
return Option.some(thread);
});

return {
Expand Down
Loading