Skip to content
Merged
65 changes: 46 additions & 19 deletions core/cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ type WorkerCMD struct {
RegistrationToken string `env:"LOCALAI_REGISTRATION_TOKEN" help:"Token for authenticating with the frontend" group:"registration"`
HeartbeatInterval string `env:"LOCALAI_HEARTBEAT_INTERVAL" default:"10s" help:"Interval between heartbeats" group:"registration"`
NodeLabels string `env:"LOCALAI_NODE_LABELS" help:"Comma-separated key=value labels for this node (e.g. tier=fast,gpu=a100)" group:"registration"`
// MaxReplicasPerModel caps how many replicas of any one model can run on
// this worker concurrently. Default 1 = historical single-replica
// behavior. Set higher when a node has enough VRAM to host multiple
// copies of the same model (e.g. a fat 128 GiB box running 4× of a
// 24 GiB model for throughput). The auto-label `node.replica-slots=N`
// is published so model schedulers can target high-capacity nodes via
// the existing label selector.
MaxReplicasPerModel int `env:"LOCALAI_MAX_REPLICAS_PER_MODEL" default:"1" help:"Max replicas of any single model on this worker. Default 1 preserves single-replica behavior; set higher to allow stacking replicas on a fat node." group:"registration"`

// NATS (required)
NatsURL string `env:"LOCALAI_NATS_URL" required:"" help:"NATS server URL" group:"distributed"`
Expand Down Expand Up @@ -567,22 +575,35 @@ func (s *backendSupervisor) getAddr(backend string) string {
return ""
}

// buildProcessKey is the supervisor's stable identifier for a backend gRPC
// process. It includes the replica index so the same model can run multiple
// processes on a worker simultaneously without colliding on the same map slot
// or port. The "#N" suffix is purely internal — the controller never reads it.
func buildProcessKey(modelID, backend string, replicaIndex int) string {
base := modelID
if base == "" {
base = backend
}
return fmt.Sprintf("%s#%d", base, replicaIndex)
}

// installBackend handles the backend.install flow:
// 1. If already running for this model, return existing address
// 1. If already running for this (model, replica) slot, return existing address
// 2. Install backend from gallery (if not already installed)
// 3. Find backend binary
// 4. Start gRPC process on a new port
// Returns the gRPC address of the backend process.
//
// ProcessKey includes the replica index so a worker with MaxReplicasPerModel>1
// can host multiple processes for the same model on distinct ports. Old
// controllers (no replica_index in the request) implicitly target replica 0,
// which preserves single-replica behavior.
func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest) (string, error) {
// Process key: use ModelID if provided (per-model process), else backend name
processKey := req.ModelID
if processKey == "" {
processKey = req.Backend
}
processKey := buildProcessKey(req.ModelID, req.Backend, int(req.ReplicaIndex))

// If already running for this model, return its address
// If already running for this model+replica, return its address
if addr := s.getAddr(processKey); addr != "" {
xlog.Info("Backend already running for model", "backend", req.Backend, "model", req.ModelID, "addr", addr)
xlog.Info("Backend already running for model replica", "backend", req.Backend, "model", req.ModelID, "replica", req.ReplicaIndex, "addr", addr)
return addr, nil
}

Expand Down Expand Up @@ -886,13 +907,18 @@ func (cmd *WorkerCMD) registrationBody() map[string]any {
totalVRAM, _ := xsysinfo.TotalAvailableVRAM()
gpuVendor, _ := xsysinfo.DetectGPUVendor()

maxReplicas := cmd.MaxReplicasPerModel
if maxReplicas < 1 {
maxReplicas = 1
}
body := map[string]any{
"name": nodeName,
"address": cmd.advertiseAddr(),
"http_address": cmd.advertiseHTTPAddr(),
"total_vram": totalVRAM,
"available_vram": totalVRAM, // initially all VRAM is available
"gpu_vendor": gpuVendor,
"name": nodeName,
"address": cmd.advertiseAddr(),
"http_address": cmd.advertiseHTTPAddr(),
"total_vram": totalVRAM,
"available_vram": totalVRAM, // initially all VRAM is available
"gpu_vendor": gpuVendor,
"max_replicas_per_model": maxReplicas,
}

// If no GPU detected, report system RAM so the scheduler/UI has capacity info
Expand All @@ -906,19 +932,20 @@ func (cmd *WorkerCMD) registrationBody() map[string]any {
body["token"] = cmd.RegistrationToken
}

// Parse and add static node labels
// Parse and add static node labels. Always include the auto-label
// `node.replica-slots=N` so AND-selectors in ModelSchedulingConfig can
// target high-capacity nodes (e.g. {"node.replica-slots":"4"}).
labels := make(map[string]string)
if cmd.NodeLabels != "" {
labels := make(map[string]string)
for _, pair := range strings.Split(cmd.NodeLabels, ",") {
pair = strings.TrimSpace(pair)
if k, v, ok := strings.Cut(pair, "="); ok {
labels[strings.TrimSpace(k)] = strings.TrimSpace(v)
}
}
if len(labels) > 0 {
body["labels"] = labels
}
}
labels["node.replica-slots"] = strconv.Itoa(maxReplicas)
body["labels"] = labels

return body
}
Expand Down
70 changes: 70 additions & 0 deletions core/cli/worker_replica_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package cli

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Worker per-replica process keying", func() {
Describe("buildProcessKey", func() {
// Pin the supervisor's keying contract: distinct replica indexes for
// the same modelID produce distinct process keys, so the supervisor
// map can hold multiple processes for one model. Dropping the suffix
// would re-introduce the original flap (one model, one slot, churn).
DescribeTable("produces stable, distinct keys",
func(modelID, backend string, replica int, want string) {
Expect(buildProcessKey(modelID, backend, replica)).To(Equal(want))
},
Entry("modelID present, replica 0", "Qwen3-35B", "llama-cpp", 0, "Qwen3-35B#0"),
Entry("modelID present, replica 1", "Qwen3-35B", "llama-cpp", 1, "Qwen3-35B#1"),
Entry("falls back to backend when modelID empty", "", "llama-cpp", 0, "llama-cpp#0"),
Entry("backend fallback with replica 2", "", "llama-cpp", 2, "llama-cpp#2"),
)

It("makes replicas distinguishable", func() {
r0 := buildProcessKey("model-a", "llama-cpp", 0)
r1 := buildProcessKey("model-a", "llama-cpp", 1)
Expect(r0).ToNot(Equal(r1), "replicas of the same model must produce distinct keys")
})
})

Describe("registrationBody", func() {
It("includes max_replicas_per_model and the auto-label", func() {
cmd := &WorkerCMD{
Addr: "worker.example.com:50051",
MaxReplicasPerModel: 4,
}
body := cmd.registrationBody()

Expect(body).To(HaveKey("max_replicas_per_model"))
Expect(body["max_replicas_per_model"]).To(Equal(4))

labels, ok := body["labels"].(map[string]string)
Expect(ok).To(BeTrue(), "labels must be present so selectors can target the slot count")
Expect(labels).To(HaveKeyWithValue("node.replica-slots", "4"))
})

It("coerces zero/unset MaxReplicasPerModel to 1", func() {
cmd := &WorkerCMD{Addr: "worker.example.com:50051"}
body := cmd.registrationBody()
Expect(body["max_replicas_per_model"]).To(Equal(1),
"unset must default to single-replica behavior, not capacity 0")

labels := body["labels"].(map[string]string)
Expect(labels).To(HaveKeyWithValue("node.replica-slots", "1"))
})

It("preserves user-provided labels alongside the auto-label", func() {
cmd := &WorkerCMD{
Addr: "worker.example.com:50051",
MaxReplicasPerModel: 2,
NodeLabels: "tier=fast,gpu=a100",
}
body := cmd.registrationBody()
labels := body["labels"].(map[string]string)
Expect(labels).To(HaveKeyWithValue("tier", "fast"))
Expect(labels).To(HaveKeyWithValue("gpu", "a100"))
Expect(labels).To(HaveKeyWithValue("node.replica-slots", "2"))
})
})
})
116 changes: 102 additions & 14 deletions core/http/endpoints/localai/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type RegisterNodeRequest struct {
AvailableRAM uint64 `json:"available_ram,omitempty"`
GPUVendor string `json:"gpu_vendor,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
// MaxReplicasPerModel is the per-node cap on replicas of any single model.
// Workers older than this field omit it; we coerce 0 → 1 below to preserve
// historical single-replica behavior.
MaxReplicasPerModel int `json:"max_replicas_per_model,omitempty"`
}

// RegisterNodeEndpoint registers a new backend node.
Expand Down Expand Up @@ -131,17 +135,26 @@ func RegisterNodeEndpoint(registry *nodes.NodeRegistry, expectedToken string, au
tokenHash = hex.EncodeToString(h[:])
}

// Coerce 0 → 1 for backward compat with workers that don't send the field.
// GORM's `default:1` only fires for a missing column; once Go zero-values
// reach the struct field they're written as 0 unless explicitly set here.
maxReplicasPerModel := req.MaxReplicasPerModel
if maxReplicasPerModel < 1 {
maxReplicasPerModel = 1
}

node := &nodes.BackendNode{
Name: req.Name,
NodeType: nodeType,
Address: req.Address,
HTTPAddress: req.HTTPAddress,
TokenHash: tokenHash,
TotalVRAM: req.TotalVRAM,
AvailableVRAM: req.AvailableVRAM,
TotalRAM: req.TotalRAM,
AvailableRAM: req.AvailableRAM,
GPUVendor: req.GPUVendor,
Name: req.Name,
NodeType: nodeType,
Address: req.Address,
HTTPAddress: req.HTTPAddress,
TokenHash: tokenHash,
TotalVRAM: req.TotalVRAM,
AvailableVRAM: req.AvailableVRAM,
TotalRAM: req.TotalRAM,
AvailableRAM: req.AvailableRAM,
GPUVendor: req.GPUVendor,
MaxReplicasPerModel: maxReplicasPerModel,
}

ctx := c.Request().Context()
Expand Down Expand Up @@ -386,7 +399,10 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler
if req.Backend == "" && req.URI == "" {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "backend name or uri required"))
}
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias)
// Admin-driven backend install: not tied to a specific replica slot
// (no model is being loaded). Pass replica 0 to match the worker's
// admin process-key convention (`backend#0`).
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0)
if err != nil {
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))
Expand Down Expand Up @@ -467,8 +483,8 @@ func UnloadModelOnNodeEndpoint(unloader nodes.NodeCommandSender, registry *nodes
xlog.Error("Failed to stop backend after model unload", "node", nodeID, "model", req.ModelName, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "model unloaded but backend stop failed"))
}
// Remove from registry
registry.RemoveNodeModel(c.Request().Context(), nodeID, req.ModelName)
// Remove every replica of this model on the node from the registry.
registry.RemoveAllNodeModelReplicas(c.Request().Context(), nodeID, req.ModelName)
return c.JSON(http.StatusOK, map[string]string{"message": "model unloaded"})
}
}
Expand All @@ -494,7 +510,7 @@ func DeleteModelOnNodeEndpoint(unloader nodes.NodeCommandSender, registry *nodes
// Non-fatal — backend process may not be running
xlog.Warn("StopBackend failed during model deletion (non-fatal)", "node", nodeID, "model", req.ModelName, "error", err)
}
registry.RemoveNodeModel(c.Request().Context(), nodeID, req.ModelName)
registry.RemoveAllNodeModelReplicas(c.Request().Context(), nodeID, req.ModelName)
return c.JSON(http.StatusOK, map[string]string{"message": "model deleted from node"})
}
}
Expand Down Expand Up @@ -669,6 +685,78 @@ func GetNodeLabelsEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
}
}

