Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #55 from okeuday/3.x

Fix remaining problems that occur infrequently when running the eunit tests
  • Loading branch information...
commit 9b3a788e0c29693a70cdea6f6a3e35875a2182f3 2 parents 63ce4e8 + 56b4dc2
Yurii Rashkovskii yrashk authored
Showing with 439 additions and 140 deletions.
  1. +340 −118 c_src/erlzmq_nif.c
  2. +99 −22 test/erlzmq_test.erl
458 c_src/erlzmq_nif.c
View
@@ -140,7 +140,8 @@ NIF(erlzmq_nif_context)
sizeof(erlzmq_context_t));
assert(context);
context->context_zmq = zmq_init(thread_count);
- if (!context->context_zmq) {
+ if (! context->context_zmq) {
+ enif_release_resource(context);
return return_zmq_errno(env, zmq_errno());
}
@@ -167,6 +168,7 @@ NIF(erlzmq_nif_context)
if (value_errno) {
free(context->thread_socket_name);
zmq_close(context->thread_socket);
+ enif_mutex_destroy(context->mutex);
zmq_term(context->context_zmq);
enif_release_resource(context);
return return_zmq_errno(env, value_errno);
@@ -204,9 +206,10 @@ NIF(erlzmq_nif_socket)
sizeof(erlzmq_socket_t));
assert(socket);
socket->context = context;
- socket->socket_index = context->socket_index++;
+ socket->socket_index = context->socket_index;
socket->socket_zmq = zmq_socket(context->context_zmq, socket_type);
- if (!socket->socket_zmq) {
+ if (! socket->socket_zmq) {
+ enif_release_resource(socket);
return return_zmq_errno(env, zmq_errno());
}
socket->active = active;
@@ -214,6 +217,7 @@ NIF(erlzmq_nif_socket)
socket->mutex = enif_mutex_create("erlzmq_socket_t_mutex");
assert(socket->mutex);
+ context->socket_index++;
return enif_make_tuple2(env, enif_make_atom(env, "ok"), enif_make_tuple2(env,
enif_make_uint64(env, socket->socket_index),
enif_make_resource(env, socket)));
@@ -239,21 +243,33 @@ NIF(erlzmq_nif_bind)
return enif_make_badarg(env);
}
+ if (! socket->mutex) {
+ free(endpoint);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_bind(socket->socket_zmq, endpoint)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ free(endpoint);
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_bind(socket->socket_zmq, endpoint)) {
enif_mutex_unlock(socket->mutex);
free(endpoint);
return return_zmq_errno(env, zmq_errno());
}
+ else if (socket->active == ERLZMQ_SOCKET_ACTIVE_PENDING) {
+ socket->active = ERLZMQ_SOCKET_ACTIVE_ON;
+ enif_mutex_unlock(socket->mutex);
+ free(endpoint);
+ return add_active_req(env, socket);
+ }
else {
enif_mutex_unlock(socket->mutex);
free(endpoint);
- if (socket->active == ERLZMQ_SOCKET_ACTIVE_PENDING) {
- return add_active_req(env, socket);
- }
- else {
- return enif_make_atom(env, "ok");
- }
+ return enif_make_atom(env, "ok");
}
}
@@ -277,21 +293,31 @@ NIF(erlzmq_nif_connect)
return enif_make_badarg(env);
}
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_connect(socket->socket_zmq, endpoint)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_connect(socket->socket_zmq, endpoint)) {
enif_mutex_unlock(socket->mutex);
free(endpoint);
return return_zmq_errno(env, zmq_errno());
}
+ else if (socket->active == ERLZMQ_SOCKET_ACTIVE_PENDING) {
+ socket->active = ERLZMQ_SOCKET_ACTIVE_ON;
+ enif_mutex_unlock(socket->mutex);
+ free(endpoint);
+ return add_active_req(env, socket);
+ }
else {
enif_mutex_unlock(socket->mutex);
free(endpoint);
- if (socket->active == ERLZMQ_SOCKET_ACTIVE_PENDING) {
- return add_active_req(env, socket);
- }
- else {
- return enif_make_atom(env, "ok");
- }
+ return enif_make_atom(env, "ok");
}
}
@@ -368,9 +394,18 @@ NIF(erlzmq_nif_setsockopt)
return enif_make_badarg(env);
}
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_setsockopt(socket->socket_zmq, option_name,
- option_value, option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_setsockopt(socket->socket_zmq, option_name,
+ option_value, option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -404,9 +439,18 @@ NIF(erlzmq_nif_getsockopt)
// int64_t
case ZMQ_MAXMSGSIZE:
option_len = sizeof(value_int64);
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_getsockopt(socket->socket_zmq, option_name,
- &value_int64, &option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_getsockopt(socket->socket_zmq, option_name,
+ &value_int64, &option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -416,9 +460,18 @@ NIF(erlzmq_nif_getsockopt)
// uint64_t
case ZMQ_AFFINITY:
option_len = sizeof(value_uint64);
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_getsockopt(socket->socket_zmq, option_name,
- &value_uint64, &option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_getsockopt(socket->socket_zmq, option_name,
+ &value_uint64, &option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -428,9 +481,18 @@ NIF(erlzmq_nif_getsockopt)
// binary
case ZMQ_IDENTITY:
option_len = sizeof(option_value);
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_getsockopt(socket->socket_zmq, option_name,
- option_value, &option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_getsockopt(socket->socket_zmq, option_name,
+ option_value, &option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -459,9 +521,18 @@ NIF(erlzmq_nif_getsockopt)
case ZMQ_EVENTS:
case ZMQ_FD: // FIXME: ZMQ_FD returns SOCKET on Windows
option_len = sizeof(value_int);
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_getsockopt(socket->socket_zmq, option_name,
- &value_int, &option_len)) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_getsockopt(socket->socket_zmq, option_name,
+ &value_int, &option_len)) {
enif_mutex_unlock(socket->mutex);
return return_zmq_errno(env, zmq_errno());
}
@@ -500,9 +571,19 @@ NIF(erlzmq_nif_send)
int polling_thread_send = 1;
if (! socket->active) {
+ // try send
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_sendmsg(socket->socket_zmq, &req.data.send.msg,
- req.data.send.flags | ZMQ_DONTWAIT) == -1) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_sendmsg(socket->socket_zmq, &req.data.send.msg,
+ req.data.send.flags | ZMQ_DONTWAIT) == -1) {
enif_mutex_unlock(socket->mutex);
int const error = zmq_errno();
if (error != EAGAIN ||
@@ -510,6 +591,7 @@ NIF(erlzmq_nif_send)
zmq_msg_close(&req.data.send.msg);
return return_zmq_errno(env, error);
}
+ // if it fails, use the context thread poll for the send
}
else {
enif_mutex_unlock(socket->mutex);
@@ -533,12 +615,15 @@ NIF(erlzmq_nif_send)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ if (! socket->context->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->context->mutex);
- if (socket->context->thread_socket_name == NULL) {
+ if (! socket->context->thread_socket_name) {
enif_mutex_unlock(socket->context->mutex);
return return_zmq_errno(env, ETERM);
}
- if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ else if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
@@ -587,8 +672,20 @@ NIF(erlzmq_nif_recv)
return return_zmq_errno(env, zmq_errno());
}
// try recv with noblock
+ // if it fails, use the context thread poll for the recv
+ if (! socket->mutex) {
+ zmq_msg_close(&msg);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->mutex);
- if (zmq_recvmsg(socket->socket_zmq, &msg, ZMQ_DONTWAIT) == -1) {
+ if (! socket->socket_zmq) {
+ if (socket->mutex) {
+ enif_mutex_unlock(socket->mutex);
+ }
+ zmq_msg_close(&msg);
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_recvmsg(socket->socket_zmq, &msg, ZMQ_DONTWAIT) == -1) {
enif_mutex_unlock(socket->mutex);
int const error = zmq_errno();
zmq_msg_close(&msg);
@@ -611,25 +708,32 @@ NIF(erlzmq_nif_recv)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ if (! socket->context->mutex) {
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.recv.env);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->context->mutex);
- if (socket->context->thread_socket_name == NULL) {
- enif_mutex_unlock(socket->context->mutex);
+ if (! socket->context->thread_socket_name) {
+ if (socket->context->mutex) {
+ enif_mutex_unlock(socket->context->mutex);
+ }
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.recv.env);
return return_zmq_errno(env, ETERM);
}
- if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ else if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
-
zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
else {
enif_mutex_unlock(socket->context->mutex);
-
zmq_msg_close(&msg);
+
// each pointer to the socket in a request increments the reference
enif_keep_resource(socket);
-
return enif_make_copy(env, req.data.recv.ref);
}
}
@@ -671,19 +775,36 @@ NIF(erlzmq_nif_close)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ if (! socket->context->mutex) {
+ zmq_msg_close(&msg);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(socket->context->mutex);
- if (socket->context->thread_socket_name == NULL) {
+ if (! socket->context->thread_socket_name) {
// context is gone
- enif_mutex_lock(socket->mutex);
+ if (socket->context->mutex) {
+ enif_mutex_unlock(socket->context->mutex);
+ }
zmq_msg_close(&msg);
+ enif_free_env(req.data.close.env);
+
+ if (! socket->mutex) {
+ return return_zmq_errno(env, ETERM);
+ }
+ enif_mutex_lock(socket->mutex);
+ if (! socket->socket_zmq) {
+ enif_mutex_unlock(socket->mutex);
+ return return_zmq_errno(env, ETERM);
+ }
zmq_close(socket->socket_zmq);
+ socket->socket_zmq = 0;
enif_mutex_unlock(socket->mutex);
enif_mutex_destroy(socket->mutex);
+ socket->mutex = 0;
enif_release_resource(socket);
- enif_mutex_unlock(socket->context->mutex);
return enif_make_atom(env, "ok");
}
- if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ else if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.close.env);
@@ -702,8 +823,8 @@ NIF(erlzmq_nif_term)
{
erlzmq_context_t * context;
- if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_context,
- (void **) &context)) {
+ if (! enif_get_resource(env, argv[0], erlzmq_nif_resource_context,
+ (void **) &context)) {
return enif_make_badarg(env);
}
@@ -721,8 +842,21 @@ NIF(erlzmq_nif_term)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ if (! context->mutex) {
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.term.env);
+ return return_zmq_errno(env, ETERM);
+ }
enif_mutex_lock(context->mutex);
- if (zmq_sendmsg(context->thread_socket, &msg, 0) == -1) {
+ if (! context->thread_socket_name) {
+ if (context->mutex) {
+ enif_mutex_unlock(context->mutex);
+ }
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.term.env);
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_sendmsg(context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.term.env);
@@ -781,28 +915,34 @@ static void * polling_thread(void * handle)
if (vector_get(zmq_pollitem_t, &items_zmq, 0)->revents & ZMQ_POLLIN) {
--count;
}
- for (i = 1; i < vector_count(&items_zmq); ++i) {
+ for (i = 1; i < vector_count(&items_zmq) && count > 0; ++i) {
zmq_pollitem_t * item = vector_get(zmq_pollitem_t, &items_zmq, i);
erlzmq_thread_request_t * r = vector_get(erlzmq_thread_request_t,
&requests, i);
if (item->revents & ZMQ_POLLIN) {
- size_t value_len = sizeof(int64_t);
- int64_t flag_value = 0;
-
assert(r->type == ERLZMQ_THREAD_REQUEST_RECV);
--count;
+ item->revents = 0;
zmq_msg_t msg;
- zmq_msg_init(&msg);
+ if (zmq_msg_init(&msg)) {
+ fprintf(stderr, "zmq_msg_init error: %s\n",
+ strerror(zmq_errno()));
+ assert(0);
+ }
+ int keep_socket = 0;
+ assert(r->data.recv.socket->mutex);
enif_mutex_lock(r->data.recv.socket->mutex);
if (zmq_recvmsg(r->data.recv.socket->socket_zmq, &msg,
- r->data.recv.flags) == -1 ||
- (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
- zmq_getsockopt(r->data.recv.socket->socket_zmq,
- ZMQ_RCVMORE, &flag_value, &value_len)) )
- {
+ r->data.recv.flags) == -1) {
enif_mutex_unlock(r->data.recv.socket->mutex);
- if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
+ zmq_msg_close(&msg);
+ int const error = zmq_errno();
+ if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
+ error == EAGAIN) {
+ keep_socket = 1;
+ }
+ else if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
enif_make_tuple3(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
@@ -810,55 +950,66 @@ static void * polling_thread(void * handle)
enif_make_uint64(r->data.recv.env,
r->data.recv.socket->socket_index),
enif_make_resource(r->data.recv.env, r->data.recv.socket)),
- return_zmq_errno(r->data.recv.env, zmq_errno())));
- enif_free_env(r->data.recv.env);
- r->data.recv.env = enif_alloc_env();
- item->revents = 0;
+ return_zmq_errno(r->data.recv.env, error)));
}
else {
- assert(0);
+ // an EAGAIN error could occur if a timeout is set on the socket
+ enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
+ enif_make_tuple2(r->data.recv.env,
+ enif_make_copy(r->data.recv.env, r->data.recv.ref),
+ return_zmq_errno(r->data.recv.env, error)));
}
}
else {
enif_mutex_unlock(r->data.recv.socket->mutex);
- }
-
- ErlNifBinary binary;
- enif_alloc_binary(zmq_msg_size(&msg), &binary);
- memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
- zmq_msg_close(&msg);
-
- if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
- ERL_NIF_TERM flags_list;
- // Should we send the multipart flag
- if(flag_value == 1) {
- flags_list = enif_make_list1(r->data.recv.env,
- enif_make_atom(r->data.recv.env,
- "rcvmore"));
- } else {
- flags_list = enif_make_list(r->data.recv.env, 0);
+ ErlNifBinary binary;
+ enif_alloc_binary(zmq_msg_size(&msg), &binary);
+ memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
+ zmq_msg_close(&msg);
+
+ if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
+ ERL_NIF_TERM flags_list;
+
+ // Should we send the multipart flag
+ size_t value_len = sizeof(int64_t);
+ int64_t flag_value = 0;
+ if (zmq_getsockopt(r->data.recv.socket->socket_zmq,
+ ZMQ_RCVMORE, &flag_value, &value_len)) {
+ fprintf(stderr, "zmq_getsockopt error: %s\n",
+ strerror(zmq_errno()));
+ assert(0);
+ }
+ if(flag_value == 1) {
+ flags_list = enif_make_list1(r->data.recv.env,
+ enif_make_atom(r->data.recv.env,
+ "rcvmore"));
+ } else {
+ flags_list = enif_make_list(r->data.recv.env, 0);
+ }
+
+ enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
+ enif_make_tuple4(r->data.recv.env,
+ enif_make_atom(r->data.recv.env, "zmq"),
+ enif_make_tuple2(r->data.recv.env,
+ enif_make_uint64(r->data.recv.env,
+ r->data.recv.socket->socket_index),
+ enif_make_resource(r->data.recv.env, r->data.recv.socket)),
+ enif_make_binary(r->data.recv.env, &binary),
+ flags_list));
+ keep_socket = 1;
}
-
- enif_send(NULL, &r->data.recv.socket->active_pid, r->data.recv.env,
- enif_make_tuple4(r->data.recv.env,
- enif_make_atom(r->data.recv.env, "zmq"),
+ else {
+ enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_make_tuple2(r->data.recv.env,
- enif_make_uint64(r->data.recv.env,
- r->data.recv.socket->socket_index),
- enif_make_resource(r->data.recv.env, r->data.recv.socket)),
- enif_make_binary(r->data.recv.env, &binary),
- flags_list));
- enif_free_env(r->data.recv.env);
- r->data.recv.env = enif_alloc_env();
- item->revents = 0;
+ enif_make_copy(r->data.recv.env, r->data.recv.ref),
+ enif_make_binary(r->data.recv.env, &binary)));
+ }
+ }
+ if (keep_socket) {
+ enif_clear_env(r->data.recv.env);
}
else {
- enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
- enif_make_tuple2(r->data.recv.env,
- enif_make_copy(r->data.recv.env, r->data.recv.ref),
- enif_make_binary(r->data.recv.env, &binary)));
-
enif_free_env(r->data.recv.env);
enif_release_resource(r->data.recv.socket);
@@ -872,10 +1023,12 @@ static void * polling_thread(void * handle)
else if (item->revents & ZMQ_POLLOUT) {
assert(r->type == ERLZMQ_THREAD_REQUEST_SEND);
--count;
+ item->revents = 0;
+ assert(r->data.send.socket->mutex);
enif_mutex_lock(r->data.send.socket->mutex);
if (zmq_sendmsg(r->data.send.socket->socket_zmq,
- &r->data.send.msg, r->data.send.flags) == -1) {
+ &r->data.send.msg, r->data.send.flags) == -1) {
enif_mutex_unlock(r->data.send.socket->mutex);
enif_send(NULL, &r->data.send.pid, r->data.send.env,
enif_make_tuple2(r->data.send.env,
@@ -898,12 +1051,22 @@ static void * polling_thread(void * handle)
assert(status == 0);
--i;
}
+ else {
+ assert(item->revents == 0);
+ }
}
+ // incoming requests to poll on
if (vector_get(zmq_pollitem_t, &items_zmq, 0)->revents & ZMQ_POLLIN) {
vector_get(zmq_pollitem_t, &items_zmq, 0)->revents = 0;
zmq_msg_t msg;
- zmq_msg_init(&msg);
+ if (zmq_msg_init(&msg)) {
+ fprintf(stderr, "zmq_msg_init error: %s\n",
+ strerror(zmq_errno()));
+ assert(0);
+ }
+
+ assert(context->mutex);
enif_mutex_lock(context->mutex);
status = zmq_recvmsg(thread_socket, &msg, 0);
enif_mutex_unlock(context->mutex);
@@ -913,6 +1076,7 @@ static void * polling_thread(void * handle)
erlzmq_thread_request_t * r =
(erlzmq_thread_request_t *) zmq_msg_data(&msg);
+ ErlNifMutex * mutex = 0;
if (r->type == ERLZMQ_THREAD_REQUEST_SEND) {
zmq_pollitem_t item_zmq = {r->data.send.socket->socket_zmq,
0, ZMQ_POLLOUT, 0};
@@ -932,6 +1096,19 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
}
else if (r->type == ERLZMQ_THREAD_REQUEST_CLOSE) {
+ if (! r->data.close.socket->mutex)
+ {
+ enif_send(NULL, &r->data.close.pid, r->data.close.env,
+ enif_make_tuple2(r->data.close.env,
+ enif_make_copy(r->data.close.env, r->data.close.ref),
+ return_zmq_errno(r->data.close.env, ETERM)));
+ enif_free_env(r->data.close.env);
+ zmq_msg_close(&msg);
+ continue;
+ }
+ enif_mutex_lock(r->data.close.socket->mutex);
+ assert(r->data.close.socket->socket_zmq);
+
// remove all entries with this socket
for (i = vector_count(&items_zmq) - 1; i > 0; --i) {
zmq_pollitem_t * item = vector_get(zmq_pollitem_t, &items_zmq, i);
@@ -939,11 +1116,7 @@ static void * polling_thread(void * handle)
erlzmq_thread_request_t * r_old =
vector_get(erlzmq_thread_request_t, &requests, i);
if (r_old->type == ERLZMQ_THREAD_REQUEST_RECV) {
- enif_clear_env(r_old->data.recv.env);
- // FIXME
- // causes crash on R14B01, works fine on R14B02
- // (repeated enif_send with active receive broken on R14B01)
- //enif_free_env(r_old->data.recv.env);
+ enif_free_env(r_old->data.recv.env);
enif_release_resource(r_old->data.recv.socket);
}
else if (r_old->type == ERLZMQ_THREAD_REQUEST_SEND) {
@@ -961,10 +1134,12 @@ static void * polling_thread(void * handle)
}
}
// close the socket
- enif_mutex_lock(r->data.close.socket->mutex);
zmq_close(r->data.close.socket->socket_zmq);
- enif_mutex_unlock(r->data.close.socket->mutex);
- enif_mutex_destroy(r->data.close.socket->mutex);
+ r->data.close.socket->socket_zmq = 0;
+ mutex = r->data.close.socket->mutex;
+ r->data.close.socket->mutex = 0;
+ enif_mutex_unlock(mutex);
+ enif_mutex_destroy(mutex);
enif_release_resource(r->data.close.socket);
// notify the waiting request
enif_send(NULL, &r->data.close.pid, r->data.close.env,
@@ -975,37 +1150,65 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
}
else if (r->type == ERLZMQ_THREAD_REQUEST_TERM) {
+ if (! context->mutex)
+ {
+ enif_send(NULL, &r->data.term.pid, r->data.term.env,
+ enif_make_tuple2(r->data.term.env,
+ enif_make_copy(r->data.term.env, r->data.term.ref),
+ return_zmq_errno(r->data.term.env, ETERM)));
+ enif_free_env(r->data.term.env);
+ zmq_msg_close(&msg);
+ continue;
+ }
enif_mutex_lock(context->mutex);
free(context->thread_socket_name);
// use this to flag context is over
- context->thread_socket_name = NULL;
- enif_mutex_unlock(context->mutex);
+ context->thread_socket_name = 0;
// cleanup pending requests
for (i = 1; i < vector_count(&requests); ++i) {
erlzmq_thread_request_t * r_old = vector_get(erlzmq_thread_request_t,
&requests, i);
- if (r_old->type == ERLZMQ_THREAD_REQUEST_RECV) {
+ if (r_old->type == ERLZMQ_THREAD_REQUEST_RECV)
+ {
enif_free_env(r_old->data.recv.env);
+ if (r_old->data.recv.socket->mutex) {
+ enif_mutex_lock(r_old->data.recv.socket->mutex);
+ assert(r_old->data.recv.socket->socket_zmq);
+ zmq_close(r_old->data.recv.socket->socket_zmq);
+ r_old->data.recv.socket->socket_zmq = 0;
+ mutex = r_old->data.recv.socket->mutex;
+ r_old->data.recv.socket->mutex = 0;
+ enif_mutex_unlock(mutex);
+ enif_mutex_destroy(mutex);
+ }
enif_release_resource(r_old->data.recv.socket);
- zmq_close(r_old->data.recv.socket->socket_zmq);
}
else if (r_old->type == ERLZMQ_THREAD_REQUEST_SEND) {
zmq_msg_close(&r_old->data.send.msg);
enif_free_env(r_old->data.send.env);
+ if (r_old->data.send.socket->mutex) {
+ enif_mutex_lock(r_old->data.send.socket->mutex);
+ assert(r_old->data.send.socket->socket_zmq);
+ zmq_close(r_old->data.send.socket->socket_zmq);
+ r_old->data.send.socket->socket_zmq = 0;
+ mutex = r_old->data.send.socket->mutex;
+ r_old->data.send.socket->mutex = 0;
+ enif_mutex_unlock(mutex);
+ enif_mutex_destroy(mutex);
+ }
enif_release_resource(r_old->data.send.socket);
- zmq_close(r_old->data.send.socket->socket_zmq);
}
}
// terminate the context
- enif_mutex_lock(context->mutex);
zmq_close(thread_socket);
zmq_close(context->thread_socket);
- enif_mutex_unlock(context->mutex);
- zmq_term(context->context_zmq);
- enif_mutex_lock(context->mutex);
- enif_mutex_unlock(context->mutex);
- enif_mutex_destroy(context->mutex);
+ mutex = context->mutex;
+ context->mutex = 0;
+ enif_mutex_unlock(mutex);
+ enif_mutex_destroy(mutex);
+ void * const context_term = context->context_zmq;
enif_release_resource(context);
+
// notify the waiting request
enif_send(NULL, &r->data.term.pid, r->data.term.env,
enif_make_tuple2(r->data.term.env,
@@ -1015,9 +1218,13 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
vector_destroy(&items_zmq);
vector_destroy(&requests);
+ // the thread will block here until all sockets
+ // within the context are closed
+ zmq_term(context_term);
return NULL;
}
else {
+ fprintf(stderr, "invalid request type: %d\n", r->type);
assert(0);
}
}
@@ -1027,8 +1234,6 @@ static void * polling_thread(void * handle)
static ERL_NIF_TERM add_active_req(ErlNifEnv* env, erlzmq_socket_t * socket)
{
- socket->active = ERLZMQ_SOCKET_ACTIVE_ON;
-
erlzmq_thread_request_t req;
req.type = ERLZMQ_THREAD_REQUEST_RECV;
req.data.recv.env = enif_alloc_env();
@@ -1038,18 +1243,35 @@ static ERL_NIF_TERM add_active_req(ErlNifEnv* env, erlzmq_socket_t * socket)
zmq_msg_t msg;
if (zmq_msg_init_size(&msg, sizeof(erlzmq_thread_request_t))) {
+ zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
- if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ if (! socket->context->mutex) {
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.recv.env);
+ return return_zmq_errno(env, ETERM);
+ }
+ enif_mutex_lock(socket->context->mutex);
+ if (! socket->context->thread_socket_name) {
+ if (socket->context->mutex) {
+ enif_mutex_unlock(socket->context->mutex);
+ }
+ zmq_msg_close(&msg);
+ enif_free_env(req.data.recv.env);
+ return return_zmq_errno(env, ETERM);
+ }
+ else if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
+ enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
else {
+ enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
// each pointer to the socket in a request increments the reference
enif_keep_resource(socket);
121 test/erlzmq_test.erl
View
@@ -2,7 +2,33 @@
-include_lib("eunit/include/eunit.hrl").
-export([worker/2]).
+% provides some context for failures only viewable within the C code
+%-define(PRINT_DEBUG, true).
+
+-ifdef(PRINT_DEBUG).
+% use stderr while bypassing the io server to avoid buffering
+-define(PRINT_START,
+ PRINT_PORT = open_port({fd, 0, 2}, [out, {line, 256}]),
+ port_command(PRINT_PORT,
+ io_lib:format("~w:~w start~n", [?MODULE, ?LINE]))).
+-define(PRINT_CHECK(ANY),
+ port_command(PRINT_PORT,
+ io_lib:format("~w:~w ~p~n", [?MODULE, ?LINE, ANY]))).
+-define(PRINT_END,
+ port_command(PRINT_PORT,
+ io_lib:format("~w:~w end~n", [?MODULE, ?LINE])),
+ port_close(PRINT_PORT),
+ ok).
+-else.
+-define(PRINT_START, ok).
+-define(PRINT_CHECK(_), ok).
+-define(PRINT_END, ok).
+-endif.
+
hwm_test() ->
+ ?PRINT_START,
+ ?PRINT_CHECK(lists:flatten(
+ io_lib:format("executing as os pid ~s", [os:getpid()]))),
{ok, C} = erlzmq:context(),
{ok, S1} = erlzmq:socket(C, [pull, {active, false}]),
{ok, S2} = erlzmq:socket(C, [push, {active, false}]),
@@ -27,7 +53,8 @@ hwm_test() ->
ok = erlzmq:close(S1),
ok = erlzmq:close(S2),
- ok = erlzmq:term(C).
+ ok = erlzmq:term(C),
+ ?PRINT_END.
hwm_loop(0, _S) ->
ok;
@@ -39,6 +66,7 @@ hwm_loop(N, S) ->
hwm_loop(N-1, S).
invalid_rep_test() ->
+ ?PRINT_START,
{ok, Ctx} = erlzmq:context(),
{ok, XrepSocket} = erlzmq:socket(Ctx, [xrep, {active, false}]),
@@ -71,21 +99,29 @@ invalid_rep_test() ->
%% Tear down the wiring.
ok = erlzmq:close(XrepSocket),
ok = erlzmq:close(ReqSocket),
- ok = erlzmq:term(Ctx).
+ ok = erlzmq:term(Ctx),
+ ?PRINT_END.
pair_inproc_test() ->
+ ?PRINT_START,
basic_tests("inproc://tester", pair, pair, active),
- basic_tests("inproc://tester", pair, pair, passive).
+ basic_tests("inproc://tester", pair, pair, passive),
+ ?PRINT_END.
pair_ipc_test() ->
+ ?PRINT_START,
basic_tests("ipc:///tmp/tester", pair, pair, active),
- basic_tests("ipc:///tmp/tester", pair, pair, passive).
+ basic_tests("ipc:///tmp/tester", pair, pair, passive),
+ ?PRINT_END.
pair_tcp_test() ->
+ ?PRINT_START,
basic_tests("tcp://127.0.0.1:5554", pair, pair, active),
- basic_tests("tcp://127.0.0.1:5555", pair, pair, passive).
+ basic_tests("tcp://127.0.0.1:5555", pair, pair, passive),
+ ?PRINT_END.
reqrep_device_test() ->
+ ?PRINT_START,
{ok, Ctx} = erlzmq:context(),
%% Create a req/rep device.
@@ -161,23 +197,31 @@ reqrep_device_test() ->
ok = erlzmq:close(Rep),
ok = erlzmq:close(Xrep),
ok = erlzmq:close(Xreq),
- ok = erlzmq:term(Ctx).
+ ok = erlzmq:term(Ctx),
+ ?PRINT_END.
reqrep_inproc_test() ->
+ ?PRINT_START,
basic_tests("inproc://test", req, rep, active),
- basic_tests("inproc://test", req, rep, passive).
+ basic_tests("inproc://test", req, rep, passive),
+ ?PRINT_END.
reqrep_ipc_test() ->
+ ?PRINT_START,
basic_tests("ipc:///tmp/tester", req, rep, active),
- basic_tests("ipc:///tmp/tester", req, rep, passive).
+ basic_tests("ipc:///tmp/tester", req, rep, passive),
+ ?PRINT_END.
reqrep_tcp_test() ->
+ ?PRINT_START,
basic_tests("tcp://127.0.0.1:5556", req, rep, active),
- basic_tests("tcp://127.0.0.1:5557", req, rep, passive).
+ basic_tests("tcp://127.0.0.1:5557", req, rep, passive),
+ ?PRINT_END.
sub_forward_test() ->
+ ?PRINT_START,
{ok, Ctx} = erlzmq:context(),
%% First, create an intermediate device.
@@ -225,9 +269,11 @@ sub_forward_test() ->
ok = erlzmq:close(Xsub),
ok = erlzmq:close(Pub),
ok = erlzmq:close(Sub),
- ok = erlzmq:term(Ctx).
+ ok = erlzmq:term(Ctx),
+ ?PRINT_END.
-timeo_test() ->
+timeo() ->
+ ?PRINT_START,
{ok, Ctx} = erlzmq:context(),
%% Create a disconnected socket.
{ok, Sb} = erlzmq:socket(Ctx, [pull, {active, false}]),
@@ -266,22 +312,40 @@ timeo_test() ->
Buff = <<"12345678ABCDEFGH12345678abcdefgh">>,
ok = erlzmq:send(Sc, Buff),
- {ok, Buff} = erlzmq:recv(Sb),
+ case erlzmq:recv(Sb) of
+ {ok, Buff} ->
+ ok;
+ {error, eagain} ->
+ timeout
+ end,
%% Clean-up.
ok = erlzmq:close(Sc),
ok = erlzmq:close(Sb),
- ok = erlzmq:term (Ctx).
+ ok = erlzmq:term (Ctx),
+ ok,
+ ?PRINT_END.
+timeo_test_() ->
+ % sometimes this test can timeout with the default timeout
+ {timeout, 10, [
+ ?_assert(timeo() =:= ok)
+ ]}.
bad_init_test() ->
- ?assertEqual({error, einval}, erlzmq:context(-1)).
+ ?PRINT_START,
+ ?assertEqual({error, einval}, erlzmq:context(-1)),
+ ?PRINT_END.
shutdown_stress_test() ->
- ?assertMatch(ok, shutdown_stress_loop(10)).
+ ?PRINT_START,
+ ?assertMatch(ok, shutdown_stress_loop(10)),
+ ?PRINT_END.
version_test() ->
+ ?PRINT_START,
{Major, Minor, Patch} = erlzmq:version(),
- ?assert(is_integer(Major) andalso is_integer(Minor) andalso is_integer(Patch)).
+ ?assert(is_integer(Major) andalso is_integer(Minor) andalso is_integer(Patch)),
+ ?PRINT_END.
shutdown_stress_loop(0) ->
ok;
@@ -295,27 +359,40 @@ shutdown_stress_loop(N) ->
shutdown_stress_loop(N-1).
shutdown_no_blocking_test() ->
+ ?PRINT_START,
{ok, C} = erlzmq:context(),
{ok, S} = erlzmq:socket(C, [pub, {active, false}]),
erlzmq:close(S),
- ?assertEqual(ok, erlzmq:term(C, 500)).
+ ?assertEqual(ok, erlzmq:term(C, 500)),
+ ?PRINT_END.
shutdown_blocking_test() ->
+ ?PRINT_START,
{ok, C} = erlzmq:context(),
{ok, _S} = erlzmq:socket(C, [pub, {active, false}]),
- ?assertMatch({error, {timeout, _}}, erlzmq:term(C, 0)).
+ case erlzmq:term(C, 0) of
+ {error, {timeout, _}} ->
+ % typical
+ ok;
+ ok ->
+ % very infrequent
+ ok
+ end,
+ ?PRINT_END.
shutdown_blocking_unblocking_test() ->
+ ?PRINT_START,
{ok, C} = erlzmq:context(),
- {ok, S} = erlzmq:socket(C, [pub, {active, false}]),
- V = erlzmq:term(C, 500),
+ {ok, _} = erlzmq:socket(C, [pub, {active, false}]),
+ V = erlzmq:term(C, 0),
?assertMatch({error, {timeout, _}}, V),
{error, {timeout, Ref}} = V,
- erlzmq:close(S),
+ % all remaining sockets are automatically closed by term (i.e., zmq_term)
receive
{Ref, ok} ->
ok
- end.
+ end,
+ ?PRINT_END.
join_procs(0) ->
ok;
Please sign in to comment.
Something went wrong with that request. Please try again.