diff --git a/engine/cli/commands/engine_install_cmd.cc b/engine/cli/commands/engine_install_cmd.cc index 1f712d10c..f37de2e77 100644 --- a/engine/cli/commands/engine_install_cmd.cc +++ b/engine/cli/commands/engine_install_cmd.cc @@ -183,8 +183,6 @@ bool EngineInstallCmd::Exec(const std::string& engine, return false; } - CLI_LOG("Validating download items, please wait..") - if (!dp_res.get()) return false; diff --git a/engine/cli/commands/model_pull_cmd.cc b/engine/cli/commands/model_pull_cmd.cc index d769b667a..376943fd1 100644 --- a/engine/cli/commands/model_pull_cmd.cc +++ b/engine/cli/commands/model_pull_cmd.cc @@ -96,8 +96,6 @@ std::optional ModelPullCmd::Exec(const std::string& host, int port, CTL_INF("model: " << model << ", model_id: " << model_id); - // Send request download model to server - CLI_LOG("Validating download items, please wait..") Json::Value json_data; json_data["model"] = model; auto data_str = json_data.toStyledString(); diff --git a/engine/controllers/models.cc b/engine/controllers/models.cc index 9e4ba1e9f..888983d7c 100644 --- a/engine/controllers/models.cc +++ b/engine/controllers/models.cc @@ -153,7 +153,7 @@ void Models::ListModel( Json::Value ret; ret["object"] = "list"; Json::Value data(Json::arrayValue); - + model_service_->ForceIndexingModelList(); // Iterate through directory cortex::db::Models modellist_handler; diff --git a/engine/database/models.cc b/engine/database/models.cc index d0bee405c..a452ca1c5 100644 --- a/engine/database/models.cc +++ b/engine/database/models.cc @@ -262,6 +262,11 @@ cpp::result Models::UpdateModelAlias( cpp::result Models::DeleteModelEntry( const std::string& identifier) { try { + // delete only if its there + if (!HasModel(identifier)) { + return true; + } + SQLite::Statement del( db_, "DELETE from models WHERE model_id = ? OR model_alias = ?"); del.bind(1, identifier); diff --git a/engine/main.cc b/engine/main.cc index 56f66154e..70fcdff3f 100644 --- a/engine/main.cc +++ b/engine/main.cc @@ -10,6 +10,7 @@ #include "controllers/server.h" #include "cortex-common/cortexpythoni.h" #include "services/config_service.h" +#include "services/file_watcher_service.h" #include "services/model_service.h" #include "utils/archive_utils.h" #include "utils/cortex_utils.h" @@ -107,6 +108,7 @@ void RunServer(std::optional port, bool ignore_cout) { auto event_queue_ptr = std::make_shared(); cortex::event::EventProcessor event_processor(event_queue_ptr); + auto model_dir_path = file_manager_utils::GetModelsContainerPath(); auto config_service = std::make_shared(); auto download_service = std::make_shared(event_queue_ptr, config_service); @@ -116,6 +118,10 @@ void RunServer(std::optional port, bool ignore_cout) { auto model_service = std::make_shared( download_service, inference_svc, engine_service); + auto file_watcher_srv = std::make_shared( + model_dir_path.string(), model_service); + file_watcher_srv->start(); + // initialize custom controllers auto engine_ctl = std::make_shared(engine_service); auto model_ctl = std::make_shared(model_service, engine_service); diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h new file mode 100644 index 000000000..f1a0780c7 --- /dev/null +++ b/engine/services/file_watcher_service.h @@ -0,0 +1,347 @@ +#include +#include +#include +#include +#include "services/model_service.h" +#include "utils/logging_utils.h" + +#ifdef __APPLE__ +#include +#include + +#elif defined(_WIN32) +#include + +#else // Linux +#include +#include +#include +#include +#endif + +class FileWatcherService { + private: +#if defined(_WIN32) + HANDLE dir_handle = INVALID_HANDLE_VALUE; + HANDLE stop_event; +#elif defined(__APPLE__) + FSEventStreamRef event_stream; +#else // Linux + int fd; + int wd; + std::unordered_map watch_descriptors; +#endif + + public: + FileWatcherService(const std::string& path, + std::shared_ptr model_service) + : watch_path_{path}, running_{false}, model_service_{model_service} { + if (!std::filesystem::exists(path)) { + throw std::runtime_error("Path does not exist: " + path); + } +#ifdef _WIN32 + stop_event = CreateEvent(NULL, TRUE, FALSE, NULL); +#endif + CTL_INF("FileWatcherService created: " + path); + } + + ~FileWatcherService() { + CTL_INF("FileWatcherService destructor"); + stop(); + } + + void start() { + if (running_) { + return; + } + + running_ = true; + watch_thread_ = std::thread(&FileWatcherService::WatcherThread, this); + } + + void stop() { + if (!running_) { + return; + } + + running_ = false; + +#ifdef _WIN32 + // Signal the stop event + SetEvent(stop_event); +#elif defined(__APPLE__) + if (event_stream) { + FSEventStreamStop(event_stream); + FSEventStreamInvalidate(event_stream); + } +#else // Linux + // For Linux, closing the fd will interrupt the read() call + CTL_INF("before close fd!"); + if (fd >= 0) { + close(fd); + } +#endif + CTL_INF("before join!"); + // Add timeout to avoid infinite waiting + if (watch_thread_.joinable()) { + watch_thread_.join(); + } + +#ifdef _WIN32 + if (stop_event != NULL) { + CloseHandle(stop_event); + } + if (dir_handle != INVALID_HANDLE_VALUE) { + CloseHandle(dir_handle); + } +#elif defined(__APPLE__) + if (event_stream) { + FSEventStreamRelease(event_stream); + } +#else // Linux + CleanupWatches(); +#endif + CTL_INF("FileWatcherService stopped!"); + } + + private: + std::string watch_path_; + std::atomic running_; + std::thread watch_thread_; + std::shared_ptr model_service_; + +#ifdef __APPLE__ + + static void callback(ConstFSEventStreamRef streamRef, + void* clientCallBackInfo, size_t numEvents, + void* eventPaths, + const FSEventStreamEventFlags eventFlags[], + const FSEventStreamEventId eventIds[]) { + auto** paths = (char**)eventPaths; + auto* watcher = static_cast(clientCallBackInfo); + + for (size_t i = 0; i < numEvents; i++) { + if (eventFlags[i] & kFSEventStreamEventFlagItemRemoved) { + watcher->model_service_->ForceIndexingModelList(); + } + } + } + + void WatcherThread() { + // macOS implementation + auto mypath = CFStringCreateWithCString(NULL, watch_path_.c_str(), + kCFStringEncodingUTF8); + auto path_to_watch = CFArrayCreate(NULL, (const void**)&mypath, 1, NULL); + + FSEventStreamContext context = {0, this, NULL, NULL, NULL}; + + event_stream = + FSEventStreamCreate(NULL, &FileWatcherService::callback, &context, + path_to_watch, kFSEventStreamEventIdSinceNow, + 0.5, // 500ms latency + kFSEventStreamCreateFlagFileEvents); + + dispatch_queue_t queue = dispatch_get_main_queue(); + FSEventStreamSetDispatchQueue(event_stream, queue); + FSEventStreamStart(event_stream); + + while (running_) { + CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, false); + } + + FSEventStreamStop(event_stream); + FSEventStreamInvalidate(event_stream); + FSEventStreamRelease(event_stream); + CFRelease(path_to_watch); + CFRelease(mypath); + } + +#elif defined(_WIN32) + void WatcherThread() { + dir_handle = + CreateFileA(watch_path_.c_str(), FILE_LIST_DIRECTORY, + FILE_SHARE_READ | FILE_SHARE_DELETE, NULL, OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL); + + if (dir_handle == INVALID_HANDLE_VALUE) { + throw std::runtime_error("Failed to open directory"); + } + + char buffer[4096]; + OVERLAPPED overlapped = {0}; + overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + DWORD bytesReturned; + HANDLE events[] = {overlapped.hEvent, stop_event}; + while (running_) { + if (!ReadDirectoryChangesW( + dir_handle, buffer, sizeof(buffer), TRUE, + FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME, + &bytesReturned, &overlapped, NULL)) { + break; + } + + // Wait for either file change event or stop event + DWORD result = WaitForMultipleObjects(2, events, FALSE, INFINITE); + if (result == WAIT_OBJECT_0 + 1) { // stop_event was signaled + break; + } + + if (result != WAIT_OBJECT_0 || + !GetOverlappedResult(dir_handle, &overlapped, &bytesReturned, + FALSE)) { + break; + } + + FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*)buffer; + do { + if (event->Action == FILE_ACTION_REMOVED) { + std::wstring fileName(event->FileName, + event->FileNameLength / sizeof(wchar_t)); + + std::string file_name_str(fileName.begin(), fileName.end()); + model_service_->ForceIndexingModelList(); + } + + if (event->NextEntryOffset == 0) + break; + event = (FILE_NOTIFY_INFORMATION*)((uint8_t*)event + + event->NextEntryOffset); + } while (true); + + ResetEvent(overlapped.hEvent); + } + + CloseHandle(overlapped.hEvent); + CloseHandle(dir_handle); + } + +#else // Linux + + void AddWatch(const std::string& dirPath) { + const int watch_flags = IN_DELETE | IN_DELETE_SELF | IN_CREATE; + wd = inotify_add_watch(fd, dirPath.c_str(), watch_flags); + if (wd < 0) { + throw std::runtime_error("Failed to add watch on " + dirPath + + ": " + std::string(strerror(errno))); + } + watch_descriptors[wd] = dirPath; + + // Add watches for subdirectories + try { + for (const auto& entry : + std::filesystem::recursive_directory_iterator(dirPath)) { + if (std::filesystem::is_directory(entry)) { + int subwd = inotify_add_watch(fd, entry.path().c_str(), watch_flags); + if (subwd >= 0) { + watch_descriptors[subwd] = entry.path().string(); + } else { + CTL_ERR("Failed to add watch for subdirectory " + + entry.path().string() + ": " + + std::string(strerror(errno))); + } + } + } + } catch (const std::filesystem::filesystem_error& e) { + CTL_ERR("Error walking directory tree: " + std::string(e.what())); + } + } + + void CleanupWatches() { + CTL_INF("Cleanup Watches"); + for (const auto& [wd, path] : watch_descriptors) { + inotify_rm_watch(fd, wd); + } + watch_descriptors.clear(); + + if (fd >= 0) { + close(fd); + fd = -1; + } + } + + void WatcherThread() { + fd = inotify_init1(IN_NONBLOCK); + if (fd < 0) { + CTL_ERR("Failed to initialize inotify: " + std::string(strerror(errno))); + return; + } + + try { + AddWatch(watch_path_); + } catch (const std::exception& e) { + CTL_ERR("Failed to add watch: " + std::string(e.what())); + close(fd); + return; + } + + const int POLL_TIMEOUT_MS = 1000; // 1 second timeout + char buffer[4096]; + struct pollfd pfd = { + .fd = fd, + .events = POLLIN, + .revents = 0 + }; + + while (running_) { + // Poll will sleep until either: + // 1. Events are available (POLLIN) + // 2. POLL_TIMEOUT_MS milliseconds have elapsed + // 3. An error occurs + int poll_result = poll(&pfd, 1, POLL_TIMEOUT_MS); + + if (poll_result < 0) { + if (errno == EINTR) { + // System call was interrupted, just retry + continue; + } + CTL_ERR("Poll failed: " + std::string(strerror(errno))); + break; + } + + if (poll_result == 0) { // Timeout - no events + // No need to sleep - poll() already waited + continue; + } + + if (pfd.revents & POLLERR || pfd.revents & POLLNVAL) { + CTL_ERR("Poll error on fd"); + break; + } + + // Read all pending events + while (running_) { + int length = read(fd, buffer, sizeof(buffer)); + if (length < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // No more events to read + break; + } + CTL_ERR("Read error: " + std::string(strerror(errno))); + break; + } + + if (length == 0) { + break; + } + + // Process events + size_t i = 0; + while (i < static_cast(length)) { + struct inotify_event* event = + reinterpret_cast(&buffer[i]); + + if (event->mask & (IN_DELETE | IN_DELETE_SELF)) { + try { + model_service_->ForceIndexingModelList(); + } catch (const std::exception& e) { + CTL_ERR("Error processing delete event: " + std::string(e.what())); + } + } + + i += sizeof(struct inotify_event) + event->len; + } + } + } + } +#endif +}; diff --git a/engine/services/model_service.cc b/engine/services/model_service.cc index 793e8ecb5..80d02f4c7 100644 --- a/engine/services/model_service.cc +++ b/engine/services/model_service.cc @@ -8,7 +8,6 @@ #include "database/models.h" #include "hardware_service.h" #include "httplib.h" -#include "services/engine_service.h" #include "utils/cli_selection_utils.h" #include "utils/engine_constants.h" #include "utils/file_manager_utils.h" @@ -114,6 +113,40 @@ cpp::result GetDownloadTask( } } // namespace +void ModelService::ForceIndexingModelList() { + CTL_INF("Force indexing model list"); + + cortex::db::Models modellist_handler; + config::YamlHandler yaml_handler; + + auto list_entry = modellist_handler.LoadModelList(); + if (list_entry.has_error()) { + CTL_ERR("Failed to load model list: " << list_entry.error()); + return; + } + + namespace fs = std::filesystem; + namespace fmu = file_manager_utils; + + CTL_DBG("Database model size: " + std::to_string(list_entry.value().size())); + for (const auto& model_entry : list_entry.value()) { + try { + yaml_handler.ModelConfigFromFile( + fmu::ToAbsoluteCortexDataPath( + fs::path(model_entry.path_to_model_yaml)) + .string()); + auto model_config = yaml_handler.GetModelConfig(); + Json::Value obj = model_config.ToJson(); + yaml_handler.Reset(); + } catch (const std::exception& e) { + // remove in db + auto remove_result = + modellist_handler.DeleteModelEntry(model_entry.model); + // silently ignore result + } + } +} + cpp::result ModelService::DownloadModel( const std::string& input) { if (input.empty()) { diff --git a/engine/services/model_service.h b/engine/services/model_service.h index 7b6375e54..a29f092d4 100644 --- a/engine/services/model_service.h +++ b/engine/services/model_service.h @@ -5,6 +5,7 @@ #include #include "common/engine_servicei.h" #include "config/model_config.h" +#include "database/models.h" #include "services/download_service.h" #include "services/inference_service.h" @@ -39,6 +40,8 @@ struct StartModelResult { class ModelService { public: + void ForceIndexingModelList(); + explicit ModelService(std::shared_ptr download_service) : download_service_{download_service} {};