Skip to content
Browse files

Add support for asynch notifications on a file-descriptor

This patch uses libev inside the NIF framework to provide a mechanism
for monitoring one or more file-descriptors for changes.

It introduces three more verbs to the procket namespace:

{ok, Handle} = procket:watcher_create(Fd, Flags, Cookie)
procket:watcher_arm(Handle)
procket:watcher_disarm(Handle)

The idea is that watcher_create takes an Fd, and a set of flags (0x1 for
READ, 0x2 for WRITE, 0x3 for READ/WRITE), and an arbitrary term (Cookie).

When the FD becomes ready for either a READ or WRITE, it will fire a msg
{procket_watcher, Flags, Cookie} back at the caller and disarm itself.

The user may subsequently re-arm the trigger with procket:watcher_arm()
and disarm it with procket:watcher_disarm().

Signed-off-by: Gregory Haskins <ghaskins@novell.com>
  • Loading branch information...
1 parent 4f1ae12 commit ca35538d1bc7dcbde17deb72cd2b2b47364f4b9c Gregory Haskins committed May 26, 2011
Showing with 236 additions and 2 deletions.
  1. +224 −1 c_src/procket.c
  2. +1 −1 rebar.config
  3. +11 −0 src/procket.erl
View
225 c_src/procket.c
@@ -33,6 +33,7 @@
#include "erl_driver.h"
#include "ancillary.h"
#include "procket.h"
+#include "ev.h"
#define BACKLOG 5
@@ -44,12 +45,28 @@ static ERL_NIF_TERM atom_error;
static ERL_NIF_TERM atom_eagain;
static ErlNifResourceType *PROCKET_ALLOC_RESOURCE;
+static ErlNifResourceType *PROCKET_WATCHER_RESOURCE;
+
+static struct event_loop {
+ ErlNifTid tid;
+ ErlNifMutex *mutex;
+ struct ev_loop *env;
+ ev_async kick;
+ ev_async die;
+} PROCKET_EVENT_LOOP;
typedef struct _alloc_state {
size_t size;
void *buf;
} ALLOC_STATE;
+typedef struct _watcher_state {
+ ev_io io;
+ ErlNifEnv *env;
+ ErlNifPid pid;
+ ERL_NIF_TERM term;
+} WATCHER_STATE;
+
/* Grow or shrink a binary.
*
* We increase the size of the binary to indicate to the
@@ -79,6 +96,81 @@ typedef struct _alloc_state {
} \
} while (0);
+/* libev processing
+ *
+ * We set up a separate thread to listen for libev registered events
+ *
+ */
+ static void
+kick_cb(struct ev_loop *loop, ev_async *w, int revents)
+{
+ // just used for the side effects..this callback forces the loop
+ // to re-eval the current set of watchers
+}
+
+ static void
+die_cb(struct ev_loop *loop, ev_async *w, int revents)
+{
+ ev_break(loop, EVBREAK_ALL);
+}
+
+ static void
+l_release(struct ev_loop *loop)
+{
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+ enif_mutex_unlock(el->mutex);
+}
+
+ static void
+l_acquire(struct ev_loop *loop)
+{
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+ enif_mutex_lock(el->mutex);
+}
+
+ void*
+event_loop(void *arg)
+{
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+
+ l_acquire (el->env);
+ ev_run (el->env, 0);
+ l_release (el->env);
+
+ return NULL;
+}
+
+ static void
+watcher_cb(struct ev_loop *loop, ev_io *w_, int revents)
+{
+ WATCHER_STATE *w = (WATCHER_STATE *)w_;
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+ ERL_NIF_TERM msg;
+
+ msg = enif_make_tuple3(w->env,
+ enif_make_atom(w->env, "procket_watcher"),
+ enif_make_int(w->env, revents),
+ w->term
+ );
+
+ enif_send(NULL, &w->pid, w->env, msg);
+
+ ev_io_stop(el->env, &w->io);
+}
+
+ void
+watcher_free(ErlNifEnv *env, void *obj)
+{
+ WATCHER_STATE *w = obj;
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+
+ l_acquire(el->env);
+ ev_io_stop(el->env, &w->io);
+ ev_async_send(el->env, &el->kick);
+ l_release(el->env);
+
+ enif_free_env(w->env);
+}
static int
load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
@@ -92,9 +184,50 @@ load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
ERL_NIF_RT_CREATE, NULL)) == NULL)
return -1;
+ if ( (PROCKET_WATCHER_RESOURCE = enif_open_resource_type(env, NULL,
+ "procket_watcher_resource", watcher_free,
+ ERL_NIF_RT_CREATE, NULL)) == NULL)
+ return -1;
+
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+
+ memset(el, 0, sizeof(*el));
+
+ el->mutex = enif_mutex_create("libev");
+ if (!el->mutex)
+ return -1;
+
+ el->env = ev_default_loop(0);
+
+ ev_async_init(&el->kick, kick_cb);
+ ev_async_start(el->env, &el->kick);
+
+ ev_async_init(&el->die, die_cb);
+ ev_async_start(el->env, &el->die);
+
+ ev_set_loop_release_cb(el->env, l_release, l_acquire);
+
+ if (enif_thread_create("libev", &el->tid, event_loop, NULL, NULL) < 0)
+ return -1;
+
return (0);
}
+ static void
+unload(ErlNifEnv* env, void *priv_data)
+{
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+
+ /* send an asynchronous ev_break */
+ l_acquire(el->env);
+ ev_async_send(el->env, &el->die);
+ l_release(el->env);
+
+ /* block here before cleaning up remaining resources */
+ enif_thread_join(el->tid, NULL);
+
+ enif_mutex_destroy(el->mutex);
+}
/* Retrieve the file descriptor from the forked privileged process */
/* 0: connected Unix socket */
@@ -629,6 +762,92 @@ nif_errno_id(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
return enif_make_atom(env, erl_errno_id(err));
}
+ static void
+watcher_arm(WATCHER_STATE *w)
+{
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+
+ l_acquire(el->env);
+ ev_io_start(el->env, &w->io);
+ ev_async_send(el->env, &el->kick);
+ l_release(el->env);
+}
+
+/* 0: fd, 1: flags, 2: term */
+ static ERL_NIF_TERM
+nif_watcher_create(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
+{
+ WATCHER_STATE *w = NULL;
+ ERL_NIF_TERM ret = enif_make_badarg(env);
+ int fd;
+ int flags;
+
+ if (!enif_get_int(env, argv[0], &fd))
+ goto out;
+
+ if (!enif_get_int(env, argv[1], &flags))
+ goto out;
+
+ if (flags & ~(EV_READ | EV_WRITE))
+ goto out;
+
+ w = enif_alloc_resource(PROCKET_WATCHER_RESOURCE, sizeof(WATCHER_STATE));
+ if (w == NULL)
+ return error_tuple(env, ENOMEM);
+
+ w->env = enif_alloc_env();
+ if (!w->env) {
+ ret = error_tuple(env, ENOMEM);
+ goto out;
+ }
+
+ enif_self(env, &w->pid);
+ w->term = enif_make_copy(w->env, argv[2]);
+
+ ev_io_init(&w->io, watcher_cb, fd, flags);
+ watcher_arm(w);
+
+ ret = enif_make_tuple2(env, atom_ok, enif_make_resource(env, w));
+
+ out:
+ if (w)
+ enif_release_resource(w);
+
+ return ret;
+
+}
+
+/* 0: watcher */
+ static ERL_NIF_TERM
+nif_watcher_arm(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
+{
+ WATCHER_STATE *w;
+
+ if(!enif_get_resource(env, argv[0], PROCKET_WATCHER_RESOURCE, (void**)&w))
+ return enif_make_badarg(env);
+
+ watcher_arm(w);
+
+ return atom_ok;
+}
+
+/* 0: watcher */
+ static ERL_NIF_TERM
+nif_watcher_disarm(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
+{
+ struct event_loop *el = &PROCKET_EVENT_LOOP;
+ WATCHER_STATE *w;
+
+ if(!enif_get_resource(env, argv[0], PROCKET_WATCHER_RESOURCE, (void**)&w))
+ return enif_make_badarg(env);
+
+ l_acquire(el->env);
+ ev_io_stop(el->env, &w->io);
+ ev_async_send(el->env, &el->kick);
+ l_release(el->env);
+
+ return atom_ok;
+}
static ERL_NIF_TERM
error_tuple(ErlNifEnv *env, int errnum)
@@ -674,7 +893,11 @@ static ErlNifFunc nif_funcs[] = {
{"memcpy", 2, nif_memcpy},
{"buf", 1, nif_buf},
+ {"watcher_create", 3, nif_watcher_create},
+ {"watcher_arm", 1, nif_watcher_arm},
+ {"watcher_disarm", 1, nif_watcher_disarm},
+
{"errno_id", 1, nif_errno_id}
};
-ERL_NIF_INIT(procket, nif_funcs, load, NULL, NULL, NULL)
+ERL_NIF_INIT(procket, nif_funcs, load, NULL, NULL, unload)
View
2 rebar.config
@@ -2,7 +2,7 @@
{port_sources, ["c_src/procket.c"]}.
{port_envs, [
- {"LDFLAGS", "$LDFLAGS -Lc_src -lancillary"}
+ {"LDFLAGS", "$LDFLAGS -Lc_src -lancillary -lev"}
]}.
{port_pre_script, {"make -C c_src -f Makefile.ancillary -f Makefile.cmd", ""}}.
View
11 src/procket.erl
@@ -51,6 +51,10 @@
buf/1,
memcpy/2,
+ watcher_create/3,
+ watcher_arm/1,
+ watcher_disarm/1,
+
errno_id/1
]).
-export([
@@ -125,6 +129,13 @@ buf(_) ->
memcpy(_,_) ->
erlang:error(not_implemented).
+watcher_create(_,_,_) ->
+ erlang:error(not_implemented).
+watcher_arm(_) ->
+ erlang:error(not_implemented).
+watcher_disarm(_) ->
+ erlang:error(not_implemented).
+
sendto(Socket, Buf) ->
sendto(Socket, Buf, 0, <<>>).
sendto(Socket, Buf, Flags) ->

0 comments on commit ca35538

Please sign in to comment.
Something went wrong with that request. Please try again.