Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ type Application struct {
// LocalAI Assistant in-process MCP server. nil when DisableLocalAIAssistant
// is set; otherwise initialised in start() after galleryService.
localAIAssistant *mcpTools.LocalAIAssistantHolder

shutdownOnce sync.Once
}

func newApplication(appConfig *config.ApplicationConfig) *Application {
Expand Down Expand Up @@ -320,6 +322,24 @@ func (a *Application) IsDistributed() bool {
return a.distributed != nil
}

// Shutdown stops backend gRPC processes and distributed services
// synchronously on the caller's stack. The context-cancel goroutine wired
// in New does the same work asynchronously, which races test-binary exit
// and CLI shutdown — orphaning spawned mock-backend / llama.cpp / etc.
// children to init. Callers that need a guarantee that cleanup has
// finished before they proceed (AfterSuite/AfterEach, signal handlers)
// must call this. Safe to call multiple times.
func (a *Application) Shutdown() error {
var err error
a.shutdownOnce.Do(func() {
a.distributed.Shutdown()
if a.modelLoader != nil {
err = a.modelLoader.StopAllGRPC()
}
})
return err
}

// waitForHealthyWorker blocks until at least one healthy backend worker is registered.
// This prevents the agent pool from failing during startup when workers haven't connected yet.
func (a *Application) waitForHealthyWorker() {
Expand Down
10 changes: 6 additions & 4 deletions core/application/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,13 +449,15 @@ func New(opts ...config.AppOption) (*Application, error) {

application.ModelLoader().SetBackendLoggingEnabled(options.EnableBackendLogging)

// turn off any process that was started by GRPC if the context is canceled
// Safety-net cleanup if the application context is cancelled without
// the caller invoking Shutdown directly. This is fire-and-forget — it
// races binary exit and is unreliable in tests; the deterministic path
// is application.Shutdown(), which Shutdown's sync.Once dedupes with
// this goroutine.
go func() {
<-options.Context.Done()
xlog.Debug("Context canceled, shutting down")
application.distributed.Shutdown()
err := application.ModelLoader().StopAllGRPC()
if err != nil {
if err := application.Shutdown(); err != nil {
xlog.Error("error while stopping all grpc backends", "error", err)
}
}()
Expand Down
8 changes: 2 additions & 6 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,12 +577,8 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
}

signals.RegisterGracefulTerminationHandler(func() {
if err := app.ModelLoader().StopAllGRPC(); err != nil {
xlog.Error("error while stopping all grpc backends", "error", err)
}
// Clean up distributed services (idempotent — safe if already called)
if d := app.Distributed(); d != nil {
d.Shutdown()
if err := app.Shutdown(); err != nil {
xlog.Error("error while shutting down application", "error", err)
}
})

Expand Down
27 changes: 21 additions & 6 deletions core/http/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ var _ = Describe("API test", func() {
var cancel context.CancelFunc
var tmpdir string
var modelDir string
// localAIApp captures the Application so AfterEach can synchronously
// stop the spawned gRPC backend processes. application.New cancels
// them asynchronously on context cancel, which races with test-binary
// exit and leaks mock-backend children to init.
var localAIApp *application.Application

commonOpts := []config.AppOption{
config.WithDebug(true),
Expand Down Expand Up @@ -736,14 +741,14 @@ parameters:
)
Expect(err).ToNot(HaveOccurred())

application, err := application.New(
localAIApp, err = application.New(
append(commonOpts,
config.WithContext(c),
config.WithSystemState(systemState),
)...)
Expect(err).ToNot(HaveOccurred())
application.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath)
app, err = API(application)
localAIApp.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath)
app, err = API(localAIApp)
Expect(err).ToNot(HaveOccurred())
go func() {
if err := app.Start("127.0.0.1:9090"); err != nil && err != http.ErrServerClosed {
Expand All @@ -765,6 +770,11 @@ parameters:
}, "2m").ShouldNot(HaveOccurred())
})
AfterEach(func() {
// Synchronous shutdown — context-cancel cleanup is async and races
// test-binary exit, orphaning mock-backend children to init.
if localAIApp != nil {
_ = localAIApp.Shutdown()
}
cancel()
if app != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down Expand Up @@ -976,15 +986,15 @@ parameters:
)
Expect(err).ToNot(HaveOccurred())

application, err := application.New(
localAIApp, err = application.New(
append(commonOpts,
config.WithContext(c),
config.WithSystemState(systemState),
config.WithConfigFile(configFile))...,
)
Expect(err).ToNot(HaveOccurred())
application.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath)
app, err = API(application)
localAIApp.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath)
app, err = API(localAIApp)
Expect(err).ToNot(HaveOccurred())

go func() {
Expand All @@ -1005,6 +1015,11 @@ parameters:
}, "2m").ShouldNot(HaveOccurred())
})
AfterEach(func() {
// Synchronous shutdown — context-cancel cleanup is async and races
// test-binary exit, orphaning mock-backend children to init.
if localAIApp != nil {
_ = localAIApp.Shutdown()
}
cancel()
if app != nil {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
21 changes: 15 additions & 6 deletions tests/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/application"
localaiapp "github.com/mudler/LocalAI/core/application"
"github.com/mudler/LocalAI/core/config"
httpapi "github.com/mudler/LocalAI/core/http"
"github.com/mudler/LocalAI/pkg/system"
Expand Down Expand Up @@ -41,6 +41,7 @@ var (
cloudProxyPath string
mcpServerURL string
mcpServerShutdown func()
localAIApp *localaiapp.Application

// Cloud-proxy fake upstreams. Live for the whole suite so the four
// cloud-proxy model YAMLs can point at their URLs at startup time.
Expand Down Expand Up @@ -390,7 +391,7 @@ var _ = BeforeSuite(func() {
// Create application instance (GeneratedContentDir so sound-generation/TTS can write files the handler sends)
generatedDir := filepath.Join(tmpDir, "generated")
Expect(os.MkdirAll(generatedDir, 0750)).To(Succeed())
application, err := application.New(
localAIApp, err = localaiapp.New(
config.WithContext(appCtx),
config.WithSystemState(systemState),
config.WithDebug(true),
Expand All @@ -399,14 +400,14 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())

// Register mock backend (always available for non-realtime tests).
application.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath)
application.ModelLoader().SetExternalBackend("opus", mockBackendPath)
localAIApp.ModelLoader().SetExternalBackend("mock-backend", mockBackendPath)
localAIApp.ModelLoader().SetExternalBackend("opus", mockBackendPath)
if cloudProxyPath != "" {
application.ModelLoader().SetExternalBackend("cloud-proxy", cloudProxyPath)
localAIApp.ModelLoader().SetExternalBackend("cloud-proxy", cloudProxyPath)
}

// Create HTTP app
app, err = httpapi.API(application)
app, err = httpapi.API(localAIApp)
Expect(err).ToNot(HaveOccurred())

// Get free port
Expand Down Expand Up @@ -436,6 +437,14 @@ var _ = BeforeSuite(func() {
})

var _ = AfterSuite(func() {
// Synchronous shutdown — the context-cancel goroutine in application.New
// runs the same cleanup asynchronously, which races test-binary exit and
// orphans spawned mock-backend children to init.
if localAIApp != nil {
if err := localAIApp.Shutdown(); err != nil {
xlog.Error("error shutting down application", "error", err)
}
}
if appCancel != nil {
appCancel()
}
Expand Down
Loading