Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/config/yaml_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ void YamlHandler::Reset() {
void YamlHandler::ReadYamlFile(const std::string& file_path) {
namespace fs = std::filesystem;
namespace fmu = file_manager_utils;

try {
yaml_node_ = YAML::LoadFile(file_path);
// incase of model.yml file, we don't have files yet, create them
Expand All @@ -41,7 +42,6 @@ void YamlHandler::ReadYamlFile(const std::string& file_path) {
yaml_node_["files"] = v;
}
} catch (const YAML::BadFile& e) {
std::cerr << "Failed to read file: " << e.what() << std::endl;
throw;
}
}
Expand Down
87 changes: 52 additions & 35 deletions engine/services/file_watcher_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
#include <windows.h>

#else // Linux
#include <poll.h>
#include <limits.h>
#include <poll.h>
#include <sys/inotify.h>
#include <unistd.h>
#endif
Expand Down Expand Up @@ -121,39 +121,59 @@ class FileWatcherService {
auto* watcher = static_cast<FileWatcherService*>(clientCallBackInfo);

for (size_t i = 0; i < numEvents; i++) {
if (eventFlags[i] & kFSEventStreamEventFlagItemRemoved) {
if (eventFlags[i] & (kFSEventStreamEventFlagItemRemoved |
kFSEventStreamEventFlagItemRenamed |
kFSEventStreamEventFlagItemModified)) {
CTL_INF("File removed: " + std::string(paths[i]));
CTL_INF("File event detected: " + std::string(paths[i]) +
" flags: " + std::to_string(eventFlags[i]));
watcher->model_service_->ForceIndexingModelList();
}
}
}

void WatcherThread() {
// macOS implementation
auto mypath = CFStringCreateWithCString(NULL, watch_path_.c_str(),
kCFStringEncodingUTF8);
auto path_to_watch = CFArrayCreate(NULL, (const void**)&mypath, 1, NULL);
CFRunLoopRef runLoop = CFRunLoopGetCurrent();

FSEventStreamContext context = {0, this, NULL, NULL, NULL};
auto path = CFStringCreateWithCString(nullptr, watch_path_.c_str(),
kCFStringEncodingUTF8);
auto path_to_watch =
CFArrayCreate(nullptr, (const void**)&path, 1, &kCFTypeArrayCallBacks);

event_stream =
FSEventStreamCreate(NULL, &FileWatcherService::callback, &context,
path_to_watch, kFSEventStreamEventIdSinceNow,
0.5, // 500ms latency
kFSEventStreamCreateFlagFileEvents);
FSEventStreamContext context = {0, this, nullptr, nullptr, nullptr};

dispatch_queue_t queue = dispatch_get_main_queue();
FSEventStreamSetDispatchQueue(event_stream, queue);
FSEventStreamStart(event_stream);
event_stream = FSEventStreamCreate(
nullptr, &FileWatcherService::callback, &context, path_to_watch,
kFSEventStreamEventIdSinceNow, 1, // each second
kFSEventStreamCreateFlagFileEvents | kFSEventStreamCreateFlagNoDefer |
kFSEventStreamCreateFlagWatchRoot);

if (!event_stream) {
CFRelease(path_to_watch);
CFRelease(path);
throw std::runtime_error("Failed to create FSEvent stream");
}

FSEventStreamScheduleWithRunLoop(event_stream, runLoop,
kCFRunLoopDefaultMode);

if (!FSEventStreamStart(event_stream)) {
FSEventStreamInvalidate(event_stream);
FSEventStreamRelease(event_stream);
CFRelease(path_to_watch);
CFRelease(path);
throw std::runtime_error("Failed to start FSEvent stream");
}

while (running_) {
CFRunLoopRunInMode(kCFRunLoopDefaultMode, 1.0, false);
CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.25, true);
}

FSEventStreamStop(event_stream);
FSEventStreamInvalidate(event_stream);
FSEventStreamRelease(event_stream);
CFRelease(path_to_watch);
CFRelease(mypath);
CFRelease(path);
}

#elif defined(_WIN32)
Expand Down Expand Up @@ -221,22 +241,22 @@ class FileWatcherService {
const int watch_flags = IN_DELETE | IN_DELETE_SELF | IN_CREATE;
wd = inotify_add_watch(fd, dirPath.c_str(), watch_flags);
if (wd < 0) {
throw std::runtime_error("Failed to add watch on " + dirPath +
": " + std::string(strerror(errno)));
throw std::runtime_error("Failed to add watch on " + dirPath + ": " +
std::string(strerror(errno)));
}
watch_descriptors[wd] = dirPath;

// Add watches for subdirectories
try {
for (const auto& entry :
for (const auto& entry :
std::filesystem::recursive_directory_iterator(dirPath)) {
if (std::filesystem::is_directory(entry)) {
int subwd = inotify_add_watch(fd, entry.path().c_str(), watch_flags);
if (subwd >= 0) {
watch_descriptors[subwd] = entry.path().string();
} else {
CTL_ERR("Failed to add watch for subdirectory " +
entry.path().string() + ": " +
CTL_ERR("Failed to add watch for subdirectory " +
entry.path().string() + ": " +
std::string(strerror(errno)));
}
}
Expand Down Expand Up @@ -274,21 +294,17 @@ class FileWatcherService {
return;
}

const int POLL_TIMEOUT_MS = 1000; // 1 second timeout
const int POLL_TIMEOUT_MS = 1000; // 1 second timeout
char buffer[4096];
struct pollfd pfd = {
.fd = fd,
.events = POLLIN,
.revents = 0
};

struct pollfd pfd = {.fd = fd, .events = POLLIN, .revents = 0};

while (running_) {
// Poll will sleep until either:
// 1. Events are available (POLLIN)
// 2. POLL_TIMEOUT_MS milliseconds have elapsed
// 3. An error occurs
int poll_result = poll(&pfd, 1, POLL_TIMEOUT_MS);

if (poll_result < 0) {
if (errno == EINTR) {
// System call was interrupted, just retry
Expand All @@ -297,7 +313,7 @@ class FileWatcherService {
CTL_ERR("Poll failed: " + std::string(strerror(errno)));
break;
}

if (poll_result == 0) { // Timeout - no events
// No need to sleep - poll() already waited
continue;
Expand Down Expand Up @@ -327,17 +343,18 @@ class FileWatcherService {
// Process events
size_t i = 0;
while (i < static_cast<size_t>(length)) {
struct inotify_event* event =
struct inotify_event* event =
reinterpret_cast<struct inotify_event*>(&buffer[i]);

if (event->mask & (IN_DELETE | IN_DELETE_SELF)) {
try {
model_service_->ForceIndexingModelList();
} catch (const std::exception& e) {
CTL_ERR("Error processing delete event: " + std::string(e.what()));
CTL_ERR("Error processing delete event: " +
std::string(e.what()));
}
}

i += sizeof(struct inotify_event) + event->len;
}
}
Expand Down