Skip to content

Commit

Permalink
Revert broken test and fix context termination issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
evax committed Apr 8, 2011
1 parent 9e2e314 commit b7d36bb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
29 changes: 27 additions & 2 deletions c_src/erlzmq_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ NIF(erlzmq_nif_send)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));

enif_mutex_lock(socket->context->mutex);
if (socket->context->thread_socket_name == NULL) {
enif_mutex_unlock(socket->context->mutex);
return return_zmq_errno(env, ETERM);
}
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);

Expand Down Expand Up @@ -580,6 +584,10 @@ NIF(erlzmq_nif_recv)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));

enif_mutex_lock(socket->context->mutex);
if (socket->context->thread_socket_name == NULL) {
enif_mutex_unlock(socket->context->mutex);
return return_zmq_errno(env, ETERM);
}
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);

Expand Down Expand Up @@ -636,6 +644,17 @@ NIF(erlzmq_nif_close)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));

enif_mutex_lock(socket->context->mutex);
if (socket->context->thread_socket_name == NULL) {
// context is gone
enif_mutex_lock(socket->mutex);
zmq_msg_close(&msg);
zmq_close(socket->socket_zmq);
enif_mutex_unlock(socket->mutex);
enif_mutex_destroy(socket->mutex);
enif_release_resource(socket);
enif_mutex_unlock(socket->context->mutex);
return enif_make_atom(env, "ok");
}
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
Expand Down Expand Up @@ -901,6 +920,11 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
}
else if (r->type == ERLZMQ_THREAD_REQUEST_TERM) {
enif_mutex_lock(context->mutex);
free(context->thread_socket_name);
// use this to flag context is over
context->thread_socket_name = NULL;
enif_mutex_unlock(context->mutex);
// cleanup pending requests
for (i = 1; i < vector_count(&requests); ++i) {
erlzmq_thread_request_t * r_old = vector_get(erlzmq_thread_request_t,
Expand All @@ -922,9 +946,10 @@ static void * polling_thread(void * handle)
zmq_close(thread_socket);
zmq_close(context->thread_socket);
enif_mutex_unlock(context->mutex);
enif_mutex_destroy(context->mutex);
free(context->thread_socket_name);
zmq_term(context->context_zmq);
enif_mutex_lock(context->mutex);
enif_mutex_unlock(context->mutex);
enif_mutex_destroy(context->mutex);
enif_release_resource(context);
// notify the waiting request
enif_send(NULL, &r->data.term.pid, r->data.term.env,
Expand Down
4 changes: 2 additions & 2 deletions test/erlzmq_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ shutdown_blocking_test() ->
shutdown_blocking_unblocking_test() ->
{ok, C} = erlzmq:context(),
{ok, S} = erlzmq:socket(C, [pub, {active, false}]),
erlzmq:close(S),
V = erlzmq:term(C, 0),
V = erlzmq:term(C, 500),
?assertMatch({error, {timeout, _}}, V),
{error, {timeout, Ref}} = V,
erlzmq:close(S),
receive
{Ref, ok} ->
ok
Expand Down

0 comments on commit b7d36bb

Please sign in to comment.