From 5c10fb14ae702814dc85eafb84fa491c8620686d Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 4 May 2026 16:53:06 +0000 Subject: [PATCH 1/3] fix(nodes/health): skip stale-marking already-offline nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The health monitor re-emitted "Node heartbeat stale" + "Marking stale node offline" + MarkOffline on every cycle for nodes that were already in the offline (or unhealthy) state. For an operator-stopped node this flooded the logs with the same WARN+INFO pair every check interval. Skip the staleness branch when the node is already StatusOffline / StatusUnhealthy — the state is already what we'd write, so neither the log lines nor the DB update carry information. Signed-off-by: Ettore Di Giacinto --- core/services/nodes/health.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/services/nodes/health.go b/core/services/nodes/health.go index dd0e903f160a..ad570fd8192d 100644 --- a/core/services/nodes/health.go +++ b/core/services/nodes/health.go @@ -126,6 +126,12 @@ func (hm *HealthMonitor) doCheckAll(ctx context.Context) { // Workers (both backend and agent) send HTTP heartbeats to the frontend. // If the heartbeat is stale, the worker is presumed down. if time.Since(node.LastHeartbeat) > hm.staleThreshold { + // Skip nodes already marked offline/unhealthy — re-marking them + // every cycle floods the log with the same WARN+INFO pair for + // nodes the operator has intentionally taken down. + if node.Status == StatusOffline || node.Status == StatusUnhealthy { + continue + } xlog.Warn("Node heartbeat stale", "node", node.Name, "lastHeartbeat", node.LastHeartbeat) if hm.autoOffline { xlog.Info("Marking stale node offline", "node", node.Name) From 97a8c748609f50b3581c9fe64429c0a5615e662d Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 4 May 2026 16:53:21 +0000 Subject: [PATCH 2/3] fix(worker): wait for backend gRPC bind before replying to backend.install MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The backend supervisor used to wait up to 4s (20 × 200ms) for the backend's gRPC server to answer a HealthCheck, then log a warning and reply Success with the bind address anyway. On slower nodes (a Jetson Orin doing first-boot CUDA init, large CGO library load) the gRPC listener wasn't up yet, so the frontend's first LoadModel dial returned "connect: connection refused" and the operator chased a phantom network issue instead of a startup-timing one. Two changes: - Bump the readiness window to 30s. CUDA init on Orin/Thor first boot measures in seconds, not milliseconds. - On deadline-exceeded, stop the half-started process, recycle the port, and return an error with the backend's stderr tail. The frontend now gets a real failure with diagnostic context instead of a misleading ECONNREFUSED on a downstream dial. Process death during the wait window keeps its existing fast-fail path. Signed-off-by: Ettore Di Giacinto --- core/cli/worker.go | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/core/cli/worker.go b/core/cli/worker.go index 368143ae05ac..032e80a7448c 100644 --- a/core/cli/worker.go +++ b/core/cli/worker.go @@ -465,10 +465,20 @@ func (s *backendSupervisor) startBackend(backend, backendPath string) (string, e bp := s.processes[backend] s.mu.Unlock() - // Wait for the gRPC server to be ready + // Wait for the gRPC server to be ready before reporting success. + // Slow nodes (Jetson Orin doing first-boot CUDA init, large CGO libs) + // can take 10-15s before the gRPC port accepts connections; the previous + // 4s window made the worker reply Success on a not-yet-listening port, + // which manifested upstream as "connect: connection refused" on the + // frontend's first LoadModel dial. client := grpc.NewClientWithToken(clientAddr, false, nil, false, s.cmd.RegistrationToken) - for range 20 { - time.Sleep(200 * time.Millisecond) + const ( + readinessPollInterval = 200 * time.Millisecond + readinessTimeout = 30 * time.Second + ) + deadline := time.Now().Add(readinessTimeout) + for time.Now().Before(deadline) { + time.Sleep(readinessPollInterval) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) if ok, _ := client.HealthCheck(ctx); ok { cancel() @@ -496,10 +506,23 @@ func (s *backendSupervisor) startBackend(backend, backendPath string) (string, e } } - // Log stderr to help diagnose why the backend isn't responding + // Readiness deadline exceeded. Returning success here would leave the + // frontend with an unbound address (it dials, gets ECONNREFUSED, and + // the operator sees a misleading "connection refused" instead of the + // real cause). Stop the half-started process, recycle the port, and + // surface the failure to the caller with the backend's stderr tail. stderrTail := readLastLinesFromFile(proc.StderrPath(), 20) - xlog.Warn("Backend gRPC server not ready after waiting, proceeding anyway", "backend", backend, "addr", clientAddr, "stderr", stderrTail) - return clientAddr, nil + xlog.Error("Backend gRPC server not ready before deadline; aborting install", "backend", backend, "addr", clientAddr, "timeout", readinessTimeout, "stderr", stderrTail) + if killErr := proc.Stop(); killErr != nil { + xlog.Warn("Failed to stop unready backend process", "backend", backend, "error", killErr) + } + s.mu.Lock() + if cur, ok := s.processes[backend]; ok && cur == bp { + delete(s.processes, backend) + s.freePorts = append(s.freePorts, port) + } + s.mu.Unlock() + return "", fmt.Errorf("backend %s did not become ready within %s. Last stderr:\n%s", backend, readinessTimeout, stderrTail) } // resolveProcessKeys turns a caller-supplied identifier into the set of From a39c0b62ac41d4b43a83fe8fccb23c1f6a028993 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 4 May 2026 16:53:39 +0000 Subject: [PATCH 3/3] fix(distributed): route auto-upgrade through BackendManager + bump LocalAGI/LocalRecall MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two distributed-mode bugs that surfaced together in the orchestrator logs: 1. Auto-upgrade always failed with "backend not found". UpgradeChecker correctly routed CheckUpgrades through the active BackendManager (so the frontend aggregates worker state), but the auto-upgrade branch right below called gallery.UpgradeBackend directly with the frontend's SystemState. In distributed mode the frontend has no backends installed locally, so ListSystemBackends returned empty and Get(name) failed for every reported upgrade. Auto-upgrade now also goes through BackendManager.UpgradeBackend, which fans out to workers via NATS. 2. Embedding-load failure on a remote node crashed the orchestrator. When RAG init lazily called NewPersistentPostgresCollection and the remote embedding worker was unreachable, LocalRecall called os.Exit(1) inside the constructor, killing the orchestrator pod. LocalRecall now returns errors instead, LocalAGI surfaces them as a nil collection, and the existing RAGProviderFromState path returns (nil, nil, false) — the same code path the agent pool already takes when no RAG is configured. The orchestrator stays up; chat requests degrade to "no RAG available" until the embedding worker recovers. Bumps: github.com/mudler/LocalAGI → e83bf515d010 github.com/mudler/localrecall → 6138c1f535ab Signed-off-by: Ettore Di Giacinto --- core/application/upgrade_checker.go | 32 ++++++++++++++++++++++++----- go.mod | 4 ++-- go.sum | 8 ++++---- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/core/application/upgrade_checker.go b/core/application/upgrade_checker.go index 3b2d94544088..6b8d69a2d9fa 100644 --- a/core/application/upgrade_checker.go +++ b/core/application/upgrade_checker.go @@ -199,13 +199,27 @@ func (uc *UpgradeChecker) runCheck(ctx context.Context) { } } - // Auto-upgrade if enabled + // Auto-upgrade if enabled. Route through the active BackendManager so + // distributed-mode upgrades fan out to workers via NATS — calling + // gallery.UpgradeBackend directly would look up the backend on the + // frontend filesystem, which is empty in distributed mode and produces + // "backend not found" while the cluster still reports an upgrade. if uc.appConfig.AutoUpgradeBackends { + var bm galleryop.BackendManager + if uc.backendManagerFn != nil { + bm = uc.backendManagerFn() + } for name, info := range upgrades { xlog.Info("Auto-upgrading backend", "backend", name, "from", info.InstalledVersion, "to", info.AvailableVersion) - if err := gallery.UpgradeBackend(ctx, uc.systemState, uc.modelLoader, - uc.galleries, name, nil); err != nil { + var err error + if bm != nil { + err = bm.UpgradeBackend(ctx, name, nil) + } else { + err = gallery.UpgradeBackend(ctx, uc.systemState, uc.modelLoader, + uc.galleries, name, nil) + } + if err != nil { xlog.Error("Failed to auto-upgrade backend", "backend", name, "error", err) } else { @@ -213,8 +227,16 @@ func (uc *UpgradeChecker) runCheck(ctx context.Context) { "version", info.AvailableVersion) } } - // Re-check to update cache after upgrades - if freshUpgrades, err := gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState); err == nil { + // Re-check to update cache after upgrades. Route through the same + // BackendManager so distributed mode reflects the worker view. + var freshUpgrades map[string]gallery.UpgradeInfo + var freshErr error + if bm != nil { + freshUpgrades, freshErr = bm.CheckUpgrades(ctx) + } else { + freshUpgrades, freshErr = gallery.CheckBackendUpgrades(ctx, uc.galleries, uc.systemState) + } + if freshErr == nil { uc.mu.Lock() uc.lastUpgrades = freshUpgrades uc.mu.Unlock() diff --git a/go.mod b/go.mod index 132b9fa915a8..62edd83a4b83 100644 --- a/go.mod +++ b/go.mod @@ -167,8 +167,8 @@ require ( github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/labstack/gommon v0.4.2 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/mudler/LocalAGI v0.0.0-20260415165142-3369136c7380 - github.com/mudler/localrecall v0.5.9-0.20260415164846-8ad831f840fc // indirect + github.com/mudler/LocalAGI v0.0.0-20260504165100-e83bf515d010 + github.com/mudler/localrecall v0.5.10-0.20260504162944-6138c1f535ab // indirect github.com/mudler/skillserver v0.0.6 github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/oxffaa/gopher-parse-sitemap v0.0.0-20191021113419-005d2eb1def4 // indirect diff --git a/go.sum b/go.sum index 53f9f2961eab..46ccc4170f31 100644 --- a/go.sum +++ b/go.sum @@ -703,8 +703,8 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/mudler/LocalAGI v0.0.0-20260415165142-3369136c7380 h1:gSS535c1MO3IRSUIWJT1xzZjT4lZBsqtHpptXvrEsmw= -github.com/mudler/LocalAGI v0.0.0-20260415165142-3369136c7380/go.mod h1:rD7G70wl+5zlpvNF13iZBpAuat8LsiJFn678z3Kxleo= +github.com/mudler/LocalAGI v0.0.0-20260504165100-e83bf515d010 h1:b5MBD3gq+H/tN2dVFqkFI6CvSrBUnmvdGPl6ivtSrSc= +github.com/mudler/LocalAGI v0.0.0-20260504165100-e83bf515d010/go.mod h1:QOB+zg2jARzslqhy2c/59CW2Kcp0JEHOiNIDeCRFP2s= github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b h1:A74T2Lauvg61KodYqsjTYDY05kPLcW+efVZjd23dghU= github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4= github.com/mudler/edgevpn v0.31.1 h1:7qegiDWd0kAg6ljhNHxqvp8hbo/6BbzSdbb7/2WZfiY= @@ -713,8 +713,8 @@ github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc/go.mod h1:O7SwdSWMilAWhBZMK9N9Y/oBDyMMzshE3ju8Xkexwig= github.com/mudler/go-processmanager v0.1.1 h1:c/1NRZOZpW8HuFv9RhBG57nQu1oDMRomEHedwBFMlrw= github.com/mudler/go-processmanager v0.1.1/go.mod h1:h6kmHUZeafr+k5hRYpGLMzJFH4hItHffgpRo2QIkP+o= -github.com/mudler/localrecall v0.5.9-0.20260415164846-8ad831f840fc h1:p1ucQ2rbU4mhG2Xl1Emg5Q6QCYCjI+fvMF9KTek/+sY= -github.com/mudler/localrecall v0.5.9-0.20260415164846-8ad831f840fc/go.mod h1:xuPtgL9zUyiQLmspYzO3kaboYrGbWmwi8BQPt1aCAcs= +github.com/mudler/localrecall v0.5.10-0.20260504162944-6138c1f535ab h1:U6MWVv9Xgb56JTIL4DfsZftSig/LeJA+yizlyw8fq24= +github.com/mudler/localrecall v0.5.10-0.20260504162944-6138c1f535ab/go.mod h1:xuPtgL9zUyiQLmspYzO3kaboYrGbWmwi8BQPt1aCAcs= github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8 h1:Ry8RiWy8fZ6Ff4E7dPmjRsBrnHOnPeOOj2LhCgyjQu0= github.com/mudler/memory v0.0.0-20260406210934-424c1ecf2cf8/go.mod h1:EA8Ashhd56o32qN7ouPKFSRUs/Z+LrRCF4v6R2Oarm8= github.com/mudler/skillserver v0.0.6 h1:ixz6wUekLdTmbnpAavCkTydDF6UdXAG3ncYufSPK9G0=