diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 61b3bb9666a6..2720cef603d3 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -114,7 +114,12 @@ func (r BackendOpResult) Err() error { // deletes the row and reports "success". For non-healthy nodes the status // is "queued" — no attempt is made right now, reconciler will pick it up // when the node returns. -func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) { +// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is +// in the set are visited. Used by UpgradeBackend to avoid asking nodes that +// never had the backend installed to "upgrade" it — such requests fail at the +// gallery (no platform variant) and would otherwise leave a forever-retrying +// pending_backend_ops row. nil means "fan out to every node" (Install/Delete). +func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) { allNodes, err := d.registry.List(ctx) if err != nil { return BackendOpResult{}, err @@ -133,6 +138,9 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context if node.NodeType != "" && node.NodeType != NodeTypeBackend { continue } + if targetNodeIDs != nil && !targetNodeIDs[node.ID] { + continue + } if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil { xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err) result.Nodes = append(result.Nodes, NodeOpStatus{ @@ -218,7 +226,7 @@ func (d *DistributedBackendManager) DeleteBackend(name string) error { } ctx := context.Background() - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error { reply, err := d.adapter.DeleteBackend(node.ID, name) if err != nil { return err @@ -241,7 +249,7 @@ func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, n if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) { return BackendOpResult{}, err } - return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error { + return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error { reply, err := d.adapter.DeleteBackend(node.ID, name) if err != nil { return err @@ -327,7 +335,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall galleriesJSON, _ := json.Marshal(op.Galleries) backendName := op.GalleryElementName - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error { + result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, nil, func(node BackendNode) error { // Admin-driven backend install: not tied to a specific replica slot. // Pass replica 0 — the worker's processKey is "backend#0" when no // modelID is supplied, matching pre-PR4 behavior. @@ -347,11 +355,28 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall } // UpgradeBackend reuses the install NATS subject (the worker re-downloads -// from the gallery). Same queue semantics as Install/Delete. +// from the gallery). Unlike Install/Delete, upgrade only targets the nodes +// that already report this backend as installed — fanning out to every node +// would ask workers to "upgrade" something they never had, which fails at +// the gallery (e.g. a darwin/arm64 worker has no platform variant for a +// linux-only backend) and leaves a forever-retrying pending_backend_ops row. func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error { galleriesJSON, _ := json.Marshal(d.backendGalleries) - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error { + installed, err := d.ListBackends() + if err != nil { + return fmt.Errorf("failed to list cluster backends: %w", err) + } + entry, ok := installed[name] + if !ok || len(entry.Nodes) == 0 { + return fmt.Errorf("backend %q is not installed on any node", name) + } + targetNodeIDs := make(map[string]bool, len(entry.Nodes)) + for _, n := range entry.Nodes { + targetNodeIDs[n.NodeID] = true + } + + result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error { reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0) if err != nil { return err diff --git a/core/services/nodes/managers_distributed_test.go b/core/services/nodes/managers_distributed_test.go index db390aa6c616..77ac5580836c 100644 --- a/core/services/nodes/managers_distributed_test.go +++ b/core/services/nodes/managers_distributed_test.go @@ -240,11 +240,30 @@ var _ = Describe("DistributedBackendManager", func() { }) Describe("UpgradeBackend", func() { + // scriptInstalled tells the worker(s) named in `nodeIDs` to claim + // `backend` is installed when DistributedBackendManager.ListBackends() + // fans out backend.list. Anything not scripted defaults to an empty + // reply, which means "this node has no backends installed" and so + // upgrade should skip it. + scriptInstalled := func(backend string, nodeIDs ...string) { + for _, id := range nodeIDs { + mc.scriptReply(messaging.SubjectNodeBackendList(id), + messaging.BackendListReply{Backends: []messaging.NodeBackendInfo{{Name: backend}}}) + } + } + scriptNoBackends := func(nodeIDs ...string) { + for _, id := range nodeIDs { + mc.scriptReply(messaging.SubjectNodeBackendList(id), + messaging.BackendListReply{Backends: nil}) + } + } + Context("when every node fails to upgrade", func() { It("returns an aggregated error", func() { n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051") n2 := registerHealthyBackend("worker-b", "10.0.0.2:50051") + scriptInstalled("vllm-development", n1.ID, n2.ID) mc.scriptReply(messaging.SubjectNodeBackendInstall(n1.ID), messaging.BackendInstallReply{Success: false, Error: "image manifest not found"}) mc.scriptReply(messaging.SubjectNodeBackendInstall(n2.ID), @@ -262,11 +281,58 @@ var _ = Describe("DistributedBackendManager", func() { Context("when every node succeeds", func() { It("returns nil", func() { n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051") + scriptInstalled("vllm-development", n1.ID) mc.scriptReply(messaging.SubjectNodeBackendInstall(n1.ID), messaging.BackendInstallReply{Success: true}) Expect(mgr.UpgradeBackend(ctx, "vllm-development", nil)).To(Succeed()) }) }) + + // Smart fan-out: only nodes that actually report the backend installed + // receive the upgrade NATS request. Reproduces the bug where the + // "Upgrade All" UI button asked a darwin/arm64 worker to upgrade a + // linux-only backend it never had, producing a "no child with platform + // darwin/arm64 in index" error and a stuck pending_backend_ops row. + Context("when only one of two healthy nodes has the backend installed", func() { + It("upgrades only on that node and skips the other entirely", func() { + has := registerHealthyBackend("linux-amd64-worker", "10.0.0.1:50051") + lacks := registerHealthyBackend("mac-mini-m4", "10.0.0.2:50051") + + scriptInstalled("cpu-insightface-development", has.ID) + scriptNoBackends(lacks.ID) + mc.scriptReply(messaging.SubjectNodeBackendInstall(has.ID), + messaging.BackendInstallReply{Success: true}) + // Deliberately don't script SubjectNodeBackendInstall for `lacks`: + // if the manager attempts it, the scripted-client default returns + // fakeNoRespondersErr and the assertion below fails loudly. + + Expect(mgr.UpgradeBackend(ctx, "cpu-insightface-development", nil)).To(Succeed()) + + mc.mu.Lock() + defer mc.mu.Unlock() + for _, call := range mc.calls { + Expect(call.Subject).ToNot(Equal(messaging.SubjectNodeBackendInstall(lacks.ID)), + "upgrade leaked to %s which does not have the backend installed", lacks.Name) + } + }) + }) + + Context("when no node has the backend installed", func() { + It("returns a clear error and never attempts an install request", func() { + n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051") + scriptNoBackends(n1.ID) + + err := mgr.UpgradeBackend(ctx, "vllm-development", nil) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not installed on any node")) + + mc.mu.Lock() + defer mc.mu.Unlock() + for _, call := range mc.calls { + Expect(call.Subject).ToNot(Equal(messaging.SubjectNodeBackendInstall(n1.ID))) + } + }) + }) }) Describe("DeleteBackend", func() {