Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gdamore/win io lock #1831

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ struct nni_socket {

bool s_closing; // Socket is closing
bool s_closed; // Socket closed, protected by global lock
bool s_ctxwait; // Waiting for contexts to close.

nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
Expand Down Expand Up @@ -734,7 +733,6 @@ nni_sock_shutdown(nni_sock *sock)
// a chance to do so gracefully.

while (!nni_list_empty(&sock->s_ctxs)) {
sock->s_ctxwait = true;
nni_cv_wait(&sock->s_close_cv);
}
nni_mtx_unlock(&sock_lk);
Expand Down Expand Up @@ -798,7 +796,6 @@ nni_sock_close(nni_sock *s)

// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
s->s_ctxwait = true;
while ((s->s_ref > 1) || (!nni_list_empty(&s->s_ctxs))) {
nni_cv_wait(&s->s_close_cv);
}
Expand Down Expand Up @@ -1307,9 +1304,7 @@ nni_ctx_rele(nni_ctx *ctx)
// tries to avoid ID reuse.
nni_id_remove(&ctx_ids, ctx->c_id);
nni_list_remove(&sock->s_ctxs, ctx);
if (sock->s_closed || sock->s_ctxwait) {
nni_cv_wake(&sock->s_close_cv);
}
nni_cv_wake(&sock->s_close_cv);
nni_mtx_unlock(&sock_lk);

nni_ctx_destroy(ctx);
Expand Down Expand Up @@ -1791,9 +1786,7 @@ nni_pipe_remove(nni_pipe *p)
d->d_pipe = NULL;
dialer_timer_start_locked(d); // Kick the timer to redial.
}
if (s->s_closing) {
nni_cv_wake(&s->s_cv);
}
nni_cv_wake(&s->s_cv);
nni_mtx_unlock(&s->s_mx);
}

Expand Down
50 changes: 38 additions & 12 deletions src/platform/windows/win_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ static int win_io_nthr = 0;
static HANDLE win_io_h = NULL;
static nni_thr *win_io_thrs;

static SRWLock win_io_lock = SRWLOCK_INIT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static SRWLock win_io_lock = SRWLOCK_INIT;
static SRWLOCK win_io_lock = SRWLOCK_INIT;


static void
win_io_handler(void *arg)
{
Expand All @@ -45,9 +47,15 @@ win_io_handler(void *arg)
break;
}

// The following is done under the win_io_lock to prevent
// a use after free race against nni_win_io_fini.
AcquireSRWLockShared(&win_io_lock);
item = CONTAINING_RECORD(olpd, nni_win_io, olpd);
rv = ok ? 0 : nni_win_error(GetLastError());
item->cb(item, rv, (size_t) cnt);
if (item->cb != NULL) {
rv = ok ? 0 : nni_win_error(GetLastError());
item->cb(item, rv, (size_t) cnt);
}
ReleaseSRWLockShared(&win_io_lock);
}
}

Expand All @@ -61,26 +69,44 @@ nni_win_io_register(HANDLE h)
}

