Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 58 additions & 13 deletions core/cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,19 @@ func buildProcessKey(modelID, backend string, replicaIndex int) string {
}

// installBackend handles the backend.install flow:
// 1. If already running for this (model, replica) slot, return existing address
// 2. Install backend from gallery (if not already installed)
// 3. Find backend binary
// 4. Start gRPC process on a new port
// 1. If already running for this (model, replica) slot AND req.Force is false,
// return existing address (the fast path used by routine load events that
// just want to know which port a backend already serves on).
// 2. If req.Force is true, stop any process(es) currently using this backend
// so the gallery install can replace the on-disk artifact and the freshly
// started process picks up the new binary. This is the upgrade path —
// without it, every backend.install we receive after the first hits the
// fast path and silently no-ops, leaving the cluster on a stale build.
// 3. Install backend from gallery (force=req.Force so existing artifacts get
// overwritten on upgrade).
// 4. Find backend binary
// 5. Start gRPC process on a new port
//
// Returns the gRPC address of the backend process.
//
// ProcessKey includes the replica index so a worker with MaxReplicasPerModel>1
Expand All @@ -677,10 +686,40 @@ func buildProcessKey(modelID, backend string, replicaIndex int) string {
func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest) (string, error) {
processKey := buildProcessKey(req.ModelID, req.Backend, int(req.ReplicaIndex))

// If already running for this model+replica, return its address
if addr := s.getAddr(processKey); addr != "" {
xlog.Info("Backend already running for model replica", "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
return addr, nil
if !req.Force {
// Fast path: already running for this model+replica → return existing
// address. Verify liveness before trusting the cached entry: a process
// that died without the supervisor noticing leaves a stale (key, addr)
// pair, and getAddr would otherwise hand the controller an address
// that immediately ECONNREFUSEDs. The reconciler then marks the
// replica failed, retries the install, the supervisor says "already
// running" again, and the cluster loops on a dead replica forever.
if addr := s.getAddr(processKey); addr != "" {
if s.isRunning(processKey) {
xlog.Info("Backend already running for model replica", "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
return addr, nil
}
xlog.Warn("Stale process entry for backend (dead process); cleaning up before reinstall",
"backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
s.stopBackendExact(processKey)
}
} else {
// Upgrade path: stop every live process that shares this backend so the
// gallery install can overwrite the on-disk artifact and the restarted
// process picks up the new binary. resolveProcessKeys catches peer
// replicas of the same backend (whisper#0, whisper#1, ...) on workers
// configured with MaxReplicasPerModel>1. We also stop the exact
// processKey from the request tuple — keys created with an explicit
// modelID don't share the bare-name prefix the resolver matches, but
// they're still using the old binary and need to come down. Both calls
// are no-ops on missing keys.
toStop := s.resolveProcessKeys(req.Backend)
toStop = append(toStop, processKey)
for _, key := range toStop {
xlog.Info("Force install: stopping running backend before reinstall",
"backend", req.Backend, "processKey", key)
s.stopBackendExact(key)
}
}

// Parse galleries from request (override local config if provided)
Expand All @@ -692,20 +731,26 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest)
}
}

// Try to find the backend binary
backendPath := s.findBackend(req.Backend)
// On upgrade, run the gallery install path even if the binary already
// exists on disk: findBackend would otherwise short-circuit and we'd
// restart the same stale binary. The force flag passed to
// InstallBackendFromGallery makes it overwrite the existing artifact.
backendPath := ""
if !req.Force {
backendPath = s.findBackend(req.Backend)
}
if backendPath == "" {
if req.URI != "" {
xlog.Info("Backend not found locally, attempting external install", "backend", req.Backend, "uri", req.URI)
xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", req.Force)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, nil, req.URI, req.Name, req.Alias,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}
} else {
xlog.Info("Backend not found locally, attempting gallery install", "backend", req.Backend)
xlog.Info("Installing backend from gallery", "backend", req.Backend, "force", req.Force)
if err := gallery.InstallBackendFromGallery(
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, false,
context.Background(), galleries, s.systemState, s.ml, req.Backend, nil, req.Force,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions core/http/endpoints/localai/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,10 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler
}
// Admin-driven backend install: not tied to a specific replica slot
// (no model is being loaded). Pass replica 0 to match the worker's
// admin process-key convention (`backend#0`).
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0)
// admin process-key convention (`backend#0`). force=false so the
// worker's fast path takes over if the backend is already running —
// upgrades go through the dedicated /api/backends/upgrade path.
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0, false)
if err != nil {
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))
Expand Down
8 changes: 8 additions & 0 deletions core/services/messaging/subjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ type BackendInstallRequest struct {
// (single-replica behavior — no collision because the controller never
// asks for replica > 0 on a node whose MaxReplicasPerModel is 1).
ReplicaIndex int32 `json:"replica_index,omitempty"`
// Force skips the "already running" short-circuit and re-runs the gallery
// install. UpgradeBackend sets this so the worker actually re-downloads the
// artifact, stops the live process, and starts a fresh one — without it,
// the install handler's early return makes upgrades a silent no-op while
// the coordinator's drift detection keeps re-flagging the backend forever.
// Older workers that don't know this field treat it as false (current
// behavior preserved).
Force bool `json:"force,omitempty"`
}

// BackendInstallReply is the response from a backend.install NATS request.
Expand Down
10 changes: 8 additions & 2 deletions core/services/nodes/managers_distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 — the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, false)
if err != nil {
return err
}
Expand All @@ -360,6 +360,12 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
// would ask workers to "upgrade" something they never had, which fails at
// the gallery (e.g. a darwin/arm64 worker has no platform variant for a
// linux-only backend) and leaves a forever-retrying pending_backend_ops row.
//
// force=true on the install call is what distinguishes upgrade from install:
// the worker stops the live process for this backend, overwrites the on-disk
// artifact, and restarts. Without it, the worker's "already running" fast
// path turns every backend.install into a no-op and the gallery's drift
// detection never converges.
func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name string, progressCb galleryop.ProgressCallback) error {
galleriesJSON, _ := json.Marshal(d.backendGalleries)

Expand All @@ -377,7 +383,7 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
}

