Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Fix remaining problems that occur infrequently when running the eunit tests #55

Merged
merged 1 commit into from

2 participants

@okeuday

Fix remaining problems that occur infrequently when running the eunit tests (problems due to race conditions). The two remaining eunit test problems that remain are related to https://zeromq.jira.com/browse/LIBZMQ-496 (which causes "Device or resource busy (mutex.hpp:92)" for most of the remaining failures) and https://zeromq.jira.com/browse/LIBZMQ-478 (which causes "Assertion failed: ok (mailbox.cpp:79)"). The changes include proper error handling of a receive within the erlzmq2 thread, but they mainly focus on safe mutex usage for all operations. The changes solve problems that previously occurred normally with socket destruction and context termination. All the changes were verified by running the eunit tests continuously for more than 4 days under CentOS.

@okeuday okeuday Fix remaining problems that occur infrequently when running the eunit…
… tests (problems due to race conditions). The two remaining eunit test problems that remain are related to https://zeromq.jira.com/browse/LIBZMQ-496 (which causes "Device or resource busy (mutex.hpp:92)" for most of the remaining failures) and https://zeromq.jira.com/browse/LIBZMQ-478 (which causes "Assertion failed: ok (mailbox.cpp:79)").  The changes include proper error handling of a receive within the erlzmq2 thread, but they mainly focus on safe mutex usage for all operations.  The changes solve problems that previously occurred normally with socket destruction and context termination.  All the changes were verified by running the eunit tests continuously for more than 4 days under CentOS.
56b4dc2
@yrashk yrashk commented on the diff
c_src/erlzmq_nif.c
@@ -204,9 +206,10 @@
sizeof(erlzmq_socket_t));
assert(socket);
socket->context = context;
- socket->socket_index = context->socket_index++;
+ socket->socket_index = context->socket_index;
@yrashk Owner
yrashk added a note

I am not sure I follow this bit

@okeuday
okeuday added a note

It moved to after the if statement for the error condition. There were other small issues with the NIF env having free/new instead of clear and missing the zmq msg close.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@okeuday

To provide more detail about the changes: many of the changes are to make sure that when pointers are deallocated, they are set to 0 (i.e., NULL), so that when they can be potentially accessed (due to concurrency and race conditions) the situation can be handled properly. The added validation relates to context->context_zmq for the ZeroMQ context instance and both socket->mutex and socket->socket_zmq for the ZeroMQ socket instance. The concurrency related race conditions appear during the eunit tests due to the rapid destruction of sockets and the ZeroMQ context. There are other minor bug fixes, but the most important bug fix is for receives within the erlzmq2 thread when the ZeroMQ socket has a timeout set (active receives in this case will ignore the ZeroMQ socket timeout, i.e., the EAGAIN error, but passive receives always return the error... before the code would break on an assert).

@yrashk yrashk merged commit 9b3a788 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 5, 2013
  1. @okeuday

    Fix remaining problems that occur infrequently when running the eunit…

    okeuday authored
    … tests (problems due to race conditions). The two remaining eunit test problems that remain are related to https://zeromq.jira.com/browse/LIBZMQ-496 (which causes "Device or resource busy (mutex.hpp:92)" for most of the remaining failures) and https://zeromq.jira.com/browse/LIBZMQ-478 (which causes "Assertion failed: ok (mailbox.cpp:79)").  The changes include proper error handling of a receive within the erlzmq2 thread, but they mainly focus on safe mutex usage for all operations.  The changes solve problems that previously occurred normally with socket destruction and context termination.  All the changes were verified by running the eunit tests continuously for more than 4 days under CentOS.
This page is out of date. Refresh to see the latest.
Showing with 439 additions and 140 deletions.
  1. +340 −118 c_src/erlzmq_nif.c
  2. +99 −22 test/erlzmq_test.erl
