diff --git a/Dockerfile b/Dockerfile index 151c73e161fc..7a705a9fc514 100644 --- a/Dockerfile +++ b/Dockerfile @@ -332,6 +332,6 @@ RUN mkdir -p /models /backends HEALTHCHECK --interval=1m --timeout=10m --retries=10 \ CMD curl -f ${HEALTHCHECK_ENDPOINT} || exit 1 -VOLUME /models /backends +VOLUME /models /backends /configuration EXPOSE 8080 ENTRYPOINT [ "/entrypoint.sh" ] diff --git a/core/application/application.go b/core/application/application.go index c852566d7a47..24c53fcbae65 100644 --- a/core/application/application.go +++ b/core/application/application.go @@ -1,6 +1,9 @@ package application import ( + "context" + "sync" + "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/services" "github.com/mudler/LocalAI/core/templates" @@ -11,8 +14,14 @@ type Application struct { backendLoader *config.ModelConfigLoader modelLoader *model.ModelLoader applicationConfig *config.ApplicationConfig + startupConfig *config.ApplicationConfig // Stores original config from env vars (before file loading) templatesEvaluator *templates.Evaluator galleryService *services.GalleryService + watchdogMutex sync.Mutex + watchdogStop chan bool + p2pMutex sync.Mutex + p2pCtx context.Context + p2pCancel context.CancelFunc } func newApplication(appConfig *config.ApplicationConfig) *Application { @@ -44,6 +53,11 @@ func (a *Application) GalleryService() *services.GalleryService { return a.galleryService } +// StartupConfig returns the original startup configuration (from env vars, before file loading) +func (a *Application) StartupConfig() *config.ApplicationConfig { + return a.startupConfig +} + func (a *Application) start() error { galleryService := services.NewGalleryService(a.ApplicationConfig(), a.ModelLoader()) err := galleryService.Start(a.ApplicationConfig().Context, a.ModelConfigLoader(), a.ApplicationConfig().SystemState) diff --git a/core/application/config_file_watcher.go b/core/application/config_file_watcher.go index 46f29b101acb..0129828cac5f 100644 --- a/core/application/config_file_watcher.go +++ b/core/application/config_file_watcher.go @@ -1,180 +1,343 @@ -package application - -import ( - "encoding/json" - "fmt" - "os" - "path" - "path/filepath" - "time" - - "dario.cat/mergo" - "github.com/fsnotify/fsnotify" - "github.com/mudler/LocalAI/core/config" - "github.com/rs/zerolog/log" -) - -type fileHandler func(fileContent []byte, appConfig *config.ApplicationConfig) error - -type configFileHandler struct { - handlers map[string]fileHandler - - watcher *fsnotify.Watcher - - appConfig *config.ApplicationConfig -} - -// TODO: This should be a singleton eventually so other parts of the code can register config file handlers, -// then we can export it to other packages -func newConfigFileHandler(appConfig *config.ApplicationConfig) configFileHandler { - c := configFileHandler{ - handlers: make(map[string]fileHandler), - appConfig: appConfig, - } - err := c.Register("api_keys.json", readApiKeysJson(*appConfig), true) - if err != nil { - log.Error().Err(err).Str("file", "api_keys.json").Msg("unable to register config file handler") - } - err = c.Register("external_backends.json", readExternalBackendsJson(*appConfig), true) - if err != nil { - log.Error().Err(err).Str("file", "external_backends.json").Msg("unable to register config file handler") - } - return c -} - -func (c *configFileHandler) Register(filename string, handler fileHandler, runNow bool) error { - _, ok := c.handlers[filename] - if ok { - return fmt.Errorf("handler already registered for file %s", filename) - } - c.handlers[filename] = handler - if runNow { - c.callHandler(filename, handler) - } - return nil -} - -func (c *configFileHandler) callHandler(filename string, handler fileHandler) { - rootedFilePath := filepath.Join(c.appConfig.DynamicConfigsDir, filepath.Clean(filename)) - log.Trace().Str("filename", rootedFilePath).Msg("reading file for dynamic config update") - fileContent, err := os.ReadFile(rootedFilePath) - if err != nil && !os.IsNotExist(err) { - log.Error().Err(err).Str("filename", rootedFilePath).Msg("could not read file") - } - - if err = handler(fileContent, c.appConfig); err != nil { - log.Error().Err(err).Msg("WatchConfigDirectory goroutine failed to update options") - } -} - -func (c *configFileHandler) Watch() error { - configWatcher, err := fsnotify.NewWatcher() - c.watcher = configWatcher - if err != nil { - return err - } - - if c.appConfig.DynamicConfigsDirPollInterval > 0 { - log.Debug().Msg("Poll interval set, falling back to polling for configuration changes") - ticker := time.NewTicker(c.appConfig.DynamicConfigsDirPollInterval) - go func() { - for { - <-ticker.C - for file, handler := range c.handlers { - log.Debug().Str("file", file).Msg("polling config file") - c.callHandler(file, handler) - } - } - }() - } - - // Start listening for events. - go func() { - for { - select { - case event, ok := <-c.watcher.Events: - if !ok { - return - } - if event.Has(fsnotify.Write | fsnotify.Create | fsnotify.Remove) { - handler, ok := c.handlers[path.Base(event.Name)] - if !ok { - continue - } - - c.callHandler(filepath.Base(event.Name), handler) - } - case err, ok := <-c.watcher.Errors: - log.Error().Err(err).Msg("config watcher error received") - if !ok { - return - } - } - } - }() - - // Add a path. - err = c.watcher.Add(c.appConfig.DynamicConfigsDir) - if err != nil { - return fmt.Errorf("unable to create a watcher on the configuration directory: %+v", err) - } - - return nil -} - -// TODO: When we institute graceful shutdown, this should be called -func (c *configFileHandler) Stop() error { - return c.watcher.Close() -} - -func readApiKeysJson(startupAppConfig config.ApplicationConfig) fileHandler { - handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error { - log.Debug().Msg("processing api keys runtime update") - log.Trace().Int("numKeys", len(startupAppConfig.ApiKeys)).Msg("api keys provided at startup") - - if len(fileContent) > 0 { - // Parse JSON content from the file - var fileKeys []string - err := json.Unmarshal(fileContent, &fileKeys) - if err != nil { - return err - } - - log.Trace().Int("numKeys", len(fileKeys)).Msg("discovered API keys from api keys dynamic config dile") - - appConfig.ApiKeys = append(startupAppConfig.ApiKeys, fileKeys...) - } else { - log.Trace().Msg("no API keys discovered from dynamic config file") - appConfig.ApiKeys = startupAppConfig.ApiKeys - } - log.Trace().Int("numKeys", len(appConfig.ApiKeys)).Msg("total api keys after processing") - return nil - } - - return handler -} - -func readExternalBackendsJson(startupAppConfig config.ApplicationConfig) fileHandler { - handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error { - log.Debug().Msg("processing external_backends.json") - - if len(fileContent) > 0 { - // Parse JSON content from the file - var fileBackends map[string]string - err := json.Unmarshal(fileContent, &fileBackends) - if err != nil { - return err - } - appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends - err = mergo.Merge(&appConfig.ExternalGRPCBackends, &fileBackends) - if err != nil { - return err - } - } else { - appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends - } - log.Debug().Msg("external backends loaded from external_backends.json") - return nil - } - return handler -} +package application + +import ( + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "time" + + "dario.cat/mergo" + "github.com/fsnotify/fsnotify" + "github.com/mudler/LocalAI/core/config" + "github.com/rs/zerolog/log" +) + +type fileHandler func(fileContent []byte, appConfig *config.ApplicationConfig) error + +type configFileHandler struct { + handlers map[string]fileHandler + + watcher *fsnotify.Watcher + + appConfig *config.ApplicationConfig +} + +// TODO: This should be a singleton eventually so other parts of the code can register config file handlers, +// then we can export it to other packages +func newConfigFileHandler(appConfig *config.ApplicationConfig) configFileHandler { + c := configFileHandler{ + handlers: make(map[string]fileHandler), + appConfig: appConfig, + } + err := c.Register("api_keys.json", readApiKeysJson(*appConfig), true) + if err != nil { + log.Error().Err(err).Str("file", "api_keys.json").Msg("unable to register config file handler") + } + err = c.Register("external_backends.json", readExternalBackendsJson(*appConfig), true) + if err != nil { + log.Error().Err(err).Str("file", "external_backends.json").Msg("unable to register config file handler") + } + err = c.Register("runtime_settings.json", readRuntimeSettingsJson(*appConfig), true) + if err != nil { + log.Error().Err(err).Str("file", "runtime_settings.json").Msg("unable to register config file handler") + } + return c +} + +func (c *configFileHandler) Register(filename string, handler fileHandler, runNow bool) error { + _, ok := c.handlers[filename] + if ok { + return fmt.Errorf("handler already registered for file %s", filename) + } + c.handlers[filename] = handler + if runNow { + c.callHandler(filename, handler) + } + return nil +} + +func (c *configFileHandler) callHandler(filename string, handler fileHandler) { + rootedFilePath := filepath.Join(c.appConfig.DynamicConfigsDir, filepath.Clean(filename)) + log.Trace().Str("filename", rootedFilePath).Msg("reading file for dynamic config update") + fileContent, err := os.ReadFile(rootedFilePath) + if err != nil && !os.IsNotExist(err) { + log.Error().Err(err).Str("filename", rootedFilePath).Msg("could not read file") + } + + if err = handler(fileContent, c.appConfig); err != nil { + log.Error().Err(err).Msg("WatchConfigDirectory goroutine failed to update options") + } +} + +func (c *configFileHandler) Watch() error { + configWatcher, err := fsnotify.NewWatcher() + c.watcher = configWatcher + if err != nil { + return err + } + + if c.appConfig.DynamicConfigsDirPollInterval > 0 { + log.Debug().Msg("Poll interval set, falling back to polling for configuration changes") + ticker := time.NewTicker(c.appConfig.DynamicConfigsDirPollInterval) + go func() { + for { + <-ticker.C + for file, handler := range c.handlers { + log.Debug().Str("file", file).Msg("polling config file") + c.callHandler(file, handler) + } + } + }() + } + + // Start listening for events. + go func() { + for { + select { + case event, ok := <-c.watcher.Events: + if !ok { + return + } + if event.Has(fsnotify.Write | fsnotify.Create | fsnotify.Remove) { + handler, ok := c.handlers[path.Base(event.Name)] + if !ok { + continue + } + + c.callHandler(filepath.Base(event.Name), handler) + } + case err, ok := <-c.watcher.Errors: + log.Error().Err(err).Msg("config watcher error received") + if !ok { + return + } + } + } + }() + + // Add a path. + err = c.watcher.Add(c.appConfig.DynamicConfigsDir) + if err != nil { + return fmt.Errorf("unable to create a watcher on the configuration directory: %+v", err) + } + + return nil +} + +// TODO: When we institute graceful shutdown, this should be called +func (c *configFileHandler) Stop() error { + return c.watcher.Close() +} + +func readApiKeysJson(startupAppConfig config.ApplicationConfig) fileHandler { + handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error { + log.Debug().Msg("processing api keys runtime update") + log.Trace().Int("numKeys", len(startupAppConfig.ApiKeys)).Msg("api keys provided at startup") + + if len(fileContent) > 0 { + // Parse JSON content from the file + var fileKeys []string + err := json.Unmarshal(fileContent, &fileKeys) + if err != nil { + return err + } + + log.Trace().Int("numKeys", len(fileKeys)).Msg("discovered API keys from api keys dynamic config dile") + + appConfig.ApiKeys = append(startupAppConfig.ApiKeys, fileKeys...) + } else { + log.Trace().Msg("no API keys discovered from dynamic config file") + appConfig.ApiKeys = startupAppConfig.ApiKeys + } + log.Trace().Int("numKeys", len(appConfig.ApiKeys)).Msg("total api keys after processing") + return nil + } + + return handler +} + +func readExternalBackendsJson(startupAppConfig config.ApplicationConfig) fileHandler { + handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error { + log.Debug().Msg("processing external_backends.json") + + if len(fileContent) > 0 { + // Parse JSON content from the file + var fileBackends map[string]string + err := json.Unmarshal(fileContent, &fileBackends) + if err != nil { + return err + } + appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends + err = mergo.Merge(&appConfig.ExternalGRPCBackends, &fileBackends) + if err != nil { + return err + } + } else { + appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends + } + log.Debug().Msg("external backends loaded from external_backends.json") + return nil + } + return handler +} + +type runtimeSettings struct { + WatchdogEnabled *bool `json:"watchdog_enabled,omitempty"` + WatchdogIdleEnabled *bool `json:"watchdog_idle_enabled,omitempty"` + WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"` + WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"` + WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"` + SingleBackend *bool `json:"single_backend,omitempty"` + ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"` + Threads *int `json:"threads,omitempty"` + ContextSize *int `json:"context_size,omitempty"` + F16 *bool `json:"f16,omitempty"` + Debug *bool `json:"debug,omitempty"` + CORS *bool `json:"cors,omitempty"` + CSRF *bool `json:"csrf,omitempty"` + CORSAllowOrigins *string `json:"cors_allow_origins,omitempty"` + P2PToken *string `json:"p2p_token,omitempty"` + P2PNetworkID *string `json:"p2p_network_id,omitempty"` + Federated *bool `json:"federated,omitempty"` + Galleries *[]config.Gallery `json:"galleries,omitempty"` + BackendGalleries *[]config.Gallery `json:"backend_galleries,omitempty"` + AutoloadGalleries *bool `json:"autoload_galleries,omitempty"` + AutoloadBackendGalleries *bool `json:"autoload_backend_galleries,omitempty"` + ApiKeys *[]string `json:"api_keys,omitempty"` +} + +func readRuntimeSettingsJson(startupAppConfig config.ApplicationConfig) fileHandler { + handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error { + log.Debug().Msg("processing runtime_settings.json") + + // Determine if settings came from env vars by comparing with startup config + // startupAppConfig contains the original values set from env vars at startup. + // If current values match startup values, they came from env vars (or defaults). + // We apply file settings only if current values match startup values (meaning not from env vars). + envWatchdogIdle := appConfig.WatchDogIdle == startupAppConfig.WatchDogIdle + envWatchdogBusy := appConfig.WatchDogBusy == startupAppConfig.WatchDogBusy + envWatchdogIdleTimeout := appConfig.WatchDogIdleTimeout == startupAppConfig.WatchDogIdleTimeout + envWatchdogBusyTimeout := appConfig.WatchDogBusyTimeout == startupAppConfig.WatchDogBusyTimeout + envSingleBackend := appConfig.SingleBackend == startupAppConfig.SingleBackend + envParallelRequests := appConfig.ParallelBackendRequests == startupAppConfig.ParallelBackendRequests + envThreads := appConfig.Threads == startupAppConfig.Threads + envContextSize := appConfig.ContextSize == startupAppConfig.ContextSize + envF16 := appConfig.F16 == startupAppConfig.F16 + envDebug := appConfig.Debug == startupAppConfig.Debug + envCORS := appConfig.CORS == startupAppConfig.CORS + envCSRF := appConfig.CSRF == startupAppConfig.CSRF + envCORSAllowOrigins := appConfig.CORSAllowOrigins == startupAppConfig.CORSAllowOrigins + envP2PToken := appConfig.P2PToken == startupAppConfig.P2PToken + envP2PNetworkID := appConfig.P2PNetworkID == startupAppConfig.P2PNetworkID + envFederated := appConfig.Federated == startupAppConfig.Federated + envAutoloadGalleries := appConfig.AutoloadGalleries == startupAppConfig.AutoloadGalleries + envAutoloadBackendGalleries := appConfig.AutoloadBackendGalleries == startupAppConfig.AutoloadBackendGalleries + + if len(fileContent) > 0 { + var settings runtimeSettings + err := json.Unmarshal(fileContent, &settings) + if err != nil { + return err + } + + // Apply file settings only if they don't match startup values (i.e., not from env vars) + if settings.WatchdogIdleEnabled != nil && !envWatchdogIdle { + appConfig.WatchDogIdle = *settings.WatchdogIdleEnabled + if appConfig.WatchDogIdle { + appConfig.WatchDog = true + } + } + if settings.WatchdogBusyEnabled != nil && !envWatchdogBusy { + appConfig.WatchDogBusy = *settings.WatchdogBusyEnabled + if appConfig.WatchDogBusy { + appConfig.WatchDog = true + } + } + if settings.WatchdogIdleTimeout != nil && !envWatchdogIdleTimeout { + dur, err := time.ParseDuration(*settings.WatchdogIdleTimeout) + if err == nil { + appConfig.WatchDogIdleTimeout = dur + } else { + log.Warn().Err(err).Str("timeout", *settings.WatchdogIdleTimeout).Msg("invalid watchdog idle timeout in runtime_settings.json") + } + } + if settings.WatchdogBusyTimeout != nil && !envWatchdogBusyTimeout { + dur, err := time.ParseDuration(*settings.WatchdogBusyTimeout) + if err == nil { + appConfig.WatchDogBusyTimeout = dur + } else { + log.Warn().Err(err).Str("timeout", *settings.WatchdogBusyTimeout).Msg("invalid watchdog busy timeout in runtime_settings.json") + } + } + if settings.SingleBackend != nil && !envSingleBackend { + appConfig.SingleBackend = *settings.SingleBackend + } + if settings.ParallelBackendRequests != nil && !envParallelRequests { + appConfig.ParallelBackendRequests = *settings.ParallelBackendRequests + } + if settings.Threads != nil && !envThreads { + appConfig.Threads = *settings.Threads + } + if settings.ContextSize != nil && !envContextSize { + appConfig.ContextSize = *settings.ContextSize + } + if settings.F16 != nil && !envF16 { + appConfig.F16 = *settings.F16 + } + if settings.Debug != nil && !envDebug { + appConfig.Debug = *settings.Debug + } + if settings.CORS != nil && !envCORS { + appConfig.CORS = *settings.CORS + } + if settings.CSRF != nil && !envCSRF { + appConfig.CSRF = *settings.CSRF + } + if settings.CORSAllowOrigins != nil && !envCORSAllowOrigins { + appConfig.CORSAllowOrigins = *settings.CORSAllowOrigins + } + if settings.P2PToken != nil && !envP2PToken { + appConfig.P2PToken = *settings.P2PToken + } + if settings.P2PNetworkID != nil && !envP2PNetworkID { + appConfig.P2PNetworkID = *settings.P2PNetworkID + } + if settings.Federated != nil && !envFederated { + appConfig.Federated = *settings.Federated + } + if settings.Galleries != nil { + appConfig.Galleries = *settings.Galleries + } + if settings.BackendGalleries != nil { + appConfig.BackendGalleries = *settings.BackendGalleries + } + if settings.AutoloadGalleries != nil && !envAutoloadGalleries { + appConfig.AutoloadGalleries = *settings.AutoloadGalleries + } + if settings.AutoloadBackendGalleries != nil && !envAutoloadBackendGalleries { + appConfig.AutoloadBackendGalleries = *settings.AutoloadBackendGalleries + } + if settings.ApiKeys != nil { + // API keys from env vars (startup) should be kept, runtime settings keys replace all runtime keys + // If runtime_settings.json specifies ApiKeys (even if empty), it replaces all runtime keys + // Start with env keys, then add runtime_settings.json keys (which may be empty to clear them) + envKeys := startupAppConfig.ApiKeys + runtimeKeys := *settings.ApiKeys + // Replace all runtime keys with what's in runtime_settings.json + appConfig.ApiKeys = append(envKeys, runtimeKeys...) + } + + // If watchdog is enabled via file but not via env, ensure WatchDog flag is set + if !envWatchdogIdle && !envWatchdogBusy { + if settings.WatchdogEnabled != nil && *settings.WatchdogEnabled { + appConfig.WatchDog = true + } + } + } + log.Debug().Msg("runtime settings loaded from runtime_settings.json") + return nil + } + return handler +} diff --git a/core/application/p2p.go b/core/application/p2p.go new file mode 100644 index 000000000000..87618e913a97 --- /dev/null +++ b/core/application/p2p.go @@ -0,0 +1,240 @@ +package application + +import ( + "context" + "fmt" + "net" + "slices" + "time" + + "github.com/google/uuid" + "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/LocalAI/core/schema" + "github.com/mudler/LocalAI/core/services" + + "github.com/mudler/edgevpn/pkg/node" + "github.com/rs/zerolog/log" + zlog "github.com/rs/zerolog/log" +) + +func (a *Application) StopP2P() error { + if a.p2pCancel != nil { + a.p2pCancel() + a.p2pCancel = nil + a.p2pCtx = nil + // Wait a bit for shutdown to complete + time.Sleep(200 * time.Millisecond) + } + return nil +} + +func (a *Application) StartP2P() error { + // we need a p2p token + if a.applicationConfig.P2PToken == "" { + return fmt.Errorf("P2P token is not set") + } + + networkID := a.applicationConfig.P2PNetworkID + + ctx, cancel := context.WithCancel(a.ApplicationConfig().Context) + a.p2pCtx = ctx + a.p2pCancel = cancel + + var n *node.Node + // Here we are avoiding creating multiple nodes: + // - if the federated mode is enabled, we create a federated node and expose a service + // - exposing a service creates a node with specific options, and we don't want to create another node + + // If the federated mode is enabled, we expose a service to the local instance running + // at r.Address + if a.applicationConfig.Federated { + _, port, err := net.SplitHostPort(a.applicationConfig.APIAddress) + if err != nil { + return err + } + + // Here a new node is created and started + // and a service is exposed by the node + node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID)) + if err != nil { + return err + } + + if err := p2p.ServiceDiscoverer(ctx, node, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil { + return err + } + + n = node + // start node sync in the background + if err := a.p2pSync(ctx, node); err != nil { + return err + } + } + + // If a node wasn't created previously, create it + if n == nil { + node, err := p2p.NewNode(a.applicationConfig.P2PToken) + if err != nil { + return err + } + err = node.Start(ctx) + if err != nil { + return fmt.Errorf("starting new node: %w", err) + } + n = node + } + + // Attach a ServiceDiscoverer to the p2p node + log.Info().Msg("Starting P2P server discovery...") + if err := p2p.ServiceDiscoverer(ctx, n, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node schema.NodeData) { + var tunnelAddresses []string + for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) { + if v.IsOnline() { + tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + } else { + log.Info().Msgf("Node %s is offline", v.ID) + } + } + if a.applicationConfig.TunnelCallback != nil { + a.applicationConfig.TunnelCallback(tunnelAddresses) + } + }, true); err != nil { + return err + } + + return nil +} + +// RestartP2P restarts the P2P stack with current ApplicationConfig settings +// Note: This method signals that P2P should be restarted, but the actual restart +// is handled by the caller to avoid import cycles +func (a *Application) RestartP2P() error { + a.p2pMutex.Lock() + defer a.p2pMutex.Unlock() + + // Stop existing P2P if running + if a.p2pCancel != nil { + a.p2pCancel() + a.p2pCancel = nil + a.p2pCtx = nil + // Wait a bit for shutdown to complete + time.Sleep(200 * time.Millisecond) + } + + appConfig := a.ApplicationConfig() + + // Start P2P if token is set + if appConfig.P2PToken == "" { + return fmt.Errorf("P2P token is not set") + } + + // Create new context for P2P + ctx, cancel := context.WithCancel(appConfig.Context) + a.p2pCtx = ctx + a.p2pCancel = cancel + + // Get API address from config + address := appConfig.APIAddress + if address == "" { + address = "127.0.0.1:8080" // default + } + + // Start P2P stack in a goroutine + go func() { + if err := a.StartP2P(); err != nil { + log.Error().Err(err).Msg("Failed to start P2P stack") + cancel() // Cancel context on error + } + }() + log.Info().Msg("P2P stack restarted with new settings") + + return nil +} + +func syncState(ctx context.Context, n *node.Node, app *Application) error { + zlog.Debug().Msg("[p2p-sync] Syncing state") + + whatWeHave := []string{} + for _, model := range app.ModelConfigLoader().GetAllModelsConfigs() { + whatWeHave = append(whatWeHave, model.Name) + } + + ledger, _ := n.Ledger() + currentData := ledger.CurrentData() + zlog.Debug().Msgf("[p2p-sync] Current data: %v", currentData) + data, exists := ledger.GetKey("shared_state", "models") + if !exists { + ledger.AnnounceUpdate(ctx, time.Minute, "shared_state", "models", whatWeHave) + zlog.Debug().Msgf("No models found in the ledger, announced our models: %v", whatWeHave) + } + + models := []string{} + if err := data.Unmarshal(&models); err != nil { + zlog.Warn().Err(err).Msg("error unmarshalling models") + return nil + } + + zlog.Debug().Msgf("[p2p-sync] Models that are present in this instance: %v\nModels that are in the ledger: %v", whatWeHave, models) + + // Sync with our state + whatIsNotThere := []string{} + for _, model := range whatWeHave { + if !slices.Contains(models, model) { + whatIsNotThere = append(whatIsNotThere, model) + } + } + if len(whatIsNotThere) > 0 { + zlog.Debug().Msgf("[p2p-sync] Announcing our models: %v", append(models, whatIsNotThere...)) + ledger.AnnounceUpdate( + ctx, + 1*time.Minute, + "shared_state", + "models", + append(models, whatIsNotThere...), + ) + } + + // Check if we have a model that is not in our state, otherwise install it + for _, model := range models { + if slices.Contains(whatWeHave, model) { + zlog.Debug().Msgf("[p2p-sync] Model %s is already present in this instance", model) + continue + } + + // we install model + zlog.Info().Msgf("[p2p-sync] Installing model which is not present in this instance: %s", model) + + uuid, err := uuid.NewUUID() + if err != nil { + zlog.Error().Err(err).Msg("error generating UUID") + continue + } + + app.GalleryService().ModelGalleryChannel <- services.GalleryOp[gallery.GalleryModel, gallery.ModelConfig]{ + ID: uuid.String(), + GalleryElementName: model, + Galleries: app.ApplicationConfig().Galleries, + BackendGalleries: app.ApplicationConfig().BackendGalleries, + } + } + + return nil +} + +func (a *Application) p2pSync(ctx context.Context, n *node.Node) error { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Minute): + if err := syncState(ctx, n, a); err != nil { + zlog.Error().Err(err).Msg("error syncing state") + } + } + + } + }() + return nil +} diff --git a/core/application/startup.go b/core/application/startup.go index eb387d06debd..6186424e5c4f 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -1,8 +1,11 @@ package application import ( + "encoding/json" "fmt" "os" + "path/filepath" + "time" "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" @@ -18,7 +21,12 @@ import ( func New(opts ...config.AppOption) (*Application, error) { options := config.NewApplicationConfig(opts...) + + // Store a copy of the startup config (from env vars, before file loading) + // This is used to determine if settings came from env vars vs file + startupConfigCopy := *options application := newApplication(options) + application.startupConfig = &startupConfigCopy log.Info().Msgf("Starting LocalAI using %d threads, with models path: %s", options.Threads, options.SystemState.Model.ModelsPath) log.Info().Msgf("LocalAI version: %s", internal.PrintableVersion()) @@ -110,6 +118,13 @@ func New(opts ...config.AppOption) (*Application, error) { } } + // Load runtime settings from file if DynamicConfigsDir is set + // This applies file settings with env var precedence (env vars take priority) + // Note: startupConfigCopy was already created above, so it has the original env var values + if options.DynamicConfigsDir != "" { + loadRuntimeSettingsFromFile(options) + } + // turn off any process that was started by GRPC if the context is canceled go func() { <-options.Context.Done() @@ -120,21 +135,8 @@ func New(opts ...config.AppOption) (*Application, error) { } }() - if options.WatchDog { - wd := model.NewWatchDog( - application.ModelLoader(), - options.WatchDogBusyTimeout, - options.WatchDogIdleTimeout, - options.WatchDogBusy, - options.WatchDogIdle) - application.ModelLoader().SetWatchDog(wd) - go wd.Run() - go func() { - <-options.Context.Done() - log.Debug().Msgf("Context canceled, shutting down") - wd.Shutdown() - }() - } + // Initialize watchdog with current settings (after loading from file) + initializeWatchdog(application, options) if options.LoadToMemory != nil && !options.SingleBackend { for _, m := range options.LoadToMemory { @@ -186,3 +188,131 @@ func startWatcher(options *config.ApplicationConfig) { log.Error().Err(err).Msg("failed creating watcher") } } + +// loadRuntimeSettingsFromFile loads settings from runtime_settings.json with env var precedence +// This function is called at startup, before env vars are applied via AppOptions. +// Since env vars are applied via AppOptions in run.go, we need to check if they're set. +// We do this by checking if the current options values differ from defaults, which would +// indicate they were set from env vars. However, a simpler approach is to just apply +// file settings here, and let the AppOptions (which are applied after this) override them. +// But actually, this is called AFTER AppOptions are applied in New(), so we need to check env vars. +// The cleanest solution: Store original values before applying file, or check if values match +// what would be set from env vars. For now, we'll apply file settings and they'll be +// overridden by AppOptions if env vars were set (but AppOptions are already applied). +// Actually, this function is called in New() before AppOptions are fully processed for watchdog. +// Let's check the call order: New() -> loadRuntimeSettingsFromFile() -> initializeWatchdog() +// But AppOptions are applied in NewApplicationConfig() which is called first. +// So at this point, options already has values from env vars. We should compare against +// defaults to see if env vars were set. But we don't have defaults stored. +// Simplest: Just apply file settings. If env vars were set, they're already in options. +// The file watcher handler will handle runtime changes properly by comparing with startupAppConfig. +func loadRuntimeSettingsFromFile(options *config.ApplicationConfig) { + settingsFile := filepath.Join(options.DynamicConfigsDir, "runtime_settings.json") + fileContent, err := os.ReadFile(settingsFile) + if err != nil { + if os.IsNotExist(err) { + log.Debug().Msg("runtime_settings.json not found, using defaults") + return + } + log.Warn().Err(err).Msg("failed to read runtime_settings.json") + return + } + + var settings struct { + WatchdogEnabled *bool `json:"watchdog_enabled,omitempty"` + WatchdogIdleEnabled *bool `json:"watchdog_idle_enabled,omitempty"` + WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"` + WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"` + WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"` + SingleBackend *bool `json:"single_backend,omitempty"` + ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"` + } + + if err := json.Unmarshal(fileContent, &settings); err != nil { + log.Warn().Err(err).Msg("failed to parse runtime_settings.json") + return + } + + // At this point, options already has values from env vars (via AppOptions in run.go). + // To avoid env var duplication, we determine if env vars were set by checking if + // current values differ from defaults. Defaults are: false for bools, 0 for durations. + // If current value is at default, it likely wasn't set from env var, so we can apply file. + // If current value is non-default, it was likely set from env var, so we preserve it. + // Note: This means env vars explicitly setting to false/0 won't be distinguishable from defaults, + // but that's an acceptable limitation to avoid env var duplication. + + if settings.WatchdogIdleEnabled != nil { + // Only apply if current value is default (false), suggesting it wasn't set from env var + if !options.WatchDogIdle { + options.WatchDogIdle = *settings.WatchdogIdleEnabled + if options.WatchDogIdle { + options.WatchDog = true + } + } + } + if settings.WatchdogBusyEnabled != nil { + if !options.WatchDogBusy { + options.WatchDogBusy = *settings.WatchdogBusyEnabled + if options.WatchDogBusy { + options.WatchDog = true + } + } + } + if settings.WatchdogIdleTimeout != nil { + // Only apply if current value is default (0), suggesting it wasn't set from env var + if options.WatchDogIdleTimeout == 0 { + dur, err := time.ParseDuration(*settings.WatchdogIdleTimeout) + if err == nil { + options.WatchDogIdleTimeout = dur + } else { + log.Warn().Err(err).Str("timeout", *settings.WatchdogIdleTimeout).Msg("invalid watchdog idle timeout in runtime_settings.json") + } + } + } + if settings.WatchdogBusyTimeout != nil { + if options.WatchDogBusyTimeout == 0 { + dur, err := time.ParseDuration(*settings.WatchdogBusyTimeout) + if err == nil { + options.WatchDogBusyTimeout = dur + } else { + log.Warn().Err(err).Str("timeout", *settings.WatchdogBusyTimeout).Msg("invalid watchdog busy timeout in runtime_settings.json") + } + } + } + if settings.SingleBackend != nil { + if !options.SingleBackend { + options.SingleBackend = *settings.SingleBackend + } + } + if settings.ParallelBackendRequests != nil { + if !options.ParallelBackendRequests { + options.ParallelBackendRequests = *settings.ParallelBackendRequests + } + } + if !options.WatchDogIdle && !options.WatchDogBusy { + if settings.WatchdogEnabled != nil && *settings.WatchdogEnabled { + options.WatchDog = true + } + } + + log.Debug().Msg("Runtime settings loaded from runtime_settings.json") +} + +// initializeWatchdog initializes the watchdog with current ApplicationConfig settings +func initializeWatchdog(application *Application, options *config.ApplicationConfig) { + if options.WatchDog { + wd := model.NewWatchDog( + application.ModelLoader(), + options.WatchDogBusyTimeout, + options.WatchDogIdleTimeout, + options.WatchDogBusy, + options.WatchDogIdle) + application.ModelLoader().SetWatchDog(wd) + go wd.Run() + go func() { + <-options.Context.Done() + log.Debug().Msgf("Context canceled, shutting down") + wd.Shutdown() + }() + } +} diff --git a/core/application/watchdog.go b/core/application/watchdog.go new file mode 100644 index 000000000000..20acf0b7a491 --- /dev/null +++ b/core/application/watchdog.go @@ -0,0 +1,88 @@ +package application + +import ( + "time" + + "github.com/mudler/LocalAI/pkg/model" + "github.com/rs/zerolog/log" +) + +func (a *Application) StopWatchdog() error { + if a.watchdogStop != nil { + close(a.watchdogStop) + a.watchdogStop = nil + } + return nil +} + +// startWatchdog starts the watchdog with current ApplicationConfig settings +// This is an internal method that assumes the caller holds the watchdogMutex +func (a *Application) startWatchdog() error { + appConfig := a.ApplicationConfig() + + // Create new watchdog if enabled + if appConfig.WatchDog { + wd := model.NewWatchDog( + a.modelLoader, + appConfig.WatchDogBusyTimeout, + appConfig.WatchDogIdleTimeout, + appConfig.WatchDogBusy, + appConfig.WatchDogIdle) + a.modelLoader.SetWatchDog(wd) + + // Create new stop channel + a.watchdogStop = make(chan bool, 1) + + // Start watchdog goroutine + go wd.Run() + + // Setup shutdown handler + go func() { + select { + case <-a.watchdogStop: + log.Debug().Msg("Watchdog stop signal received") + wd.Shutdown() + case <-appConfig.Context.Done(): + log.Debug().Msg("Context canceled, shutting down watchdog") + wd.Shutdown() + } + }() + + log.Info().Msg("Watchdog started with new settings") + } else { + log.Info().Msg("Watchdog disabled") + } + + return nil +} + +// StartWatchdog starts the watchdog with current ApplicationConfig settings +func (a *Application) StartWatchdog() error { + a.watchdogMutex.Lock() + defer a.watchdogMutex.Unlock() + + return a.startWatchdog() +} + +// RestartWatchdog restarts the watchdog with current ApplicationConfig settings +func (a *Application) RestartWatchdog() error { + a.watchdogMutex.Lock() + defer a.watchdogMutex.Unlock() + + // Shutdown existing watchdog if running + if a.watchdogStop != nil { + close(a.watchdogStop) + a.watchdogStop = nil + } + + // Shutdown existing watchdog if running + currentWD := a.modelLoader.GetWatchDog() + if currentWD != nil { + currentWD.Shutdown() + // Wait a bit for shutdown to complete + time.Sleep(100 * time.Millisecond) + } + + // Start watchdog with new settings + return a.startWatchdog() +} diff --git a/core/cli/api/p2p.go b/core/cli/api/p2p.go deleted file mode 100644 index 9e94e94d6eb3..000000000000 --- a/core/cli/api/p2p.go +++ /dev/null @@ -1,87 +0,0 @@ -package cli_api - -import ( - "context" - "fmt" - "net" - "os" - "strings" - - "github.com/mudler/LocalAI/core/application" - "github.com/mudler/LocalAI/core/p2p" - "github.com/mudler/LocalAI/core/schema" - "github.com/mudler/edgevpn/pkg/node" - - "github.com/rs/zerolog/log" -) - -func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool, app *application.Application) error { - var n *node.Node - // Here we are avoiding creating multiple nodes: - // - if the federated mode is enabled, we create a federated node and expose a service - // - exposing a service creates a node with specific options, and we don't want to create another node - - // If the federated mode is enabled, we expose a service to the local instance running - // at r.Address - if federated { - _, port, err := net.SplitHostPort(address) - if err != nil { - return err - } - - // Here a new node is created and started - // and a service is exposed by the node - node, err := p2p.ExposeService(ctx, "localhost", port, token, p2p.NetworkID(networkID, p2p.FederatedID)) - if err != nil { - return err - } - - if err := p2p.ServiceDiscoverer(ctx, node, token, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil { - return err - } - - n = node - - // start node sync in the background - if err := p2p.Sync(ctx, node, app); err != nil { - return err - } - } - - // If the p2p mode is enabled, we start the service discovery - if token != "" { - // If a node wasn't created previously, create it - if n == nil { - node, err := p2p.NewNode(token) - if err != nil { - return err - } - err = node.Start(ctx) - if err != nil { - return fmt.Errorf("starting new node: %w", err) - } - n = node - } - - // Attach a ServiceDiscoverer to the p2p node - log.Info().Msg("Starting P2P server discovery...") - if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node schema.NodeData) { - var tunnelAddresses []string - for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) { - if v.IsOnline() { - tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) - } else { - log.Info().Msgf("Node %s is offline", v.ID) - } - } - tunnelEnvVar := strings.Join(tunnelAddresses, ",") - - os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar) - log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar) - }, true); err != nil { - return err - } - } - - return nil -} diff --git a/core/cli/run.go b/core/cli/run.go index 9c21eaa4b08f..efb0ee99b9ec 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -8,7 +8,6 @@ import ( "time" "github.com/mudler/LocalAI/core/application" - cli_api "github.com/mudler/LocalAI/core/cli/api" cliContext "github.com/mudler/LocalAI/core/cli/context" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/http" @@ -52,6 +51,7 @@ type RunCMD struct { UploadLimit int `env:"LOCALAI_UPLOAD_LIMIT,UPLOAD_LIMIT" default:"15" help:"Default upload-limit in MB" group:"api"` APIKeys []string `env:"LOCALAI_API_KEY,API_KEY" help:"List of API Keys to enable API authentication. When this is set, all the requests must be authenticated with one of these API keys" group:"api"` DisableWebUI bool `env:"LOCALAI_DISABLE_WEBUI,DISABLE_WEBUI" default:"false" help:"Disables the web user interface. When set to true, the server will only expose API endpoints without serving the web interface" group:"api"` + DisableRuntimeSettings bool `env:"LOCALAI_DISABLE_RUNTIME_SETTINGS,DISABLE_RUNTIME_SETTINGS" default:"false" help:"Disables the runtime settings. When set to true, the server will not load the runtime settings from the runtime_settings.json file" group:"api"` DisablePredownloadScan bool `env:"LOCALAI_DISABLE_PREDOWNLOAD_SCAN" help:"If true, disables the best-effort security scanner before downloading any files." group:"hardening" default:"false"` OpaqueErrors bool `env:"LOCALAI_OPAQUE_ERRORS" default:"false" help:"If true, all error responses are replaced with blank 500 errors. This is intended only for hardening against information leaks and is normally not recommended." group:"hardening"` UseSubtleKeyComparison bool `env:"LOCALAI_SUBTLE_KEY_COMPARISON" default:"false" help:"If true, API Key validation comparisons will be performed using constant-time comparisons rather than simple equality. This trades off performance on each request for resiliancy against timing attacks." group:"hardening"` @@ -98,6 +98,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { } opts := []config.AppOption{ + config.WithContext(context.Background()), config.WithConfigFile(r.ModelsConfigFile), config.WithJSONStringPreload(r.PreloadModels), config.WithYAMLConfigPreload(r.PreloadModelsConfig), @@ -128,12 +129,22 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { config.WithLoadToMemory(r.LoadToMemory), config.WithMachineTag(r.MachineTag), config.WithAPIAddress(r.Address), + config.WithTunnelCallback(func(tunnels []string) { + tunnelEnvVar := strings.Join(tunnels, ",") + // TODO: this is very specific to llama.cpp, we should have a more generic way to set the environment variable + os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar) + log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar) + }), } if r.DisableMetricsEndpoint { opts = append(opts, config.DisableMetricsEndpoint) } + if r.DisableRuntimeSettings { + opts = append(opts, config.DisableRuntimeSettings) + } + token := "" if r.Peer2Peer || r.Peer2PeerToken != "" { log.Info().Msg("P2P mode enabled") @@ -152,7 +163,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { opts = append(opts, config.WithP2PToken(token)) } - backgroundCtx := context.Background() + if r.Federated { + opts = append(opts, config.EnableFederated) + } idleWatchDog := r.EnableWatchdogIdle busyWatchDog := r.EnableWatchdogBusy @@ -222,8 +235,10 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { return err } - if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated, app); err != nil { - return err + if token != "" { + if err := app.StartP2P(); err != nil { + return err + } } signals.RegisterGracefulTerminationHandler(func() { diff --git a/core/config/application_config.go b/core/config/application_config.go index d98c8ba4069b..9a9a8171c1e8 100644 --- a/core/config/application_config.go +++ b/core/config/application_config.go @@ -33,6 +33,7 @@ type ApplicationConfig struct { ApiKeys []string P2PToken string P2PNetworkID string + Federated bool DisableWebUI bool EnforcePredownloadScans bool @@ -65,6 +66,10 @@ type ApplicationConfig struct { MachineTag string APIAddress string + + TunnelCallback func(tunnels []string) + + DisableRuntimeSettings bool } type AppOption func(*ApplicationConfig) @@ -73,7 +78,6 @@ func NewApplicationConfig(o ...AppOption) *ApplicationConfig { opt := &ApplicationConfig{ Context: context.Background(), UploadLimitMB: 15, - ContextSize: 512, Debug: true, } for _, oo := range o { @@ -152,6 +156,10 @@ var DisableWebUI = func(o *ApplicationConfig) { o.DisableWebUI = true } +var DisableRuntimeSettings = func(o *ApplicationConfig) { + o.DisableRuntimeSettings = true +} + func SetWatchDogBusyTimeout(t time.Duration) AppOption { return func(o *ApplicationConfig) { o.WatchDogBusyTimeout = t @@ -180,6 +188,10 @@ var EnableBackendGalleriesAutoload = func(o *ApplicationConfig) { o.AutoloadBackendGalleries = true } +var EnableFederated = func(o *ApplicationConfig) { + o.Federated = true +} + func WithExternalBackend(name string, uri string) AppOption { return func(o *ApplicationConfig) { if o.ExternalGRPCBackends == nil { @@ -273,6 +285,12 @@ func WithContextSize(ctxSize int) AppOption { } } +func WithTunnelCallback(callback func(tunnels []string)) AppOption { + return func(o *ApplicationConfig) { + o.TunnelCallback = callback + } +} + func WithF16(f16 bool) AppOption { return func(o *ApplicationConfig) { o.F16 = f16 diff --git a/core/http/app.go b/core/http/app.go index cec25fc6c1f1..a5ce91e42566 100644 --- a/core/http/app.go +++ b/core/http/app.go @@ -208,7 +208,7 @@ func API(application *application.Application) (*echo.Echo, error) { routes.RegisterLocalAIRoutes(e, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService(), opcache, application.TemplatesEvaluator()) routes.RegisterOpenAIRoutes(e, requestExtractor, application) if !application.ApplicationConfig().DisableWebUI { - routes.RegisterUIAPIRoutes(e, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService(), opcache) + routes.RegisterUIAPIRoutes(e, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService(), opcache, application) routes.RegisterUIRoutes(e, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService()) } routes.RegisterJINARoutes(e, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()) diff --git a/core/http/endpoints/localai/import_model.go b/core/http/endpoints/localai/import_model.go index 77abcdfb60b3..9d8926c0a228 100644 --- a/core/http/endpoints/localai/import_model.go +++ b/core/http/endpoints/localai/import_model.go @@ -145,7 +145,7 @@ func ImportModelEndpoint(cl *config.ModelConfigLoader, appConfig *config.Applica } // Set defaults - modelConfig.SetDefaults() + modelConfig.SetDefaults(appConfig.ToConfigLoaderOptions()...) // Validate the configuration if valid, _ := modelConfig.Validate(); !valid { diff --git a/core/http/endpoints/localai/mcp.go b/core/http/endpoints/localai/mcp.go index c13a388064f1..f23b00e4fabb 100644 --- a/core/http/endpoints/localai/mcp.go +++ b/core/http/endpoints/localai/mcp.go @@ -5,7 +5,7 @@ import ( "encoding/json" "errors" "fmt" - "strings" + "net" "time" "github.com/labstack/echo/v4" @@ -105,7 +105,10 @@ func MCPStreamEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eval fragment = fragment.AddMessage(message.Role, message.StringContent) } - port := appConfig.APIAddress[strings.LastIndex(appConfig.APIAddress, ":")+1:] + _, port, err := net.SplitHostPort(appConfig.APIAddress) + if err != nil { + return err + } apiKey := "" if len(appConfig.ApiKeys) > 0 { apiKey = appConfig.ApiKeys[0] diff --git a/core/http/endpoints/localai/settings.go b/core/http/endpoints/localai/settings.go new file mode 100644 index 000000000000..62f198a9d049 --- /dev/null +++ b/core/http/endpoints/localai/settings.go @@ -0,0 +1,340 @@ +package localai + +import ( + "encoding/json" + "io" + "net/http" + "os" + "path/filepath" + "time" + + "github.com/labstack/echo/v4" + "github.com/mudler/LocalAI/core/application" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/p2p" + "github.com/rs/zerolog/log" +) + +type SettingsResponse struct { + Success bool `json:"success"` + Error string `json:"error,omitempty"` + Message string `json:"message,omitempty"` +} + +type RuntimeSettings struct { + WatchdogEnabled *bool `json:"watchdog_enabled,omitempty"` + WatchdogIdleEnabled *bool `json:"watchdog_idle_enabled,omitempty"` + WatchdogBusyEnabled *bool `json:"watchdog_busy_enabled,omitempty"` + WatchdogIdleTimeout *string `json:"watchdog_idle_timeout,omitempty"` + WatchdogBusyTimeout *string `json:"watchdog_busy_timeout,omitempty"` + SingleBackend *bool `json:"single_backend,omitempty"` + ParallelBackendRequests *bool `json:"parallel_backend_requests,omitempty"` + Threads *int `json:"threads,omitempty"` + ContextSize *int `json:"context_size,omitempty"` + F16 *bool `json:"f16,omitempty"` + Debug *bool `json:"debug,omitempty"` + CORS *bool `json:"cors,omitempty"` + CSRF *bool `json:"csrf,omitempty"` + CORSAllowOrigins *string `json:"cors_allow_origins,omitempty"` + P2PToken *string `json:"p2p_token,omitempty"` + P2PNetworkID *string `json:"p2p_network_id,omitempty"` + Federated *bool `json:"federated,omitempty"` + Galleries *[]config.Gallery `json:"galleries,omitempty"` + BackendGalleries *[]config.Gallery `json:"backend_galleries,omitempty"` + AutoloadGalleries *bool `json:"autoload_galleries,omitempty"` + AutoloadBackendGalleries *bool `json:"autoload_backend_galleries,omitempty"` + ApiKeys *[]string `json:"api_keys"` // No omitempty - we need to save empty arrays to clear keys +} + +// GetSettingsEndpoint returns current settings with precedence (env > file > defaults) +func GetSettingsEndpoint(app *application.Application) echo.HandlerFunc { + return func(c echo.Context) error { + appConfig := app.ApplicationConfig() + startupConfig := app.StartupConfig() + + if startupConfig == nil { + // Fallback if startup config not available + startupConfig = appConfig + } + + settings := RuntimeSettings{} + + // Set all current values (using pointers for RuntimeSettings) + watchdogIdle := appConfig.WatchDogIdle + watchdogBusy := appConfig.WatchDogBusy + watchdogEnabled := appConfig.WatchDog + singleBackend := appConfig.SingleBackend + parallelBackendRequests := appConfig.ParallelBackendRequests + threads := appConfig.Threads + contextSize := appConfig.ContextSize + f16 := appConfig.F16 + debug := appConfig.Debug + cors := appConfig.CORS + csrf := appConfig.CSRF + corsAllowOrigins := appConfig.CORSAllowOrigins + p2pToken := appConfig.P2PToken + p2pNetworkID := appConfig.P2PNetworkID + federated := appConfig.Federated + galleries := appConfig.Galleries + backendGalleries := appConfig.BackendGalleries + autoloadGalleries := appConfig.AutoloadGalleries + autoloadBackendGalleries := appConfig.AutoloadBackendGalleries + apiKeys := appConfig.ApiKeys + + settings.WatchdogIdleEnabled = &watchdogIdle + settings.WatchdogBusyEnabled = &watchdogBusy + settings.WatchdogEnabled = &watchdogEnabled + settings.SingleBackend = &singleBackend + settings.ParallelBackendRequests = ¶llelBackendRequests + settings.Threads = &threads + settings.ContextSize = &contextSize + settings.F16 = &f16 + settings.Debug = &debug + settings.CORS = &cors + settings.CSRF = &csrf + settings.CORSAllowOrigins = &corsAllowOrigins + settings.P2PToken = &p2pToken + settings.P2PNetworkID = &p2pNetworkID + settings.Federated = &federated + settings.Galleries = &galleries + settings.BackendGalleries = &backendGalleries + settings.AutoloadGalleries = &autoloadGalleries + settings.AutoloadBackendGalleries = &autoloadBackendGalleries + settings.ApiKeys = &apiKeys + + var idleTimeout, busyTimeout string + if appConfig.WatchDogIdleTimeout > 0 { + idleTimeout = appConfig.WatchDogIdleTimeout.String() + } else { + idleTimeout = "15m" // default + } + if appConfig.WatchDogBusyTimeout > 0 { + busyTimeout = appConfig.WatchDogBusyTimeout.String() + } else { + busyTimeout = "5m" // default + } + settings.WatchdogIdleTimeout = &idleTimeout + settings.WatchdogBusyTimeout = &busyTimeout + return c.JSON(http.StatusOK, settings) + } +} + +// UpdateSettingsEndpoint updates settings, saves to file, and applies immediately +func UpdateSettingsEndpoint(app *application.Application) echo.HandlerFunc { + return func(c echo.Context) error { + appConfig := app.ApplicationConfig() + startupConfig := app.StartupConfig() + + if startupConfig == nil { + // Fallback if startup config not available + startupConfig = appConfig + } + + body, err := io.ReadAll(c.Request().Body) + if err != nil { + return c.JSON(http.StatusBadRequest, SettingsResponse{ + Success: false, + Error: "Failed to read request body: " + err.Error(), + }) + } + + var settings RuntimeSettings + if err := json.Unmarshal(body, &settings); err != nil { + return c.JSON(http.StatusBadRequest, SettingsResponse{ + Success: false, + Error: "Failed to parse JSON: " + err.Error(), + }) + } + + // Validate timeouts if provided + if settings.WatchdogIdleTimeout != nil { + _, err := time.ParseDuration(*settings.WatchdogIdleTimeout) + if err != nil { + return c.JSON(http.StatusBadRequest, SettingsResponse{ + Success: false, + Error: "Invalid watchdog_idle_timeout format: " + err.Error(), + }) + } + } + if settings.WatchdogBusyTimeout != nil { + _, err := time.ParseDuration(*settings.WatchdogBusyTimeout) + if err != nil { + return c.JSON(http.StatusBadRequest, SettingsResponse{ + Success: false, + Error: "Invalid watchdog_busy_timeout format: " + err.Error(), + }) + } + } + + // Save to file + if appConfig.DynamicConfigsDir == "" { + return c.JSON(http.StatusBadRequest, SettingsResponse{ + Success: false, + Error: "DynamicConfigsDir is not set", + }) + } + + settingsFile := filepath.Join(appConfig.DynamicConfigsDir, "runtime_settings.json") + settingsJSON, err := json.MarshalIndent(settings, "", " ") + if err != nil { + return c.JSON(http.StatusInternalServerError, SettingsResponse{ + Success: false, + Error: "Failed to marshal settings: " + err.Error(), + }) + } + + if err := os.WriteFile(settingsFile, settingsJSON, 0600); err != nil { + return c.JSON(http.StatusInternalServerError, SettingsResponse{ + Success: false, + Error: "Failed to write settings file: " + err.Error(), + }) + } + + // Apply settings immediately, checking env var overrides per field + watchdogChanged := false + if settings.WatchdogEnabled != nil { + appConfig.WatchDog = *settings.WatchdogEnabled + watchdogChanged = true + } + if settings.WatchdogIdleEnabled != nil { + appConfig.WatchDogIdle = *settings.WatchdogIdleEnabled + if appConfig.WatchDogIdle { + appConfig.WatchDog = true + } + watchdogChanged = true + } + if settings.WatchdogBusyEnabled != nil { + appConfig.WatchDogBusy = *settings.WatchdogBusyEnabled + if appConfig.WatchDogBusy { + appConfig.WatchDog = true + } + watchdogChanged = true + } + if settings.WatchdogIdleTimeout != nil { + dur, _ := time.ParseDuration(*settings.WatchdogIdleTimeout) + appConfig.WatchDogIdleTimeout = dur + watchdogChanged = true + } + if settings.WatchdogBusyTimeout != nil { + dur, _ := time.ParseDuration(*settings.WatchdogBusyTimeout) + appConfig.WatchDogBusyTimeout = dur + watchdogChanged = true + } + if settings.SingleBackend != nil { + appConfig.SingleBackend = *settings.SingleBackend + } + if settings.ParallelBackendRequests != nil { + appConfig.ParallelBackendRequests = *settings.ParallelBackendRequests + } + if settings.Threads != nil { + appConfig.Threads = *settings.Threads + } + if settings.ContextSize != nil { + appConfig.ContextSize = *settings.ContextSize + } + if settings.F16 != nil { + appConfig.F16 = *settings.F16 + } + if settings.Debug != nil { + appConfig.Debug = *settings.Debug + } + if settings.CORS != nil { + appConfig.CORS = *settings.CORS + } + if settings.CSRF != nil { + appConfig.CSRF = *settings.CSRF + } + if settings.CORSAllowOrigins != nil { + appConfig.CORSAllowOrigins = *settings.CORSAllowOrigins + } + if settings.P2PToken != nil { + appConfig.P2PToken = *settings.P2PToken + } + if settings.P2PNetworkID != nil { + appConfig.P2PNetworkID = *settings.P2PNetworkID + } + if settings.Federated != nil { + appConfig.Federated = *settings.Federated + } + if settings.Galleries != nil { + appConfig.Galleries = *settings.Galleries + } + if settings.BackendGalleries != nil { + appConfig.BackendGalleries = *settings.BackendGalleries + } + if settings.AutoloadGalleries != nil { + appConfig.AutoloadGalleries = *settings.AutoloadGalleries + } + if settings.AutoloadBackendGalleries != nil { + appConfig.AutoloadBackendGalleries = *settings.AutoloadBackendGalleries + } + if settings.ApiKeys != nil { + // API keys from env vars (startup) should be kept, runtime settings keys are added + // Combine startup keys (env vars) with runtime settings keys + envKeys := startupConfig.ApiKeys + runtimeKeys := *settings.ApiKeys + // Merge: env keys first (they take precedence), then runtime keys + appConfig.ApiKeys = append(envKeys, runtimeKeys...) + + // Note: We only save to runtime_settings.json (not api_keys.json) to avoid duplication + // The runtime_settings.json is the unified config file. If api_keys.json exists, + // it will be loaded first, but runtime_settings.json takes precedence and deduplicates. + } + + // Restart watchdog if settings changed + if watchdogChanged { + if settings.WatchdogEnabled != nil && !*settings.WatchdogEnabled || settings.WatchdogEnabled == nil { + if err := app.StopWatchdog(); err != nil { + log.Error().Err(err).Msg("Failed to stop watchdog") + return c.JSON(http.StatusInternalServerError, SettingsResponse{ + Success: false, + Error: "Settings saved but failed to stop watchdog: " + err.Error(), + }) + } + } else { + if err := app.RestartWatchdog(); err != nil { + log.Error().Err(err).Msg("Failed to restart watchdog") + return c.JSON(http.StatusInternalServerError, SettingsResponse{ + Success: false, + Error: "Settings saved but failed to restart watchdog: " + err.Error(), + }) + } + } + } + + // Restart P2P if P2P settings changed + p2pChanged := settings.P2PToken != nil || settings.P2PNetworkID != nil || settings.Federated != nil + if p2pChanged { + if settings.P2PToken != nil && *settings.P2PToken == "" { + // stop P2P + if err := app.StopP2P(); err != nil { + log.Error().Err(err).Msg("Failed to stop P2P") + return c.JSON(http.StatusInternalServerError, SettingsResponse{ + Success: false, + Error: "Settings saved but failed to stop P2P: " + err.Error(), + }) + } + } else { + if settings.P2PToken != nil && *settings.P2PToken == "0" { + // generate a token if users sets 0 (disabled) + token := p2p.GenerateToken(60, 60) + settings.P2PToken = &token + appConfig.P2PToken = token + } + // Stop existing P2P + if err := app.RestartP2P(); err != nil { + log.Error().Err(err).Msg("Failed to stop P2P") + return c.JSON(http.StatusInternalServerError, SettingsResponse{ + Success: false, + Error: "Settings saved but failed to stop P2P: " + err.Error(), + }) + } + } + } + + return c.JSON(http.StatusOK, SettingsResponse{ + Success: true, + Message: "Settings updated successfully", + }) + } +} diff --git a/core/http/endpoints/localai/welcome.go b/core/http/endpoints/localai/welcome.go index d21d853c41f1..ce197ba05e73 100644 --- a/core/http/endpoints/localai/welcome.go +++ b/core/http/endpoints/localai/welcome.go @@ -43,17 +43,18 @@ func WelcomeEndpoint(appConfig *config.ApplicationConfig, processingModels, taskTypes := opcache.GetStatus() summary := map[string]interface{}{ - "Title": "LocalAI API - " + internal.PrintableVersion(), - "Version": internal.PrintableVersion(), - "BaseURL": middleware.BaseURL(c), - "Models": modelsWithoutConfig, - "ModelsConfig": modelConfigs, - "GalleryConfig": galleryConfigs, - "ApplicationConfig": appConfig, - "ProcessingModels": processingModels, - "TaskTypes": taskTypes, - "LoadedModels": loadedModelsMap, - "InstalledBackends": installedBackends, + "Title": "LocalAI API - " + internal.PrintableVersion(), + "Version": internal.PrintableVersion(), + "BaseURL": middleware.BaseURL(c), + "Models": modelsWithoutConfig, + "ModelsConfig": modelConfigs, + "GalleryConfig": galleryConfigs, + "ApplicationConfig": appConfig, + "ProcessingModels": processingModels, + "TaskTypes": taskTypes, + "LoadedModels": loadedModelsMap, + "InstalledBackends": installedBackends, + "DisableRuntimeSettings": appConfig.DisableRuntimeSettings, } contentType := c.Request().Header.Get("Content-Type") diff --git a/core/http/endpoints/openai/mcp.go b/core/http/endpoints/openai/mcp.go index 264403c31d87..9318aa6a7bd3 100644 --- a/core/http/endpoints/openai/mcp.go +++ b/core/http/endpoints/openai/mcp.go @@ -5,7 +5,7 @@ import ( "encoding/json" "errors" "fmt" - "strings" + "net" "time" "github.com/labstack/echo/v4" @@ -75,7 +75,11 @@ func MCPCompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, fragment = fragment.AddMessage(message.Role, message.StringContent) } - port := appConfig.APIAddress[strings.LastIndex(appConfig.APIAddress, ":")+1:] + _, port, err := net.SplitHostPort(appConfig.APIAddress) + if err != nil { + return err + } + apiKey := "" if appConfig.ApiKeys != nil { apiKey = appConfig.ApiKeys[0] diff --git a/core/http/routes/ui.go b/core/http/routes/ui.go index 776547e5c979..6ef56550564d 100644 --- a/core/http/routes/ui.go +++ b/core/http/routes/ui.go @@ -23,6 +23,17 @@ func RegisterUIRoutes(app *echo.Echo, app.GET("/", localai.WelcomeEndpoint(appConfig, cl, ml, processingOps)) app.GET("/manage", localai.WelcomeEndpoint(appConfig, cl, ml, processingOps)) + if !appConfig.DisableRuntimeSettings { + // Settings page + app.GET("/settings", func(c echo.Context) error { + summary := map[string]interface{}{ + "Title": "LocalAI - Settings", + "BaseURL": middleware.BaseURL(c), + } + return c.Render(200, "views/settings", summary) + }) + } + // P2P app.GET("/p2p/", func(c echo.Context) error { summary := map[string]interface{}{ diff --git a/core/http/routes/ui_api.go b/core/http/routes/ui_api.go index b6c8c67cc7ea..95b20410f4b7 100644 --- a/core/http/routes/ui_api.go +++ b/core/http/routes/ui_api.go @@ -12,8 +12,10 @@ import ( "github.com/google/uuid" "github.com/labstack/echo/v4" + "github.com/mudler/LocalAI/core/application" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/http/endpoints/localai" "github.com/mudler/LocalAI/core/p2p" "github.com/mudler/LocalAI/core/services" "github.com/mudler/LocalAI/pkg/model" @@ -21,7 +23,7 @@ import ( ) // RegisterUIAPIRoutes registers JSON API routes for the web UI -func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig, galleryService *services.GalleryService, opcache *services.OpCache) { +func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig, galleryService *services.GalleryService, opcache *services.OpCache, applicationInstance *application.Application) { // Operations API - Get all current operations (models + backends) app.GET("/api/operations", func(c echo.Context) error { @@ -264,17 +266,17 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model installedModelsCount := len(modelConfigs) + len(modelsWithoutConfig) return c.JSON(200, map[string]interface{}{ - "models": modelsJSON, - "repositories": appConfig.Galleries, - "allTags": tags, - "processingModels": processingModelsData, - "taskTypes": taskTypes, - "availableModels": totalModels, - "installedModels": installedModelsCount, - "currentPage": pageNum, - "totalPages": totalPages, - "prevPage": prevPage, - "nextPage": nextPage, + "models": modelsJSON, + "repositories": appConfig.Galleries, + "allTags": tags, + "processingModels": processingModelsData, + "taskTypes": taskTypes, + "availableModels": totalModels, + "installedModels": installedModelsCount, + "currentPage": pageNum, + "totalPages": totalPages, + "prevPage": prevPage, + "nextPage": nextPage, }) }) @@ -802,4 +804,10 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model }, }) }) + + if !appConfig.DisableRuntimeSettings { + // Settings API + app.GET("/api/settings", localai.GetSettingsEndpoint(applicationInstance)) + app.POST("/api/settings", localai.UpdateSettingsEndpoint(applicationInstance)) + } } diff --git a/core/http/static/chat.js b/core/http/static/chat.js index fea4b1efac95..62270ddb6682 100644 --- a/core/http/static/chat.js +++ b/core/http/static/chat.js @@ -1382,6 +1382,12 @@ document.addEventListener('DOMContentLoaded', function() { if (chatData) { try { const data = JSON.parse(chatData); + + // Set MCP mode if provided + if (data.mcpMode === true && Alpine.store("chat")) { + Alpine.store("chat").mcpMode = true; + } + const input = document.getElementById('input'); if (input && data.message) { @@ -1417,6 +1423,9 @@ document.addEventListener('DOMContentLoaded', function() { processAndSendMessage(input.value); } }, 500); + } else { + // No message, but might have mcpMode - clear localStorage + localStorage.removeItem('localai_index_chat_data'); } } catch (error) { console.error('Error processing chat data from index:', error); diff --git a/core/http/views/chat.html b/core/http/views/chat.html index dd917612adc0..fadf57d85950 100644 --- a/core/http/views/chat.html +++ b/core/http/views/chat.html @@ -44,8 +44,25 @@ // Function to initialize store function __initChatStore() { if (!window.Alpine) return; + + // Check for MCP mode from localStorage (set by index page) + // Note: We don't clear localStorage here - chat.js will handle that after reading all data + let initialMcpMode = false; + try { + const chatData = localStorage.getItem('localai_index_chat_data'); + if (chatData) { + const parsed = JSON.parse(chatData); + if (parsed.mcpMode === true) { + initialMcpMode = true; + } + } + } catch (e) { + console.error('Error reading MCP mode from localStorage:', e); + } + if (Alpine.store("chat")) { Alpine.store("chat").contextSize = __chatContextSize; + Alpine.store("chat").mcpMode = initialMcpMode; return; } @@ -53,7 +70,7 @@ history: [], languages: [undefined], systemPrompt: "", - mcpMode: false, + mcpMode: initialMcpMode, contextSize: __chatContextSize, tokenUsage: { promptTokens: 0, diff --git a/core/http/views/index.html b/core/http/views/index.html index 460ff322fe90..d4f1b87fafef 100644 --- a/core/http/views/index.html +++ b/core/http/views/index.html @@ -128,6 +128,9 @@
Configure watchdog and backend request settings
+