// UpdateMaxReplicasPerModelRequest is the body for the per-node replica cap endpoint.
type UpdateMaxReplicasPerModelRequest struct {
// Value is the new per-model replica cap on this node. Must be >= 1.
Value int `json:"value"`
}

// UpdateMaxReplicasPerModelEndpoint sets the per-node cap on how many replicas
// of any one model can be loaded concurrently. The corresponding
// `node.replica-slots` auto-label is refreshed so existing AND-selectors keep
// matching, and any unsatisfiable scheduling cooldowns are cleared so the
// reconciler retries on the next tick.
//
// This is a transient admin override — a worker re-registration restores the
// value the worker was started with (--max-replicas-per-model). For permanent
// fleet changes, change the worker flag.
//
// @Summary Update a node's max replicas per model
// @Tags Nodes
// @Param id path string true "Node ID"
// @Param request body UpdateMaxReplicasPerModelRequest true "New value"
// @Success 200 {object} map[string]int
// @Failure 400 {object} map[string]any "value must be >= 1"
// @Failure 404 {object} map[string]any "node not found"
// @Router /api/nodes/{id}/max-replicas-per-model [put]
func UpdateMaxReplicasPerModelEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
return func(c echo.Context) error {
ctx := c.Request().Context()
nodeID := c.Param("id")
if _, err := registry.Get(ctx, nodeID); err != nil {
return c.JSON(http.StatusNotFound, nodeError(http.StatusNotFound, "node not found"))
}
var req UpdateMaxReplicasPerModelRequest
if err := c.Bind(&req); err != nil {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "invalid request body"))
}
if req.Value < 1 {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "value must be >= 1"))
}
if err := registry.UpdateMaxReplicasPerModel(ctx, nodeID, req.Value); err != nil {
xlog.Error("Failed to update max_replicas_per_model", "node", nodeID, "value", req.Value, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to update max replicas per model"))
}
return c.JSON(http.StatusOK, map[string]int{"max_replicas_per_model": req.Value})
}
}

