Skip to content

Commit

Permalink
fixes #419 want to nni_aio_stop without blocking (#428)
Browse files Browse the repository at this point in the history
* fixes #419 want to nni_aio_stop without blocking

This actually introduces an nni_aio_close() API that causes
nni_aio_begin to return NNG_ECLOSED, while scheduling a callback
on the AIO to do an NNG_ECLOSED as well.  This should be called
in non-blocking close() contexts instead of nni_aio_stop(), and
the cases where we call nni_aio_fini() multiple times are updated
updated to add nni_aio_stop() calls on all "interlinked" aios before
finalizing them.

Furthermore, we call nni_aio_close() as soon as practical in the
close path.  This closes an annoying race condition where the
callback from a lower subsystem could wind up rescheduling an
operation that we wanted to abort.
  • Loading branch information
gdamore committed May 15, 2018
1 parent 16b4c40 commit 1d03348
Show file tree
Hide file tree
Showing 37 changed files with 495 additions and 278 deletions.
45 changes: 33 additions & 12 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ struct nng_aio {
nni_duration a_timeout; // Relative timeout

// These fields are private to the aio framework.
bool a_stop; // shutting down (no new operations)
bool a_sleep; // sleeping with no action
int a_sleeprv; // result when sleep wakes
int a_cancelrv; // if canceled between begin and schedule
bool a_stop; // shutting down (no new operations)
bool a_closed; // close called, but not fini (yet)
bool a_sleep; // sleeping with no action
int a_sleeprv; // result when sleep wakes
nni_task *a_task;

// Read/write operations.
Expand Down Expand Up @@ -204,6 +204,24 @@ nni_aio_stop(nni_aio *aio)
}
}

void
nni_aio_close(nni_aio *aio)
{
if (aio != NULL) {
nni_aio_cancelfn cancelfn;

nni_mtx_lock(&nni_aio_lk);
cancelfn = aio->a_prov_cancel;
aio->a_prov_cancel = NULL;
aio->a_closed = true;
nni_mtx_unlock(&nni_aio_lk);

if (cancelfn != NULL) {
cancelfn(aio, NNG_ECLOSED);
}
}
}

void
nni_aio_set_timeout(nni_aio *aio, nni_duration when)
{
Expand Down Expand Up @@ -306,19 +324,25 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
aio->a_cancelrv = 0;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
nni_task_prep(aio->a_task);
if (aio->a_closed) {
aio->a_result = NNG_ECLOSED;
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
nni_mtx_unlock(&nni_aio_lk);
nni_task_dispatch(aio->a_task);
return (NNG_ECLOSED);
}
nni_mtx_unlock(&nni_aio_lk);
return (0);
}