result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0)
reply, err := d.adapter.InstallBackend(node.ID, name, "", string(galleriesJSON), "", "", "", 0, true)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions core/services/nodes/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
_, applyErr = rc.adapter.DeleteBackend(op.NodeID, op.Backend)
case OpBackendInstall, OpBackendUpgrade:
// Pending-op drain for admin install/upgrade — not a per-replica
// load. Replica 0 is the conventional admin slot.
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0)
// load. Replica 0 is the conventional admin slot. Upgrade ops set
// force=true so the worker reinstalls the artifact and restarts
// the live process; install ops keep the existing fast-path
// semantics for the case where the backend is already running.
force := op.Op == OpBackendUpgrade
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, force)
if err != nil {
applyErr = err
} else if !reply.Success {
Expand Down
5 changes: 4 additions & 1 deletion core/services/nodes/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,10 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
return "", fmt.Errorf("no NATS connection for backend installation")
}

reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex)
// force=false: routine load, the worker's fast-path "already running →
// return current address" is correct here. Upgrades go through
// DistributedBackendManager.UpgradeBackend which sets force=true.
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, false)
if err != nil {
return "", err
}
Expand Down
5 changes: 3 additions & 2 deletions core/services/nodes/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,11 @@ type installCall struct {
backend string
modelID string
replica int
force bool
}

func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) {
f.installCalls = append(f.installCalls, installCall{nodeID, backend, modelID, replica})
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int, force bool) (*messaging.BackendInstallReply, error) {
f.installCalls = append(f.installCalls, installCall{nodeID, backend, modelID, replica, force})
return f.installReply, f.installErr
}

Expand Down
17 changes: 14 additions & 3 deletions core/services/nodes/unloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ type backendStopRequest struct {

// NodeCommandSender abstracts NATS-based commands to worker nodes.
// Used by HTTP endpoint handlers to avoid coupling to the concrete RemoteUnloaderAdapter.
//
// The `force` parameter on InstallBackend is set by the upgrade path to make
// the worker re-run the gallery install (overwriting the on-disk artifact) and
// restart any live process for that backend. Routine installs and load events
// pass force=false so an already-running process short-circuits as before.
type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error)
StopBackend(nodeID, backend string) error
Expand Down Expand Up @@ -77,10 +82,15 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
// process key — distinct slots run on distinct ports so multiple replicas of
// the same model can coexist on a fat node. Pass 0 for single-replica.
//
// force=true is the upgrade path: the worker stops any live process for this
// backend, overwrites the on-disk artifact via gallery install, and restarts.
// Routine installs and load events pass force=false to keep the existing
// "already running → return current address" fast path.
//
// Timeout: 5 minutes (gallery install can take a while).
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, force bool) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "force", force)

return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
Expand All @@ -90,6 +100,7 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
Force: force,
}, 5*time.Minute)
}

Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/distributed/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)

adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0, false)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeTrue())
})
Expand All @@ -78,7 +78,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)

adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0, false)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeFalse())
Expect(installReply.Error).To(ContainSubstring("backend not found"))
Expand Down
Loading