Browse files

Fix socket handling of send operations when the socket is in active m…

…ode.
  • Loading branch information...
1 parent 0139502 commit cadc359b835768c88a65a68998eec2b08d568f65 @okeuday okeuday committed Mar 28, 2011
Showing with 41 additions and 24 deletions.
  1. +41 −24 c_src/erlzmq_nif.c
View
65 c_src/erlzmq_nif.c
@@ -470,18 +470,26 @@ NIF(erlzmq_nif_send)
memcpy(zmq_msg_data(&req.data.send.msg), binary.data, binary.size);
- enif_mutex_lock(socket->mutex);
- if (zmq_send(socket->socket_zmq, &req.data.send.msg,
- req.data.send.flags | ZMQ_NOBLOCK)) {
- enif_mutex_unlock(socket->mutex);
-
- int const error = zmq_errno();
- if (error != EAGAIN ||
- (error == EAGAIN && (req.data.send.flags & ZMQ_NOBLOCK))) {
- zmq_msg_close(&req.data.send.msg);
- return return_zmq_errno(env, error);
+ int polling_thread_send = 1;
+ if (! socket->active) {
+ enif_mutex_lock(socket->mutex);
+ if (zmq_send(socket->socket_zmq, &req.data.send.msg,
+ req.data.send.flags | ZMQ_NOBLOCK)) {
+ enif_mutex_unlock(socket->mutex);
+ int const error = zmq_errno();
+ if (error != EAGAIN ||
+ (error == EAGAIN && (req.data.send.flags & ZMQ_NOBLOCK))) {
+ zmq_msg_close(&req.data.send.msg);
+ return return_zmq_errno(env, error);
+ }
}
-
+ else {
+ enif_mutex_unlock(socket->mutex);
+ polling_thread_send = 0;
+ }
+ }
+
+ if (polling_thread_send) {
req.type = ERLZMQ_THREAD_REQUEST_SEND;
req.data.send.env = enif_alloc_env();
req.data.send.ref = enif_make_ref(req.data.send.env);
@@ -497,22 +505,26 @@ NIF(erlzmq_nif_send)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ enif_mutex_lock(socket->context->mutex);
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ enif_mutex_unlock(socket->context->mutex);
+
zmq_msg_close(&msg);
zmq_msg_close(&req.data.send.msg);
enif_free_env(req.data.send.env);
return return_zmq_errno(env, zmq_errno());
}
+ else {
+ enif_mutex_unlock(socket->context->mutex);
- zmq_msg_close(&msg);
- // each pointer to the socket in a request increments the reference
- enif_keep_resource(socket);
-
- return enif_make_copy(env, req.data.send.ref);
+ zmq_msg_close(&msg);
+ // each pointer to the socket in a request increments the reference
+ enif_keep_resource(socket);
+
+ return enif_make_copy(env, req.data.send.ref);
+ }
}
else {
- enif_mutex_unlock(socket->mutex);
-
zmq_msg_close(&req.data.send.msg);
return enif_make_atom(env, "ok");
@@ -554,7 +566,6 @@ NIF(erlzmq_nif_recv)
(error == EAGAIN && (req.data.recv.flags & ZMQ_NOBLOCK))) {
return return_zmq_errno(env, error);
}
-
req.type = ERLZMQ_THREAD_REQUEST_RECV;
req.data.recv.env = enif_alloc_env();
req.data.recv.ref = enif_make_ref(req.data.recv.env);
@@ -568,17 +579,23 @@ NIF(erlzmq_nif_recv)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
+ enif_mutex_lock(socket->context->mutex);
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ enif_mutex_unlock(socket->context->mutex);
+
zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
+ else {
+ enif_mutex_unlock(socket->context->mutex);
- zmq_msg_close(&msg);
- // each pointer to the socket in a request increments the reference
- enif_keep_resource(socket);
-
- return enif_make_copy(env, req.data.recv.ref);
+ zmq_msg_close(&msg);
+ // each pointer to the socket in a request increments the reference
+ enif_keep_resource(socket);
+
+ return enif_make_copy(env, req.data.recv.ref);
+ }
}
else {
enif_mutex_unlock(socket->mutex);

0 comments on commit cadc359

Please sign in to comment.