int
nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr)
nni_win_io_init(nni_win_io *io, HANDLE h, nni_win_io_cb cb, void *ptr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to update the header file as well

{
ZeroMemory(&io->olpd, sizeof(io->olpd));

io->cb = cb;
io->ptr = ptr;
io->aio = NULL;
io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (io->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}
io->f = h;
io->cb = cb;
io->ptr = ptr;
io->aio = NULL;
return (0);
}

void
nni_win_io_fini(nni_win_io *io)
{
if (io->olpd.hEvent != NULL) {
CloseHandle((HANDLE) io->olpd.hEvent);
DWORD num;
// Make absolutely sure there is no I/O running.
if (io->f != INVALID_HANDLE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (io->f != INVALID_HANDLE) {
if (io->f != INVALID_HANDLE_VALUE) {

CancelIoEx(io->f, &o->olpd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CancelIoEx(io->f, &o->olpd);
CancelIoEx(io->f, &io->olpd);

(void) GetOverlappedResult(io->f, &io->olpd, &num, TRUE);
}

// Now acquire the win_io_lock to make sure that no threads are
// running in this loop. Note that there is a subtle race here, where
// in theory its possible for the callback thread to be just about to
// to start the callback (entering this lock). This is very narrow,
// and it's unlikely to be problematic if it does, because we do not
// immedately destroy the data behind the overlapped I/O. Closing this
// race requires stopping all of the threads in the thread pool (which
// means waking them all up). This may be worth doing in the future.
// One problem with this approach is that we have to have a way to wake
// *all* of them, not just one.
//
// One approach to doing this would be to close the handle of the
// completion port itself, and generate a new one. This seems likely
// to be rather expensive?
AcquireSRWLockExclusive(&win_io_lock);
io->cb = NULL;
ReleaseSRWLockExclusive(&win_io_lock);
}

int
Expand Down
4 changes: 2 additions & 2 deletions src/platform/windows/win_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ nni_win_ipc_init(
c->stream.s_get = ipc_get;
c->stream.s_set = ipc_set;

if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
if (((rv = nni_win_io_init(&c->recv_io, p, ipc_recv_cb, c)) != 0) ||
((rv = nni_win_io_init(&c->send_io, p, ipc_send_cb, c)) != 0)) {
ipc_free(c);
return (rv);
}
Expand Down
11 changes: 6 additions & 5 deletions src/platform/windows/win_ipclisten.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

typedef struct {
nng_stream_listener sl;
char * path;
char *path;
bool started;
bool closed;
HANDLE f;
Expand All @@ -33,7 +33,7 @@ typedef struct {
static void
ipc_accept_done(ipc_listener *l, int rv)
{
nni_aio * aio;
nni_aio *aio;
HANDLE f;
nng_stream *c;

Expand Down Expand Up @@ -138,7 +138,7 @@ static int
ipc_listener_set_sec_desc(void *arg, const void *buf, size_t sz, nni_type t)
{
ipc_listener *l = arg;
void * desc;
void *desc;
int rv;

if ((rv = nni_copyin_ptr(&desc, buf, sz, t)) != 0) {
Expand Down Expand Up @@ -200,7 +200,7 @@ ipc_listener_listen(void *arg)
ipc_listener *l = arg;
int rv;
HANDLE f;
char * path;
char *path;

nni_mtx_lock(&l->mtx);
if (l->started) {
Expand Down Expand Up @@ -331,7 +331,8 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) {
if ((rv = nni_win_io_init(&l->io, INVALID_HANDLE, ipc_accept_cb, l)) !=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ((rv = nni_win_io_init(&l->io, INVALID_HANDLE, ipc_accept_cb, l)) !=
if ((rv = nni_win_io_init(&l->io, INVALID_HANDLE_VALUE, ipc_accept_cb, l)) !=

0) {
NNI_FREE_STRUCT(l);
return (rv);
}
Expand Down
4 changes: 2 additions & 2 deletions src/platform/windows/win_tcpconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ nni_win_tcp_init(nni_tcp_conn **connp, SOCKET s)
c->ops.s_get = tcp_get;
c->ops.s_set = tcp_set;

if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) ||
((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) ||
if (((rv = nni_win_io_init(&c->recv_io, s, tcp_recv_cb, c)) != 0) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remember to update the function declaration in win_impl.h:125

((rv = nni_win_io_init(&c->send_io, s, tcp_send_cb, c)) != 0) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (((rv = nni_win_io_init(&c->recv_io, s, tcp_recv_cb, c)) != 0) ||
((rv = nni_win_io_init(&c->send_io, s, tcp_send_cb, c)) != 0) ||
if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) != 0) ||
((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) != 0) ||

((rv = nni_win_io_register((HANDLE) s)) != 0)) {
tcp_free(c);
return (rv);
Expand Down
2 changes: 1 addition & 1 deletion src/platform/windows/win_tcpdial.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)

c->peername = ss;

if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) {
if ((rv = nni_win_io_init(&c->conn_io, s, tcp_dial_cb, c)) != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ((rv = nni_win_io_init(&c->conn_io, s, tcp_dial_cb, c)) != 0) {
if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) != 0) {

nni_aio_finish_error(aio, rv);
return;
}
Expand Down
12 changes: 6 additions & 6 deletions src/platform/windows/win_tcplisten.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ tcp_listener_funcs(nni_tcp_listener *l)
static void
tcp_accept_cb(nni_win_io *io, int rv, size_t cnt)
{
nni_tcp_conn * c = io->ptr;
nni_tcp_conn *c = io->ptr;
nni_tcp_listener *l = c->listener;
nni_aio * aio;
nni_aio *aio;
int len1;
int len2;
SOCKADDR * sa1;
SOCKADDR * sa2;
SOCKADDR *sa1;
SOCKADDR *sa2;
BOOL nd;
BOOL ka;

Expand Down Expand Up @@ -270,7 +270,7 @@ static void
tcp_accept_cancel(nni_aio *aio, void *arg, int rv)
{
nni_tcp_listener *l = arg;
nni_tcp_conn * c;
nni_tcp_conn *c;

nni_mtx_lock(&l->mtx);
if ((c = nni_aio_get_prov_data(aio)) != NULL) {
Expand Down Expand Up @@ -322,7 +322,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
c->listener = l;
c->conn_aio = aio;
nni_aio_set_prov_data(aio, c);
if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) ||
if (((rv = nni_win_io_init(&c->conn_io, s, tcp_accept_cb, c)) != 0) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (((rv = nni_win_io_init(&c->conn_io, s, tcp_accept_cb, c)) != 0) ||
if (((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_accept_cb, c)) != 0) ||

((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) {
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&l->mtx);
Expand Down
14 changes: 7 additions & 7 deletions src/platform/windows/win_udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ static void udp_recv_start(nni_plat_udp *);
int
nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
{
nni_plat_udp * u;
nni_plat_udp *u;
SOCKADDR_STORAGE ss;
int sslen;
DWORD no;
Expand Down Expand Up @@ -67,7 +67,7 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
(void) setsockopt(
u->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));

if (((rv = nni_win_io_init(&u->rxio, udp_recv_cb, u)) != 0) ||
if (((rv = nni_win_io_init(&u->rxio, u->s, udp_recv_cb, u)) != 0) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (((rv = nni_win_io_init(&u->rxio, u->s, udp_recv_cb, u)) != 0) ||
if (((rv = nni_win_io_init(&u->rxio, (HANDLE) u->s, udp_recv_cb, u)) != 0) ||

((rv = nni_win_io_register((HANDLE) u->s)) != 0)) {
nni_plat_udp_close(u);
return (rv);
Expand Down Expand Up @@ -115,10 +115,10 @@ nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio)
{
SOCKADDR_STORAGE to;
int tolen;
nng_sockaddr * sa;
nng_sockaddr *sa;
unsigned naiov;
nni_iov * aiov;
WSABUF * iov;
nni_iov *aiov;
WSABUF *iov;
int rv;
DWORD nsent;

Expand Down Expand Up @@ -191,7 +191,7 @@ udp_recv_cb(nni_win_io *io, int rv, size_t num)
{
nni_plat_udp *u = io->ptr;
nni_sockaddr *sa;
nni_aio * aio;
nni_aio *aio;

nni_mtx_lock(&u->lk);
if ((aio = nni_list_first(&u->rxq)) == NULL) {
Expand Down Expand Up @@ -226,7 +226,7 @@ udp_recv_start(nni_plat_udp *u)
DWORD flags;
nni_iov *aiov;
unsigned naiov;
WSABUF * iov;
WSABUF *iov;
nni_aio *aio;

if ((u->s == INVALID_SOCKET) || (u->closed)) {
Expand Down
22 changes: 10 additions & 12 deletions src/sp/transport/ipc/ipc.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
// Copyright 2019 Devolutions <info@devolutions.net>
//
Expand Down Expand Up @@ -65,7 +65,7 @@ struct ipc_ep {
nni_aio *time_aio;
nni_list busy_pipes; // busy pipes -- ones passed to socket
nni_list wait_pipes; // pipes waiting to match to socket
nni_list neg_pipes; // pipes busy negotiating
nni_list nego_pipes; // pipes busy negotiating
nni_reap_node reap;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_rcv_max;
Expand All @@ -76,7 +76,7 @@ static void ipc_pipe_send_start(ipc_pipe *p);
static void ipc_pipe_recv_start(ipc_pipe *p);
static void ipc_pipe_send_cb(void *);
static void ipc_pipe_recv_cb(void *);
static void ipc_pipe_neg_cb(void *);
static void ipc_pipe_nego_cb(void *);
static void ipc_pipe_fini(void *);
static void ipc_ep_fini(void *);

Expand Down Expand Up @@ -165,9 +165,6 @@ static void
ipc_pipe_reap(ipc_pipe *p)
{
if (!nni_atomic_flag_test_and_set(&p->reaped)) {
if (p->conn != NULL) {
nng_stream_close(p->conn);
}
nni_reap(&ipc_pipe_reap_list, p);
}
}
Expand All @@ -183,7 +180,7 @@ ipc_pipe_alloc(ipc_pipe **pipe_p)
nni_mtx_init(&p->mtx);
nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p);
nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p);
nni_aio_init(&p->neg_aio, ipc_pipe_neg_cb, p);
nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p);
nni_aio_list_init(&p->send_q);
nni_aio_list_init(&p->recv_q);
nni_atomic_flag_reset(&p->reaped);
Expand All @@ -210,7 +207,7 @@ ipc_ep_match(ipc_ep *ep)
}

static void
ipc_pipe_neg_cb(void *arg)
ipc_pipe_nego_cb(void *arg)
{
ipc_pipe *p = arg;
ipc_ep *ep = p->ep;
Expand Down Expand Up @@ -261,7 +258,7 @@ ipc_pipe_neg_cb(void *arg)

// We are ready now. We put this in the wait list, and
// then try to run the matcher.
nni_list_remove(&ep->neg_pipes, p);
nni_list_remove(&ep->nego_pipes, p);
nni_list_append(&ep->wait_pipes, p);

ipc_ep_match(ep);
Expand All @@ -276,6 +273,7 @@ ipc_pipe_neg_cb(void *arg)
if (rv == NNG_ECLOSED) {
rv = NNG_ECONNSHUT;
}
nni_list_remove(&ep->nego_pipes, p);
nng_stream_close(p->conn);
// If we are waiting to negotiate on a client side, then a failure
// here has to be passed to the user app.
Expand Down Expand Up @@ -658,7 +656,7 @@ ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep)
iov.iov_len = 8;
iov.iov_buf = &p->tx_head[0];
nni_aio_set_iov(&p->neg_aio, 1, &iov);
nni_list_append(&ep->neg_pipes, p);
nni_list_append(&ep->nego_pipes, p);

nni_aio_set_timeout(&p->neg_aio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, &p->neg_aio);
Expand All @@ -679,7 +677,7 @@ ipc_ep_close(void *arg)
if (ep->listener != NULL) {
nng_stream_listener_close(ep->listener);
}
NNI_LIST_FOREACH (&ep->neg_pipes, p) {
NNI_LIST_FOREACH (&ep->nego_pipes, p) {
ipc_pipe_close(p);
}
NNI_LIST_FOREACH (&ep->wait_pipes, p) {
Expand Down Expand Up @@ -835,7 +833,7 @@ ipc_ep_init(ipc_ep **epp, nni_sock *sock)
nni_mtx_init(&ep->mtx);
NNI_LIST_INIT(&ep->busy_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->wait_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->neg_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->nego_pipes, ipc_pipe, node);

ep->proto = nni_sock_proto_id(sock);

Expand Down
2 changes: 2 additions & 0 deletions src/sp/transport/tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@
ep->useraio = NULL;
nni_aio_finish_error(uaio, rv);
}
nni_list_remove(&ep->negopipes, p);

Check warning on line 296 in src/sp/transport/tcp/tcp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/tcp/tcp.c#L296

Added line #L296 was not covered by tests
nni_mtx_unlock(&ep->mtx);

tcptran_pipe_reap(p);
}

Expand Down
Loading
Loading