Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions core/services/nodes/managers_distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ func (r BackendOpResult) Err() error {
// deletes the row and reports "success". For non-healthy nodes the status
// is "queued" — no attempt is made right now, reconciler will pick it up
// when the node returns.
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, apply func(node BackendNode) error) (BackendOpResult, error) {
// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is
// in the set are visited. Used by UpgradeBackend to avoid asking nodes that
// never had the backend installed to "upgrade" it — such requests fail at the
// gallery (no platform variant) and would otherwise leave a forever-retrying
// pending_backend_ops row. nil means "fan out to every node" (Install/Delete).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
Expand All @@ -133,6 +138,9 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
continue
}
if targetNodeIDs != nil && !targetNodeIDs[node.ID] {
continue
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
Expand Down Expand Up @@ -218,7 +226,7 @@ func (d *DistributedBackendManager) DeleteBackend(name string) error {
}

ctx := context.Background()
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
Expand All @@ -241,7 +249,7 @@ func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, n
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, func(node BackendNode) error {
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
Expand Down Expand Up @@ -327,7 +335,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
galleriesJSON, _ := json.Marshal(op.Galleries)
backendName := op.GalleryElementName

result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, func(node BackendNode) error {
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, nil, func(node BackendNode) error {
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 — the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
Expand All @@ -347,11 +355,28 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
}

// UpgradeBackend reuses the install NATS subject (the worker re-downloads
// from the gallery). Same queue semantics as Install/Delete.
// from the gallery). Unlike Install/Delete, upgrade only targets the nodes
// that already report this backend as installed — fanning out to every node
// would ask workers to "upgrade" something they never had, which fails at
// the gallery (e.g. a darwin/arm64 worker has no platform variant for a
// linux-only backend) and leaves a forever-retrying pending_backend_ops row.
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(d.backendGalleries)

result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, func(node BackendNode) error {
installed, err := d.ListBackends()
if err != nil {
return fmt.Errorf("failed to list cluster backends: %w", err)
}
entry, ok := installed[name]
if !ok || len(entry.Nodes) == 0 {
return fmt.Errorf("backend %q is not installed on any node", name)
}
targetNodeIDs := make(map[string]bool, len(entry.Nodes))
for _, n := range entry.Nodes {
targetNodeIDs[n.NodeID] = true
}

result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0)
if err != nil {
return err
Expand Down
66 changes: 66 additions & 0 deletions core/services/nodes/managers_distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,30 @@ var _ = Describe("DistributedBackendManager", func() {
})

Describe("UpgradeBackend", func() {
// scriptInstalled tells the worker(s) named in `nodeIDs` to claim
// `backend` is installed when DistributedBackendManager.ListBackends()
// fans out backend.list. Anything not scripted defaults to an empty
// reply, which means "this node has no backends installed" and so
// upgrade should skip it.
scriptInstalled := func(backend string, nodeIDs ...string) {
for _, id := range nodeIDs {
mc.scriptReply(messaging.SubjectNodeBackendList(id),
messaging.BackendListReply{Backends: []messaging.NodeBackendInfo{{Name: backend}}})
}
}
scriptNoBackends := func(nodeIDs ...string) {
for _, id := range nodeIDs {
mc.scriptReply(messaging.SubjectNodeBackendList(id),
messaging.BackendListReply{Backends: nil})
}
}

Context("when every node fails to upgrade", func() {
It("returns an aggregated error", func() {
n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051")
n2 := registerHealthyBackend("worker-b", "10.0.0.2:50051")

scriptInstalled("vllm-development", n1.ID, n2.ID)
mc.scriptReply(messaging.SubjectNodeBackendInstall(n1.ID),
messaging.BackendInstallReply{Success: false, Error: "image manifest not found"})
mc.scriptReply(messaging.SubjectNodeBackendInstall(n2.ID),
Expand All @@ -262,11 +281,58 @@ var _ = Describe("DistributedBackendManager", func() {
Context("when every node succeeds", func() {
It("returns nil", func() {
n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051")
scriptInstalled("vllm-development", n1.ID)
mc.scriptReply(messaging.SubjectNodeBackendInstall(n1.ID),
messaging.BackendInstallReply{Success: true})
Expect(mgr.UpgradeBackend(ctx, "vllm-development", nil)).To(Succeed())
})
})

// Smart fan-out: only nodes that actually report the backend installed
// receive the upgrade NATS request. Reproduces the bug where the
// "Upgrade All" UI button asked a darwin/arm64 worker to upgrade a
// linux-only backend it never had, producing a "no child with platform
// darwin/arm64 in index" error and a stuck pending_backend_ops row.
Context("when only one of two healthy nodes has the backend installed", func() {
It("upgrades only on that node and skips the other entirely", func() {
has := registerHealthyBackend("linux-amd64-worker", "10.0.0.1:50051")
lacks := registerHealthyBackend("mac-mini-m4", "10.0.0.2:50051")

scriptInstalled("cpu-insightface-development", has.ID)
scriptNoBackends(lacks.ID)
mc.scriptReply(messaging.SubjectNodeBackendInstall(has.ID),
messaging.BackendInstallReply{Success: true})
// Deliberately don't script SubjectNodeBackendInstall for `lacks`:
// if the manager attempts it, the scripted-client default returns
// fakeNoRespondersErr and the assertion below fails loudly.

Expect(mgr.UpgradeBackend(ctx, "cpu-insightface-development", nil)).To(Succeed())

mc.mu.Lock()
defer mc.mu.Unlock()
for _, call := range mc.calls {
Expect(call.Subject).ToNot(Equal(messaging.SubjectNodeBackendInstall(lacks.ID)),
"upgrade leaked to %s which does not have the backend installed", lacks.Name)
}
})
})

Context("when no node has the backend installed", func() {
It("returns a clear error and never attempts an install request", func() {
n1 := registerHealthyBackend("worker-a", "10.0.0.1:50051")
scriptNoBackends(n1.ID)

err := mgr.UpgradeBackend(ctx, "vllm-development", nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("not installed on any node"))

mc.mu.Lock()
defer mc.mu.Unlock()
for _, call := range mc.calls {
Expect(call.Subject).ToNot(Equal(messaging.SubjectNodeBackendInstall(n1.ID)))
}
})
})
})

Describe("DeleteBackend", func() {
Expand Down
Loading