// ResetMaxReplicasPerModelEndpoint clears the admin override on a node, so
// the next worker re-registration is allowed to update the value from its
// CLI flag again. The current value is left in place until the worker calls
// register.
//
// @Summary Reset a node's max replicas per model to the worker default
// @Tags Nodes
// @Param id path string true "Node ID"
// @Success 200 {object} map[string]bool
// @Failure 404 {object} map[string]any "node not found"
// @Router /api/nodes/{id}/max-replicas-per-model [delete]
func ResetMaxReplicasPerModelEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
return func(c echo.Context) error {
ctx := c.Request().Context()
nodeID := c.Param("id")
if _, err := registry.Get(ctx, nodeID); err != nil {
return c.JSON(http.StatusNotFound, nodeError(http.StatusNotFound, "node not found"))
}
if err := registry.ResetMaxReplicasPerModel(ctx, nodeID); err != nil {
xlog.Error("Failed to reset max_replicas_per_model override", "node", nodeID, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to reset override"))
}
return c.JSON(http.StatusOK, map[string]bool{"reset": true})
}
}

// SetNodeLabelsEndpoint replaces all labels for a node.
func SetNodeLabelsEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
return func(c echo.Context) error {
Expand Down
6 changes: 6 additions & 0 deletions core/http/react-ui/e2e/nodes-per-node-backend-actions.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ async function expandNodeAndWaitForBackends(page) {
// Click the row to expand it. The chevron toggle and the row both work,
// but clicking the name cell is the most user-like.
await page.getByText(NODE_NAME).first().click()
// Backends, Capacity and Labels live behind a "Manage" <details>
// disclosure (the drawer was distilled to keep at-a-glance content
// lean — see distill refactor in the multi-replica branch). Open it
// by clicking the summary inside the .node-manage scope so the
// per-node backend table is in the DOM before assertions run.
await page.locator('.node-manage > summary').first().click()
await expect(page.getByRole('cell', { name: BACKEND_NAME, exact: true })).toBeVisible({ timeout: 10_000 })
}

Expand Down
35 changes: 35 additions & 0 deletions core/http/react-ui/src/App.css
Original file line number Diff line number Diff line change
Expand Up @@ -1977,6 +1977,41 @@ select.input {
opacity: 0.8;
}

/* Small caps eyebrow inside the drawer's "Manage" disclosure. Replaces the
h4 sub-headings that used to stack inside the drawer — at this depth, an
eyebrow keeps the typographic hierarchy from feeling parallel to the
page-level h1/h2 stack. */
.drawer-eyebrow {
font-size: 0.6875rem;
font-weight: var(--font-weight-semibold);
letter-spacing: 0.06em;
text-transform: uppercase;
color: var(--color-text-muted);
margin-bottom: var(--spacing-xs);
}

/* "Manage" disclosure inside the node drawer. The chevron rotates with the
open state so the affordance reads as an accordion, not a link. */
.node-manage > summary {
user-select: none;
outline: none;
}
.node-manage > summary::-webkit-details-marker {
display: none;
}
.node-manage > summary:focus-visible {
outline: 2px solid var(--color-primary);
outline-offset: 2px;
border-radius: var(--radius-sm);
}
.node-manage__chevron {
font-size: 0.625rem;
transition: transform var(--duration-fast) ease-out;
}
.node-manage[open] > summary .node-manage__chevron {
transform: rotate(90deg);
}

/* Node-status indicator — replaces the tiny bullet with a proper LED-style
dot next to a bold status label. Colors are applied inline from statusConfig
so one primitive handles healthy/unhealthy/draining/pending in one shape. */
Expand Down
Loading
Loading