Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/http/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func API(application *application.Application) (*echo.Echo, error) {
}
}
routes.RegisterNodeSelfServiceRoutes(e, registry, distCfg.RegistrationToken, distCfg.AutoApproveNodes, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken)
routes.RegisterNodeAdminRoutes(e, registry, remoteUnloader, application.GalleryService(), opcache, application.ApplicationConfig(), adminMiddleware, application.AuthDB(), application.ApplicationConfig().Auth.APIKeyHMACSecret, application.ApplicationConfig().Distributed.RegistrationToken)

// Distributed SSE routes (job progress + agent events via NATS)
if d := application.Distributed(); d != nil {
Expand Down
89 changes: 71 additions & 18 deletions core/http/endpoints/localai/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/xlog"
"gorm.io/gorm"
Expand Down Expand Up @@ -381,14 +384,24 @@ func ResumeNodeEndpoint(registry *nodes.NodeRegistry) echo.HandlerFunc {
}
}

// InstallBackendOnNodeEndpoint triggers backend installation on a worker node via NATS.
// InstallBackendOnNodeEndpoint triggers backend installation on a worker node.
// Async: enqueues a ManagementOp on the gallery service channel and returns a
// jobID immediately. The gallery service worker goroutine drives the actual
// install via DistributedBackendManager.InstallBackend, which honors the op's
// TargetNodeID to scope the fan-out to one node. The UI polls /api/backends/job/:uid
// for progress, mirroring /api/backends/install/:id.
//
// Backend can be either a gallery ID (resolved against BackendGalleries) or a
// direct URI install (URI + Name + optional Alias) same shape as the
// direct URI install (URI + Name + optional Alias) - same shape as the
// standalone /api/backends/install-external path, just scoped to one node.
func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.HandlerFunc {
//
// The legacy unloader argument is retained for signature symmetry with
// DeleteBackendOnNodeEndpoint / ListBackendsOnNodeEndpoint but is no longer
// used here - the async path goes through galleryService.
func InstallBackendOnNodeEndpoint(_ nodes.NodeCommandSender, galleryService *galleryop.GalleryService, opcache *galleryop.OpCache, appConfig *config.ApplicationConfig) echo.HandlerFunc {
return func(c echo.Context) error {
if unloader == nil {
return c.JSON(http.StatusServiceUnavailable, nodeError(http.StatusServiceUnavailable, "NATS not configured"))
if galleryService == nil {
return c.JSON(http.StatusServiceUnavailable, nodeError(http.StatusServiceUnavailable, "gallery service not configured"))
}
nodeID := c.Param("id")
var req struct {
Expand All @@ -401,25 +414,65 @@ func InstallBackendOnNodeEndpoint(unloader nodes.NodeCommandSender) echo.Handler
if err := c.Bind(&req); err != nil {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "invalid request body"))
}
// Either a gallery backend name or a direct URI must be supplied.
if req.Backend == "" && req.URI == "" {
return c.JSON(http.StatusBadRequest, nodeError(http.StatusBadRequest, "backend name or uri required"))
}
// Admin-driven backend install: not tied to a specific replica slot
// (no model is being loaded). Pass replica 0 to match the worker's
// admin process-key convention (`backend#0`). The worker's fast path
// takes over if the backend is already running — upgrades go through
// the dedicated /api/backends/upgrade path on backend.upgrade.
reply, err := unloader.InstallBackend(nodeID, req.Backend, "", req.BackendGalleries, req.URI, req.Name, req.Alias, 0)

jobUUID, err := uuid.NewUUID()
if err != nil {
xlog.Error("Failed to install backend on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", err)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to install backend on node"))
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "failed to generate job id"))
}
jobID := jobUUID.String()

// Cache key: for gallery installs, use the backend slug; for URI
// installs prefer the provided Name (falling back to URI). All keys
// are node-scoped so concurrent installs of the same backend on
// different nodes do not stomp each other in opcache.
backendKey := req.Backend
if backendKey == "" {
backendKey = req.Name
if backendKey == "" {
backendKey = req.URI
}
}
if !reply.Success {
xlog.Error("Backend install failed on node", "node", nodeID, "backend", req.Backend, "uri", req.URI, "error", reply.Error)
return c.JSON(http.StatusInternalServerError, nodeError(http.StatusInternalServerError, "backend installation failed"))
cacheKey := galleryop.NodeScopedKey(nodeID, backendKey)
opcache.SetBackend(cacheKey, jobID)

// Optional caller-supplied galleries override. Mirrors the standalone
// install path so an admin can point at a private gallery.
galleries := appConfig.BackendGalleries
if req.BackendGalleries != "" {
var custom []config.Gallery
if err := json.Unmarshal([]byte(req.BackendGalleries), &custom); err != nil {
xlog.Warn("Ignoring malformed backend_galleries override; falling back to configured galleries", "error", err, "nodeID", nodeID)
} else if len(custom) > 0 {
galleries = custom
}
}

ctx, cancelFunc := context.WithCancel(context.Background())
op := galleryop.ManagementOp[gallery.GalleryBackend, any]{
ID: jobID,
GalleryElementName: req.Backend,
Galleries: galleries,
TargetNodeID: nodeID,
ExternalURI: req.URI,
ExternalName: req.Name,
ExternalAlias: req.Alias,
Context: ctx,
CancelFunc: cancelFunc,
}
return c.JSON(http.StatusOK, map[string]string{"message": "backend installed"})
galleryService.StoreCancellation(jobID, cancelFunc)
go func() {
galleryService.BackendGalleryChannel <- op
}()

xlog.Info("Node-scoped backend install dispatched", "node", nodeID, "backend", req.Backend, "uri", req.URI, "jobID", jobID)
return c.JSON(http.StatusAccepted, map[string]string{
"jobID": jobID,
"statusUrl": "/api/backends/job/" + jobID,
"message": "backend installation started",
})
}
}

