Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/scope-listtasks-pretooluse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
'@colony/storage': patch
'@colony/hooks': patch
---

Stop scanning the full task table on every PreToolUse tool call

`protectedLiveClaimConflict` in the PreToolUse hook used `listTasks(1_000_000)` to find conflicting protected-branch claims and then linearly filtered the result by `repo_root` and `isProtectedBranch(branch)`. With the task table growing into the thousands across all agents, that scan dominated p95 latency on every editor tool call and violated the <150ms hook-handler budget.

`@colony/storage` now exposes `listProtectedBranchTasksByRepo(repoRoot)`, a single index-backed query against the existing `UNIQUE(repo_root, branch)` constraint. The PreToolUse hook calls this in place of the unbounded scan; defensive `resolve()` and `isProtectedBranch()` checks remain inside the loop so storage path inconsistencies still get filtered out. No new migration is needed — the unique index already covers the new query shape.
79 changes: 41 additions & 38 deletions packages/core/src/stranded-rescue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,46 +357,49 @@ export function bulkRescueStrandedSessions(
// subsequent deletes/writes are atomic across processes. Two concurrent
// rescue callers could otherwise both read the same held claims outside
// the transaction and then both attempt to release and audit them.
const audit_observation_id = store.storage.transaction(() => {
// Re-read claims inside the transaction so the set we release matches
// exactly what is visible under the write lock — guards against a
// concurrent caller having already released some of them between the
// outer read and this point.
const liveClaims = heldClaimsForCandidate(store, candidate);
if (liveClaims.length === 0) return -1;
for (const claim of liveClaims) {
store.storage.releaseClaim({
task_id: claim.task_id,
file_path: claim.file_path,
session_id,
});
}
const auditId = store.addObservation({
session_id,
kind: 'rescue-stranded',
content: `Bulk rescue released ${liveClaims.length} claim(s) for stranded session ${session_id}; audit history retained.`,
metadata: {
kind: 'rescue-stranded',
action: 'bulk-release-claims',
stranded_session_id: session_id,
agent: row.agent,
repo_root: row.repo_root,
branch: row.branch,
repo_roots: row.repo_roots,
branches: row.branches,
task_ids: row.task_ids,
last_activity: row.last_activity,
held_claim_count: liveClaims.length,
released_claims: liveClaims.map((claim) => ({
const audit_observation_id = store.storage.transaction(
() => {
// Re-read claims inside the transaction so the set we release matches
// exactly what is visible under the write lock — guards against a
// concurrent caller having already released some of them between the
// outer read and this point.
const liveClaims = heldClaimsForCandidate(store, candidate);
if (liveClaims.length === 0) return -1;
for (const claim of liveClaims) {
store.storage.releaseClaim({
task_id: claim.task_id,
file_path: claim.file_path,
claimed_at: claim.claimed_at,
})),
},
});
store.storage.endSession(session_id, now);
return auditId;
}, { immediate: true });
session_id,
});
}
const auditId = store.addObservation({
session_id,
kind: 'rescue-stranded',
content: `Bulk rescue released ${liveClaims.length} claim(s) for stranded session ${session_id}; audit history retained.`,
metadata: {
kind: 'rescue-stranded',
action: 'bulk-release-claims',
stranded_session_id: session_id,
agent: row.agent,
repo_root: row.repo_root,
branch: row.branch,
repo_roots: row.repo_roots,
branches: row.branches,
task_ids: row.task_ids,
last_activity: row.last_activity,
held_claim_count: liveClaims.length,
released_claims: liveClaims.map((claim) => ({
task_id: claim.task_id,
file_path: claim.file_path,
claimed_at: claim.claimed_at,
})),
},
});
store.storage.endSession(session_id, now);
return auditId;
},
{ immediate: true },
);

// -1 means another concurrent caller already released the claims before
// this transaction acquired the write lock — skip rather than double-count.
Expand Down
139 changes: 71 additions & 68 deletions packages/core/src/task-thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1751,81 +1751,84 @@ export class TaskThread {
// writes are serialized across processes. Without IMMEDIATE, two concurrent
// cleanup callers both read the same expired claims in DEFERRED mode, then
// both attempt to write — producing duplicate claim-weakened observations.
return this.store.storage.transaction(() => {
const claims = this.claims().filter((claim) => {
if (claim.state !== 'handoff_pending') return false;
if (typeof claim.expires_at !== 'number' || now < claim.expires_at) return false;
if (normalizedFilePath !== null && claim.file_path !== normalizedFilePath) return false;
if (
args.handoff_observation_id !== undefined &&
claim.handoff_observation_id !== args.handoff_observation_id
) {
return false;
}
return true;
});
const audit_observation_ids: number[] = [];
const seenBatons = new Set<number>();
for (const claim of claims) {
if (claim.handoff_observation_id !== null) seenBatons.add(claim.handoff_observation_id);
if (claim.handoff_observation_id !== null) {
this.store.storage.markClaimWeakExpired({
return this.store.storage.transaction(
() => {
const claims = this.claims().filter((claim) => {
if (claim.state !== 'handoff_pending') return false;
if (typeof claim.expires_at !== 'number' || now < claim.expires_at) return false;
if (normalizedFilePath !== null && claim.file_path !== normalizedFilePath) return false;
if (
args.handoff_observation_id !== undefined &&
claim.handoff_observation_id !== args.handoff_observation_id
) {
return false;
}
return true;
});
const audit_observation_ids: number[] = [];
const seenBatons = new Set<number>();
for (const claim of claims) {
if (claim.handoff_observation_id !== null) seenBatons.add(claim.handoff_observation_id);
if (claim.handoff_observation_id !== null) {
this.store.storage.markClaimWeakExpired({
task_id: this.task_id,
file_path: claim.file_path,
session_id: claim.session_id,
handoff_observation_id: claim.handoff_observation_id,
});
}
const auditObservationId = this.store.addObservation({
session_id: args.session_id,
kind: 'claim-weakened',
content: `claim ${claim.file_path} downgraded to weak_expired from quota-pending owner ${claim.session_id}`,
task_id: this.task_id,
file_path: claim.file_path,
session_id: claim.session_id,
handoff_observation_id: claim.handoff_observation_id,
reply_to: claim.handoff_observation_id,
metadata: {
kind: 'claim-weakened',
file_path: claim.file_path,
previous_session_id: claim.session_id,
ownership_strength: 'weak',
state: 'weak_expired',
reason: 'quota_pending_expired',
handoff_observation_id: claim.handoff_observation_id,
previous_claimed_at: claim.claimed_at,
expires_at: claim.expires_at,
},
});
audit_observation_ids.push(auditObservationId);
recordReflexion(this.store, {
session_id: args.session_id,
task_id: this.task_id,
kind: 'rollback',
action: 'quota claim released',
observation_summary: `quota claim rolled back to weak_expired for task ${this.task_id}`,
reflection: 'expired quota claims should be weakened before another agent proceeds',
source_kind: 'claim-weakened',
source_observation_id: auditObservationId,
idempotency_key: `quota-release:${this.task_id}:${claim.file_path}:${claim.session_id}:${claim.handoff_observation_id ?? 'none'}`,
reply_to: claim.handoff_observation_id,
now,
tags: ['quota', 'claim'],
});
}
const auditObservationId = this.store.addObservation({
session_id: args.session_id,
kind: 'claim-weakened',
content: `claim ${claim.file_path} downgraded to weak_expired from quota-pending owner ${claim.session_id}`,
for (const batonId of seenBatons) {
this.expireQuotaBatonIfPending(batonId, now);
}
this.store.storage.touchTask(this.task_id, now);
return {
status: 'released_expired',
task_id: this.task_id,
reply_to: claim.handoff_observation_id,
metadata: {
kind: 'claim-weakened',
released_claims: claims.map((claim) => ({
file_path: claim.file_path,
previous_session_id: claim.session_id,
ownership_strength: 'weak',
state: 'weak_expired',
reason: 'quota_pending_expired',
handoff_observation_id: claim.handoff_observation_id,
previous_claimed_at: claim.claimed_at,
expires_at: claim.expires_at,
},
});
audit_observation_ids.push(auditObservationId);
recordReflexion(this.store, {
session_id: args.session_id,
task_id: this.task_id,
kind: 'rollback',
action: 'quota claim released',
observation_summary: `quota claim rolled back to weak_expired for task ${this.task_id}`,
reflection: 'expired quota claims should be weakened before another agent proceeds',
source_kind: 'claim-weakened',
source_observation_id: auditObservationId,
idempotency_key: `quota-release:${this.task_id}:${claim.file_path}:${claim.session_id}:${claim.handoff_observation_id ?? 'none'}`,
reply_to: claim.handoff_observation_id,
now,
tags: ['quota', 'claim'],
});
}
for (const batonId of seenBatons) {
this.expireQuotaBatonIfPending(batonId, now);
}
this.store.storage.touchTask(this.task_id, now);
return {
status: 'released_expired',
task_id: this.task_id,
released_claims: claims.map((claim) => ({
file_path: claim.file_path,
previous_session_id: claim.session_id,
handoff_observation_id: claim.handoff_observation_id,
state: 'weak_expired' as const,
})),
audit_observation_ids,
};
}, { immediate: true });
state: 'weak_expired' as const,
})),
audit_observation_ids,
};
},
{ immediate: true },
);
}

private assertTaskExists(): void {
Expand Down
10 changes: 2 additions & 8 deletions packages/core/test/claim-lifecycle-concurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ describe('releaseExpiredQuotaClaims idempotency', () => {
handoff_observation_id?: number;
reason?: string;
};
return (
meta.reason === 'quota_pending_expired' &&
meta.handoff_observation_id === handoffId
);
return meta.reason === 'quota_pending_expired' && meta.handoff_observation_id === handoffId;
});
expect(weakenedAfterFirst).toHaveLength(1);

Expand All @@ -176,10 +173,7 @@ describe('releaseExpiredQuotaClaims idempotency', () => {
handoff_observation_id?: number;
reason?: string;
};
return (
meta.reason === 'quota_pending_expired' &&
meta.handoff_observation_id === handoffId
);
return meta.reason === 'quota_pending_expired' && meta.handoff_observation_id === handoffId;
});
expect(weakenedAfterSecond).toHaveLength(1);
});
Expand Down
3 changes: 1 addition & 2 deletions packages/hooks/src/handlers/pre-tool-use.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { extractTouchedFiles, pathExtractionWarningsForToolUse } from './post-to

const CLAIM_WARNING_DEBOUNCE_MS = 60_000;
const CLAIM_BEFORE_EDIT_FALLBACK_SESSION_ID = 'colony-pre-tool-use-diagnostics';
const ALL_TASKS_LIMIT = 1_000_000;
const PROTECTED_BRANCHES = new Set(['main', 'dev', 'master', 'trunk']);
const claimWarningDebounceByStore = new WeakMap<MemoryStore, Map<string, number>>();

Expand Down Expand Up @@ -433,7 +432,7 @@ function protectedLiveClaimConflict(
const repoRoot = candidate?.repo_root ?? scope.repo_root;
if (!repoRoot) return null;
const normalizedRepoRoot = resolve(repoRoot);
const tasks = store.storage.listTasks(ALL_TASKS_LIMIT);
const tasks = store.storage.listProtectedBranchTasksByRepo(repoRoot);
const conflicts: ClaimConflictInfo[] = [];

for (const task of tasks) {
Expand Down
16 changes: 16 additions & 0 deletions packages/storage/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,22 @@ export class Storage {
.all(limit) as TaskRow[];
}

/**
* Tasks rooted at `repoRoot` whose `branch` is one of `PROTECTED_BRANCH_NAMES`.
* Backed by the existing `UNIQUE(repo_root, branch)` index on `tasks`.
* Used by the PreToolUse hook to detect protected-branch claim conflicts
* without scanning the full task table on every editor tool call.
*/
listProtectedBranchTasksByRepo(repoRoot: string): TaskRow[] {
const names = Array.from(PROTECTED_BRANCH_NAMES);
const placeholders = names.map(() => '?').join(', ');
return this.db
.prepare(
`SELECT * FROM tasks WHERE repo_root = ? AND branch IN (${placeholders}) ORDER BY updated_at DESC`,
)
.all(repoRoot, ...names) as TaskRow[];
}

touchTask(id: number, ts = Date.now()): void {
this.db.prepare('UPDATE tasks SET updated_at = ? WHERE id = ?').run(ts, id);
}
Expand Down
40 changes: 40 additions & 0 deletions packages/storage/test/tasks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,46 @@ describe('tasks', () => {

expect(storage.getClaim(task.id, 'src/x.ts')).toMatchObject({ state: 'weak_expired' });
});

it('listProtectedBranchTasksByRepo returns only tasks on protected branches at the given repo', () => {
seedSessions('s-a');
const targetMain = storage.findOrCreateTask({
title: 'main lane',
repo_root: '/repo-a',
branch: 'main',
created_by: 's-a',
});
const targetDev = storage.findOrCreateTask({
title: 'dev lane',
repo_root: '/repo-a',
branch: 'dev',
created_by: 's-a',
});
storage.findOrCreateTask({
title: 'agent lane same repo',
repo_root: '/repo-a',
branch: 'agent/claude/feature',
created_by: 's-a',
});
storage.findOrCreateTask({
title: 'main lane different repo',
repo_root: '/repo-b',
branch: 'main',
created_by: 's-a',
});
for (let i = 0; i < 25; i++) {
storage.findOrCreateTask({
title: `noise ${i}`,
repo_root: '/repo-a',
branch: `agent/claude/noise-${i}`,
created_by: 's-a',
});
}

const rows = storage.listProtectedBranchTasksByRepo('/repo-a');
const ids = rows.map((row) => row.id).sort((a, b) => a - b);
expect(ids).toEqual([targetMain.id, targetDev.id].sort((a, b) => a - b));
});
});

function rewriteTaskClaimsAsOldSchema(dbPath: string): void {
Expand Down
Loading