From a5b704c1823517fbdef9fc3b0fcfefa9a6961280 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 19 Nov 2024 12:02:20 +0700 Subject: [PATCH 01/12] feat: add file watcher service --- engine/main.cc | 6 + engine/services/file_watcher_service.h | 191 +++++++++++++++++++++++++ engine/services/model_service.cc | 45 +++++- engine/services/model_service.h | 9 +- 4 files changed, 246 insertions(+), 5 deletions(-) create mode 100644 engine/services/file_watcher_service.h diff --git a/engine/main.cc b/engine/main.cc index 5fdd69d6c..a60fcc8b1 100644 --- a/engine/main.cc +++ b/engine/main.cc @@ -10,6 +10,7 @@ #include "controllers/server.h" #include "cortex-common/cortexpythoni.h" #include "services/config_service.h" +#include "services/file_watcher_service.h" #include "services/model_service.h" #include "utils/archive_utils.h" #include "utils/cortex_utils.h" @@ -106,6 +107,7 @@ void RunServer(std::optional port, bool ignore_cout) { auto event_queue_ptr = std::make_shared(); cortex::event::EventProcessor event_processor(event_queue_ptr); + auto model_dir_path = file_manager_utils::GetModelsContainerPath(); auto config_service = std::make_shared(); auto download_service = std::make_shared(event_queue_ptr, config_service); @@ -115,6 +117,10 @@ void RunServer(std::optional port, bool ignore_cout) { auto model_service = std::make_shared( download_service, inference_svc, engine_service); + auto file_watcher_srv = std::make_shared( + model_dir_path.string(), model_service); + file_watcher_srv->start(); + // initialize custom controllers auto engine_ctl = std::make_shared(engine_service); auto model_ctl = std::make_shared(model_service, engine_service); diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h new file mode 100644 index 000000000..334059e22 --- /dev/null +++ b/engine/services/file_watcher_service.h @@ -0,0 +1,191 @@ +#include +#include +#include +#include +#include +#include "services/model_service.h" +#include "utils/logging_utils.h" + +#ifdef __APPLE__ +#include +#include + +#elif defined(_WIN32) +#include + +#else // Linux +#include +#include +#include +#endif + +class FileWatcherService { + public: + FileWatcherService(const std::string& path, + std::shared_ptr model_service) + : watchPath{path}, running{false} { + CTL_INF("FileWatcherService created: " + path); + } + + ~FileWatcherService() { stop(); } + + void start() { + if (running) + return; + running = true; + watchThread = std::thread(&FileWatcherService::watcherThread, this); + } + + void stop() { + CTL_INF("FileWatcherService stop"); + running = false; + if (watchThread.joinable()) { + watchThread.join(); + } + } + + private: + std::string watchPath; + std::atomic running; + std::thread watchThread; + std::shared_ptr model_service_; + +#ifdef __APPLE__ + static void callback(ConstFSEventStreamRef streamRef, + void* clientCallBackInfo, size_t numEvents, + void* eventPaths, + const FSEventStreamEventFlags eventFlags[], + const FSEventStreamEventId eventIds[]) { + char** paths = (char**)eventPaths; + // model_service->ForceIndexingModelList(); + FileWatcherService* watcher = + static_cast(clientCallBackInfo); + watcher->model_service_->ForceIndexingModelList(); + for (size_t i = 0; i < numEvents; i++) { + if (eventFlags[i] & kFSEventStreamEventFlagItemRemoved) { + std::cout << "File deleted: " << paths[i] << std::endl; + } + } + } + + void watcherThread() { + FSEventStreamContext context = {0, this, nullptr, nullptr, nullptr}; + CFStringRef pathRef = CFStringCreateWithCString(nullptr, watchPath.c_str(), + kCFStringEncodingUTF8); + CFArrayRef pathsToWatch = + CFArrayCreate(nullptr, (const void**)&pathRef, 1, nullptr); + + FSEventStreamRef stream = + FSEventStreamCreate(nullptr, &callback, &context, pathsToWatch, + kFSEventStreamEventIdSinceNow, + 0.5, // 500ms latency + kFSEventStreamCreateFlagFileEvents); + + FSEventStreamScheduleWithRunLoop(stream, CFRunLoopGetCurrent(), + kCFRunLoopDefaultMode); + FSEventStreamStart(stream); + + CTL_INF("NamH start loop"); + while (running) { + CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, false); + } + + FSEventStreamStop(stream); + FSEventStreamUnscheduleFromRunLoop(stream, CFRunLoopGetCurrent(), + kCFRunLoopDefaultMode); + FSEventStreamInvalidate(stream); + FSEventStreamRelease(stream); + CFRelease(pathsToWatch); + CFRelease(pathRef); + } + +#elif defined(_WIN32) + void watcherThread() { + HANDLE hDir = + CreateFileA(watchPath.c_str(), FILE_LIST_DIRECTORY, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + nullptr, OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, nullptr); + + if (hDir == INVALID_HANDLE_VALUE) { + std::cerr << "Failed to open directory" << std::endl; + return; + } + + char buffer[4096]; + DWORD bytesReturned; + OVERLAPPED overlapped = {0}; + overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); + + while (running) { + ReadDirectoryChangesW( + hDir, buffer, sizeof(buffer), TRUE, + FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME, + &bytesReturned, &overlapped, nullptr); + + WaitForSingleObject(overlapped.hEvent, 1000); + + FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*)buffer; + do { + if (event->Action == FILE_ACTION_REMOVED) { + wchar_t fileName[MAX_PATH]; + memcpy(fileName, event->FileName, event->FileNameLength); + fileName[event->FileNameLength / 2] = L'\0'; + std::wcout << L"File deleted: " << fileName << std::endl; + } + + if (event->NextEntryOffset == 0) + break; + event = (FILE_NOTIFY_INFORMATION*)((uint8_t*)event + + event->NextEntryOffset); + } while (true); + + ResetEvent(overlapped.hEvent); + } + + CloseHandle(overlapped.hEvent); + CloseHandle(hDir); + } + +#else // Linux + void watcherThread() { + int fd = inotify_init(); + if (fd < 0) { + std::cerr << "Failed to initialize inotify" << std::endl; + return; + } + + int wd = inotify_add_watch(fd, watchPath.c_str(), IN_DELETE); + if (wd < 0) { + std::cerr << "Failed to add watch" << std::endl; + close(fd); + return; + } + + const size_t event_size = sizeof(struct inotify_event); + const size_t buf_len = 1024 * (event_size + 16); + char buffer[buf_len]; + + while (running) { + int length = read(fd, buffer, buf_len); + if (length < 0) { + if (errno == EINTR) + continue; + break; + } + + int i = 0; + while (i < length) { + struct inotify_event* event = (struct inotify_event*)&buffer[i]; + if (event->len && (event->mask & IN_DELETE)) { + std::cout << "File deleted: " << event->name << std::endl; + } + i += event_size + event->len; + } + } + + inotify_rm_watch(fd, wd); + close(fd); + } +#endif +}; diff --git a/engine/services/model_service.cc b/engine/services/model_service.cc index 3a8507c22..13be77811 100644 --- a/engine/services/model_service.cc +++ b/engine/services/model_service.cc @@ -8,7 +8,6 @@ #include "database/models.h" #include "hardware_service.h" #include "httplib.h" -#include "services/engine_service.h" #include "utils/cli_selection_utils.h" #include "utils/engine_constants.h" #include "utils/file_manager_utils.h" @@ -114,6 +113,47 @@ cpp::result GetDownloadTask( } } // namespace +void ModelService::ForceIndexingModelList() { + CTL_INF("Force indexing model list"); + + // bad code, refactor later on + cortex::db::Models modellist_handler; + config::YamlHandler yaml_handler; + + auto list_entry = modellist_handler.LoadModelList(); + if (list_entry.has_error()) { + CTL_ERR("Failed to load model list: " << list_entry.error()); + return; + } + + namespace fs = std::filesystem; + namespace fmu = file_manager_utils; + + CTL_DBG("Database model size: " + std::to_string(list_entry.value().size())); + for (const auto& model_entry : list_entry.value()) { + try { + yaml_handler.ModelConfigFromFile( + fmu::ToAbsoluteCortexDataPath( + fs::path(model_entry.path_to_model_yaml)) + .string()); + auto model_config = yaml_handler.GetModelConfig(); + Json::Value obj = model_config.ToJson(); + yaml_handler.Reset(); + } catch (const std::exception& e) { + LOG_ERROR << "Failed to load yaml file for model: " + << model_entry.path_to_model_yaml << ", error: " << e.what(); + // remove in db + auto remove_result = + modellist_handler.DeleteModelEntry(model_entry.model); + if (remove_result.has_error()) { + LOG_ERROR << "Failed to remove model in db: " << remove_result.error(); + } else { + LOG_INFO << "Removed model in db: " << model_entry.model; + } + } + } +} + cpp::result ModelService::DownloadModel( const std::string& input) { if (input.empty()) { @@ -745,7 +785,8 @@ cpp::result ModelService::StartModel( return cpp::fail( "Not enough VRAM - required: " + std::to_string(vram_needed_MiB) + " MiB, available: " + std::to_string(free_vram_MiB) + - " MiB - Should adjust ngl to " + std::to_string(free_vram_MiB / (vram_needed_MiB / ngl) - 1)); + " MiB - Should adjust ngl to " + + std::to_string(free_vram_MiB / (vram_needed_MiB / ngl) - 1)); } if (ram_needed_MiB > free_ram_MiB) { diff --git a/engine/services/model_service.h b/engine/services/model_service.h index 47d61c154..dff5003eb 100644 --- a/engine/services/model_service.h +++ b/engine/services/model_service.h @@ -3,10 +3,11 @@ #include #include #include +#include "common/engine_servicei.h" #include "config/model_config.h" +#include "database/models.h" #include "services/download_service.h" #include "services/inference_service.h" -#include "common/engine_servicei.h" struct ModelPullInfo { std::string id; @@ -30,12 +31,14 @@ struct StartParameterOverride { }; struct StartModelResult { - bool success; - std::optional warning; + bool success; + std::optional warning; }; class ModelService { public: + void ForceIndexingModelList(); + explicit ModelService(std::shared_ptr download_service) : download_service_{download_service} {}; From 339459ed7d930e452844ae831c9a6a995924030f Mon Sep 17 00:00:00 2001 From: James Date: Tue, 19 Nov 2024 15:54:22 +0700 Subject: [PATCH 02/12] some improvement --- engine/services/file_watcher_service.h | 198 +++++++++++++++---------- 1 file changed, 116 insertions(+), 82 deletions(-) diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index 334059e22..94eacef9b 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -20,28 +20,49 @@ #endif class FileWatcherService { + private: +#if defined(_WIN32) + HANDLE dirHandle; +#elif defined(__APPLE__) + FSEventStreamRef event_stream; +#else // Linux + std::unordered_map watchDescriptors; +#endif + public: FileWatcherService(const std::string& path, std::shared_ptr model_service) : watchPath{path}, running{false} { + if (!std::filesystem::exists(path)) { + throw std::runtime_error("Path does not exist: " + path); + } CTL_INF("FileWatcherService created: " + path); } ~FileWatcherService() { stop(); } void start() { - if (running) + if (running) { return; + } + running = true; watchThread = std::thread(&FileWatcherService::watcherThread, this); } void stop() { - CTL_INF("FileWatcherService stop"); +#ifdef _WIN32 + CloseHandle(dirHandle); +#endif + +#ifdef Linux + cleanupWatches(); +#endif running = false; if (watchThread.joinable()) { watchThread.join(); } + CTL_INF("FileWatcherService stopped!"); } private: @@ -51,140 +72,153 @@ class FileWatcherService { std::shared_ptr model_service_; #ifdef __APPLE__ + static void callback(ConstFSEventStreamRef streamRef, void* clientCallBackInfo, size_t numEvents, void* eventPaths, const FSEventStreamEventFlags eventFlags[], const FSEventStreamEventId eventIds[]) { - char** paths = (char**)eventPaths; - // model_service->ForceIndexingModelList(); - FileWatcherService* watcher = - static_cast(clientCallBackInfo); - watcher->model_service_->ForceIndexingModelList(); + auto** paths = (char**)eventPaths; + auto* watcher = static_cast(clientCallBackInfo); + for (size_t i = 0; i < numEvents; i++) { if (eventFlags[i] & kFSEventStreamEventFlagItemRemoved) { - std::cout << "File deleted: " << paths[i] << std::endl; + std::cout << "File deleted: " << paths[i] + << std::endl; // todo: remove after debug finished + watcher->model_service_->ForceIndexingModelList(); } } } void watcherThread() { - FSEventStreamContext context = {0, this, nullptr, nullptr, nullptr}; - CFStringRef pathRef = CFStringCreateWithCString(nullptr, watchPath.c_str(), - kCFStringEncodingUTF8); - CFArrayRef pathsToWatch = - CFArrayCreate(nullptr, (const void**)&pathRef, 1, nullptr); - - FSEventStreamRef stream = - FSEventStreamCreate(nullptr, &callback, &context, pathsToWatch, - kFSEventStreamEventIdSinceNow, + // macOS implementation + auto mypath = CFStringCreateWithCString(NULL, watchPath.c_str(), + kCFStringEncodingUTF8); + auto path_to_watch = CFArrayCreate(NULL, (const void**)&mypath, 1, NULL); + + FSEventStreamContext context = {0, this, NULL, NULL, NULL}; + + event_stream = + FSEventStreamCreate(NULL, &FileWatcherService::callback, &context, + path_to_watch, kFSEventStreamEventIdSinceNow, 0.5, // 500ms latency kFSEventStreamCreateFlagFileEvents); - FSEventStreamScheduleWithRunLoop(stream, CFRunLoopGetCurrent(), - kCFRunLoopDefaultMode); - FSEventStreamStart(stream); + dispatch_queue_t queue = dispatch_get_main_queue(); + FSEventStreamSetDispatchQueue(event_stream, queue); + FSEventStreamStart(event_stream); - CTL_INF("NamH start loop"); while (running) { CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, false); } - FSEventStreamStop(stream); - FSEventStreamUnscheduleFromRunLoop(stream, CFRunLoopGetCurrent(), - kCFRunLoopDefaultMode); - FSEventStreamInvalidate(stream); - FSEventStreamRelease(stream); - CFRelease(pathsToWatch); - CFRelease(pathRef); + FSEventStreamStop(event_stream); + FSEventStreamInvalidate(event_stream); + FSEventStreamRelease(event_stream); + CFRelease(path_to_watch); + CFRelease(mypath); } #elif defined(_WIN32) void watcherThread() { - HANDLE hDir = - CreateFileA(watchPath.c_str(), FILE_LIST_DIRECTORY, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - nullptr, OPEN_EXISTING, - FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, nullptr); - - if (hDir == INVALID_HANDLE_VALUE) { - std::cerr << "Failed to open directory" << std::endl; - return; + dirHandle = CreateFileA( + path.c_str(), FILE_LIST_DIRECTORY, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, + OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL); + + if (dirHandle == INVALID_HANDLE_VALUE) { + throw std::runtime_error("Failed to open directory"); } char buffer[4096]; DWORD bytesReturned; OVERLAPPED overlapped = {0}; - overlapped.hEvent = CreateEvent(nullptr, TRUE, FALSE, nullptr); while (running) { - ReadDirectoryChangesW( - hDir, buffer, sizeof(buffer), TRUE, - FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME, - &bytesReturned, &overlapped, nullptr); - - WaitForSingleObject(overlapped.hEvent, 1000); - - FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*)buffer; - do { - if (event->Action == FILE_ACTION_REMOVED) { - wchar_t fileName[MAX_PATH]; - memcpy(fileName, event->FileName, event->FileNameLength); - fileName[event->FileNameLength / 2] = L'\0'; - std::wcout << L"File deleted: " << fileName << std::endl; - } + if (ReadDirectoryChangesW( + dirHandle, buffer, sizeof(buffer), + TRUE, // Watch subtree + FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME, + &bytesReturned, &overlapped, NULL)) { + FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*)buffer; + do { + if (event->Action == FILE_ACTION_REMOVED) { + wchar_t fileName[MAX_PATH]; + wcsncpy_s(fileName, event->FileName, + event->FileNameLength / sizeof(wchar_t)); + fileName[event->FileNameLength / sizeof(wchar_t)] = '\0'; + std::wcout << L"Deleted: " << fileName << std::endl; + model_service_->ForceIndexingModelList(); + } + + if (event->NextEntryOffset == 0) { + break; + } + event = (FILE_NOTIFY_INFORMATION*)((uint8_t*)event + + event->NextEntryOffset); + } while (true); + } + } + } - if (event->NextEntryOffset == 0) - break; - event = (FILE_NOTIFY_INFORMATION*)((uint8_t*)event + - event->NextEntryOffset); - } while (true); +#else // Linux - ResetEvent(overlapped.hEvent); + void addWatch(const std::string& dirPath) { + wd = inotify_add_watch(fd, dirPath.c_str(), + IN_DELETE | IN_CREATE | IN_DELETE_SELF); + if (wd < 0) { + throw std::runtime_error("Failed to add watch on: " + dirPath); } + watchDescriptors[wd] = dirPath; - CloseHandle(overlapped.hEvent); - CloseHandle(hDir); + // Recursively add watches to subdirectories + for (const auto& entry : + std::filesystem::recursive_directory_iterator(dirPath)) { + if (std::filesystem::is_directory(entry)) { + addWatch(entry.path().string()); + } + } } -#else // Linux - void watcherThread() { - int fd = inotify_init(); - if (fd < 0) { - std::cerr << "Failed to initialize inotify" << std::endl; - return; + void cleanupWatches() { + for (const auto& [wd, path] : watchDescriptors) { + inotify_rm_watch(fd, wd); } + watchDescriptors.clear(); - int wd = inotify_add_watch(fd, watchPath.c_str(), IN_DELETE); - if (wd < 0) { - std::cerr << "Failed to add watch" << std::endl; + if (fd >= 0) { close(fd); - return; + fd = -1; } + } - const size_t event_size = sizeof(struct inotify_event); - const size_t buf_len = 1024 * (event_size + 16); - char buffer[buf_len]; + void watcherThread() { + fd = inotify_init(); + if (fd < 0) { + throw std::runtime_error("Failed to initialize inotify"); + } + // Add initial watch on the main directory + addWatch(path); + + char buffer[4096]; while (running) { - int length = read(fd, buffer, buf_len); + int length = read(fd, buffer, sizeof(buffer)); if (length < 0) { - if (errno == EINTR) - continue; - break; + continue; } int i = 0; while (i < length) { struct inotify_event* event = (struct inotify_event*)&buffer[i]; - if (event->len && (event->mask & IN_DELETE)) { - std::cout << "File deleted: " << event->name << std::endl; + if (event->mask & IN_DELETE) { + auto deletedPath = watchDescriptors[event->wd] + "/" + event->name; + std::cout << "Deleted: " << deletedPath << std::endl; } - i += event_size + event->len; + i += sizeof(struct inotify_event) + event->len; } } - inotify_rm_watch(fd, wd); close(fd); } #endif From bb69b4eedc3598ac1a2b59bc8b802b7e81aca78d Mon Sep 17 00:00:00 2001 From: James Date: Tue, 19 Nov 2024 22:13:40 +0700 Subject: [PATCH 03/12] fix: windows Signed-off-by: James --- engine/cli/commands/engine_install_cmd.cc | 2 - engine/cli/commands/model_pull_cmd.cc | 2 - engine/services/file_watcher_service.h | 81 ++++++++++++++--------- engine/services/model_service.cc | 9 +-- 4 files changed, 51 insertions(+), 43 deletions(-) diff --git a/engine/cli/commands/engine_install_cmd.cc b/engine/cli/commands/engine_install_cmd.cc index 1f712d10c..f37de2e77 100644 --- a/engine/cli/commands/engine_install_cmd.cc +++ b/engine/cli/commands/engine_install_cmd.cc @@ -183,8 +183,6 @@ bool EngineInstallCmd::Exec(const std::string& engine, return false; } - CLI_LOG("Validating download items, please wait..") - if (!dp_res.get()) return false; diff --git a/engine/cli/commands/model_pull_cmd.cc b/engine/cli/commands/model_pull_cmd.cc index d769b667a..376943fd1 100644 --- a/engine/cli/commands/model_pull_cmd.cc +++ b/engine/cli/commands/model_pull_cmd.cc @@ -96,8 +96,6 @@ std::optional ModelPullCmd::Exec(const std::string& host, int port, CTL_INF("model: " << model << ", model_id: " << model_id); - // Send request download model to server - CLI_LOG("Validating download items, please wait..") Json::Value json_data; json_data["model"] = model; auto data_str = json_data.toStyledString(); diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index 94eacef9b..ddf406e1e 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -22,10 +22,12 @@ class FileWatcherService { private: #if defined(_WIN32) - HANDLE dirHandle; + HANDLE dir_handle; #elif defined(__APPLE__) FSEventStreamRef event_stream; #else // Linux + int fd; + int wd; std::unordered_map watchDescriptors; #endif @@ -52,11 +54,11 @@ class FileWatcherService { void stop() { #ifdef _WIN32 - CloseHandle(dirHandle); + CloseHandle(dir_handle); #endif #ifdef Linux - cleanupWatches(); + CleanupWatches(); #endif running = false; if (watchThread.joinable()) { @@ -121,49 +123,66 @@ class FileWatcherService { #elif defined(_WIN32) void watcherThread() { - dirHandle = CreateFileA( - path.c_str(), FILE_LIST_DIRECTORY, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, NULL, - OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL); + dir_handle = + CreateFileA(watchPath.c_str(), FILE_LIST_DIRECTORY, + FILE_SHARE_READ | FILE_SHARE_DELETE, NULL, OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL); - if (dirHandle == INVALID_HANDLE_VALUE) { + if (dir_handle == INVALID_HANDLE_VALUE) { throw std::runtime_error("Failed to open directory"); } char buffer[4096]; - DWORD bytesReturned; OVERLAPPED overlapped = {0}; + overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + DWORD bytesReturned; while (running) { - if (ReadDirectoryChangesW( - dirHandle, buffer, sizeof(buffer), - TRUE, // Watch subtree + if (!ReadDirectoryChangesW( + dir_handle, buffer, sizeof(buffer), TRUE, FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME, &bytesReturned, &overlapped, NULL)) { - FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*)buffer; - do { - if (event->Action == FILE_ACTION_REMOVED) { - wchar_t fileName[MAX_PATH]; - wcsncpy_s(fileName, event->FileName, - event->FileNameLength / sizeof(wchar_t)); - fileName[event->FileNameLength / sizeof(wchar_t)] = '\0'; - std::wcout << L"Deleted: " << fileName << std::endl; + break; + } + + if (WaitForSingleObject(overlapped.hEvent, INFINITE) != WAIT_OBJECT_0) { + break; + } + + if (!GetOverlappedResult(dir_handle, &overlapped, &bytesReturned, + FALSE)) { + break; + } + + FILE_NOTIFY_INFORMATION* event = (FILE_NOTIFY_INFORMATION*)buffer; + do { + if (event->Action == FILE_ACTION_REMOVED) { + std::wstring fileName(event->FileName, + event->FileNameLength / sizeof(wchar_t)); + + std::string file_name_str(fileName.begin(), fileName.end()); + if (file_name_str.find(".yaml") != std::string::npos || + file_name_str.find(".yml") != std::string::npos) { model_service_->ForceIndexingModelList(); } + } - if (event->NextEntryOffset == 0) { - break; - } - event = (FILE_NOTIFY_INFORMATION*)((uint8_t*)event + - event->NextEntryOffset); - } while (true); - } + if (event->NextEntryOffset == 0) + break; + event = (FILE_NOTIFY_INFORMATION*)((uint8_t*)event + + event->NextEntryOffset); + } while (true); + + ResetEvent(overlapped.hEvent); } + + CloseHandle(overlapped.hEvent); + CloseHandle(dir_handle); } #else // Linux - void addWatch(const std::string& dirPath) { + void AddWatch(const std::string& dirPath) { wd = inotify_add_watch(fd, dirPath.c_str(), IN_DELETE | IN_CREATE | IN_DELETE_SELF); if (wd < 0) { @@ -175,12 +194,12 @@ class FileWatcherService { for (const auto& entry : std::filesystem::recursive_directory_iterator(dirPath)) { if (std::filesystem::is_directory(entry)) { - addWatch(entry.path().string()); + AddWatch(entry.path().string()); } } } - void cleanupWatches() { + void CleanupWatches() { for (const auto& [wd, path] : watchDescriptors) { inotify_rm_watch(fd, wd); } @@ -199,7 +218,7 @@ class FileWatcherService { } // Add initial watch on the main directory - addWatch(path); + AddWatch(path); char buffer[4096]; while (running) { diff --git a/engine/services/model_service.cc b/engine/services/model_service.cc index 13be77811..54c457a89 100644 --- a/engine/services/model_service.cc +++ b/engine/services/model_service.cc @@ -116,7 +116,6 @@ cpp::result GetDownloadTask( void ModelService::ForceIndexingModelList() { CTL_INF("Force indexing model list"); - // bad code, refactor later on cortex::db::Models modellist_handler; config::YamlHandler yaml_handler; @@ -140,16 +139,10 @@ void ModelService::ForceIndexingModelList() { Json::Value obj = model_config.ToJson(); yaml_handler.Reset(); } catch (const std::exception& e) { - LOG_ERROR << "Failed to load yaml file for model: " - << model_entry.path_to_model_yaml << ", error: " << e.what(); // remove in db auto remove_result = modellist_handler.DeleteModelEntry(model_entry.model); - if (remove_result.has_error()) { - LOG_ERROR << "Failed to remove model in db: " << remove_result.error(); - } else { - LOG_INFO << "Removed model in db: " << model_entry.model; - } + // silently ignore result } } } From 33fea02d9ec3961176775943b314c91995a473aa Mon Sep 17 00:00:00 2001 From: James Date: Tue, 19 Nov 2024 22:33:13 +0700 Subject: [PATCH 04/12] fix: linux Signed-off-by: James --- engine/services/file_watcher_service.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index ddf406e1e..1e495eb5d 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -218,7 +218,7 @@ class FileWatcherService { } // Add initial watch on the main directory - AddWatch(path); + AddWatch(watchPath); char buffer[4096]; while (running) { From 32cc07269987dd8a6e88bd77d50ddd3741924b0a Mon Sep 17 00:00:00 2001 From: James Date: Tue, 19 Nov 2024 23:11:31 +0700 Subject: [PATCH 05/12] fix: linux Signed-off-by: James --- engine/controllers/models.cc | 2 +- engine/services/file_watcher_service.h | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/engine/controllers/models.cc b/engine/controllers/models.cc index 796f70d16..ee7b01bd7 100644 --- a/engine/controllers/models.cc +++ b/engine/controllers/models.cc @@ -153,7 +153,7 @@ void Models::ListModel( Json::Value ret; ret["object"] = "list"; Json::Value data(Json::arrayValue); - + model_service_->ForceIndexingModelList(); // Iterate through directory cortex::db::Models modellist_handler; diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index 1e495eb5d..f0c58a40d 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -85,8 +85,6 @@ class FileWatcherService { for (size_t i = 0; i < numEvents; i++) { if (eventFlags[i] & kFSEventStreamEventFlagItemRemoved) { - std::cout << "File deleted: " << paths[i] - << std::endl; // todo: remove after debug finished watcher->model_service_->ForceIndexingModelList(); } } @@ -161,10 +159,7 @@ class FileWatcherService { event->FileNameLength / sizeof(wchar_t)); std::string file_name_str(fileName.begin(), fileName.end()); - if (file_name_str.find(".yaml") != std::string::npos || - file_name_str.find(".yml") != std::string::npos) { - model_service_->ForceIndexingModelList(); - } + model_service_->ForceIndexingModelList(); } if (event->NextEntryOffset == 0) @@ -232,7 +227,7 @@ class FileWatcherService { struct inotify_event* event = (struct inotify_event*)&buffer[i]; if (event->mask & IN_DELETE) { auto deletedPath = watchDescriptors[event->wd] + "/" + event->name; - std::cout << "Deleted: " << deletedPath << std::endl; + model_service_->ForceIndexingModelList(); } i += sizeof(struct inotify_event) + event->len; } From f253c0bfd331726827a504b8c02350f93b7275e1 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 20 Nov 2024 10:25:57 +0700 Subject: [PATCH 06/12] update --- engine/services/file_watcher_service.h | 44 +++++++++++++------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index f0c58a40d..66ad08efe 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -28,13 +27,13 @@ class FileWatcherService { #else // Linux int fd; int wd; - std::unordered_map watchDescriptors; + std::unordered_map watch_descriptors; #endif public: FileWatcherService(const std::string& path, std::shared_ptr model_service) - : watchPath{path}, running{false} { + : watch_path_{path}, running_{false} { if (!std::filesystem::exists(path)) { throw std::runtime_error("Path does not exist: " + path); } @@ -44,15 +43,20 @@ class FileWatcherService { ~FileWatcherService() { stop(); } void start() { - if (running) { + if (running_) { return; } - running = true; - watchThread = std::thread(&FileWatcherService::watcherThread, this); + running_ = true; + watch_thread_ = std::thread(&FileWatcherService::WatcherThread, this); } void stop() { + running_ = false; + if (watch_thread_.joinable()) { + watch_thread_.join(); + } + #ifdef _WIN32 CloseHandle(dir_handle); #endif @@ -60,17 +64,13 @@ class FileWatcherService { #ifdef Linux CleanupWatches(); #endif - running = false; - if (watchThread.joinable()) { - watchThread.join(); - } CTL_INF("FileWatcherService stopped!"); } private: - std::string watchPath; - std::atomic running; - std::thread watchThread; + std::string watch_path_; + std::atomic running_; + std::thread watch_thread_; std::shared_ptr model_service_; #ifdef __APPLE__ @@ -90,9 +90,9 @@ class FileWatcherService { } } - void watcherThread() { + void WatcherThread() { // macOS implementation - auto mypath = CFStringCreateWithCString(NULL, watchPath.c_str(), + auto mypath = CFStringCreateWithCString(NULL, watch_path_.c_str(), kCFStringEncodingUTF8); auto path_to_watch = CFArrayCreate(NULL, (const void**)&mypath, 1, NULL); @@ -108,7 +108,7 @@ class FileWatcherService { FSEventStreamSetDispatchQueue(event_stream, queue); FSEventStreamStart(event_stream); - while (running) { + while (running_) { CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, false); } @@ -120,7 +120,7 @@ class FileWatcherService { } #elif defined(_WIN32) - void watcherThread() { + void WatcherThread() { dir_handle = CreateFileA(watchPath.c_str(), FILE_LIST_DIRECTORY, FILE_SHARE_READ | FILE_SHARE_DELETE, NULL, OPEN_EXISTING, @@ -183,7 +183,7 @@ class FileWatcherService { if (wd < 0) { throw std::runtime_error("Failed to add watch on: " + dirPath); } - watchDescriptors[wd] = dirPath; + watch_descriptors[wd] = dirPath; // Recursively add watches to subdirectories for (const auto& entry : @@ -195,10 +195,10 @@ class FileWatcherService { } void CleanupWatches() { - for (const auto& [wd, path] : watchDescriptors) { + for (const auto& [wd, path] : watch_descriptors) { inotify_rm_watch(fd, wd); } - watchDescriptors.clear(); + watch_descriptors.clear(); if (fd >= 0) { close(fd); @@ -206,7 +206,7 @@ class FileWatcherService { } } - void watcherThread() { + void WatcherThread() { fd = inotify_init(); if (fd < 0) { throw std::runtime_error("Failed to initialize inotify"); @@ -226,7 +226,7 @@ class FileWatcherService { while (i < length) { struct inotify_event* event = (struct inotify_event*)&buffer[i]; if (event->mask & IN_DELETE) { - auto deletedPath = watchDescriptors[event->wd] + "/" + event->name; + auto deletedPath = watch_descriptors[event->wd] + "/" + event->name; model_service_->ForceIndexingModelList(); } i += sizeof(struct inotify_event) + event->len; From 53e2d5101a909105b0962e319863818313f3cd6d Mon Sep 17 00:00:00 2001 From: James Date: Wed, 20 Nov 2024 11:23:24 +0700 Subject: [PATCH 07/12] fix build Signed-off-by: James --- engine/services/file_watcher_service.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index 66ad08efe..cc677e15d 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -213,10 +213,10 @@ class FileWatcherService { } // Add initial watch on the main directory - AddWatch(watchPath); + AddWatch(watch_path_); char buffer[4096]; - while (running) { + while (running_) { int length = read(fd, buffer, sizeof(buffer)); if (length < 0) { continue; From f4e6e505d6f2c2fff125913858a4968d437c9041 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 20 Nov 2024 11:51:14 +0700 Subject: [PATCH 08/12] fix build windows Signed-off-by: James --- engine/services/file_watcher_service.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index cc677e15d..72b618ef5 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -122,7 +122,7 @@ class FileWatcherService { #elif defined(_WIN32) void WatcherThread() { dir_handle = - CreateFileA(watchPath.c_str(), FILE_LIST_DIRECTORY, + CreateFileA(watch_path_.c_str(), FILE_LIST_DIRECTORY, FILE_SHARE_READ | FILE_SHARE_DELETE, NULL, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, NULL); @@ -135,7 +135,7 @@ class FileWatcherService { overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); DWORD bytesReturned; - while (running) { + while (running_) { if (!ReadDirectoryChangesW( dir_handle, buffer, sizeof(buffer), TRUE, FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME, From 63e6a29a57eb2e316519f97aa47375034d9402df Mon Sep 17 00:00:00 2001 From: James Date: Wed, 20 Nov 2024 14:09:54 +0700 Subject: [PATCH 09/12] fix ci Signed-off-by: James --- engine/services/file_watcher_service.h | 54 ++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index 72b618ef5..a83624811 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -22,6 +22,7 @@ class FileWatcherService { private: #if defined(_WIN32) HANDLE dir_handle; + HANDLE stop_event; #elif defined(__APPLE__) FSEventStreamRef event_stream; #else // Linux @@ -37,10 +38,16 @@ class FileWatcherService { if (!std::filesystem::exists(path)) { throw std::runtime_error("Path does not exist: " + path); } +#ifdef _WIN32 + stop_event = CreateEvent(NULL, TRUE, FALSE, NULL); +#endif CTL_INF("FileWatcherService created: " + path); } - ~FileWatcherService() { stop(); } + ~FileWatcherService() { + CTL_INF("FileWatcherService destructor"); + stop(); + } void start() { if (running_) { @@ -52,16 +59,44 @@ class FileWatcherService { } void stop() { + if (!running_) { + return; + } + running_ = false; + +#ifdef _WIN32 + // Signal the stop event + SetEvent(stop_event); +#elif defined(__APPLE__) + if (event_stream) { + FSEventStreamStop(event_stream); + FSEventStreamInvalidate(event_stream); + } +#else // Linux + // For Linux, closing the fd will interrupt the read() call + if (fd >= 0) { + close(fd); + } +#endif + + // Add timeout to avoid infinite waiting if (watch_thread_.joinable()) { watch_thread_.join(); } #ifdef _WIN32 - CloseHandle(dir_handle); -#endif - -#ifdef Linux + if (stop_event != NULL) { + CloseHandle(stop_event); + } + if (dir_handle != INVALID_HANDLE_VALUE) { + CloseHandle(dir_handle); + } +#elif defined(__APPLE__) + if (event_stream) { + FSEventStreamRelease(event_stream); + } +#else // Linux CleanupWatches(); #endif CTL_INF("FileWatcherService stopped!"); @@ -134,7 +169,7 @@ class FileWatcherService { OVERLAPPED overlapped = {0}; overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); DWORD bytesReturned; - + HANDLE events[] = {overlapped.hEvent, stop_event}; while (running_) { if (!ReadDirectoryChangesW( dir_handle, buffer, sizeof(buffer), TRUE, @@ -143,11 +178,14 @@ class FileWatcherService { break; } - if (WaitForSingleObject(overlapped.hEvent, INFINITE) != WAIT_OBJECT_0) { + // Wait for either file change event or stop event + DWORD result = WaitForMultipleObjects(2, events, FALSE, INFINITE); + if (result == WAIT_OBJECT_0 + 1) { // stop_event was signaled break; } - if (!GetOverlappedResult(dir_handle, &overlapped, &bytesReturned, + if (result != WAIT_OBJECT_0 || + !GetOverlappedResult(dir_handle, &overlapped, &bytesReturned, FALSE)) { break; } From a1a7ce2db148982822260969bf84c8c38e0ad216 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 20 Nov 2024 15:45:02 +0700 Subject: [PATCH 10/12] update --- engine/database/models.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/engine/database/models.cc b/engine/database/models.cc index d0bee405c..a452ca1c5 100644 --- a/engine/database/models.cc +++ b/engine/database/models.cc @@ -262,6 +262,11 @@ cpp::result Models::UpdateModelAlias( cpp::result Models::DeleteModelEntry( const std::string& identifier) { try { + // delete only if its there + if (!HasModel(identifier)) { + return true; + } + SQLite::Statement del( db_, "DELETE from models WHERE model_id = ? OR model_alias = ?"); del.bind(1, identifier); From 8c4b04617f869d81f0115264ee10acaed2cf470a Mon Sep 17 00:00:00 2001 From: James Date: Wed, 20 Nov 2024 16:22:44 +0700 Subject: [PATCH 11/12] fix linux Signed-off-by: James --- engine/services/file_watcher_service.h | 122 +++++++++++++++++++------ 1 file changed, 96 insertions(+), 26 deletions(-) diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index a83624811..5ed152523 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -13,6 +13,7 @@ #include #else // Linux +#include #include #include #include @@ -21,7 +22,7 @@ class FileWatcherService { private: #if defined(_WIN32) - HANDLE dir_handle; + HANDLE dir_handle = INVALID_HANDLE_VALUE HANDLE stop_event; #elif defined(__APPLE__) FSEventStreamRef event_stream; @@ -34,7 +35,7 @@ class FileWatcherService { public: FileWatcherService(const std::string& path, std::shared_ptr model_service) - : watch_path_{path}, running_{false} { + : watch_path_{path}, running_{false}, model_service_{model_service} { if (!std::filesystem::exists(path)) { throw std::runtime_error("Path does not exist: " + path); } @@ -75,11 +76,12 @@ class FileWatcherService { } #else // Linux // For Linux, closing the fd will interrupt the read() call + CTL_INF("before close fd!"); if (fd >= 0) { close(fd); } #endif - + CTL_INF("before join!"); // Add timeout to avoid infinite waiting if (watch_thread_.joinable()) { watch_thread_.join(); @@ -216,23 +218,36 @@ class FileWatcherService { #else // Linux void AddWatch(const std::string& dirPath) { - wd = inotify_add_watch(fd, dirPath.c_str(), - IN_DELETE | IN_CREATE | IN_DELETE_SELF); + const int watch_flags = IN_DELETE | IN_DELETE_SELF | IN_CREATE; + wd = inotify_add_watch(fd, dirPath.c_str(), watch_flags); if (wd < 0) { - throw std::runtime_error("Failed to add watch on: " + dirPath); + throw std::runtime_error("Failed to add watch on " + dirPath + + ": " + std::string(strerror(errno))); } watch_descriptors[wd] = dirPath; - // Recursively add watches to subdirectories - for (const auto& entry : - std::filesystem::recursive_directory_iterator(dirPath)) { - if (std::filesystem::is_directory(entry)) { - AddWatch(entry.path().string()); + // Add watches for subdirectories + try { + for (const auto& entry : + std::filesystem::recursive_directory_iterator(dirPath)) { + if (std::filesystem::is_directory(entry)) { + int subwd = inotify_add_watch(fd, entry.path().c_str(), watch_flags); + if (subwd >= 0) { + watch_descriptors[subwd] = entry.path().string(); + } else { + CTL_ERR("Failed to add watch for subdirectory " + + entry.path().string() + ": " + + std::string(strerror(errno))); + } + } } + } catch (const std::filesystem::filesystem_error& e) { + CTL_ERR("Error walking directory tree: " + std::string(e.what())); } } void CleanupWatches() { + CTL_INF("Cleanup Watches"); for (const auto& [wd, path] : watch_descriptors) { inotify_rm_watch(fd, wd); } @@ -245,33 +260,88 @@ class FileWatcherService { } void WatcherThread() { - fd = inotify_init(); + fd = inotify_init1(IN_NONBLOCK); if (fd < 0) { - throw std::runtime_error("Failed to initialize inotify"); + CTL_ERR("Failed to initialize inotify: " + std::string(strerror(errno))); + return; } - // Add initial watch on the main directory - AddWatch(watch_path_); + try { + AddWatch(watch_path_); + } catch (const std::exception& e) { + CTL_ERR("Failed to add watch: " + std::string(e.what())); + close(fd); + return; + } + const int POLL_TIMEOUT_MS = 1000; // 1 second timeout char buffer[4096]; + struct pollfd pfd = { + .fd = fd, + .events = POLLIN, + .revents = 0 + }; + while (running_) { - int length = read(fd, buffer, sizeof(buffer)); - if (length < 0) { + // Poll will sleep until either: + // 1. Events are available (POLLIN) + // 2. POLL_TIMEOUT_MS milliseconds have elapsed + // 3. An error occurs + int poll_result = poll(&pfd, 1, POLL_TIMEOUT_MS); + + if (poll_result < 0) { + if (errno == EINTR) { + // System call was interrupted, just retry + continue; + } + CTL_ERR("Poll failed: " + std::string(strerror(errno))); + break; + } + + if (poll_result == 0) { // Timeout - no events + // No need to sleep - poll() already waited continue; } - int i = 0; - while (i < length) { - struct inotify_event* event = (struct inotify_event*)&buffer[i]; - if (event->mask & IN_DELETE) { - auto deletedPath = watch_descriptors[event->wd] + "/" + event->name; - model_service_->ForceIndexingModelList(); + if (pfd.revents & POLLERR || pfd.revents & POLLNVAL) { + CTL_ERR("Poll error on fd"); + break; + } + + // Read all pending events + while (running_) { + int length = read(fd, buffer, sizeof(buffer)); + if (length < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // No more events to read + break; + } + CTL_ERR("Read error: " + std::string(strerror(errno))); + break; + } + + if (length == 0) { + break; + } + + // Process events + size_t i = 0; + while (i < static_cast(length)) { + struct inotify_event* event = + reinterpret_cast(&buffer[i]); + + if (event->mask & (IN_DELETE | IN_DELETE_SELF)) { + try { + model_service_->ForceIndexingModelList(); + } catch (const std::exception& e) { + CTL_ERR("Error processing delete event: " + std::string(e.what())); + } + } + + i += sizeof(struct inotify_event) + event->len; } - i += sizeof(struct inotify_event) + event->len; } } - - close(fd); } #endif }; From e77d439a294c2a718fd58265d1f82fd2e578917e Mon Sep 17 00:00:00 2001 From: James Date: Wed, 20 Nov 2024 19:41:14 +0700 Subject: [PATCH 12/12] fix windows Signed-off-by: James --- engine/services/file_watcher_service.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/services/file_watcher_service.h b/engine/services/file_watcher_service.h index 5ed152523..f1a0780c7 100644 --- a/engine/services/file_watcher_service.h +++ b/engine/services/file_watcher_service.h @@ -22,7 +22,7 @@ class FileWatcherService { private: #if defined(_WIN32) - HANDLE dir_handle = INVALID_HANDLE_VALUE + HANDLE dir_handle = INVALID_HANDLE_VALUE; HANDLE stop_event; #elif defined(__APPLE__) FSEventStreamRef event_stream;