Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement inotify directory watcher #743

Merged
merged 1 commit into from
Jul 28, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 164 additions & 12 deletions source/vibe/core/drivers/libevent2.d
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ final class Libevent2Driver : EventDriver {
s_eventLoop = m_eventLoop;
logDiagnostic("libevent is using %s for events.", to!string(event_base_get_method(m_eventLoop)));
evthread_make_base_notifiable(m_eventLoop);

m_dnsBase = evdns_base_new(m_eventLoop, 1);
if( !m_dnsBase ) logError("Failed to initialize DNS lookup.");

Expand Down Expand Up @@ -210,6 +210,7 @@ final class Libevent2Driver : EventDriver {

DirectoryWatcher watchDirectory(Path path, bool recursive)
{
version (linux) return new InotifyDirectoryWatcher(m_core, path, recursive);
assert(false, "watchDirectory is not yet implemented in the libevent driver.");
}

Expand Down Expand Up @@ -255,7 +256,7 @@ final class Libevent2Driver : EventDriver {

Libevent2TCPConnection connectTCP(NetworkAddress addr)
{

auto sockfd_raw = socket(addr.family, SOCK_STREAM, 0);
// on Win64 socket() returns a 64-bit value but libevent expects an int
static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0);
Expand All @@ -269,10 +270,10 @@ final class Libevent2Driver : EventDriver {
socketEnforce(bind(sockfd, bind_addr.sockAddr, bind_addr.sockAddrLen) == 0, "Failed to bind socket.");
socklen_t balen = bind_addr.sockAddrLen;
socketEnforce(getsockname(sockfd, bind_addr.sockAddr, &balen) == 0, "getsockname failed.");

if( evutil_make_socket_nonblocking(sockfd) )
throw new Exception("Failed to make socket non-blocking.");

auto buf_event = bufferevent_socket_new(m_eventLoop, sockfd, bufferevent_options.BEV_OPT_CLOSE_ON_FREE);
if( !buf_event ) throw new Exception("Failed to create buffer event for socket.");

Expand All @@ -287,7 +288,7 @@ final class Libevent2Driver : EventDriver {
assert(cctx.exception is null);
socketEnforce(bufferevent_socket_connect(buf_event, addr.sockAddr, addr.sockAddrLen) == 0,
"Failed to connect to " ~ addr.toString());

try {
cctx.checkForException();

Expand All @@ -298,10 +299,10 @@ final class Libevent2Driver : EventDriver {
} catch (Exception e) {
throw new Exception(format("Failed to connect to %s: %s", addr.toString(), e.msg));
}

logTrace("Connect result status: %d", cctx.status);
enforce(cctx.status == BEV_EVENT_CONNECTED, format("Failed to connect to host %s: %s", addr.toString(), cctx.status));

return new Libevent2TCPConnection(cctx);
}

Expand All @@ -315,7 +316,7 @@ final class Libevent2Driver : EventDriver {
static if (typeof(listenfd_raw).max > int.max) assert(listenfd_raw <= int.max || listenfd_raw == ~0);
auto listenfd = cast(int)listenfd_raw;
socketEnforce(listenfd != -1, "Error creating listening socket");
int tmp_reuse = 1;
int tmp_reuse = 1;
socketEnforce(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof) == 0,
"Error enabling socket address reuse on listening socket");
socketEnforce(bind(listenfd, bind_addr.sockAddr, bind_addr.sockAddrLen) == 0,
Expand Down Expand Up @@ -830,7 +831,7 @@ final class Libevent2UDPConnection : UDPConnection {
static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0);
auto sockfd = cast(int)sockfd_raw;
socketEnforce(sockfd != -1, "Failed to create socket.");

enforce(evutil_make_socket_nonblocking(sockfd) == 0, "Failed to make socket non-blocking.");

int tmp_reuse = 1;
Expand All @@ -842,7 +843,7 @@ final class Libevent2UDPConnection : UDPConnection {
// read back the actual bind address
socklen_t balen = bind_addr.sockAddrLen;
socketEnforce(getsockname(sockfd, bind_addr.sockAddr, &balen) == 0, "getsockname failed.");

// generate the bind address string
m_bindAddress = bind_addr;
char buf[64];
Expand Down Expand Up @@ -908,7 +909,7 @@ final class Libevent2UDPConnection : UDPConnection {
addr.port = port;
connect(addr);
}

void connect(NetworkAddress addr)
{
enforce(.connect(m_ctx.socketfd, addr.sockAddr, addr.sockAddrLen) == 0, "Failed to connect UDP socket."~to!string(getLastSocketError()));
Expand Down Expand Up @@ -996,6 +997,157 @@ final class Libevent2UDPConnection : UDPConnection {
}
}

/******************************************************************************/
/* InotifyDirectoryWatcher */
/******************************************************************************/

version (linux)
final class InotifyDirectoryWatcher : DirectoryWatcher {
import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify;
import std.file;

private {
Path m_path;
string[int] m_watches;
bool m_recursive;
int m_handle;
DriverCore m_core;
Task m_owner;
}

this(DriverCore core, Path path, bool recursive)
{
m_core = core;
m_recursive = recursive;
m_path = path;

enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
m_handle = inotify_init1(IN_NONBLOCK);
errnoEnforce(m_handle != -1, "Failed to initialize inotify.");

auto spath = m_path.toString();
addWatch(spath);
if (recursive && spath.isDir)
{
foreach (de; spath.dirEntries(SpanMode.shallow))
if (de.isDir) addWatch(de.name);
}
}

~this()
{
errnoEnforce(close(m_handle) == 0);
}

@property Path path() const { return m_path; }
@property bool recursive() const { return m_recursive; }

void release()
{
assert(m_owner == Task.getThis(), "Releasing DirectoyWatcher that is not owned by the calling task.");
m_owner = Task();
}

void acquire()
{
assert(m_owner == Task(), "Acquiring DirectoyWatcher that is already owned.");
m_owner = Task.getThis();
}

bool amOwner()
{
return m_owner == Task.getThis();
}

bool readChanges(ref DirectoryChange[] dst, Duration timeout)
{
import core.stdc.stdio : FILENAME_MAX;
import core.stdc.string : strlen;

acquire();
scope(exit) release();

ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
auto nread = read(m_handle, buf.ptr, buf.sizeof);

if (nread == -1 && errno == EAGAIN)
{
if (!waitReadable(m_handle, timeout))
return false;
nread = read(m_handle, buf.ptr, buf.sizeof);
}
errnoEnforce(nread != -1, "Error while reading inotify handle.");
assert(nread > 0);

dst.length = 0;
do
{
for (auto p = buf.ptr; p < buf.ptr + nread; )
{
auto ev = cast(inotify_event*)p;

DirectoryChangeType type;
if (ev.mask & (IN_CREATE|IN_MOVED_TO))
type = DirectoryChangeType.added;
else if (ev.mask & (IN_DELETE|IN_DELETE_SELF|IN_MOVE_SELF|IN_MOVED_FROM))
type = DirectoryChangeType.removed;
else if (ev.mask & IN_MODIFY)
type = DirectoryChangeType.modified;

import std.path : buildPath;
auto name = ev.name.ptr[0 .. ev.name.ptr.strlen];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to be getting a SEGV here on my 32bit server, but not on my 64bit dev machine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmh, the inotify_event struct looks declaration seems to be correct. Can you print out nread?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... seem to have been a bug when built with the gdc-2.064 binaries. I've switched to gdc-2.065 and it's gone away. This might be difficult to track down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it also works for me with -m32 on dmd.2.065.

auto path = Path(buildPath(m_watches[ev.wd], name));

dst ~= DirectoryChange(type, path);

p += inotify_event.sizeof + ev.len;
}
nread = read(m_handle, buf.ptr, buf.sizeof);
errnoEnforce(nread != -1 || errno == EAGAIN, "Error while reading inotify handle.");
} while (nread > 0);
return true;
}

private bool waitReadable(int fd, Duration timeout)
{
static struct Args { InotifyDirectoryWatcher watcher; bool readable, timeout; }

static extern(System) void cb(int fd, short what, void* p) {
with (cast(Args*)p) {
if (what & EV_READ) readable = true;
if (what & EV_TIMEOUT) timeout = true;
if (watcher.m_owner)
watcher.m_core.resumeTask(watcher.m_owner);
}
}

auto loop = getThreadLibeventEventLoop();
auto args = Args(this);
auto ev = event_new(loop, fd, EV_READ, &cb, &args);
scope(exit) event_free(ev);

if (!timeout.isNegative) {
auto tv = timeout.toTimeVal();
event_add(ev, &tv);
} else {
event_add(ev, null);
}
while (!args.readable && !args.timeout)
m_core.yieldForEvent();
return args.readable;
}

private void addWatch(string path)
{
enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY |
IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO;
immutable wd = inotify_add_watch(m_handle, path.toStringz, EVENTS);
errnoEnforce(wd != -1, "Failed to add inotify watch.");
m_watches[wd] = path;
}
}


private {
event_base* s_eventLoop; // TLS
__gshared DriverCore s_driverCore;
Expand Down Expand Up @@ -1131,7 +1283,7 @@ private nothrow extern(C)
(*pl)++;
}
auto mtx = cast(LevMutex*)lock;

assert(mtx !is null, "null lock");
assert(mtx.mutex !is null || mtx.rwmutex !is null, "lock contains no mutex");
if( mode & EVTHREAD_WRITE ){
Expand Down
5 changes: 5 additions & 0 deletions tests/dirwatcher/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.dub
docs.json
__dummy.html
*.o
*.obj
8 changes: 8 additions & 0 deletions tests/dirwatcher/dub.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "tests",
"description": "Test for vibe.d's DirectoryWatcher.",
"dependencies": {
"vibe-d": {"version": "~master", "path": "../../"}
},
"versions": ["VibeCustomMain"]
}
78 changes: 78 additions & 0 deletions tests/dirwatcher/source/app.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import vibe.vibe;
import std.file, std.process;

void runTest()
{
auto dir = buildPath(tempDir, format("dirwatcher_test_%d", thisProcessID()));
mkdir(dir);
scope(exit) rmdirRecurse(dir);

DirectoryWatcher watcher;
try watcher = Path(dir).watchDirectory(No.recursive);
catch (AssertError e) {
logInfo("DirectoryWatcher not yet implemented. Skipping test.");
return;
}
DirectoryChange[] changes;
assert(!watcher.readChanges(changes, 500.msecs));

auto foo = dir.buildPath("foo");

alias Type = DirectoryChangeType;
static DirectoryChange dc(Type t, string p) { return DirectoryChange(t, Path(p)); }

void check(DirectoryChange[] expected)
{
assert(watcher.readChanges(changes, 0.seconds));
assert(changes == expected);
assert(!watcher.readChanges(changes, 100.msecs));
}

write(foo, null);
check([dc(Type.added, foo)]);
write(foo, [0, 1]);
check([dc(Type.modified, foo)]);
remove(foo);
check([dc(Type.removed, foo)]);
write(foo, null);
write(foo, [0, 1]);
remove(foo);
check([dc(Type.added, foo), dc(Type.modified, foo), dc(Type.removed, foo)]);

auto subdir = dir.buildPath("subdir");
mkdir(subdir);
check([dc(Type.added, subdir)]);
auto bar = subdir.buildPath("bar");
write(bar, null);
assert(!watcher.readChanges(changes, 100.msecs));
remove(bar);

watcher = Path(dir).watchDirectory(Yes.recursive);
write(foo, null);
write(foo, [0, 1]);
remove(foo);
write(bar, null);
write(bar, [0, 1]);
remove(bar);
check([dc(Type.added, foo), dc(Type.modified, foo), dc(Type.removed, foo),
dc(Type.added, bar), dc(Type.modified, bar), dc(Type.removed, bar)]);

write(foo, null);
rename(foo, bar);
remove(bar);
check([dc(Type.added, foo), dc(Type.removed, foo), dc(Type.added, bar), dc(Type.removed, bar)]);
}

int main()
{
int ret = 0;
runTask({
try runTest();
catch (Throwable th) {
logError("Test failed: %s", th.toString());
ret = 1;
} finally exitEventLoop(true);
});
runEventLoop();
return ret;
}