Permalink
Browse files

Defer to polling thread on blocking send

  • Loading branch information...
1 parent bb10850 commit c4598252d2235f2fc211335951d20720751d1103 @evax evax committed Mar 18, 2011
Showing with 143 additions and 78 deletions.
  1. +132 −77 c_src/erlzmq_nif.c
  2. +11 −1 src/erlzmq.erl
View
@@ -24,15 +24,17 @@ typedef struct _erlzmq_socket {
#define erlzmq_TERM 1211981
-typedef struct _erlzmq_recv {
+typedef struct _erlzmq_ipc_request {
ErlNifEnv * env;
ERL_NIF_TERM ref;
int flags;
+ int poll_flag;
+ zmq_msg_t msg;
ErlNifPid pid;
void * socket;
- TAILQ_ENTRY(_erlzmq_recv) recvs;
-} erlzmq_recv;
-TAILQ_HEAD(recvs_head, _erlzmq_recv);
+ TAILQ_ENTRY(_erlzmq_ipc_request) requests;
+} erlzmq_ipc_request;
+TAILQ_HEAD(requests_head, _erlzmq_ipc_request);
// Prototypes
#define NIF(name) ERL_NIF_TERM name(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
@@ -347,57 +349,92 @@ NIF(erlzmq_nif_getsockopt)
NIF(erlzmq_nif_send)
{
+ erlzmq_ipc_request req;
erlzmq_socket * socket;
- int _flags;
ErlNifBinary _bin;
- if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_socket, (void **) &socket)) {
+ if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_socket,
+ (void **) &socket)) {
return enif_make_badarg(env);
}
if (!enif_inspect_iolist_as_binary(env, argv[1], &_bin)) {
return enif_make_badarg(env);
}
- if (!enif_get_int(env, argv[2], &_flags)) {
+ if (!enif_get_int(env, argv[2], &req.flags)) {
return enif_make_badarg(env);
}
+ enif_self(env, &req.pid);
+
int error;
- zmq_msg_t msg;
- if ((error = zmq_msg_init_size(&msg, _bin.size))) {
- return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno()));
+ if (zmq_msg_init_size(&req.msg, _bin.size)) {
+ goto errno_out;
}
- memcpy(zmq_msg_data(&msg), _bin.data, _bin.size);
+ memcpy(zmq_msg_data(&req.msg), _bin.data, _bin.size);
- if ((error = zmq_send(socket->socket, &msg, _flags))) {
- return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno()));
+ if (zmq_send(socket->socket, &req.msg, req.flags|ZMQ_NOBLOCK)) {
+ error = zmq_errno();
+ if (error != EAGAIN || (error == EAGAIN && (req.flags & ZMQ_NOBLOCK))) {
+ zmq_msg_close(&req.msg);
+ goto out;
+ }
+ zmq_msg_t msg;
+ req.poll_flag = ZMQ_POLLOUT;
+ req.env = enif_alloc_env();
+ req.ref = enif_make_ref(req.env);
+ req.socket = socket->socket;
+
+ if (zmq_msg_init_size(&msg, sizeof(erlzmq_ipc_request))) {
+ goto q_err;
+ }
+
+ memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_ipc_request));
+
+ if (zmq_send(socket->context->ipc_socket, &msg, 0)) {
+ zmq_msg_close(&msg);
+ goto q_err;
+ }
+
+ zmq_msg_close(&msg);
+
+ return enif_make_copy(env, req.ref);
+q_err:
+ zmq_msg_close(&req.msg);
+ enif_free_env(req.env);
+ goto errno_out;
}
- zmq_msg_close(&msg);
+ zmq_msg_close(&req.msg);
return enif_make_atom(env, "ok");
+errno_out:
+ error = zmq_errno();
+out:
+ return enif_make_tuple2(env, enif_make_atom(env, "error"),
+ enif_make_int(env, error));
}
NIF(erlzmq_nif_recv)
{
- erlzmq_recv recv;
+ erlzmq_ipc_request req;
erlzmq_socket * socket;
if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_socket,
(void **) &socket)) {
return enif_make_badarg(env);
}
- if (!enif_get_int(env, argv[1], &recv.flags)) {
+ if (!enif_get_int(env, argv[1], &req.flags)) {
return enif_make_badarg(env);
}
- enif_self(env, &recv.pid);
+ enif_self(env, &req.pid);
int error;
zmq_msg_t msg;
@@ -409,34 +446,31 @@ NIF(erlzmq_nif_recv)
// try recv with noblock
if (zmq_recv(socket->socket, &msg, ZMQ_NOBLOCK)) {
error = zmq_errno();
- if (error == EAGAIN) { // if nothing is there, hand it off to the receiver thread
- if (recv.flags & ZMQ_NOBLOCK) {
+ if (error != EAGAIN || (error == EAGAIN && (req.flags & ZMQ_NOBLOCK))) {
goto out;
- }
- recv.env = enif_alloc_env();
- recv.ref = enif_make_ref(recv.env);
- recv.socket = socket->socket;
+ }
+ req.poll_flag = ZMQ_POLLIN;
+ req.env = enif_alloc_env();
+ req.ref = enif_make_ref(req.env);
+ req.socket = socket->socket;
- if (zmq_msg_init_size(&msg, sizeof(erlzmq_recv))) {
- goto q_err;
- }
+ if (zmq_msg_init_size(&msg, sizeof(erlzmq_ipc_request))) {
+ goto q_err;
+ }
- memcpy(zmq_msg_data(&msg), &recv, sizeof(erlzmq_recv));
-
- if (zmq_send(socket->context->ipc_socket, &msg, 0)) {
- zmq_msg_close(&msg);
- goto q_err;
- }
+ memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_ipc_request));
+ if (zmq_send(socket->context->ipc_socket, &msg, 0)) {
zmq_msg_close(&msg);
+ goto q_err;
+ }
- return enif_make_copy(env, recv.ref);
+ zmq_msg_close(&msg);
+
+ return enif_make_copy(env, req.ref);
q_err:
- enif_free_env(recv.env);
- goto errno_out;
- } else {
- goto out;
- }
+ enif_free_env(req.env);
+ goto errno_out;
}
ErlNifBinary bin;
enif_alloc_binary(zmq_msg_size(&msg), &bin);
@@ -460,14 +494,14 @@ void * polling_thread(void * handle)
ERL_NIF_TERM final_ref;
ERL_NIF_TERM final_pid;
- struct recvs_head * recvs_queue;
- recvs_queue = malloc(sizeof(struct recvs_head));
- TAILQ_INIT(recvs_queue);
+ struct requests_head * requests_queue;
+ requests_queue = malloc(sizeof(struct requests_head));
+ TAILQ_INIT(requests_queue);
void *ipc_socket = zmq_socket(ctx->context, ZMQ_PULL);
zmq_connect(ipc_socket,ctx->ipc_socket_name);
- int nreaders = 1;
+ int nwatched = 1;
enif_mutex_lock(ctx->mutex);
ctx->running = 1;
enif_cond_signal(ctx->cond);
@@ -476,24 +510,24 @@ void * polling_thread(void * handle)
while (ctx->running) {
int i;
zmq_msg_t msg;
- erlzmq_recv *r, *rtmp;
+ erlzmq_ipc_request *r, *rtmp;
- zmq_pollitem_t *items = calloc(nreaders, sizeof(zmq_pollitem_t));
+ zmq_pollitem_t *items = calloc(nwatched, sizeof(zmq_pollitem_t));
items[0].socket = ipc_socket;
items[0].events = ZMQ_POLLIN;
- for (i = 1, r = recvs_queue->tqh_first;
- r != NULL; r = r->recvs.tqe_next, i++) {
+ for (i = 1, r = requests_queue->tqh_first;
+ r != NULL; r = r->requests.tqe_next, i++) {
items[i].socket = r->socket;
- items[i].events = ZMQ_POLLIN;
+ items[i].events = r->poll_flag;
}
- zmq_poll(items, nreaders, -1);
+ zmq_poll(items, nwatched, -1);
- for (i = 1, r = recvs_queue->tqh_first;
- r && ((rtmp = r->recvs.tqe_next), 1); r = rtmp, i++) {
+ for (i = 1, r = requests_queue->tqh_first;
+ r && ((rtmp = r->requests.tqe_next), 1); r = rtmp, i++) {
if (items[i].revents & ZMQ_POLLIN) {
- nreaders--;
+ nwatched--;
ErlNifBinary bin;
zmq_msg_init(&msg);
@@ -509,27 +543,46 @@ void * polling_thread(void * handle)
enif_make_copy(r->env, r->ref),
enif_make_binary(r->env, &bin)));
enif_free_env(r->env);
- TAILQ_REMOVE(recvs_queue, r, recvs);
+ TAILQ_REMOVE(requests_queue, r, requests);
+ free(r);
+ }
+ if (items[i].revents & ZMQ_POLLOUT) {
+ nwatched--;
+ if (zmq_send(r->socket, &r->msg, r->flags)) {
+ enif_send(NULL, &r->pid, r->env,
+ enif_make_tuple3(r->env,
+ enif_make_copy(r->env, r->ref),
+ enif_make_atom(r->env, "error"),
+ enif_make_int(r->env, zmq_errno())));
+ } else {
+ enif_send(NULL, &r->pid, r->env,
+ enif_make_tuple2(r->env,
+ enif_make_copy(r->env, r->ref),
+ enif_make_atom(r->env, "ok")));
+ }
+ zmq_msg_close(&r->msg);
+ enif_free_env(r->env);
+ TAILQ_REMOVE(requests_queue, r, requests);
free(r);
}
}
if (items[0].revents & ZMQ_POLLIN) {
zmq_msg_init(&msg);
if (!zmq_recv(items[0].socket, &msg, 0)) {
- erlzmq_recv * recv = (erlzmq_recv *) zmq_msg_data(&msg);
- if (recv->flags & erlzmq_TERM) {
+ erlzmq_ipc_request * req = (erlzmq_ipc_request *)zmq_msg_data(&msg);
+ if (req->flags & erlzmq_TERM) {
+
+ final_ref = enif_make_copy(final_env, req->ref);
+ final_pid = enif_make_pid(final_env, &req->pid);
- final_ref = enif_make_copy(final_env, recv->ref);
- final_pid = enif_make_pid(final_env, &recv->pid);
-
- enif_free_env(recv->env);
+ enif_free_env(req->env);
ctx->running = 0;
goto out;
}
- nreaders++;
- erlzmq_recv * r = malloc(sizeof(erlzmq_recv));
- memcpy(r, recv, sizeof(erlzmq_recv));
- TAILQ_INSERT_TAIL(recvs_queue, r, recvs);
+ nwatched++;
+ erlzmq_ipc_request * r = malloc(sizeof(erlzmq_ipc_request));
+ memcpy(r, req, sizeof(erlzmq_ipc_request));
+ TAILQ_INSERT_TAIL(requests_queue, r, requests);
}
out:
zmq_msg_close(&msg);
@@ -540,12 +593,12 @@ void * polling_thread(void * handle)
enif_mutex_unlock(ctx->mutex);
// cleanup reader's queue
- erlzmq_recv * r;
- while ((r = recvs_queue->tqh_first) != NULL) {
- TAILQ_REMOVE(recvs_queue, recvs_queue->tqh_first, recvs);
+ erlzmq_ipc_request * r;
+ while ((r = requests_queue->tqh_first) != NULL) {
+ TAILQ_REMOVE(requests_queue, requests_queue->tqh_first, requests);
free(r);
}
- free(recvs_queue);
+ free(requests_queue);
zmq_close(ipc_socket);
zmq_close(ctx->ipc_socket);
@@ -574,21 +627,23 @@ NIF(erlzmq_nif_close)
erlzmq_socket * socket;
- if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_socket, (void **) &socket)) {
+ if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_socket,
+ (void **) &socket)) {
return enif_make_badarg(env);
}
enif_release_resource(socket);
if (-1 == zmq_close(socket->socket)) {
- return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno()));
+ return enif_make_tuple2(env, enif_make_atom(env, "error"),
+ enif_make_int(env, zmq_errno()));
} else {
return enif_make_atom(env, "ok");
}
}
-NIF(erlzmq_nif_term)
+NIF(erlzmq_nif_term)
{
erlzmq_context * ctx;
@@ -598,23 +653,23 @@ NIF(erlzmq_nif_term)
}
zmq_msg_t msg;
- erlzmq_recv recv;
+ erlzmq_ipc_request req;
- recv.flags = erlzmq_TERM;
- recv.env = enif_alloc_env();
- recv.ref = enif_make_ref(recv.env);
- enif_self(env, &recv.pid);
+ req.flags = erlzmq_TERM;
+ req.env = enif_alloc_env();
+ req.ref = enif_make_ref(req.env);
+ enif_self(env, &req.pid);
- zmq_msg_init_size(&msg, sizeof(erlzmq_recv));
- memcpy(zmq_msg_data(&msg), &recv, sizeof(erlzmq_recv));
+ zmq_msg_init_size(&msg, sizeof(erlzmq_ipc_request));
+ memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_ipc_request));
enif_mutex_lock(ctx->mutex);
zmq_send(ctx->ipc_socket, &msg, 0);
enif_mutex_unlock(ctx->mutex);
zmq_msg_close(&msg);
enif_release_resource(ctx);
- return enif_make_copy(env, recv.ref);
+ return enif_make_copy(env, req.ref);
}
static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
View
@@ -86,7 +86,17 @@ send(Socket, Binary) ->
-spec send(Socket :: erlzmq_socket(), Data :: erlzmq_data(), Flags :: erlzmq_send_recv_flags()) -> ok | erlzmq_error().
send(Socket, Binary, Flags) when is_list(Flags) ->
- erlzmq_result(erlzmq_nif:send(Socket, Binary, sendrecv_flags(Flags))).
+ case erlzmq_nif:send(Socket, Binary, sendrecv_flags(Flags)) of
+ Ref when is_reference(Ref) ->
+ receive
+ {Ref, ok} ->
+ ok;
+ {Ref, error, Error} ->
+ {error, Error}
+ end;
+ Result ->
+ erlzmq_result(Result)
+ end.
%% @equiv recv(Socket, 0)
%% @spec recv(erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error()

0 comments on commit c459825

Please sign in to comment.