Expand Down
123 changes: 123 additions & 0 deletions core/http/endpoints/localai/nodes_install_async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package localai_test

import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"

"github.com/labstack/echo/v4"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/http/endpoints/localai"
"github.com/mudler/LocalAI/core/services/galleryop"
)

// InstallBackendOnNodeEndpoint became async to stop blocking the browser on
// the 3-minute NATS reply timeout. These specs lock in the new contract:
// HTTP 202 with a jobID, a ManagementOp enqueued on the gallery channel, and
// an opcache entry keyed by NodeScopedKey so concurrent installs of the same
// backend on different nodes do not stomp each other.
var _ = Describe("InstallBackendOnNodeEndpoint async behavior", func() {
var (
e *echo.Echo
galleryService *galleryop.GalleryService
opcache *galleryop.OpCache
appCfg *config.ApplicationConfig
dispatched chan galleryop.ManagementOp[gallery.GalleryBackend, any]
done chan struct{}
drainExited chan struct{}
)

BeforeEach(func() {
e = echo.New()
appCfg = &config.ApplicationConfig{
BackendGalleries: []config.Gallery{{Name: "test-gallery", URL: "http://example.com"}},
}
galleryService = galleryop.NewGalleryService(appCfg, nil)
opcache = galleryop.NewOpCache(galleryService)
// Drain the gallery channel into a buffered side channel so the
// handler's `go func() { ch <- op }()` send does not block waiting
// for the real worker (which is not running in this unit test).
dispatched = make(chan galleryop.ManagementOp[gallery.GalleryBackend, any], 4)
done = make(chan struct{})
drainExited = make(chan struct{})
go func() {
defer close(drainExited)
for {
select {
case op := <-galleryService.BackendGalleryChannel:
dispatched <- op
case <-done:
return
}
}
}()
})

AfterEach(func() {
// Signal the drain goroutine to exit. We do NOT close
// BackendGalleryChannel: the handler's dispatch goroutine may still
// be pending (specs that don't Eventually-Receive), and a send on a
// closed channel panics. Signalling via `done` lets the drain
// goroutine return without touching the gallery channel.
close(done)
Eventually(drainExited, "2s").Should(BeClosed())
})

It("returns 202 with a jobID and dispatches a TargetNodeID-scoped op", func() {
body := `{"backend": "llama-cpp"}`
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")

handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusAccepted))

var resp map[string]any
Expect(json.Unmarshal(rec.Body.Bytes(), &resp)).To(Succeed())
Expect(resp["jobID"]).To(BeAssignableToTypeOf(""))
Expect(resp["jobID"].(string)).ToNot(BeEmpty())
Expect(resp["message"]).To(Equal("backend installation started"))

Eventually(dispatched, "2s").Should(Receive())
Expect(opcache.Exists(galleryop.NodeScopedKey("node-xyz", "llama-cpp"))).To(BeTrue())
Expect(opcache.IsBackendOp(galleryop.NodeScopedKey("node-xyz", "llama-cpp"))).To(BeTrue())
})

It("returns 400 when neither backend nor uri is supplied", func() {
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(`{}`))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")

handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusBadRequest))
})

It("accepts a direct URI install and uses the name as the cache key", func() {
body := `{"uri": "oci://example.com/custom-backend:v1", "name": "custom"}`
req := httptest.NewRequest(http.MethodPost, "/api/nodes/node-xyz/backends/install", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)
c.SetParamNames("id")
c.SetParamValues("node-xyz")

handler := localai.InstallBackendOnNodeEndpoint(nil, galleryService, opcache, appCfg)
Expect(handler(c)).To(Succeed())
Expect(rec.Code).To(Equal(http.StatusAccepted))

Expect(opcache.Exists(galleryop.NodeScopedKey("node-xyz", "custom"))).To(BeTrue())
})
})
Loading
Loading