View
458 c_src/erlzmq_nif.c
@@ -140,7 +140,8 @@ NIF(erlzmq_nif_context)
sizeof(erlzmq_context_t));
assert(context);
context->context_zmq = zmq_init(thread_count);
- if (!context->context_zmq) {
+ if (! context->context_zmq) {
+ enif_release_resource(context);
return return_zmq_errno(env, zmq_errno());
}
@@ -167,6 +168,7 @@ NIF(erlzmq_nif_context)
if (value_errno) {
free(context->thread_socket_name);
zmq_close(context->thread_socket);
+ enif_mutex_destroy(context->mutex);
zmq_term(context->context_zmq);
enif_release_resource(context);
return return_zmq_errno(env, value_errno);
@@ -204,9 +206,10 @@ NIF(erlzmq_nif_socket)
sizeof(erlzmq_socket_t));
assert(socket);
socket->context = context;
- socket->socket_index = context->socket_index++;
+ socket->socket_index = context->socket_index;
@yrashk Owner
yrashk added a note

I am not sure I follow this bit

@okeuday
okeuday added a note

It moved to after the if statement for the error condition. There were other small issues with the NIF env having free/new instead of clear and missing the zmq msg close.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
socket->socket_zmq = zmq_socket(context->context_zmq, socket_type);
- if (!socket->socket_zmq) {
+ if (! socket->socket_zmq) {
+ enif_release_resource(socket);
return return_zmq_errno(env, zmq_errno());
}
socket->active = active;
@@ -214,6 +217,7 @@ NIF(erlzmq_nif_socket)
socket->mutex = enif_mutex_create("erlzmq_socket_t_mutex");
assert(socket->mutex);
+ context->socket_index++;
return enif_make_tuple2(env, enif_make_atom(env, "ok"), enif_make_tuple2(env,
enif_make_uint64(env, socket->socket_index),
enif_make_resource(env, socket)));
@@ -239,21 +243,33 @@ NIF(erlzmq_nif_bind)
return enif_make_badarg(env);
}
+ if (! socket->mutex) {
+ free(endpoint);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_bind(socket->socket_zmq, endpoint)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ free(endpoint);
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_bind(socket->socket_zmq, endpoint)) {
enif_mutex_unlock(socket->mutex);
free(endpoint);
return return_zmq_errno(env, zmq_errno());
}
+ else if (socket->active == ERLZMQ_SOCKET_ACTIVE_PENDING) {
+ socket->active = ERLZMQ_SOCKET_ACTIVE_ON;
+ enif_mutex_unlock(socket->mutex);
+ free(endpoint);
+ return add_active_req(env, socket);
+ }
else {
enif_mutex_unlock(socket->mutex);
free(endpoint);
- if (socket->active == ERLZMQ_SOCKET_ACTIVE_PENDING) {
- return add_active_req(env, socket);
- }
- else {
- return enif_make_atom(env, "ok");
- }
+ return enif_make_atom(env, "ok");
}
}
@@ -277,21 +293,31 @@ NIF(erlzmq_nif_connect)
return enif_make_badarg(env);
}
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_connect(socket->socket_zmq, endpoint)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_connect(socket->socket_zmq, endpoint)) {
enif_mutex_unlock(socket->mutex);
free(endpoint);
return return_zmq_errno(env, zmq_errno());
}
+ else if (socket->active == ERLZMQ_SOCKET_ACTIVE_PENDING) {
+ socket->active = ERLZMQ_SOCKET_ACTIVE_ON;
+ enif_mutex_unlock(socket->mutex);
+ free(endpoint);
+ return add_active_req(env, socket);
+ }
else {
enif_mutex_unlock(socket->mutex);
free(endpoint);
- if (socket->active == ERLZMQ_SOCKET_ACTIVE_PENDING) {
- return add_active_req(env, socket);
- }
- else {
- return enif_make_atom(env, "ok");
- }
+ return enif_make_atom(env, "ok");
}
}
@@ -368,9 +394,18 @@ NIF(erlzmq_nif_setsockopt)
return enif_make_badarg(env);
}
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_setsockopt(socket->socket_zmq, option_name,
- option_value, option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_setsockopt(socket->socket_zmq, option_name,
+ option_value, option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -404,9 +439,18 @@ NIF(erlzmq_nif_getsockopt)
// int64_t
case ZMQ_MAXMSGSIZE:
option_len = sizeof(value_int64);
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_getsockopt(socket->socket_zmq, option_name,
- &value_int64, &option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_getsockopt(socket->socket_zmq, option_name,
+ &value_int64, &option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -416,9 +460,18 @@ NIF(erlzmq_nif_getsockopt)
// uint64_t
case ZMQ_AFFINITY:
option_len = sizeof(value_uint64);
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_getsockopt(socket->socket_zmq, option_name,
- &value_uint64, &option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_getsockopt(socket->socket_zmq, option_name,
+ &value_uint64, &option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -428,9 +481,18 @@ NIF(erlzmq_nif_getsockopt)
// binary
case ZMQ_IDENTITY:
option_len = sizeof(option_value);
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_getsockopt(socket->socket_zmq, option_name,
- option_value, &option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_getsockopt(socket->socket_zmq, option_name,
+ option_value, &option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -459,9 +521,18 @@ NIF(erlzmq_nif_getsockopt)
case ZMQ_EVENTS:
case ZMQ_FD: // FIXME: ZMQ_FD returns SOCKET on Windows
option_len = sizeof(value_int);
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_getsockopt(socket->socket_zmq, option_name,
- &value_int, &option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_getsockopt(socket->socket_zmq, option_name,
+ &value_int, &option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -500,9 +571,19 @@ NIF(erlzmq_nif_send)
int polling_thread_send = 1;
if (! socket->active) {
+ // try send
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_sendmsg(socket->socket_zmq, &req.data.send.msg,
- req.data.send.flags | ZMQ_DONTWAIT) == -1) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_sendmsg(socket->socket_zmq, &req.data.send.msg,
+ req.data.send.flags | ZMQ_DONTWAIT) == -1) {
enif_mutex_unlock(socket->mutex);
int const error = zmq_errno();
if (error != EAGAIN ||
@@ -510,6 +591,7 @@ NIF(erlzmq_nif_send)
zmq_msg_close(&req.data.send.msg);
return return_zmq_errno(env, error);
}
+ // if it fails, use the context thread poll for the send
}
else {
enif_mutex_unlock(socket->mutex);
@@ -533,12 +615,15 @@ NIF(erlzmq_nif_send)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ if (! socket->context->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->context->mutex);
- if (socket->context->thread_socket_name == NULL) {
+ if (! socket->context->thread_socket_name) {
enif_mutex_unlock(socket->context->mutex);
return return_zmq_errno(env, ETERM);
}
- if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ else if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
@@ -587,8 +672,20 @@ NIF(erlzmq_nif_recv)
return return_zmq_errno(env, zmq_errno());
}
// try recv with noblock
+ // if it fails, use the context thread poll for the recv
+ if (! socket->mutex) {
+ zmq_msg_close(&msg);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_recvmsg(socket->socket_zmq, &msg, ZMQ_DONTWAIT) == -1) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ zmq_msg_close(&msg);
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_recvmsg(socket->socket_zmq, &msg, ZMQ_DONTWAIT) == -1) {
enif_mutex_unlock(socket->mutex);
int const error = zmq_errno();
zmq_msg_close(&msg);
@@ -611,25 +708,32 @@ NIF(erlzmq_nif_recv)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ if (! socket->context->mutex) {
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.recv.env);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->context->mutex);
- if (socket->context->thread_socket_name == NULL) {
- enif_mutex_unlock(socket->context->mutex);
+ if (! socket->context->thread_socket_name) {
+ if (socket->context->mutex) {
+ enif_mutex_unlock(socket->context->mutex);
+ }
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.recv.env);
return return_zmq_errno(env, ETERM);
}
- if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ else if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
-
zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
else {
enif_mutex_unlock(socket->context->mutex);
-
zmq_msg_close(&msg);
+
// each pointer to the socket in a request increments the reference
enif_keep_resource(socket);
-
return enif_make_copy(env, req.data.recv.ref);
}
}
@@ -671,19 +775,36 @@ NIF(erlzmq_nif_close)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ if (! socket->context->mutex) {
+ zmq_msg_close(&msg);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->context->mutex);
- if (socket->context->thread_socket_name == NULL) {
+ if (! socket->context->thread_socket_name) {
// context is gone
- enif_mutex_lock(socket->mutex);
+ if (socket->context->mutex) {
+ enif_mutex_unlock(socket->context->mutex);
+ }
zmq_msg_close(&msg);
+ enif_free_env(req.data.close.env);
+
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
+ enif_mutex_lock(socket->mutex);
+ if (! socket->socket_zmq) {
+ enif_mutex_unlock(socket->mutex);
+ return return_zmq_errno(env, ETERM);
+ }
zmq_close(socket->socket_zmq);
+ socket->socket_zmq = 0;
enif_mutex_unlock(socket->mutex);
enif_mutex_destroy(socket->mutex);
+ socket->mutex = 0;
enif_release_resource(socket);
- enif_mutex_unlock(socket->context->mutex);
return enif_make_atom(env, "ok");
}
- if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ else if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.close.env);
@@ -702,8 +823,8 @@ NIF(erlzmq_nif_term)
{
erlzmq_context_t * context;
- if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_context,
- (void **) &context)) {
+ if (! enif_get_resource(env, argv[0], erlzmq_nif_resource_context,
+ (void **) &context)) {
return enif_make_badarg(env);
}
@@ -721,8 +842,21 @@ NIF(erlzmq_nif_term)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ if (! context->mutex) {
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.term.env);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(context->mutex);
- if (zmq_sendmsg(context->thread_socket, &msg, 0) == -1) {
+ if (! context->thread_socket_name) {
+ if (context->mutex) {
+ enif_mutex_unlock(context->mutex);
+ }
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.term.env);
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_sendmsg(context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.term.env);
@@ -781,28 +915,34 @@ static void * polling_thread(void * handle)
if (vector_get(zmq_pollitem_t, &items_zmq, 0)->revents & ZMQ_POLLIN) {
--count;
}
- for (i = 1; i < vector_count(&items_zmq); ++i) {
+ for (i = 1; i < vector_count(&items_zmq) && count > 0; ++i) {
zmq_pollitem_t * item = vector_get(zmq_pollitem_t, &items_zmq, i);
erlzmq_thread_request_t * r = vector_get(erlzmq_thread_request_t,
&requests, i);
if (item->revents & ZMQ_POLLIN) {
- size_t value_len = sizeof(int64_t);
- int64_t flag_value = 0;
-
assert(r->type == ERLZMQ_THREAD_REQUEST_RECV);
--count;
+ item->revents = 0;
zmq_msg_t msg;
- zmq_msg_init(&msg);
+ if (zmq_msg_init(&msg)) {
+ fprintf(stderr, "zmq_msg_init error: %s\n",
+ strerror(zmq_errno()));
+ assert(0);
+ }
+ int keep_socket = 0;
+ assert(r->data.recv.socket->mutex);
enif_mutex_lock(r->data.recv.socket->mutex);
if (zmq_recvmsg(r->data.recv.socket->socket_zmq, &msg,
- r->data.recv.flags) == -1 ||
- (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
- zmq_getsockopt(r->data.recv.socket->socket_zmq,
- ZMQ_RCVMORE, &flag_value, &value_len)) )
- {
+ r->data.recv.flags) == -1) {
enif_mutex_unlock(r->data.recv.socket->mutex);
- if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
+ zmq_msg_close(&msg);
+ int const error = zmq_errno();
+ if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
+ error == EAGAIN) {
+ keep_socket = 1;
+ }
+ else if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
enif_make_tuple3(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
@@ -810,55 +950,66 @@ static void * polling_thread(void * handle)
enif_make_uint64(r->data.recv.env,
r->data.recv.socket->socket_index),
enif_make_resource(r->data.recv.env, r->data.recv.socket)),
- return_zmq_errno(r->data.recv.env, zmq_errno())));
- enif_free_env(r->data.recv.env);
- r->data.recv.env = enif_alloc_env();
- item->revents = 0;
+ return_zmq_errno(r->data.recv.env, error)));
}
else {
- assert(0);
+ // an EAGAIN error could occur if a timeout is set on the socket
+ enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
+ enif_make_tuple2(r->data.recv.env,
+ enif_make_copy(r->data.recv.env, r->data.recv.ref),
+ return_zmq_errno(r->data.recv.env, error)));
}
}
else {
enif_mutex_unlock(r->data.recv.socket->mutex);
- }
-
- ErlNifBinary binary;
- enif_alloc_binary(zmq_msg_size(&msg), &binary);
- memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
- zmq_msg_close(&msg);
-
- if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
- ERL_NIF_TERM flags_list;
- // Should we send the multipart flag
- if(flag_value == 1) {
- flags_list = enif_make_list1(r->data.recv.env,
- enif_make_atom(r->data.recv.env,
- "rcvmore"));
- } else {
- flags_list = enif_make_list(r->data.recv.env, 0);
+ ErlNifBinary binary;
+ enif_alloc_binary(zmq_msg_size(&msg), &binary);
+ memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
+ zmq_msg_close(&msg);
+
+ if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
+ ERL_NIF_TERM flags_list;
+
+ // Should we send the multipart flag
+ size_t value_len = sizeof(int64_t);
+ int64_t flag_value = 0;
+ if (zmq_getsockopt(r->data.recv.socket->socket_zmq,
+ ZMQ_RCVMORE, &flag_value, &value_len)) {
+ fprintf(stderr, "zmq_getsockopt error: %s\n",
+ strerror(zmq_errno()));
+ assert(0);
+ }
+ if(flag_value == 1) {
+ flags_list = enif_make_list1(r->data.recv.env,
+ enif_make_atom(r->data.recv.env,
+ "rcvmore"));
+ } else {
+ flags_list = enif_make_list(r->data.recv.env, 0);
+ }
+
+ enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
+ enif_make_tuple4(r->data.recv.env,
+ enif_make_atom(r->data.recv.env, "zmq"),
+ enif_make_tuple2(r->data.recv.env,
+ enif_make_uint64(r->data.recv.env,
+ r->data.recv.socket->socket_index),
+ enif_make_resource(r->data.recv.env, r->data.recv.socket)),
+ enif_make_binary(r->data.recv.env, &binary),
+ flags_list));
+ keep_socket = 1;
}
-
- enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
- enif_make_tuple4(r->data.recv.env,
- enif_make_atom(r->data.recv.env, "zmq"),
+ else {
+ enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_make_tuple2(r->data.recv.env,
- enif_make_uint64(r->data.recv.env,
- r->data.recv.socket->socket_index),
- enif_make_resource(r->data.recv.env, r->data.recv.socket)),
- enif_make_binary(r->data.recv.env, &binary),
- flags_list));
- enif_free_env(r->data.recv.env);
- r->data.recv.env = enif_alloc_env();
- item->revents = 0;
+ enif_make_copy(r->data.recv.env, r->data.recv.ref),
+ enif_make_binary(r->data.recv.env, &binary)));
+ }
+ }
+ if (keep_socket) {
+ enif_clear_env(r->data.recv.env);
}
else {
- enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
- enif_make_tuple2(r->data.recv.env,
- enif_make_copy(r->data.recv.env, r->data.recv.ref),
- enif_make_binary(r->data.recv.env, &binary)));
-
enif_free_env(r->data.recv.env);
enif_release_resource(r->data.recv.socket);
@@ -872,10 +1023,12 @@ static void * polling_thread(void * handle)
else if (item->revents & ZMQ_POLLOUT) {
assert(r->type == ERLZMQ_THREAD_REQUEST_SEND);
--count;
+ item->revents = 0;
+ assert(r->data.send.socket->mutex);
enif_mutex_lock(r->data.send.socket->mutex);
if (zmq_sendmsg(r->data.send.socket->socket_zmq,
- &r->data.send.msg, r->data.send.flags) == -1) {
+ &r->data.send.msg, r->data.send.flags) == -1) {
enif_mutex_unlock(r->data.send.socket->mutex);
enif_send(NULL, &r->data.send.pid, r->data.send.env,
enif_make_tuple2(r->data.send.env,
@@ -898,12 +1051,22 @@ static void * polling_thread(void * handle)
assert(status == 0);
--i;
}
+ else {
+ assert(item->revents == 0);
+ }
}
+ // incoming requests to poll on
if (vector_get(zmq_pollitem_t, &items_zmq, 0)->revents & ZMQ_POLLIN) {
vector_get(zmq_pollitem_t, &items_zmq, 0)->revents = 0;
zmq_msg_t msg;
- zmq_msg_init(&msg);
+ if (zmq_msg_init(&msg)) {
+ fprintf(stderr, "zmq_msg_init error: %s\n",
+ strerror(zmq_errno()));
+ assert(0);
+ }
+
+ assert(context->mutex);
enif_mutex_lock(context->mutex);
status = zmq_recvmsg(thread_socket, &msg, 0);
enif_mutex_unlock(context->mutex);
@@ -913,6 +1076,7 @@ static void * polling_thread(void * handle)
erlzmq_thread_request_t * r =
(erlzmq_thread_request_t *) zmq_msg_data(&msg);
+ ErlNifMutex * mutex = 0;
if (r->type == ERLZMQ_THREAD_REQUEST_SEND) {
zmq_pollitem_t item_zmq = {r->data.send.socket->socket_zmq,
0, ZMQ_POLLOUT, 0};
@@ -932,6 +1096,19 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
}
else if (r->type == ERLZMQ_THREAD_REQUEST_CLOSE) {
+ if (! r->data.close.socket->mutex)
+ {
+ enif_send(NULL, &r->data.close.pid, r->data.close.env,
+ enif_make_tuple2(r->data.close.env,
+ enif_make_copy(r->data.close.env, r->data.close.ref),
+ return_zmq_errno(r->data.close.env, ETERM)));
+ enif_free_env(r->data.close.env);
+ zmq_msg_close(&msg);
+ continue;
+ }
+ enif_mutex_lock(r->data.close.socket->mutex);
+ assert(r->data.close.socket->socket_zmq);
+
// remove all entries with this socket
for (i = vector_count(&items_zmq) - 1; i > 0; --i) {
zmq_pollitem_t * item = vector_get(zmq_pollitem_t, &items_zmq, i);
@@ -939,11 +1116,7 @@ static void * polling_thread(void * handle)
erlzmq_thread_request_t * r_old =
vector_get(erlzmq_thread_request_t, &requests, i);
if (r_old->type == ERLZMQ_THREAD_REQUEST_RECV) {
- enif_clear_env(r_old->data.recv.env);
- // FIXME
- // causes crash on R14B01, works fine on R14B02
- // (repeated enif_send with active receive broken on R14B01)
- //enif_free_env(r_old->data.recv.env);
+ enif_free_env(r_old->data.recv.env);
enif_release_resource(r_old->data.recv.socket);
}
else if (r_old->type == ERLZMQ_THREAD_REQUEST_SEND) {
@@ -961,10 +1134,12 @@ static void * polling_thread(void * handle)
}
}
// close the socket
- enif_mutex_lock(r->data.close.socket->mutex);
zmq_close(r->data.close.socket->socket_zmq);
- enif_mutex_unlock(r->data.close.socket->mutex);
- enif_mutex_destroy(r->data.close.socket->mutex);
+ r->data.close.socket->socket_zmq = 0;
+ mutex = r->data.close.socket->mutex;
+ r->data.close.socket->mutex = 0;
+ enif_mutex_unlock(mutex);
+ enif_mutex_destroy(mutex);
enif_release_resource(r->data.close.socket);
// notify the waiting request
enif_send(NULL, &r->data.close.pid, r->data.close.env,
@@ -975,37 +1150,65 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
}
else if (r->type == ERLZMQ_THREAD_REQUEST_TERM) {
+ if (! context->mutex)
+ {
+ enif_send(NULL, &r->data.term.pid, r->data.term.env,
+ enif_make_tuple2(r->data.term.env,
+ enif_make_copy(r->data.term.env, r->data.term.ref),
+ return_zmq_errno(r->data.term.env, ETERM)));
+ enif_free_env(r->data.term.env);
+ zmq_msg_close(&msg);
+ continue;
+ }
enif_mutex_lock(context->mutex);
free(context->thread_socket_name);
// use this to flag context is over
- context->thread_socket_name = NULL;
- enif_mutex_unlock(context->mutex);
+ context->thread_socket_name = 0;
// cleanup pending requests
for (i = 1; i < vector_count(&requests); ++i) {
erlzmq_thread_request_t * r_old = vector_get(erlzmq_thread_request_t,
&requests, i);
- if (r_old->type == ERLZMQ_THREAD_REQUEST_RECV) {
+ if (r_old->type == ERLZMQ_THREAD_REQUEST_RECV)
+ {
enif_free_env(r_old->data.recv.env);
+ if (r_old->data.recv.socket->mutex) {
+ enif_mutex_lock(r_old->data.recv.socket->mutex);
+ assert(r_old->data.recv.socket->socket_zmq);
+ zmq_close(r_old->data.recv.socket->socket_zmq);
+ r_old->data.recv.socket->socket_zmq = 0;
+ mutex = r_old->data.recv.socket->mutex;
+ r_old->data.recv.socket->mutex = 0;
+ enif_mutex_unlock(mutex);
+ enif_mutex_destroy(mutex);
+ }
enif_release_resource(r_old->data.recv.socket);
- zmq_close(r_old->data.recv.socket->socket_zmq);
}
else if (r_old->type == ERLZMQ_THREAD_REQUEST_SEND) {
zmq_msg_close(&r_old->data.send.msg);
enif_free_env(r_old->data.send.env);
+ if (r_old->data.send.socket->mutex) {
+ enif_mutex_lock(r_old->data.send.socket->mutex);
+ assert(r_old->data.send.socket->socket_zmq);
+ zmq_close(r_old->data.send.socket->socket_zmq);
+ r_old->data.send.socket->socket_zmq = 0;
+ mutex = r_old->data.send.socket->mutex;
+ r_old->data.send.socket->mutex = 0;
+ enif_mutex_unlock(mutex);
+ enif_mutex_destroy(mutex);
+ }
enif_release_resource(r_old->data.send.socket);
- zmq_close(r_old->data.send.socket->socket_zmq);
}
}
// terminate the context
- enif_mutex_lock(context->mutex);
zmq_close(thread_socket);
zmq_close(context->thread_socket);
- enif_mutex_unlock(context->mutex);
- zmq_term(context->context_zmq);
- enif_mutex_lock(context->mutex);
- enif_mutex_unlock(context->mutex);
- enif_mutex_destroy(context->mutex);
+ mutex = context->mutex;
+ context->mutex = 0;
+ enif_mutex_unlock(mutex);
+ enif_mutex_destroy(mutex);
+ void * const context_term = context->context_zmq;
enif_release_resource(context);
+
// notify the waiting request
enif_send(NULL, &r->data.term.pid, r->data.term.env,
enif_make_tuple2(r->data.term.env,
@@ -1015,9 +1218,13 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
vector_destroy(&items_zmq);
vector_destroy(&requests);
+ // the thread will block here until all sockets
+ // within the context are closed
+ zmq_term(context_term);
return NULL;
}
else {
+ fprintf(stderr, "invalid request type: %d\n", r->type);
assert(0);
}
}
@@ -1027,8 +1234,6 @@ static void * polling_thread(void * handle)
static ERL_NIF_TERM add_active_req(ErlNifEnv* env, erlzmq_socket_t * socket)
{
- socket->active = ERLZMQ_SOCKET_ACTIVE_ON;
-
erlzmq_thread_request_t req;
req.type = ERLZMQ_THREAD_REQUEST_RECV;
req.data.recv.env = enif_alloc_env();
@@ -1038,18 +1243,35 @@ static ERL_NIF_TERM add_active_req(ErlNifEnv* env, erlzmq_socket_t * socket)
zmq_msg_t msg;
if (zmq_msg_init_size(&msg, sizeof(erlzmq_thread_request_t))) {
+ zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
- if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ if (! socket->context->mutex) {
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.recv.env);
+ return return_zmq_errno(env, ETERM);
+ }
+ enif_mutex_lock(socket->context->mutex);
+ if (! socket->context->thread_socket_name) {
+ if (socket->context->mutex) {
+ enif_mutex_unlock(socket->context->mutex);
+ }
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.recv.env);
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
else {
+ enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
// each pointer to the socket in a request increments the reference
enif_keep_resource(socket);
View
121 test/erlzmq_test.erl
@@ -2,7 +2,33 @@
-include_lib("eunit/include/eunit.hrl").
-export([worker/2]).
+% provides some context for failures only viewable within the C code
+%-define(PRINT_DEBUG, true).
+
+-ifdef(PRINT_DEBUG).
+% use stderr while bypassing the io server to avoid buffering
+-define(PRINT_START,
+ PRINT_PORT = open_port({fd, 0, 2}, [out, {line, 256}]),
+ port_command(PRINT_PORT,
+ io_lib:format("~w:~w start~n", [?MODULE, ?LINE]))).
+-define(PRINT_CHECK(ANY),
+ port_command(PRINT_PORT,
+ io_lib:format("~w:~w ~p~n", [?MODULE, ?LINE, ANY]))).
+-define(PRINT_END,
+ port_command(PRINT_PORT,
+ io_lib:format("~w:~w end~n", [?MODULE, ?LINE])),
+ port_close(PRINT_PORT),
+ ok).
+-else.
+-define(PRINT_START, ok).
+-define(PRINT_CHECK(_), ok).
+-define(PRINT_END, ok).
+-endif.
+
hwm_test() ->
+ ?PRINT_START,
+ ?PRINT_CHECK(lists:flatten(
+ io_lib:format("executing as os pid ~s", [os:getpid()]))),
{ok, C} = erlzmq:context(),
{ok, S1} = erlzmq:socket(C, [pull, {active, false}]),
{ok, S2} = erlzmq:socket(C, [push, {active, false}]),
@@ -27,7 +53,8 @@ hwm_test() ->
ok = erlzmq:close(S1),
ok = erlzmq:close(S2),
- ok = erlzmq:term(C).
+ ok = erlzmq:term(C),
+ ?PRINT_END.
hwm_loop(0, _S) ->
ok;
@@ -39,6 +66,7 @@ hwm_loop(N, S) ->
hwm_loop(N-1, S).
invalid_rep_test() ->
+ ?PRINT_START,
{ok, Ctx} = erlzmq:context(),
{ok, XrepSocket} = erlzmq:socket(Ctx, [xrep, {active, false}]),
@@ -71,21 +99,29 @@ invalid_rep_test() ->
%% Tear down the wiring.
ok = erlzmq:close(XrepSocket),
ok = erlzmq:close(ReqSocket),
- ok = erlzmq:term(Ctx).
+ ok = erlzmq:term(Ctx),
+ ?PRINT_END.
pair_inproc_test() ->
+ ?PRINT_START,
basic_tests("inproc://tester", pair, pair, active),
- basic_tests("inproc://tester", pair, pair, passive).
+ basic_tests("inproc://tester", pair, pair, passive),
+ ?PRINT_END.
pair_ipc_test() ->
+ ?PRINT_START,
basic_tests("ipc:///tmp/tester", pair, pair, active),
- basic_tests("ipc:///tmp/tester", pair, pair, passive).
+ basic_tests("ipc:///tmp/tester", pair, pair, passive),
+ ?PRINT_END.
pair_tcp_test() ->
+ ?PRINT_START,
basic_tests("tcp://127.0.0.1:5554", pair, pair, active),
- basic_tests("tcp://127.0.0.1:5555", pair, pair, passive).
+ basic_tests("tcp://127.0.0.1:5555", pair, pair, passive),
+ ?PRINT_END.
reqrep_device_test() ->
+ ?PRINT_START,
{ok, Ctx} = erlzmq:context(),
%% Create a req/rep device.
@@ -161,23 +197,31 @@ reqrep_device_test() ->
ok = erlzmq:close(Rep),
ok = erlzmq:close(Xrep),
ok = erlzmq:close(Xreq),
- ok = erlzmq:term(Ctx).
+ ok = erlzmq:term(Ctx),
+ ?PRINT_END.
reqrep_inproc_test() ->
+ ?PRINT_START,
basic_tests("inproc://test", req, rep, active),
- basic_tests("inproc://test", req, rep, passive).
+ basic_tests("inproc://test", req, rep, passive),
+ ?PRINT_END.
reqrep_ipc_test() ->
+ ?PRINT_START,
basic_tests("ipc:///tmp/tester", req, rep, active),
- basic_tests("ipc:///tmp/tester", req, rep, passive).
+ basic_tests("ipc:///tmp/tester", req, rep, passive),
+ ?PRINT_END.
reqrep_tcp_test() ->
+ ?PRINT_START,
basic_tests("tcp://127.0.0.1:5556", req, rep, active),
- basic_tests("tcp://127.0.0.1:5557", req, rep, passive).
+ basic_tests("tcp://127.0.0.1:5557", req, rep, passive),
+ ?PRINT_END.
sub_forward_test() ->
+ ?PRINT_START,
{ok, Ctx} = erlzmq:context(),
%% First, create an intermediate device.
@@ -225,9 +269,11 @@ sub_forward_test() ->
ok = erlzmq:close(Xsub),
ok = erlzmq:close(Pub),
ok = erlzmq:close(Sub),
- ok = erlzmq:term(Ctx).
+ ok = erlzmq:term(Ctx),
+ ?PRINT_END.
-timeo_test() ->
+timeo() ->
+ ?PRINT_START,
{ok, Ctx} = erlzmq:context(),
%% Create a disconnected socket.
{ok, Sb} = erlzmq:socket(Ctx, [pull, {active, false}]),
@@ -266,22 +312,40 @@ timeo_test() ->
Buff = <<"12345678ABCDEFGH12345678abcdefgh">>,
ok = erlzmq:send(Sc, Buff),
- {ok, Buff} = erlzmq:recv(Sb),
+ case erlzmq:recv(Sb) of
+ {ok, Buff} ->
+ ok;
+ {error, eagain} ->
+ timeout
+ end,
%% Clean-up.
ok = erlzmq:close(Sc),
ok = erlzmq:close(Sb),
- ok = erlzmq:term (Ctx).
+ ok = erlzmq:term (Ctx),
+ ok,
+ ?PRINT_END.
+timeo_test_() ->
+ % sometimes this test can timeout with the default timeout
+ {timeout, 10, [
+ ?_assert(timeo() =:= ok)
+ ]}.
bad_init_test() ->
- ?assertEqual({error, einval}, erlzmq:context(-1)).
+ ?PRINT_START,
+ ?assertEqual({error, einval}, erlzmq:context(-1)),
+ ?PRINT_END.
shutdown_stress_test() ->
- ?assertMatch(ok, shutdown_stress_loop(10)).
+ ?PRINT_START,
+ ?assertMatch(ok, shutdown_stress_loop(10)),
+ ?PRINT_END.
version_test() ->
+ ?PRINT_START,
{Major, Minor, Patch} = erlzmq:version(),
- ?assert(is_integer(Major) andalso is_integer(Minor) andalso is_integer(Patch)).
+ ?assert(is_integer(Major) andalso is_integer(Minor) andalso is_integer(Patch)),
+ ?PRINT_END.
shutdown_stress_loop(0) ->
ok;
@@ -295,27 +359,40 @@ shutdown_stress_loop(N) ->
shutdown_stress_loop(N-1).
shutdown_no_blocking_test() ->
+ ?PRINT_START,
{ok, C} = erlzmq:context(),
{ok, S} = erlzmq:socket(C, [pub, {active, false}]),
erlzmq:close(S),
- ?assertEqual(ok, erlzmq:term(C, 500)).
+ ?assertEqual(ok, erlzmq:term(C, 500)),
+ ?PRINT_END.
shutdown_blocking_test() ->
+ ?PRINT_START,
{ok, C} = erlzmq:context(),
{ok, _S} = erlzmq:socket(C, [pub, {active, false}]),
- ?assertMatch({error, {timeout, _}}, erlzmq:term(C, 0)).
+ case erlzmq:term(C, 0) of
+ {error, {timeout, _}} ->
+ % typical
+ ok;
+ ok ->
+ % very infrequent
+ ok
+ end,
+ ?PRINT_END.
shutdown_blocking_unblocking_test() ->
+ ?PRINT_START,
{ok, C} = erlzmq:context(),
- {ok, S} = erlzmq:socket(C, [pub, {active, false}]),
- V = erlzmq:term(C, 500),
+ {ok, _} = erlzmq:socket(C, [pub, {active, false}]),
+ V = erlzmq:term(C, 0),
?assertMatch({error, {timeout, _}}, V),
{error, {timeout, Ref}} = V,
- erlzmq:close(S),
+ % all remaining sockets are automatically closed by term (i.e., zmq_term)
receive
{Ref, ok} ->
ok
- end.
+ end,
+ ?PRINT_END.
join_procs(0) ->
ok;
Something went wrong with that request. Please try again.