Permalink
Browse files

Use a non-blocking interface to libvirt

Create a non-blocking interface to libvirt by running the libvirt
operations in a thread. The thread servicing Erlang and the thread
running libvirt operations communicate over a socket.

The Erlang end is non-blocking and will return immediately. If the libvirt
end is not reading quickly enough, the Erlang side should be returned
an error message (I'm assuming eagain, but I haven't tested this theory
yet!).

The Erlang side sends a struct that basically contains an ErlNifFunc.
This is used to call the actual blocking function that used to be exposed
via the NIF interface.

The libvirt thread stays in a loop, blocking on its socket. When data
is sent over the socket, the thread reads it and through the magic of
icky shared state, the libvirt wrapping function is called.

Tested by running the example scripts. Nothing segfaulted. I declare
success!
  • Loading branch information...
1 parent 01a10bf commit e636465361ba5bfa760d940b964b82ec37fce863 @msantos committed Apr 9, 2011
Showing with 290 additions and 147 deletions.
  1. +10 −8 README.md
  2. +198 −32 c_src/vert.c
  3. +7 −0 c_src/vert_util.c
  4. +1 −0 c_src/vert_util.h
  5. +74 −107 src/vert.erl
View
@@ -13,14 +13,16 @@ This version uses the libvirtd remote procotol over a Unix socket.
## WARNING
- The libvirt API is not safe.
-
- Aside from being needlessly huge and error prone, the API is
- inconsistent: some functions require memory to be freed for one
- type of resource but not for other resource types. Some functions
- are blocking and will block the Erlang scheduler. Inconsistencies
- between the same functions for different resources and the sheer
- size of the API mean that there will be mistakes.
+ The libvirt API is not safe. It is huge, inconsistent and error
+ prone.
+
+ The current implementation calls all libvirt functions in a thread
+ so the Erlang VM will not block. If libvirt blocks, the caller will
+ receive an error immediately ({error, eagain}). This means only one
+ call from a single Erlang VM into libvirt can be running at any time.
+
+ These bindings have not been heavily tested, are still under
+ development and will undergo many changes.
## HOW TO BUILD IT
View
@@ -37,10 +37,90 @@
#include "vert_network.h"
#include "vert_resource.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <errno.h>
+
+
+#define VERT_READ 0
+#define VERT_WRITE 1
+
+#define MAX_ATOM_LEN 255
+
+typedef struct _vert_state {
+ ErlNifTid tid;
+ int fd[2];
+} VERT_STATE;
+
+typedef struct _vert_cast {
+ ErlNifPid *pid;
+ ERL_NIF_TERM (*fptr)(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
+ int argc;
+ void *argv;
+ char *name;
+} VERT_CAST;
+
+void *vert_loop(void *arg);
+
+static ErlNifFunc vert_funcs[] = {
+ /* connect */
+ {"connect_open", 2, vert_connect_open},
+ {"connect_close", 1, vert_connect_close},
+
+ {"connect_get", 2, vert_connect_get},
+ {"connect_get", 3, vert_connect_get},
+
+ {"connect_get_numactive", 2, vert_connect_numactive},
+ {"connect_get_numinactive", 2, vert_connect_numinactive},
+ {"connect_get_listactive", 3, vert_connect_listactive},
+ {"connect_get_listinactive", 3, vert_connect_listinactive},
+
+ /* domain */
+ {"domain_lookup", 3, vert_domain_lookup},
+
+ {"domain_get", 2, vert_domain_get},
+ {"domain_get", 3, vert_domain_get},
+
+ {"domain_save", 2, vert_domain_save},
+ {"domain_restore", 2, vert_domain_restore},
+ {"domain_shutdown", 1, vert_domain_shutdown},
+ {"domain_suspend", 1, vert_domain_suspend},
+ {"domain_resume", 1, vert_domain_resume},
+
+ {"domain_set_autostart", 2, vert_domain_autostart},
+
+ /* interface */
+ {"interface_lookup", 3, vert_interface_lookup},
+ {"interface_get", 2, vert_interface_get},
+
+ /* network */
+ {"network_get", 2, vert_network_get},
+ {"network_lookup", 3, vert_network_lookup},
+
+ /* all resource types */
+ {"resource_define", 3, vert_resource_define},
+ {"resource_undefine", 1, vert_resource_undefine},
+ {"resource_create", 2, vert_resource_create},
+ {"resource_destroy", 1, vert_resource_destroy},
+
+ {NULL, 0, NULL}
+};
+
static int
load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
{
+ VERT_STATE *state = NULL;
+ int flags = 0;
+
+
+ state = enif_alloc(sizeof(VERT_STATE));
+
+ if (state == NULL)
+ return -1;
+
atom_ok = enif_make_atom(env, "ok");
atom_error = enif_make_atom(env, "error");
atom_undefined = enif_make_atom(env, "undefined");
@@ -65,6 +145,22 @@ load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
* */
virSetErrorFunc(NULL, null_logger);
+ /* Create a thread for blocking libvirt operations */
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, state->fd) < 0)
+ return -1;
+
+ /* Set the write socket (request from Erlang) to non-blocking.
+ * The read end (read by the thread calling into libvirt) is
+ * blocking by default */
+ flags = fcntl(state->fd[VERT_WRITE], F_GETFL, 0);
+ if (fcntl(state->fd[VERT_WRITE], F_SETFL, flags | O_NONBLOCK) < 0)
+ return -1;
+
+ if (enif_thread_create("vert_loop", &state->tid, vert_loop, state, NULL) != 0)
+ return -1;
+
+ *priv_data = state;
+
return 0;
}
@@ -75,47 +171,117 @@ unload(ErlNifEnv *env, void *priv_data)
}
-static ErlNifFunc nif_funcs[] = {
- /* connect */
- {"connect_open", 2, vert_connect_open},
- {"connect_close", 1, vert_connect_close},
+ void *
+vert_loop(void *arg)
+{
+ VERT_STATE *state = arg;
+ VERT_CAST *cmd = NULL;
+ ErlNifEnv *env = NULL;
+ ERL_NIF_TERM res = {0};
- {"connect_get", 2, vert_connect_get},
- {"connect_get", 3, vert_connect_get},
+ fd_set rfds;
+ ssize_t n = 0;
- {"connect_get_numactive", 2, vert_connect_numactive},
- {"connect_get_numinactive", 2, vert_connect_numinactive},
- {"connect_get_listactive", 3, vert_connect_listactive},
- {"connect_get_listinactive", 3, vert_connect_listinactive},
- /* domain */
- {"domain_lookup", 3, vert_domain_lookup},
+ env = enif_alloc_env();
+ cmd = enif_alloc(sizeof(VERT_CAST));
- {"domain_get", 2, vert_domain_get},
- {"domain_get", 3, vert_domain_get},
+ if ( (env == NULL) || (cmd == NULL))
+ goto ERR;
- {"domain_save", 2, vert_domain_save},
- {"domain_restore", 2, vert_domain_restore},
- {"domain_shutdown", 1, vert_domain_shutdown},
- {"domain_suspend", 1, vert_domain_suspend},
- {"domain_resume", 1, vert_domain_resume},
+ for ( ; ; ) {
+ FD_ZERO(&rfds);
+ FD_SET(state->fd[VERT_READ], &rfds);
- {"domain_set_autostart", 2, vert_domain_autostart},
+ n = select(state->fd[VERT_READ]+1, &rfds, NULL, NULL, NULL);
- /* interface */
- {"interface_lookup", 3, vert_interface_lookup},
- {"interface_get", 2, vert_interface_get},
+ if (n < 0) {
+ switch (errno) {
+ case EAGAIN:
+ case EINTR:
+ continue;
+ default:
+ goto ERR;
+ }
+ }
- /* network */
- {"network_get", 2, vert_network_get},
- {"network_lookup", 3, vert_network_lookup},
+ if (read(state->fd[VERT_READ], cmd, sizeof(VERT_CAST)) < 0)
+ goto ERR;
- /* all resource types */
- {"resource_define", 3, vert_resource_define},
- {"resource_undefine", 1, vert_resource_undefine},
- {"resource_create", 2, vert_resource_create},
- {"resource_destroy", 1, vert_resource_destroy},
+ res = (*cmd->fptr)(env, cmd->argc, (ERL_NIF_TERM *)cmd->argv);
+
+ (void)enif_send(NULL, cmd->pid, env, res);
+
+ enif_free(cmd->pid);
+ enif_free(cmd->argv);
+
+ enif_clear_env(env);
+ }
+
+ERR:
+ enif_free(cmd);
+ enif_free_env(env);
+ return NULL;
+}
+
+
+ ERL_NIF_TERM
+vert_cast(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
+{
+ VERT_STATE *state = NULL;
+ VERT_CAST *cmd = NULL;
+ char buf[MAX_ATOM_LEN+1];
+ int i = 0;
+ int found = 0;
+ ErlNifPid *pid = NULL;
+
+
+ state = enif_priv_data(env);
+
+ if (!enif_get_atom(env, argv[0], buf, sizeof(buf), ERL_NIF_LATIN1)
+ || strcmp("cast", buf) == 0)
+ return enif_make_badarg(env);
+
+ for (i = 0; vert_funcs[i].name != NULL; i++) {
+ if ( (strcmp(vert_funcs[i].name, buf) == 0) &&
+ (vert_funcs[i].arity == argc-1)) {
+ found = 1;
+ break;
+ }
+ }
+
+ if (found == 0)
+ return enif_make_badarg(env);
+
+ cmd = enif_alloc(sizeof(VERT_CAST));
+ cmd->argv = enif_alloc(sizeof(ERL_NIF_TERM) * (argc-1));
+ pid = enif_alloc(sizeof(ErlNifPid));
+
+ if ( (cmd == NULL) || (cmd->argv == NULL) || (pid == NULL))
+ return error_tuple(env, atom_enomem);
+
+ (void)enif_self(env, pid);
+
+ cmd->pid = pid;
+ cmd->fptr = vert_funcs[i].fptr;
+ (void)memcpy(cmd->argv, &argv[1], sizeof(ERL_NIF_TERM) * (argc-1));
+ cmd->argc = argc-1;
+
+ cmd->name = (char *)vert_funcs[i].name;
+
+ if (write(state->fd[VERT_WRITE], cmd, sizeof(VERT_CAST)) < 0)
+ return error_errno(env, errno);
+
+ enif_free(cmd);
+
+ return atom_ok;
+}
+
+
+static ErlNifFunc nif_funcs[] = {
+ {"cast", 2, vert_cast},
+ {"cast", 3, vert_cast},
+ {"cast", 4, vert_cast},
};
ERL_NIF_INIT(vert, nif_funcs, load, NULL, NULL, unload)
-
View
@@ -31,6 +31,7 @@
*/
#include "vert.h"
#include "vert_util.h"
+#include "erl_driver.h"
ERL_NIF_TERM
@@ -54,6 +55,12 @@ error_string(ErlNifEnv *env, char *err)
(err ? enif_make_string(env, err, ERL_NIF_LATIN1) : atom_unsupported));
}
+ ERL_NIF_TERM
+error_errno(ErlNifEnv *env, int errnum)
+{
+ return error_tuple(env, enif_make_atom(env, erl_errno_id(errnum)));
+}
+
ERL_NIF_TERM
error_tuple(ErlNifEnv *env, ERL_NIF_TERM error)
{
View
@@ -31,6 +31,7 @@
*/
ERL_NIF_TERM verterr(ErlNifEnv *env);
ERL_NIF_TERM error_string(ErlNifEnv *env, char *err);
+ERL_NIF_TERM error_errno(ErlNifEnv *env, int errnum);
ERL_NIF_TERM error_tuple(ErlNifEnv *env, ERL_NIF_TERM error);
ERL_NIF_TERM vert_make_resource(ErlNifEnv *env, ERL_NIF_TERM type, ERL_NIF_TERM resource);
ERL_NIF_TERM bincopy(ErlNifEnv *env, void *src, size_t len);
Oops, something went wrong.

0 comments on commit e636465

Please sign in to comment.