From 63462f2e63f6347452016b588dcf50ea6c9f4e7e Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Tue, 5 May 2026 21:34:56 +0000 Subject: [PATCH] fix(distributed): scope Upgrade All to nodes that have the backend installed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In distributed mode the React UI's "Upgrade All" button fanned every detected outdated backend out to every healthy backend node, including nodes that never had that backend installed. On heterogeneous clusters this surfaced as platform errors (e.g. mac-mini-m4 asked to upgrade cpu-insightface-development, which has no darwin/arm64 variant) and left forever-retrying pending_backend_ops rows. DistributedBackendManager.UpgradeBackend now queries ListBackends() first, builds the target node-ID set from SystemBackend.Nodes, and only fans out to those nodes — every per-node primitive (adapter.InstallBackend, the pending-ops queue, BackendOpResult) is unchanged. enqueueAndDrainBackendOp gains an optional targetNodeIDs allowlist; Install/Delete keep their fan-to-everyone semantics by passing nil. If no node reports the backend installed, UpgradeBackend now returns a clear "not installed on any node" error instead of producing a stuck queue. Adds Ginkgo coverage for the smart fan-out: backend on a subset of nodes goes only to those nodes; backend on no node returns the new error and never sends a NATS install request. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-7 [Claude Code] --- core/services/nodes/managers_distributed.go | 37 +++++++++-- .../nodes/managers_distributed_test.go | 66 +++++++++++++++++++ 2 files changed, 97 insertions(+), 6 deletions(-) diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 61b3bb9666a6..2720cef603d3 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -114,7 +114,12 @@ func (r BackendOpResult) Err() error { // deletes the row and reports "success". For non-healthy nodes the status // is "queued" — no attempt is made right now, reconciler will pick it up // when the node returns. -func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) { +// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is +// in the set are visited. Used by UpgradeBackend to avoid asking nodes that +// never had the backend installed to "upgrade" it — such requests fail at the +// gallery (no platform variant) and would otherwise leave a forever-retrying +// pending_backend_ops row. nil means "fan out to every node" (Install/Delete). +func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) { allNodes, err := d.registry.List(ctx) if err != nil { return BackendOpResult{}, err @@ -133,6 +138,9 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context if node.NodeType != "" && node.NodeType != NodeTypeBackend { continue } + if targetNodeIDs != nil && !targetNodeIDs[node.ID] { + continue + } if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil { xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err) result.Nodes = append(result.Nodes, NodeOpStatus{ @@ -218,7 +226,7 @@ func (d *DistributedBackendManager) DeleteBackend(name string) error { } ctx := context.Background() - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error { reply, err := d.adapter.DeleteBackend(node.ID, name) if err != nil { return err @@ -241,7 +249,7 @@ func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, n if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) { return BackendOpResult{}, err } - return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error { reply, err := d.adapter.DeleteBackend(node.ID, name) if err != nil { return err @@ -327,7 +335,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall galleriesJSON, _ := json.Marshal(op.Galleries) backendName := op.GalleryElementName - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error { + result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, nil, func(node BackendNode) error { // Admin-driven backend install: not tied to a specific replica slot. // Pass replica 0 — the worker's processKey is "backend#0" when no // modelID is supplied, matching pre-PR4 behavior. @@ -347,11 +355,28 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall } // UpgradeBackend reuses the install NATS subject (the worker re-downloads -// from the gallery). Same queue semantics as Install/Delete. +// from the gallery). Unlike Install/Delete, upgrade only targets the nodes +// that already report this backend as installed — fanning out to every node +// would ask workers to "upgrade" something they never had, which fails at +// the gallery (e.g. a darwin/arm64 worker has no platform variant for a +// linux-only backend) and leaves a forever-retrying pending_backend_ops row. func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error { galleriesJSON, _ := json.Marshal(d.backendGalleries) - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error { + installed, err := d.ListBackends() + if err != nil { + return fmt.Errorf("failed to list cluster backends: %w", err) + } + entry, ok := installed[name] + if !ok || len(entry.Nodes) == 0 { + return fmt.Errorf("backend %q is not installed on any node", name) + } + targetNodeIDs := make(map[string]bool, len(entry.Nodes)) + for _, n := range entry.Nodes { + targetNodeIDs[n.NodeID] = true + } + + result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error { reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0) if err != nil { return err diff --git a/core/services/nodes/managers_distributed_test.go b/core/services/nodes/managers_distributed_test.go index db390aa6c616..77ac5580836c 100644 --- a/core/services/nodes/managers_distributed_test.go +++ b/core/services/nodes/managers_distributed_test.go @@ -240,11 +240,30 @@ var _ = Describe("DistributedBackendManager", func() { }) Describe("UpgradeBackend", func() { + // scriptInstalled tells the worker(s) named in `nodeIDs` to claim + // `backend` is installed when DistributedBackendManager.ListBackends() + // fans out backend.list. Anything not scripted defaults to an empty + // reply, which means "this node has no backends installed" and so + // upgrade should skip it. + scriptInstalled := func(backend string, nodeIDs ...string) { + for _, id := range nodeIDs { + mc.scriptReply(messaging.SubjectNodeBackendList(id), + messaging.BackendListReply{Backends: []messaging.NodeBackendInfo{{Name: backend}}}) + } + } + scriptNoBackends := func(nodeIDs ...string) { + for _, id := range nodeIDs { + mc.scriptReply(messaging.SubjectNodeBackendList(id), + messaging.BackendListReply{Backends: nil}) + } + } + Context("when every node fails to upgrade", func() { It("returns an aggregated error", func() { n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051") n2 := registerHealthyBackend("worker-b", "10.0.0.2:50051") + scriptInstalled("vllm-development", n1.ID, n2.ID) mc.scriptReply(messaging.SubjectNodeBackendInstall(n1.ID), messaging.BackendInstallReply{Success: false, Error: "image manifest not found"}) mc.scriptReply(messaging.SubjectNodeBackendInstall(n2.ID), @@ -262,11 +281,58 @@ var _ = Describe("DistributedBackendManager", func() { Context("when every node succeeds", func() { It("returns nil", func() { n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051") + scriptInstalled("vllm-development", n1.ID) mc.scriptReply(messaging.SubjectNodeBackendInstall(n1.ID), messaging.BackendInstallReply{Success: true}) Expect(mgr.UpgradeBackend(ctx, "vllm-development", nil)).To(Succeed()) }) }) + + // Smart fan-out: only nodes that actually report the backend installed + // receive the upgrade NATS request. Reproduces the bug where the + // "Upgrade All" UI button asked a darwin/arm64 worker to upgrade a + // linux-only backend it never had, producing a "no child with platform + // darwin/arm64 in index" error and a stuck pending_backend_ops row. + Context("when only one of two healthy nodes has the backend installed", func() { + It("upgrades only on that node and skips the other entirely", func() { + has := registerHealthyBackend("linux-amd64-worker", "10.0.0.1:50051") + lacks := registerHealthyBackend("mac-mini-m4", "10.0.0.2:50051") + + scriptInstalled("cpu-insightface-development", has.ID) + scriptNoBackends(lacks.ID) + mc.scriptReply(messaging.SubjectNodeBackendInstall(has.ID), + messaging.BackendInstallReply{Success: true}) + // Deliberately don't script SubjectNodeBackendInstall for `lacks`: + // if the manager attempts it, the scripted-client default returns + // fakeNoRespondersErr and the assertion below fails loudly. + + Expect(mgr.UpgradeBackend(ctx, "cpu-insightface-development", nil)).To(Succeed()) + + mc.mu.Lock() + defer mc.mu.Unlock() + for _, call := range mc.calls { + Expect(call.Subject).ToNot(Equal(messaging.SubjectNodeBackendInstall(lacks.ID)), + "upgrade leaked to %s which does not have the backend installed", lacks.Name) + } + }) + }) + + Context("when no node has the backend installed", func() { + It("returns a clear error and never attempts an install request", func() { + n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051") + scriptNoBackends(n1.ID) + + err := mgr.UpgradeBackend(ctx, "vllm-development", nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not installed on any node")) + + mc.mu.Lock() + defer mc.mu.Unlock() + for _, call := range mc.calls { + Expect(call.Subject).ToNot(Equal(messaging.SubjectNodeBackendInstall(n1.ID))) + } + }) + }) }) Describe("DeleteBackend", func() {