From 6bdaea539f8d112f672fa8a18b4c6813b1e44690 Mon Sep 17 00:00:00 2001 From: Martin Nowak Date: Mon, 28 Jul 2014 04:01:49 +0200 Subject: [PATCH] implement inotify directory watcher --- source/vibe/core/drivers/libevent2.d | 176 +++++++++++++++++++++++++-- tests/dirwatcher/.gitignore | 5 + tests/dirwatcher/dub.json | 8 ++ tests/dirwatcher/source/app.d | 78 ++++++++++++ 4 files changed, 255 insertions(+), 12 deletions(-) create mode 100644 tests/dirwatcher/.gitignore create mode 100644 tests/dirwatcher/dub.json create mode 100644 tests/dirwatcher/source/app.d diff --git a/source/vibe/core/drivers/libevent2.d b/source/vibe/core/drivers/libevent2.d index b104fa8719..649234b61b 100644 --- a/source/vibe/core/drivers/libevent2.d +++ b/source/vibe/core/drivers/libevent2.d @@ -115,7 +115,7 @@ final class Libevent2Driver : EventDriver { s_eventLoop = m_eventLoop; logDiagnostic("libevent is using %s for events.", to!string(event_base_get_method(m_eventLoop))); evthread_make_base_notifiable(m_eventLoop); - + m_dnsBase = evdns_base_new(m_eventLoop, 1); if( !m_dnsBase ) logError("Failed to initialize DNS lookup."); @@ -210,6 +210,7 @@ final class Libevent2Driver : EventDriver { DirectoryWatcher watchDirectory(Path path, bool recursive) { + version (linux) return new InotifyDirectoryWatcher(m_core, path, recursive); assert(false, "watchDirectory is not yet implemented in the libevent driver."); } @@ -255,7 +256,7 @@ final class Libevent2Driver : EventDriver { Libevent2TCPConnection connectTCP(NetworkAddress addr) { - + auto sockfd_raw = socket(addr.family, SOCK_STREAM, 0); // on Win64 socket() returns a 64-bit value but libevent expects an int static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0); @@ -269,10 +270,10 @@ final class Libevent2Driver : EventDriver { socketEnforce(bind(sockfd, bind_addr.sockAddr, bind_addr.sockAddrLen) == 0, "Failed to bind socket."); socklen_t balen = bind_addr.sockAddrLen; socketEnforce(getsockname(sockfd, bind_addr.sockAddr, &balen) == 0, "getsockname failed."); - + if( evutil_make_socket_nonblocking(sockfd) ) throw new Exception("Failed to make socket non-blocking."); - + auto buf_event = bufferevent_socket_new(m_eventLoop, sockfd, bufferevent_options.BEV_OPT_CLOSE_ON_FREE); if( !buf_event ) throw new Exception("Failed to create buffer event for socket."); @@ -287,7 +288,7 @@ final class Libevent2Driver : EventDriver { assert(cctx.exception is null); socketEnforce(bufferevent_socket_connect(buf_event, addr.sockAddr, addr.sockAddrLen) == 0, "Failed to connect to " ~ addr.toString()); - + try { cctx.checkForException(); @@ -298,10 +299,10 @@ final class Libevent2Driver : EventDriver { } catch (Exception e) { throw new Exception(format("Failed to connect to %s: %s", addr.toString(), e.msg)); } - + logTrace("Connect result status: %d", cctx.status); enforce(cctx.status == BEV_EVENT_CONNECTED, format("Failed to connect to host %s: %s", addr.toString(), cctx.status)); - + return new Libevent2TCPConnection(cctx); } @@ -315,7 +316,7 @@ final class Libevent2Driver : EventDriver { static if (typeof(listenfd_raw).max > int.max) assert(listenfd_raw <= int.max || listenfd_raw == ~0); auto listenfd = cast(int)listenfd_raw; socketEnforce(listenfd != -1, "Error creating listening socket"); - int tmp_reuse = 1; + int tmp_reuse = 1; socketEnforce(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) == 0, "Error enabling socket address reuse on listening socket"); socketEnforce(bind(listenfd, bind_addr.sockAddr, bind_addr.sockAddrLen) == 0, @@ -830,7 +831,7 @@ final class Libevent2UDPConnection : UDPConnection { static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0); auto sockfd = cast(int)sockfd_raw; socketEnforce(sockfd != -1, "Failed to create socket."); - + enforce(evutil_make_socket_nonblocking(sockfd) == 0, "Failed to make socket non-blocking."); int tmp_reuse = 1; @@ -842,7 +843,7 @@ final class Libevent2UDPConnection : UDPConnection { // read back the actual bind address socklen_t balen = bind_addr.sockAddrLen; socketEnforce(getsockname(sockfd, bind_addr.sockAddr, &balen) == 0, "getsockname failed."); - + // generate the bind address string m_bindAddress = bind_addr; char buf[64]; @@ -908,7 +909,7 @@ final class Libevent2UDPConnection : UDPConnection { addr.port = port; connect(addr); } - + void connect(NetworkAddress addr) { enforce(.connect(m_ctx.socketfd, addr.sockAddr, addr.sockAddrLen) == 0, "Failed to connect UDP socket."~to!string(getLastSocketError())); @@ -996,6 +997,157 @@ final class Libevent2UDPConnection : UDPConnection { } } +/******************************************************************************/ +/* InotifyDirectoryWatcher */ +/******************************************************************************/ + +version (linux) +final class InotifyDirectoryWatcher : DirectoryWatcher { + import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify; + import std.file; + + private { + Path m_path; + string[int] m_watches; + bool m_recursive; + int m_handle; + DriverCore m_core; + Task m_owner; + } + + this(DriverCore core, Path path, bool recursive) + { + m_core = core; + m_recursive = recursive; + m_path = path; + + enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect + m_handle = inotify_init1(IN_NONBLOCK); + errnoEnforce(m_handle != -1, "Failed to initialize inotify."); + + auto spath = m_path.toString(); + addWatch(spath); + if (recursive && spath.isDir) + { + foreach (de; spath.dirEntries(SpanMode.shallow)) + if (de.isDir) addWatch(de.name); + } + } + + ~this() + { + errnoEnforce(close(m_handle) == 0); + } + + @property Path path() const { return m_path; } + @property bool recursive() const { return m_recursive; } + + void release() + { + assert(m_owner == Task.getThis(), "Releasing DirectoyWatcher that is not owned by the calling task."); + m_owner = Task(); + } + + void acquire() + { + assert(m_owner == Task(), "Acquiring DirectoyWatcher that is already owned."); + m_owner = Task.getThis(); + } + + bool amOwner() + { + return m_owner == Task.getThis(); + } + + bool readChanges(ref DirectoryChange[] dst, Duration timeout) + { + import core.stdc.stdio : FILENAME_MAX; + import core.stdc.string : strlen; + + acquire(); + scope(exit) release(); + + ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void; + auto nread = read(m_handle, buf.ptr, buf.sizeof); + + if (nread == -1 && errno == EAGAIN) + { + if (!waitReadable(m_handle, timeout)) + return false; + nread = read(m_handle, buf.ptr, buf.sizeof); + } + errnoEnforce(nread != -1, "Error while reading inotify handle."); + assert(nread > 0); + + dst.length = 0; + do + { + for (auto p = buf.ptr; p < buf.ptr + nread; ) + { + auto ev = cast(inotify_event*)p; + + DirectoryChangeType type; + if (ev.mask & (IN_CREATE|IN_MOVED_TO)) + type = DirectoryChangeType.added; + else if (ev.mask & (IN_DELETE|IN_DELETE_SELF|IN_MOVE_SELF|IN_MOVED_FROM)) + type = DirectoryChangeType.removed; + else if (ev.mask & IN_MODIFY) + type = DirectoryChangeType.modified; + + import std.path : buildPath; + auto name = ev.name.ptr[0 .. ev.name.ptr.strlen]; + auto path = Path(buildPath(m_watches[ev.wd], name)); + + dst ~= DirectoryChange(type, path); + + p += inotify_event.sizeof + ev.len; + } + nread = read(m_handle, buf.ptr, buf.sizeof); + errnoEnforce(nread != -1 || errno == EAGAIN, "Error while reading inotify handle."); + } while (nread > 0); + return true; + } + + private bool waitReadable(int fd, Duration timeout) + { + static struct Args { InotifyDirectoryWatcher watcher; bool readable, timeout; } + + static extern(System) void cb(int fd, short what, void* p) { + with (cast(Args*)p) { + if (what & EV_READ) readable = true; + if (what & EV_TIMEOUT) timeout = true; + if (watcher.m_owner) + watcher.m_core.resumeTask(watcher.m_owner); + } + } + + auto loop = getThreadLibeventEventLoop(); + auto args = Args(this); + auto ev = event_new(loop, fd, EV_READ, &cb, &args); + scope(exit) event_free(ev); + + if (!timeout.isNegative) { + auto tv = timeout.toTimeVal(); + event_add(ev, &tv); + } else { + event_add(ev, null); + } + while (!args.readable && !args.timeout) + m_core.yieldForEvent(); + return args.readable; + } + + private void addWatch(string path) + { + enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY | + IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO; + immutable wd = inotify_add_watch(m_handle, path.toStringz, EVENTS); + errnoEnforce(wd != -1, "Failed to add inotify watch."); + m_watches[wd] = path; + } +} + + private { event_base* s_eventLoop; // TLS __gshared DriverCore s_driverCore; @@ -1131,7 +1283,7 @@ private nothrow extern(C) (*pl)++; } auto mtx = cast(LevMutex*)lock; - + assert(mtx !is null, "null lock"); assert(mtx.mutex !is null || mtx.rwmutex !is null, "lock contains no mutex"); if( mode & EVTHREAD_WRITE ){ diff --git a/tests/dirwatcher/.gitignore b/tests/dirwatcher/.gitignore new file mode 100644 index 0000000000..433d26664a --- /dev/null +++ b/tests/dirwatcher/.gitignore @@ -0,0 +1,5 @@ +.dub +docs.json +__dummy.html +*.o +*.obj diff --git a/tests/dirwatcher/dub.json b/tests/dirwatcher/dub.json new file mode 100644 index 0000000000..7c5b3a8af7 --- /dev/null +++ b/tests/dirwatcher/dub.json @@ -0,0 +1,8 @@ +{ + "name": "tests", + "description": "Test for vibe.d's DirectoryWatcher.", + "dependencies": { + "vibe-d": {"version": "~master", "path": "../../"} + }, + "versions": ["VibeCustomMain"] +} diff --git a/tests/dirwatcher/source/app.d b/tests/dirwatcher/source/app.d new file mode 100644 index 0000000000..557076f204 --- /dev/null +++ b/tests/dirwatcher/source/app.d @@ -0,0 +1,78 @@ +import vibe.vibe; +import std.file, std.process; + +void runTest() +{ + auto dir = buildPath(tempDir, format("dirwatcher_test_%d", thisProcessID())); + mkdir(dir); + scope(exit) rmdirRecurse(dir); + + DirectoryWatcher watcher; + try watcher = Path(dir).watchDirectory(No.recursive); + catch (AssertError e) { + logInfo("DirectoryWatcher not yet implemented. Skipping test."); + return; + } + DirectoryChange[] changes; + assert(!watcher.readChanges(changes, 500.msecs)); + + auto foo = dir.buildPath("foo"); + + alias Type = DirectoryChangeType; + static DirectoryChange dc(Type t, string p) { return DirectoryChange(t, Path(p)); } + + void check(DirectoryChange[] expected) + { + assert(watcher.readChanges(changes, 0.seconds)); + assert(changes == expected); + assert(!watcher.readChanges(changes, 100.msecs)); + } + + write(foo, null); + check([dc(Type.added, foo)]); + write(foo, [0, 1]); + check([dc(Type.modified, foo)]); + remove(foo); + check([dc(Type.removed, foo)]); + write(foo, null); + write(foo, [0, 1]); + remove(foo); + check([dc(Type.added, foo), dc(Type.modified, foo), dc(Type.removed, foo)]); + + auto subdir = dir.buildPath("subdir"); + mkdir(subdir); + check([dc(Type.added, subdir)]); + auto bar = subdir.buildPath("bar"); + write(bar, null); + assert(!watcher.readChanges(changes, 100.msecs)); + remove(bar); + + watcher = Path(dir).watchDirectory(Yes.recursive); + write(foo, null); + write(foo, [0, 1]); + remove(foo); + write(bar, null); + write(bar, [0, 1]); + remove(bar); + check([dc(Type.added, foo), dc(Type.modified, foo), dc(Type.removed, foo), + dc(Type.added, bar), dc(Type.modified, bar), dc(Type.removed, bar)]); + + write(foo, null); + rename(foo, bar); + remove(bar); + check([dc(Type.added, foo), dc(Type.removed, foo), dc(Type.added, bar), dc(Type.removed, bar)]); +} + +int main() +{ + int ret = 0; + runTask({ + try runTest(); + catch (Throwable th) { + logError("Test failed: %s", th.toString()); + ret = 1; + } finally exitEventLoop(true); + }); + runEventLoop(); + return ret; +}