Skip to content

Commit

Permalink
Merge pull request #743 from MartinNowak/inotify
Browse files Browse the repository at this point in the history
implement inotify directory watcher
  • Loading branch information
s-ludwig committed Jul 28, 2014
2 parents 14582fd + 6bdaea5 commit 58fd812
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 12 deletions.
176 changes: 164 additions & 12 deletions source/vibe/core/drivers/libevent2.d
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ final class Libevent2Driver : EventDriver {
s_eventLoop = m_eventLoop;
logDiagnostic("libevent is using %s for events.", to!string(event_base_get_method(m_eventLoop)));
evthread_make_base_notifiable(m_eventLoop);

m_dnsBase = evdns_base_new(m_eventLoop, 1);
if( !m_dnsBase ) logError("Failed to initialize DNS lookup.");

Expand Down Expand Up @@ -210,6 +210,7 @@ final class Libevent2Driver : EventDriver {

DirectoryWatcher watchDirectory(Path path, bool recursive)
{
version (linux) return new InotifyDirectoryWatcher(m_core, path, recursive);
assert(false, "watchDirectory is not yet implemented in the libevent driver.");
}

Expand Down Expand Up @@ -255,7 +256,7 @@ final class Libevent2Driver : EventDriver {

Libevent2TCPConnection connectTCP(NetworkAddress addr)
{

auto sockfd_raw = socket(addr.family, SOCK_STREAM, 0);
// on Win64 socket() returns a 64-bit value but libevent expects an int
static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0);
Expand All @@ -269,10 +270,10 @@ final class Libevent2Driver : EventDriver {
socketEnforce(bind(sockfd, bind_addr.sockAddr, bind_addr.sockAddrLen) == 0, "Failed to bind socket.");
socklen_t balen = bind_addr.sockAddrLen;
socketEnforce(getsockname(sockfd, bind_addr.sockAddr, &balen) == 0, "getsockname failed.");

if( evutil_make_socket_nonblocking(sockfd) )
throw new Exception("Failed to make socket non-blocking.");

auto buf_event = bufferevent_socket_new(m_eventLoop, sockfd, bufferevent_options.BEV_OPT_CLOSE_ON_FREE);
if( !buf_event ) throw new Exception("Failed to create buffer event for socket.");

Expand All @@ -287,7 +288,7 @@ final class Libevent2Driver : EventDriver {
assert(cctx.exception is null);
socketEnforce(bufferevent_socket_connect(buf_event, addr.sockAddr, addr.sockAddrLen) == 0,
"Failed to connect to " ~ addr.toString());

try {
cctx.checkForException();

Expand All @@ -298,10 +299,10 @@ final class Libevent2Driver : EventDriver {
} catch (Exception e) {
throw new Exception(format("Failed to connect to %s: %s", addr.toString(), e.msg));
}

logTrace("Connect result status: %d", cctx.status);
enforce(cctx.status == BEV_EVENT_CONNECTED, format("Failed to connect to host %s: %s", addr.toString(), cctx.status));

return new Libevent2TCPConnection(cctx);
}

Expand All @@ -315,7 +316,7 @@ final class Libevent2Driver : EventDriver {
static if (typeof(listenfd_raw).max > int.max) assert(listenfd_raw <= int.max || listenfd_raw == ~0);
auto listenfd = cast(int)listenfd_raw;
socketEnforce(listenfd != -1, "Error creating listening socket");
int tmp_reuse = 1;
int tmp_reuse = 1;
socketEnforce(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) == 0,
"Error enabling socket address reuse on listening socket");
socketEnforce(bind(listenfd, bind_addr.sockAddr, bind_addr.sockAddrLen) == 0,
Expand Down Expand Up @@ -830,7 +831,7 @@ final class Libevent2UDPConnection : UDPConnection {
static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0);
auto sockfd = cast(int)sockfd_raw;
socketEnforce(sockfd != -1, "Failed to create socket.");

enforce(evutil_make_socket_nonblocking(sockfd) == 0, "Failed to make socket non-blocking.");

int tmp_reuse = 1;
Expand All @@ -842,7 +843,7 @@ final class Libevent2UDPConnection : UDPConnection {
// read back the actual bind address
socklen_t balen = bind_addr.sockAddrLen;
socketEnforce(getsockname(sockfd, bind_addr.sockAddr, &balen) == 0, "getsockname failed.");

// generate the bind address string
m_bindAddress = bind_addr;
char buf[64];
Expand Down Expand Up @@ -908,7 +909,7 @@ final class Libevent2UDPConnection : UDPConnection {
addr.port = port;
connect(addr);
}

void connect(NetworkAddress addr)
{
enforce(.connect(m_ctx.socketfd, addr.sockAddr, addr.sockAddrLen) == 0, "Failed to connect UDP socket."~to!string(getLastSocketError()));
Expand Down Expand Up @@ -996,6 +997,157 @@ final class Libevent2UDPConnection : UDPConnection {
}
}

/******************************************************************************/
/* InotifyDirectoryWatcher */
/******************************************************************************/

version (linux)
final class InotifyDirectoryWatcher : DirectoryWatcher {
import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify;
import std.file;

private {
Path m_path;
string[int] m_watches;
bool m_recursive;
int m_handle;
DriverCore m_core;
Task m_owner;
}

this(DriverCore core, Path path, bool recursive)
{
m_core = core;
m_recursive = recursive;
m_path = path;

enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
m_handle = inotify_init1(IN_NONBLOCK);
errnoEnforce(m_handle != -1, "Failed to initialize inotify.");

auto spath = m_path.toString();
addWatch(spath);
if (recursive && spath.isDir)
{
foreach (de; spath.dirEntries(SpanMode.shallow))
if (de.isDir) addWatch(de.name);
}
}

~this()
{
errnoEnforce(close(m_handle) == 0);
}

@property Path path() const { return m_path; }
@property bool recursive() const { return m_recursive; }

void release()
{
assert(m_owner == Task.getThis(), "Releasing DirectoyWatcher that is not owned by the calling task.");
m_owner = Task();
}

void acquire()
{
assert(m_owner == Task(), "Acquiring DirectoyWatcher that is already owned.");
m_owner = Task.getThis();
}

bool amOwner()
{
return m_owner == Task.getThis();
}

bool readChanges(ref DirectoryChange[] dst, Duration timeout)
{
import core.stdc.stdio : FILENAME_MAX;
import core.stdc.string : strlen;

acquire();
scope(exit) release();

ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
auto nread = read(m_handle, buf.ptr, buf.sizeof);

if (nread == -1 && errno == EAGAIN)
{
if (!waitReadable(m_handle, timeout))
return false;
nread = read(m_handle, buf.ptr, buf.sizeof);
}
errnoEnforce(nread != -1, "Error while reading inotify handle.");
assert(nread > 0);

dst.length = 0;
do
{
for (auto p = buf.ptr; p < buf.ptr + nread; )
{
auto ev = cast(inotify_event*)p;

DirectoryChangeType type;
if (ev.mask & (IN_CREATE|IN_MOVED_TO))
type = DirectoryChangeType.added;
else if (ev.mask & (IN_DELETE|IN_DELETE_SELF|IN_MOVE_SELF|IN_MOVED_FROM))
type = DirectoryChangeType.removed;
else if (ev.mask & IN_MODIFY)
type = DirectoryChangeType.modified;

import std.path : buildPath;
auto name = ev.name.ptr[0 .. ev.name.ptr.strlen];
auto path = Path(buildPath(m_watches[ev.wd], name));

dst ~= DirectoryChange(type, path);

p += inotify_event.sizeof + ev.len;
}
nread = read(m_handle, buf.ptr, buf.sizeof);
errnoEnforce(nread != -1 || errno == EAGAIN, "Error while reading inotify handle.");
} while (nread > 0);
return true;
}

private bool waitReadable(int fd, Duration timeout)
{
static struct Args { InotifyDirectoryWatcher watcher; bool readable, timeout; }

static extern(System) void cb(int fd, short what, void* p) {
with (cast(Args*)p) {
if (what & EV_READ) readable = true;
if (what & EV_TIMEOUT) timeout = true;
if (watcher.m_owner)
watcher.m_core.resumeTask(watcher.m_owner);
}
}

auto loop = getThreadLibeventEventLoop();
auto args = Args(this);
auto ev = event_new(loop, fd, EV_READ, &cb, &args);
scope(exit) event_free(ev);

if (!timeout.isNegative) {
auto tv = timeout.toTimeVal();
event_add(ev, &tv);
} else {
event_add(ev, null);
}
while (!args.readable && !args.timeout)
m_core.yieldForEvent();
return args.readable;
}

private void addWatch(string path)
{
enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY |
IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO;
immutable wd = inotify_add_watch(m_handle, path.toStringz, EVENTS);
errnoEnforce(wd != -1, "Failed to add inotify watch.");
m_watches[wd] = path;
}
}


private {
event_base* s_eventLoop; // TLS
__gshared DriverCore s_driverCore;
Expand Down Expand Up @@ -1131,7 +1283,7 @@ private nothrow extern(C)
(*pl)++;
}
auto mtx = cast(LevMutex*)lock;

assert(mtx !is null, "null lock");
assert(mtx.mutex !is null || mtx.rwmutex !is null, "lock contains no mutex");
if( mode & EVTHREAD_WRITE ){
Expand Down
5 changes: 5 additions & 0 deletions tests/dirwatcher/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.dub
docs.json
__dummy.html
*.o
*.obj
8 changes: 8 additions & 0 deletions tests/dirwatcher/dub.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "tests",
"description": "Test for vibe.d's DirectoryWatcher.",
"dependencies": {
"vibe-d": {"version": "~master", "path": "../../"}
},
"versions": ["VibeCustomMain"]
}
78 changes: 78 additions & 0 deletions tests/dirwatcher/source/app.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import vibe.vibe;
import std.file, std.process;

void runTest()
{
auto dir = buildPath(tempDir, format("dirwatcher_test_%d", thisProcessID()));
mkdir(dir);
scope(exit) rmdirRecurse(dir);

DirectoryWatcher watcher;
try watcher = Path(dir).watchDirectory(No.recursive);
catch (AssertError e) {
logInfo("DirectoryWatcher not yet implemented. Skipping test.");
return;
}
DirectoryChange[] changes;
assert(!watcher.readChanges(changes, 500.msecs));

auto foo = dir.buildPath("foo");

alias Type = DirectoryChangeType;
static DirectoryChange dc(Type t, string p) { return DirectoryChange(t, Path(p)); }

void check(DirectoryChange[] expected)
{
assert(watcher.readChanges(changes, 0.seconds));
assert(changes == expected);
assert(!watcher.readChanges(changes, 100.msecs));
}

write(foo, null);
check([dc(Type.added, foo)]);
write(foo, [0, 1]);
check([dc(Type.modified, foo)]);
remove(foo);
check([dc(Type.removed, foo)]);
write(foo, null);
write(foo, [0, 1]);
remove(foo);
check([dc(Type.added, foo), dc(Type.modified, foo), dc(Type.removed, foo)]);

auto subdir = dir.buildPath("subdir");
mkdir(subdir);
check([dc(Type.added, subdir)]);
auto bar = subdir.buildPath("bar");
write(bar, null);
assert(!watcher.readChanges(changes, 100.msecs));
remove(bar);

watcher = Path(dir).watchDirectory(Yes.recursive);
write(foo, null);
write(foo, [0, 1]);
remove(foo);
write(bar, null);
write(bar, [0, 1]);
remove(bar);
check([dc(Type.added, foo), dc(Type.modified, foo), dc(Type.removed, foo),
dc(Type.added, bar), dc(Type.modified, bar), dc(Type.removed, bar)]);

write(foo, null);
rename(foo, bar);
remove(bar);
check([dc(Type.added, foo), dc(Type.removed, foo), dc(Type.added, bar), dc(Type.removed, bar)]);
}

int main()
{
int ret = 0;
runTask({
try runTest();
catch (Throwable th) {
logError("Test failed: %s", th.toString());
ret = 1;
} finally exitEventLoop(true);
});
runEventLoop();
return ret;
}

0 comments on commit 58fd812

Please sign in to comment.