int
nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
int rv;
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
Expand All @@ -339,19 +363,16 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
if ((rv = aio->a_cancelrv) != 0) {
if (aio->a_closed) {
nni_mtx_unlock(&nni_aio_lk);
return (rv);
return (NNG_ECLOSED);
}

// If cancellation occurred in between "begin" and "schedule",
// then cancel it right now.
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
if ((rv = aio->a_cancelrv) != 0) {
aio->a_expire = 0;
nni_aio_expire_add(aio);
} else if (aio->a_expire != NNI_TIME_NEVER) {
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
Expand Down
6 changes: 6 additions & 0 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ extern void nni_aio_fini(nni_aio *);
// use nni_aio_cancel instead.)
extern void nni_aio_stop(nni_aio *);

// nni_aio_close closes the aio for further activity. It aborts any in-progress
// transaction (if it can), and future calls nni_aio_begin or nni_aio_schedule
// with both result in NNG_ECLOSED. The expectation is that protocols call this
// for all their aios in a stop routine, before calling fini on any of them.
extern void nni_aio_close(nni_aio *);

// nni_aio_set_data sets user data. This should only be done by the
// consumer, initiating the I/O. The intention is to be able to store
// additional data for use when the operation callback is executed.
Expand Down
4 changes: 2 additions & 2 deletions src/core/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ typedef struct nni_ctx nni_ctx;
typedef struct nni_ep nni_ep;
typedef struct nni_pipe nni_pipe;
typedef struct nni_tran nni_tran;
typedef struct nni_tran_ep nni_tran_ep;
typedef struct nni_tran_ep_ops nni_tran_ep_ops;
typedef struct nni_tran_ep_option nni_tran_ep_option;
typedef struct nni_tran_pipe nni_tran_pipe;
typedef struct nni_tran_pipe_ops nni_tran_pipe_ops;
typedef struct nni_tran_pipe_option nni_tran_pipe_option;

typedef struct nni_proto_ctx_option nni_proto_ctx_option;
Expand Down
4 changes: 2 additions & 2 deletions src/core/device.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
if ((s1 == NULL) || (s2 == NULL)) {
return (NNG_EINVAL);
}
if ((nni_sock_peer(s1) != nni_sock_proto(s2)) ||
(nni_sock_peer(s2) != nni_sock_proto(s1))) {
if ((nni_sock_peer_id(s1) != nni_sock_proto_id(s2)) ||
(nni_sock_peer_id(s2) != nni_sock_proto_id(s1))) {
return (NNG_EINVAL);
}

Expand Down
64 changes: 32 additions & 32 deletions src/core/endpt.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,30 @@
#include <string.h>

struct nni_ep {
nni_tran_ep ep_ops; // transport ops
nni_tran * ep_tran; // transport pointer
void * ep_data; // transport private
uint64_t ep_id; // endpoint id
nni_list_node ep_node; // per socket list
nni_sock * ep_sock;
nni_url * ep_url;
int ep_mode;
int ep_started;
int ep_closed; // full shutdown
int ep_closing; // close pending (waiting on refcnt)
int ep_refcnt;
bool ep_tmo_run;
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
nni_aio * ep_acc_aio;
nni_aio * ep_con_aio;
nni_aio * ep_con_syn; // used for sync connect
nni_aio * ep_tmo_aio; // backoff timer
nni_duration ep_maxrtime; // maximum time for reconnect
nni_duration ep_currtime; // current time for reconnect
nni_duration ep_inirtime; // initial time for reconnect
nni_time ep_conntime; // time of last good connect
nni_tran_ep_ops ep_ops; // transport ops
nni_tran * ep_tran; // transport pointer
void * ep_data; // transport private
uint64_t ep_id; // endpoint id
nni_list_node ep_node; // per socket list
nni_sock * ep_sock;
nni_url * ep_url;
int ep_mode;
int ep_started;
int ep_closed; // full shutdown
int ep_closing; // close pending (waiting on refcnt)
int ep_refcnt;
bool ep_tmo_run;
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
nni_aio * ep_acc_aio;
nni_aio * ep_con_aio;
nni_aio * ep_con_syn; // used for sync connect
nni_aio * ep_tmo_aio; // backoff timer
nni_duration ep_maxrtime; // maximum time for reconnect
nni_duration ep_currtime; // current time for reconnect
nni_duration ep_inirtime; // initial time for reconnect
nni_time ep_conntime; // time of last good connect
};

// Functionality related to end points.
Expand Down Expand Up @@ -249,10 +249,10 @@ nni_ep_shutdown(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);

// Abort any remaining in-flight operations.
nni_aio_abort(ep->ep_acc_aio, NNG_ECLOSED);
nni_aio_abort(ep->ep_con_aio, NNG_ECLOSED);
nni_aio_abort(ep->ep_con_syn, NNG_ECLOSED);
nni_aio_abort(ep->ep_tmo_aio, NNG_ECLOSED);
nni_aio_close(ep->ep_acc_aio);
nni_aio_close(ep->ep_con_aio);
nni_aio_close(ep->ep_con_syn);
nni_aio_close(ep->ep_tmo_aio);

// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
Expand All @@ -276,10 +276,10 @@ nni_ep_close(nni_ep *ep)

nni_ep_shutdown(ep);

nni_aio_stop(ep->ep_acc_aio);
nni_aio_stop(ep->ep_con_aio);
nni_aio_stop(ep->ep_con_syn);
nni_aio_stop(ep->ep_tmo_aio);
nni_aio_close(ep->ep_acc_aio);
nni_aio_close(ep->ep_con_aio);
nni_aio_close(ep->ep_con_syn);
nni_aio_close(ep->ep_tmo_aio);

nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
Expand Down
Loading

0 comments on commit 1d03348

Please sign in to comment.