Skip to content

Commit

Permalink
Refactoring socket send/recv for thread-safety when calling `nn_close…
Browse files Browse the repository at this point in the history
…()` from a separate thread.
  • Loading branch information
JackDunaway committed Jan 7, 2015
1 parent 5881b8b commit d0ebdea
Showing 1 changed file with 66 additions and 51 deletions.
117 changes: 66 additions & 51 deletions src/core/sock.c
Expand Up @@ -543,13 +543,13 @@ int nn_sock_send (struct nn_sock *self, struct nn_msg *msg, int flags)
uint64_t now;
int timeout;

/* Some sockets types cannot be used for sending messages. */
/* Some socket types cannot be used for sending messages. */
if (nn_slow (self->socktype->flags & NN_SOCKTYPE_FLAG_NOSEND))
return -ENOTSUP;

nn_ctx_enter (&self->ctx);

/* Compute the deadline for SNDTIMEO timer. */
/* Compute the deadline for this operation. */
if (self->sndtimeo < 0) {
deadline = -1;
timeout = -1;
Expand All @@ -561,44 +561,50 @@ int nn_sock_send (struct nn_sock *self, struct nn_msg *msg, int flags)

while (1) {

/* If nn_term() was already called, return ETERM. */
if (nn_slow (self->state == NN_SOCK_STATE_ZOMBIE)) {
nn_ctx_leave (&self->ctx);
return -ETERM;
}
switch (self->state) {
case NN_SOCK_STATE_ACTIVE:
/* Attempt send operation without blocking. */
rc = self->sockbase->vfptr->send (self->sockbase, msg);

/* Try to send the message in a non-blocking way. */
rc = self->sockbase->vfptr->send (self->sockbase, msg);
if (nn_fast (rc == 0)) {
/* Release socket mutex to enable operations in other threads. */
nn_ctx_leave (&self->ctx);
return 0;
}
nn_assert (rc < 0);

/* Any unexpected error is forwarded to the caller. */
if (nn_slow (rc != -EAGAIN)) {
/* Immediate success. */
if (nn_fast (rc == 0))
return 0;

/* Any unexpected error is forwarded to the caller. */
if (nn_slow (rc != -EAGAIN))
return rc;

nn_assert (rc == -EAGAIN);

/* Return if call is configured to be non-blocking. */
if (nn_fast (flags & NN_DONTWAIT))
return -EAGAIN;

break;

case NN_SOCK_STATE_ZOMBIE:
nn_ctx_leave (&self->ctx);
return rc;
}
return -ETERM;

/* If the message cannot be sent at the moment and the send call
is non-blocking, return immediately. */
if (nn_fast (flags & NN_DONTWAIT)) {
default:
/* Unexpected state. */
nn_ctx_leave (&self->ctx);
return -EAGAIN;
nn_assert (0);
break;
}

/* While waiting until there is a pipe available for sending, release
context to enable operations on this socket from other threads. */
nn_ctx_leave (&self->ctx);
/* Wait until there is a pipe available for sending. */
rc = nn_efd_wait (&self->sndfd, timeout);
if (nn_slow (rc == -ETIMEDOUT))
return -EAGAIN;
if (nn_slow (rc == -EINTR))
return -EINTR;
errnum_assert (rc == 0, rc);

/* Within context again, ensure a pipe is still available for sending. */
/* Within context again, ensure a pipe is still available. */
nn_ctx_enter (&self->ctx);
if (!nn_efd_wait (&self->sndfd, 0)) {
self->flags |= NN_SOCK_FLAG_OUT;
Expand All @@ -612,6 +618,7 @@ int nn_sock_send (struct nn_sock *self, struct nn_msg *msg, int flags)
}
}

/* Impossible code path. */
nn_assert (0);
}

Expand All @@ -622,13 +629,13 @@ int nn_sock_recv (struct nn_sock *self, struct nn_msg *msg, int flags)
uint64_t now;
int timeout;

/* Some sockets types cannot be used for receiving messages. */
/* Some socket types cannot be used for receiving messages. */
if (nn_slow (self->socktype->flags & NN_SOCKTYPE_FLAG_NORECV))
return -ENOTSUP;

nn_ctx_enter (&self->ctx);

/* Compute the deadline for RCVTIMEO timer. */
/* Compute the deadline for this operation. */
if (self->rcvtimeo < 0) {
deadline = -1;
timeout = -1;
Expand All @@ -639,45 +646,51 @@ int nn_sock_recv (struct nn_sock *self, struct nn_msg *msg, int flags)
}

while (1) {

switch (self->state) {
case NN_SOCK_STATE_ACTIVE:
/* Attempt receive operation without blocking. */
rc = self->sockbase->vfptr->recv (self->sockbase, msg);

/* If nn_term() was already called, return ETERM. */
if (nn_slow (self->state == NN_SOCK_STATE_ZOMBIE)) {
/* Release socket mutex to enable operations in other threads. */
nn_ctx_leave (&self->ctx);
return -ETERM;
}

/* Try to receive the message in a non-blocking way. */
rc = self->sockbase->vfptr->recv (self->sockbase, msg);
if (nn_fast (rc == 0)) {
nn_ctx_leave (&self->ctx);
return 0;
}
nn_assert (rc < 0);
/* Immediate success. */
if (nn_fast (rc == 0))
return 0;

/* Any unexpected error is forwarded to the caller. */
if (nn_slow (rc != -EAGAIN))
return rc;

/* Any unexpected error is forwarded to the caller. */
if (nn_slow (rc != -EAGAIN)) {
nn_assert (rc == -EAGAIN);

/* Return if call is configured to be non-blocking. */
if (nn_fast (flags & NN_DONTWAIT))
return -EAGAIN;

break;

case NN_SOCK_STATE_ZOMBIE:
nn_ctx_leave (&self->ctx);
return rc;
}
return -ETERM;

/* If the message cannot be received at the moment and the recv call
is non-blocking, return immediately. */
if (nn_fast (flags & NN_DONTWAIT)) {
default:
/* Unexpected state. */
nn_ctx_leave (&self->ctx);
return -EAGAIN;
nn_assert (0);
break;
}

/* While waiting until there is a pipe available for recv, release
context to enable operations on this socket from other threads. */
nn_ctx_leave (&self->ctx);
/* Wait until there is a pipe available for receiving. */
rc = nn_efd_wait (&self->rcvfd, timeout);
if (nn_slow (rc == -ETIMEDOUT))
return -EAGAIN;
if (nn_slow (rc == -EINTR))
return -EINTR;
errnum_assert (rc == 0, rc);

/* Within context again, ensure a pipe is still available for sending. */
/* Within context again, ensure a pipe is still available. */
nn_ctx_enter (&self->ctx);
if (!nn_efd_wait (&self->rcvfd, 0)) {
self->flags |= NN_SOCK_FLAG_IN;
Expand Down Expand Up @@ -826,12 +839,13 @@ static void nn_sock_shutdown (struct nn_fsm *self, int src, int type,
nn_ep_stop (ep);
}
break;

case NN_SOCK_ACTION_STOPPED:
/* We get here when the deallocation of the socket was delayed by the
specific socket type. */
sock->state = NN_SOCK_STATE_STOPPED;

break;

default:
nn_fsm_bad_state (sock->state, src, type);
break;
Expand All @@ -847,6 +861,7 @@ static void nn_sock_shutdown (struct nn_fsm *self, int src, int type,
nn_ep_term (ep);
nn_free (ep);
break;

default:
nn_fsm_bad_state (sock->state, src, type);
break;
Expand Down

0 comments on commit d0ebdea

Please sign in to comment.