-
Notifications
You must be signed in to change notification settings - Fork 8
fix(journal): serialize concurrent appends via lockfile #1002
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f339222
f5c737c
82ffccd
5177133
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,176 @@ | ||
| import crypto from 'node:crypto'; | ||
| import fs from 'node:fs'; | ||
| import path from 'node:path'; | ||
| import { debug, warn } from '../../infrastructure/logger.js'; | ||
|
|
||
| export const JOURNAL_FILENAME = 'changes.journal'; | ||
| const HEADER_PREFIX = '# codegraph-journal v1 '; | ||
| const LOCK_SUFFIX = '.lock'; | ||
| const LOCK_TIMEOUT_MS = 5_000; | ||
| const LOCK_STALE_MS = 30_000; | ||
| const LOCK_RETRY_MS = 25; | ||
|
|
||
| // Busy-spin sleep avoids blocking the Node.js event loop (unlike Atomics.wait, | ||
| // which freezes all I/O and timer callbacks). The retry interval is short | ||
| // (25ms), so the CPU cost is negligible while keeping unrelated callbacks | ||
| // responsive in watcher processes. | ||
| function sleepSync(ms: number): void { | ||
| const end = process.hrtime.bigint() + BigInt(ms) * 1_000_000n; | ||
| while (process.hrtime.bigint() < end) { | ||
| /* spin */ | ||
| } | ||
| } | ||
|
|
||
| function isPidAlive(pid: number): boolean { | ||
| if (!Number.isFinite(pid) || pid <= 0) return false; | ||
| try { | ||
| process.kill(pid, 0); | ||
| return true; | ||
| } catch (e) { | ||
| // EPERM means the process exists but we lack permission — still alive. | ||
| return (e as NodeJS.ErrnoException).code === 'EPERM'; | ||
| } | ||
| } | ||
|
|
||
| interface AcquiredLock { | ||
| fd: number; | ||
| nonce: string; | ||
| } | ||
|
|
||
| /** | ||
| * Steal a stale lockfile atomically via write-tmp + rename. | ||
| * | ||
| * Using rename (which is atomic on POSIX and Windows) avoids the TOCTOU race | ||
| * inherent to the unlink + openSync('wx') pattern: if two stealers both | ||
| * observed the same stale holder, one's unlink could cross the other's fresh | ||
| * acquisition, admitting two writers into the critical section. | ||
| * | ||
| * After rename, we re-read the lockfile to confirm our nonce — if another | ||
| * stealer's rename landed after ours, they own the lock and we retry. | ||
| */ | ||
| function trySteal(lockPath: string): AcquiredLock | null { | ||
| const nonce = `${process.pid}-${crypto.randomBytes(8).toString('hex')}`; | ||
| const tmpPath = `${lockPath}.${nonce}.tmp`; | ||
| try { | ||
| fs.writeFileSync(tmpPath, `${process.pid}\n${nonce}\n`, { flag: 'w' }); | ||
| } catch { | ||
| return null; | ||
| } | ||
|
|
||
| try { | ||
| // Atomic replace: overwrites the stale lockfile. | ||
| fs.renameSync(tmpPath, lockPath); | ||
| } catch { | ||
| try { | ||
| fs.unlinkSync(tmpPath); | ||
| } catch { | ||
| /* ignore */ | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| // Verify the nonce — another stealer's rename may have landed after ours. | ||
| let content: string; | ||
| try { | ||
| content = fs.readFileSync(lockPath, 'utf-8'); | ||
| } catch { | ||
| return null; | ||
| } | ||
| if (!content.includes(nonce)) { | ||
| // Lost the race to another stealer; do NOT unlink their live lockfile. | ||
| return null; | ||
| } | ||
|
|
||
| let fd: number; | ||
| try { | ||
| // Re-open r+ so we have a persistent fd the caller can close on release. | ||
| fd = fs.openSync(lockPath, 'r+'); | ||
| } catch { | ||
| return null; | ||
| } | ||
| return { fd, nonce }; | ||
| } | ||
|
|
||
| function acquireJournalLock(lockPath: string): AcquiredLock { | ||
| const start = Date.now(); | ||
| for (;;) { | ||
| const nonce = `${process.pid}-${crypto.randomBytes(8).toString('hex')}`; | ||
| try { | ||
| const fd = fs.openSync(lockPath, 'wx'); | ||
| try { | ||
| fs.writeSync(fd, `${process.pid}\n${nonce}\n`); | ||
| } catch { | ||
| /* PID stamp is advisory; fd is still exclusive */ | ||
| } | ||
| return { fd, nonce }; | ||
| } catch (e) { | ||
| if ((e as NodeJS.ErrnoException).code !== 'EEXIST') throw e; | ||
| } | ||
|
Comment on lines
+98
to
+108
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If
The comment "PID stamp is advisory; fd is still exclusive" only holds when there are no concurrent waiters, but a journal with a watcher + build process is exactly the concurrent scenario this lock is meant to protect. Fix: when the write fails, release the fd and unlink synchronously (we still hold the exclusive fd at that point) and retry, rather than proceeding into |
||
|
|
||
| let holderAlive = true; | ||
| try { | ||
| const pidContent = fs.readFileSync(lockPath, 'utf-8').split('\n')[0]!.trim(); | ||
| holderAlive = isPidAlive(Number(pidContent)); | ||
| } catch { | ||
| /* unreadable — fall through to age check */ | ||
| } | ||
|
|
||
| let shouldSteal = !holderAlive; | ||
| if (holderAlive) { | ||
| try { | ||
| const stat = fs.statSync(lockPath); | ||
| if (Date.now() - stat.mtimeMs > LOCK_STALE_MS) { | ||
| shouldSteal = true; | ||
| } | ||
| } catch { | ||
| /* stat failed — keep retrying */ | ||
| } | ||
| } | ||
|
|
||
| if (shouldSteal) { | ||
| const stolen = trySteal(lockPath); | ||
| if (stolen) return stolen; | ||
| // Steal failed or lost the race — fall through to timeout check & retry. | ||
| } | ||
|
|
||
| if (Date.now() - start > LOCK_TIMEOUT_MS) { | ||
| throw new Error(`Failed to acquire journal lock at ${lockPath} within ${LOCK_TIMEOUT_MS}ms`); | ||
| } | ||
| sleepSync(LOCK_RETRY_MS); | ||
| } | ||
| } | ||
|
|
||
| function releaseJournalLock(lockPath: string, lock: AcquiredLock): void { | ||
| try { | ||
| fs.closeSync(lock.fd); | ||
| } catch { | ||
| /* ignore */ | ||
| } | ||
| // Only unlink if the lockfile still carries our nonce — if another stealer | ||
| // decided we were stale and replaced it, we must not unlink their live lock. | ||
| try { | ||
| const content = fs.readFileSync(lockPath, 'utf-8'); | ||
| if (content.includes(lock.nonce)) { | ||
| fs.unlinkSync(lockPath); | ||
| } | ||
| } catch { | ||
| /* lockfile gone or unreadable — nothing to unlink */ | ||
| } | ||
| } | ||
|
|
||
| function withJournalLock<T>(rootDir: string, fn: () => T): T { | ||
| const dir = path.join(rootDir, '.codegraph'); | ||
| if (!fs.existsSync(dir)) { | ||
| fs.mkdirSync(dir, { recursive: true }); | ||
| } | ||
| const lockPath = path.join(dir, `${JOURNAL_FILENAME}${LOCK_SUFFIX}`); | ||
| const lock = acquireJournalLock(lockPath); | ||
| try { | ||
| return fn(); | ||
| } finally { | ||
| releaseJournalLock(lockPath, lock); | ||
| } | ||
| } | ||
|
|
||
| interface JournalResult { | ||
| valid: boolean; | ||
|
|
@@ -63,43 +230,37 @@ export function appendJournalEntries( | |
| rootDir: string, | ||
| entries: Array<{ file: string; deleted?: boolean }>, | ||
| ): void { | ||
| const dir = path.join(rootDir, '.codegraph'); | ||
| const journalPath = path.join(dir, JOURNAL_FILENAME); | ||
| withJournalLock(rootDir, () => { | ||
| const journalPath = path.join(rootDir, '.codegraph', JOURNAL_FILENAME); | ||
|
|
||
| if (!fs.existsSync(dir)) { | ||
| fs.mkdirSync(dir, { recursive: true }); | ||
| } | ||
| if (!fs.existsSync(journalPath)) { | ||
| fs.writeFileSync(journalPath, `${HEADER_PREFIX}0\n`); | ||
| } | ||
|
|
||
| if (!fs.existsSync(journalPath)) { | ||
| fs.writeFileSync(journalPath, `${HEADER_PREFIX}0\n`); | ||
| } | ||
| const lines = entries.map((e) => { | ||
| if (e.deleted) return `DELETED ${e.file}`; | ||
| return e.file; | ||
| }); | ||
|
|
||
| const lines = entries.map((e) => { | ||
| if (e.deleted) return `DELETED ${e.file}`; | ||
| return e.file; | ||
| fs.appendFileSync(journalPath, `${lines.join('\n')}\n`); | ||
| }); | ||
|
|
||
| fs.appendFileSync(journalPath, `${lines.join('\n')}\n`); | ||
| } | ||
|
|
||
| export function writeJournalHeader(rootDir: string, timestamp: number): void { | ||
| const dir = path.join(rootDir, '.codegraph'); | ||
| const journalPath = path.join(dir, JOURNAL_FILENAME); | ||
| const tmpPath = `${journalPath}.tmp`; | ||
| withJournalLock(rootDir, () => { | ||
| const journalPath = path.join(rootDir, '.codegraph', JOURNAL_FILENAME); | ||
| const tmpPath = `${journalPath}.tmp`; | ||
|
|
||
| if (!fs.existsSync(dir)) { | ||
| fs.mkdirSync(dir, { recursive: true }); | ||
| } | ||
|
|
||
| try { | ||
| fs.writeFileSync(tmpPath, `${HEADER_PREFIX}${timestamp}\n`); | ||
| fs.renameSync(tmpPath, journalPath); | ||
| } catch (err) { | ||
| warn(`Failed to write journal header: ${(err as Error).message}`); | ||
| try { | ||
| fs.unlinkSync(tmpPath); | ||
| } catch { | ||
| /* ignore */ | ||
| fs.writeFileSync(tmpPath, `${HEADER_PREFIX}${timestamp}\n`); | ||
| fs.renameSync(tmpPath, journalPath); | ||
| } catch (err) { | ||
| warn(`Failed to write journal header: ${(err as Error).message}`); | ||
| try { | ||
| fs.unlinkSync(tmpPath); | ||
| } catch { | ||
| /* ignore */ | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Atomics.waitfreezes the Node.js event loop during lock contentionAtomics.waitis a synchronous, blocking call — it stops the entire V8 event loop for the fullmsduration. In a watcher process, every filesystem notification, timer, and pending I/O callback is silenced for each 25 ms retry. In the worst case (5 000 ms timeout, 200 retries), the watcher becomes completely unresponsive for up to 5 seconds before ever throwing.A lighter alternative that avoids blocking the event loop is a simple busy-spin with
process.hrtime.bigint():This keeps each retry short and doesn't starve unrelated callbacks (though it does keep the CPU busy, which is acceptable for the brief per-retry duration).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in f5c737c. Replaced Atomics.wait with a short process.hrtime.bigint busy-spin per your suggestion. The 25ms retry interval keeps CPU burn negligible while letting pending FS events, timers, and I/O callbacks in watcher processes keep firing during contention.