diff --git a/.changeset/scope-listtasks-pretooluse.md b/.changeset/scope-listtasks-pretooluse.md new file mode 100644 index 00000000..ebaa7d8f --- /dev/null +++ b/.changeset/scope-listtasks-pretooluse.md @@ -0,0 +1,10 @@ +--- +'@colony/storage': patch +'@colony/hooks': patch +--- + +Stop scanning the full task table on every PreToolUse tool call + +`protectedLiveClaimConflict` in the PreToolUse hook used `listTasks(1_000_000)` to find conflicting protected-branch claims and then linearly filtered the result by `repo_root` and `isProtectedBranch(branch)`. With the task table growing into the thousands across all agents, that scan dominated p95 latency on every editor tool call and violated the <150ms hook-handler budget. + +`@colony/storage` now exposes `listProtectedBranchTasksByRepo(repoRoot)`, a single index-backed query against the existing `UNIQUE(repo_root, branch)` constraint. The PreToolUse hook calls this in place of the unbounded scan; defensive `resolve()` and `isProtectedBranch()` checks remain inside the loop so storage path inconsistencies still get filtered out. No new migration is needed — the unique index already covers the new query shape. diff --git a/packages/core/src/stranded-rescue.ts b/packages/core/src/stranded-rescue.ts index 5d520ff5..0e596c8c 100644 --- a/packages/core/src/stranded-rescue.ts +++ b/packages/core/src/stranded-rescue.ts @@ -357,46 +357,49 @@ export function bulkRescueStrandedSessions( // subsequent deletes/writes are atomic across processes. Two concurrent // rescue callers could otherwise both read the same held claims outside // the transaction and then both attempt to release and audit them. - const audit_observation_id = store.storage.transaction(() => { - // Re-read claims inside the transaction so the set we release matches - // exactly what is visible under the write lock — guards against a - // concurrent caller having already released some of them between the - // outer read and this point. - const liveClaims = heldClaimsForCandidate(store, candidate); - if (liveClaims.length === 0) return -1; - for (const claim of liveClaims) { - store.storage.releaseClaim({ - task_id: claim.task_id, - file_path: claim.file_path, - session_id, - }); - } - const auditId = store.addObservation({ - session_id, - kind: 'rescue-stranded', - content: `Bulk rescue released ${liveClaims.length} claim(s) for stranded session ${session_id}; audit history retained.`, - metadata: { - kind: 'rescue-stranded', - action: 'bulk-release-claims', - stranded_session_id: session_id, - agent: row.agent, - repo_root: row.repo_root, - branch: row.branch, - repo_roots: row.repo_roots, - branches: row.branches, - task_ids: row.task_ids, - last_activity: row.last_activity, - held_claim_count: liveClaims.length, - released_claims: liveClaims.map((claim) => ({ + const audit_observation_id = store.storage.transaction( + () => { + // Re-read claims inside the transaction so the set we release matches + // exactly what is visible under the write lock — guards against a + // concurrent caller having already released some of them between the + // outer read and this point. + const liveClaims = heldClaimsForCandidate(store, candidate); + if (liveClaims.length === 0) return -1; + for (const claim of liveClaims) { + store.storage.releaseClaim({ task_id: claim.task_id, file_path: claim.file_path, - claimed_at: claim.claimed_at, - })), - }, - }); - store.storage.endSession(session_id, now); - return auditId; - }, { immediate: true }); + session_id, + }); + } + const auditId = store.addObservation({ + session_id, + kind: 'rescue-stranded', + content: `Bulk rescue released ${liveClaims.length} claim(s) for stranded session ${session_id}; audit history retained.`, + metadata: { + kind: 'rescue-stranded', + action: 'bulk-release-claims', + stranded_session_id: session_id, + agent: row.agent, + repo_root: row.repo_root, + branch: row.branch, + repo_roots: row.repo_roots, + branches: row.branches, + task_ids: row.task_ids, + last_activity: row.last_activity, + held_claim_count: liveClaims.length, + released_claims: liveClaims.map((claim) => ({ + task_id: claim.task_id, + file_path: claim.file_path, + claimed_at: claim.claimed_at, + })), + }, + }); + store.storage.endSession(session_id, now); + return auditId; + }, + { immediate: true }, + ); // -1 means another concurrent caller already released the claims before // this transaction acquired the write lock — skip rather than double-count. diff --git a/packages/core/src/task-thread.ts b/packages/core/src/task-thread.ts index 435194aa..71b7ecc8 100644 --- a/packages/core/src/task-thread.ts +++ b/packages/core/src/task-thread.ts @@ -1751,81 +1751,84 @@ export class TaskThread { // writes are serialized across processes. Without IMMEDIATE, two concurrent // cleanup callers both read the same expired claims in DEFERRED mode, then // both attempt to write — producing duplicate claim-weakened observations. - return this.store.storage.transaction(() => { - const claims = this.claims().filter((claim) => { - if (claim.state !== 'handoff_pending') return false; - if (typeof claim.expires_at !== 'number' || now < claim.expires_at) return false; - if (normalizedFilePath !== null && claim.file_path !== normalizedFilePath) return false; - if ( - args.handoff_observation_id !== undefined && - claim.handoff_observation_id !== args.handoff_observation_id - ) { - return false; - } - return true; - }); - const audit_observation_ids: number[] = []; - const seenBatons = new Set(); - for (const claim of claims) { - if (claim.handoff_observation_id !== null) seenBatons.add(claim.handoff_observation_id); - if (claim.handoff_observation_id !== null) { - this.store.storage.markClaimWeakExpired({ + return this.store.storage.transaction( + () => { + const claims = this.claims().filter((claim) => { + if (claim.state !== 'handoff_pending') return false; + if (typeof claim.expires_at !== 'number' || now < claim.expires_at) return false; + if (normalizedFilePath !== null && claim.file_path !== normalizedFilePath) return false; + if ( + args.handoff_observation_id !== undefined && + claim.handoff_observation_id !== args.handoff_observation_id + ) { + return false; + } + return true; + }); + const audit_observation_ids: number[] = []; + const seenBatons = new Set(); + for (const claim of claims) { + if (claim.handoff_observation_id !== null) seenBatons.add(claim.handoff_observation_id); + if (claim.handoff_observation_id !== null) { + this.store.storage.markClaimWeakExpired({ + task_id: this.task_id, + file_path: claim.file_path, + session_id: claim.session_id, + handoff_observation_id: claim.handoff_observation_id, + }); + } + const auditObservationId = this.store.addObservation({ + session_id: args.session_id, + kind: 'claim-weakened', + content: `claim ${claim.file_path} downgraded to weak_expired from quota-pending owner ${claim.session_id}`, task_id: this.task_id, - file_path: claim.file_path, - session_id: claim.session_id, - handoff_observation_id: claim.handoff_observation_id, + reply_to: claim.handoff_observation_id, + metadata: { + kind: 'claim-weakened', + file_path: claim.file_path, + previous_session_id: claim.session_id, + ownership_strength: 'weak', + state: 'weak_expired', + reason: 'quota_pending_expired', + handoff_observation_id: claim.handoff_observation_id, + previous_claimed_at: claim.claimed_at, + expires_at: claim.expires_at, + }, + }); + audit_observation_ids.push(auditObservationId); + recordReflexion(this.store, { + session_id: args.session_id, + task_id: this.task_id, + kind: 'rollback', + action: 'quota claim released', + observation_summary: `quota claim rolled back to weak_expired for task ${this.task_id}`, + reflection: 'expired quota claims should be weakened before another agent proceeds', + source_kind: 'claim-weakened', + source_observation_id: auditObservationId, + idempotency_key: `quota-release:${this.task_id}:${claim.file_path}:${claim.session_id}:${claim.handoff_observation_id ?? 'none'}`, + reply_to: claim.handoff_observation_id, + now, + tags: ['quota', 'claim'], }); } - const auditObservationId = this.store.addObservation({ - session_id: args.session_id, - kind: 'claim-weakened', - content: `claim ${claim.file_path} downgraded to weak_expired from quota-pending owner ${claim.session_id}`, + for (const batonId of seenBatons) { + this.expireQuotaBatonIfPending(batonId, now); + } + this.store.storage.touchTask(this.task_id, now); + return { + status: 'released_expired', task_id: this.task_id, - reply_to: claim.handoff_observation_id, - metadata: { - kind: 'claim-weakened', + released_claims: claims.map((claim) => ({ file_path: claim.file_path, previous_session_id: claim.session_id, - ownership_strength: 'weak', - state: 'weak_expired', - reason: 'quota_pending_expired', handoff_observation_id: claim.handoff_observation_id, - previous_claimed_at: claim.claimed_at, - expires_at: claim.expires_at, - }, - }); - audit_observation_ids.push(auditObservationId); - recordReflexion(this.store, { - session_id: args.session_id, - task_id: this.task_id, - kind: 'rollback', - action: 'quota claim released', - observation_summary: `quota claim rolled back to weak_expired for task ${this.task_id}`, - reflection: 'expired quota claims should be weakened before another agent proceeds', - source_kind: 'claim-weakened', - source_observation_id: auditObservationId, - idempotency_key: `quota-release:${this.task_id}:${claim.file_path}:${claim.session_id}:${claim.handoff_observation_id ?? 'none'}`, - reply_to: claim.handoff_observation_id, - now, - tags: ['quota', 'claim'], - }); - } - for (const batonId of seenBatons) { - this.expireQuotaBatonIfPending(batonId, now); - } - this.store.storage.touchTask(this.task_id, now); - return { - status: 'released_expired', - task_id: this.task_id, - released_claims: claims.map((claim) => ({ - file_path: claim.file_path, - previous_session_id: claim.session_id, - handoff_observation_id: claim.handoff_observation_id, - state: 'weak_expired' as const, - })), - audit_observation_ids, - }; - }, { immediate: true }); + state: 'weak_expired' as const, + })), + audit_observation_ids, + }; + }, + { immediate: true }, + ); } private assertTaskExists(): void { diff --git a/packages/core/test/claim-lifecycle-concurrency.test.ts b/packages/core/test/claim-lifecycle-concurrency.test.ts index e773ea7e..d2abe9be 100644 --- a/packages/core/test/claim-lifecycle-concurrency.test.ts +++ b/packages/core/test/claim-lifecycle-concurrency.test.ts @@ -152,10 +152,7 @@ describe('releaseExpiredQuotaClaims idempotency', () => { handoff_observation_id?: number; reason?: string; }; - return ( - meta.reason === 'quota_pending_expired' && - meta.handoff_observation_id === handoffId - ); + return meta.reason === 'quota_pending_expired' && meta.handoff_observation_id === handoffId; }); expect(weakenedAfterFirst).toHaveLength(1); @@ -176,10 +173,7 @@ describe('releaseExpiredQuotaClaims idempotency', () => { handoff_observation_id?: number; reason?: string; }; - return ( - meta.reason === 'quota_pending_expired' && - meta.handoff_observation_id === handoffId - ); + return meta.reason === 'quota_pending_expired' && meta.handoff_observation_id === handoffId; }); expect(weakenedAfterSecond).toHaveLength(1); }); diff --git a/packages/hooks/src/handlers/pre-tool-use.ts b/packages/hooks/src/handlers/pre-tool-use.ts index 24b84b4e..9159b9d4 100644 --- a/packages/hooks/src/handlers/pre-tool-use.ts +++ b/packages/hooks/src/handlers/pre-tool-use.ts @@ -22,7 +22,6 @@ import { extractTouchedFiles, pathExtractionWarningsForToolUse } from './post-to const CLAIM_WARNING_DEBOUNCE_MS = 60_000; const CLAIM_BEFORE_EDIT_FALLBACK_SESSION_ID = 'colony-pre-tool-use-diagnostics'; -const ALL_TASKS_LIMIT = 1_000_000; const PROTECTED_BRANCHES = new Set(['main', 'dev', 'master', 'trunk']); const claimWarningDebounceByStore = new WeakMap>(); @@ -433,7 +432,7 @@ function protectedLiveClaimConflict( const repoRoot = candidate?.repo_root ?? scope.repo_root; if (!repoRoot) return null; const normalizedRepoRoot = resolve(repoRoot); - const tasks = store.storage.listTasks(ALL_TASKS_LIMIT); + const tasks = store.storage.listProtectedBranchTasksByRepo(repoRoot); const conflicts: ClaimConflictInfo[] = []; for (const task of tasks) { diff --git a/packages/storage/src/storage.ts b/packages/storage/src/storage.ts index aad10a3e..5186bc8e 100644 --- a/packages/storage/src/storage.ts +++ b/packages/storage/src/storage.ts @@ -1566,6 +1566,22 @@ export class Storage { .all(limit) as TaskRow[]; } + /** + * Tasks rooted at `repoRoot` whose `branch` is one of `PROTECTED_BRANCH_NAMES`. + * Backed by the existing `UNIQUE(repo_root, branch)` index on `tasks`. + * Used by the PreToolUse hook to detect protected-branch claim conflicts + * without scanning the full task table on every editor tool call. + */ + listProtectedBranchTasksByRepo(repoRoot: string): TaskRow[] { + const names = Array.from(PROTECTED_BRANCH_NAMES); + const placeholders = names.map(() => '?').join(', '); + return this.db + .prepare( + `SELECT * FROM tasks WHERE repo_root = ? AND branch IN (${placeholders}) ORDER BY updated_at DESC`, + ) + .all(repoRoot, ...names) as TaskRow[]; + } + touchTask(id: number, ts = Date.now()): void { this.db.prepare('UPDATE tasks SET updated_at = ? WHERE id = ?').run(ts, id); } diff --git a/packages/storage/test/tasks.test.ts b/packages/storage/test/tasks.test.ts index 9699bb1e..52e324d4 100644 --- a/packages/storage/test/tasks.test.ts +++ b/packages/storage/test/tasks.test.ts @@ -466,6 +466,46 @@ describe('tasks', () => { expect(storage.getClaim(task.id, 'src/x.ts')).toMatchObject({ state: 'weak_expired' }); }); + + it('listProtectedBranchTasksByRepo returns only tasks on protected branches at the given repo', () => { + seedSessions('s-a'); + const targetMain = storage.findOrCreateTask({ + title: 'main lane', + repo_root: '/repo-a', + branch: 'main', + created_by: 's-a', + }); + const targetDev = storage.findOrCreateTask({ + title: 'dev lane', + repo_root: '/repo-a', + branch: 'dev', + created_by: 's-a', + }); + storage.findOrCreateTask({ + title: 'agent lane same repo', + repo_root: '/repo-a', + branch: 'agent/claude/feature', + created_by: 's-a', + }); + storage.findOrCreateTask({ + title: 'main lane different repo', + repo_root: '/repo-b', + branch: 'main', + created_by: 's-a', + }); + for (let i = 0; i < 25; i++) { + storage.findOrCreateTask({ + title: `noise ${i}`, + repo_root: '/repo-a', + branch: `agent/claude/noise-${i}`, + created_by: 's-a', + }); + } + + const rows = storage.listProtectedBranchTasksByRepo('/repo-a'); + const ids = rows.map((row) => row.id).sort((a, b) => a - b); + expect(ids).toEqual([targetMain.id, targetDev.id].sort((a, b) => a - b)); + }); }); function rewriteTaskClaimsAsOldSchema(dbPath: string): void {