From 824e8fb6ff336b6558b948adea2a51051b3b5b82 Mon Sep 17 00:00:00 2001 From: matthew-pilot Date: Thu, 28 May 2026 06:22:13 +0000 Subject: [PATCH] fix(policy): release pr.mu during executeEvictWhere peer evaluation (PILOT-104) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit executeEvictWhere previously held pr.mu (exclusive write lock) while iterating over every peer and calling EvaluatePeerExpr for each one. Each peer expression evaluation has a 100 ms timeout. With 1 000 peers worst-case the lock was held for ~100 seconds, blocking reconcileMembership, applyMembershipDiff, and every other mu-requiring path in the daemon-side policy engine. Fix: snapshot peer pointers under pr.mu.RLock(), release, evaluate policies outside the lock, then re-acquire pr.mu.Lock() briefly to apply evictions. Concurrent peer removals between the snapshot and re-acquire are harmless — delete on a non-existent map key is a no-op. The write-critical section is now O(1) regardless of peer count. Closes PILOT-104 --- runner.go | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/runner.go b/runner.go index a8950a5..1b89c62 100644 --- a/runner.go +++ b/runner.go @@ -350,11 +350,22 @@ func (pr *PolicyRunner) executeEvict(ctx map[string]interface{}) { } func (pr *PolicyRunner) executeEvictWhere(d Directive, actionIdx int) { - pr.mu.Lock() - defer pr.mu.Unlock() + // Snapshot the peer list under read lock. Per-peer policy evaluation + // (via EvaluatePeerExpr → runProgram) has a 100 ms timeout per peer. + // Holding the write lock across O(N) evaluations blocks every other + // pr.mu acquirer (reconcileMembership, applyMembershipDiff, entire + // runCycle body). For a network with 1 000 peers worst case the lock + // was held for ~100 s. Snapshot + release + re-acquire briefly keeps + // the write-critical section O(1) regardless of peer count. + pr.mu.RLock() + snapshot := make([]*managedPeer, 0, len(pr.peers)) + for _, p := range pr.peers { + snapshot = append(snapshot, p) + } + pr.mu.RUnlock() var toEvict []uint32 - for _, p := range pr.peers { + for _, p := range snapshot { peerCtx := map[string]interface{}{ "peer_id": int(p.NodeID), "peer_tags": mergeTags(p.RegistryTags, p.tags()), @@ -371,6 +382,14 @@ func (pr *PolicyRunner) executeEvictWhere(d Directive, actionIdx int) { } } + if len(toEvict) == 0 { + return + } + + // Re-acquire the write lock briefly to apply evictions. Peers that + // were concurrently removed between the snapshot and this lock are + // harmless — delete on a non-existent key is a no-op. + pr.mu.Lock() now := time.Now() for _, id := range toEvict { delete(pr.peers, id) @@ -378,10 +397,10 @@ func (pr *PolicyRunner) executeEvictWhere(d Directive, actionIdx int) { pr.recentlyEvicted[id] = now } } - if len(toEvict) > 0 { - pr.cycleEvicted += len(toEvict) - slog.Info("policy: evicted peers", "network_id", pr.netID, "count", len(toEvict), "rule", d.Rule) - } + pr.cycleEvicted += len(toEvict) + pr.mu.Unlock() + + slog.Info("policy: evicted peers", "network_id", pr.netID, "count", len(toEvict), "rule", d.Rule) } // evictCooldown bounds how long an evicted peer stays out of pr.peers