Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gdamore/win io lock #1831

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions src/core/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ struct nni_socket {

bool s_closing; // Socket is closing
bool s_closed; // Socket closed, protected by global lock
bool s_ctxwait; // Waiting for contexts to close.

nni_mtx s_pipe_cbs_mtx;
nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
Expand Down Expand Up @@ -734,7 +733,6 @@ nni_sock_shutdown(nni_sock *sock)
// a chance to do so gracefully.

while (!nni_list_empty(&sock->s_ctxs)) {
sock->s_ctxwait = true;
nni_cv_wait(&sock->s_close_cv);
}
nni_mtx_unlock(&sock_lk);
Expand Down Expand Up @@ -798,7 +796,6 @@ nni_sock_close(nni_sock *s)

// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
s->s_ctxwait = true;
while ((s->s_ref > 1) || (!nni_list_empty(&s->s_ctxs))) {
nni_cv_wait(&s->s_close_cv);
}
Expand Down Expand Up @@ -1307,9 +1304,7 @@ nni_ctx_rele(nni_ctx *ctx)
// tries to avoid ID reuse.
nni_id_remove(&ctx_ids, ctx->c_id);
nni_list_remove(&sock->s_ctxs, ctx);
if (sock->s_closed || sock->s_ctxwait) {
nni_cv_wake(&sock->s_close_cv);
}
nni_cv_wake(&sock->s_close_cv);
nni_mtx_unlock(&sock_lk);

nni_ctx_destroy(ctx);
Expand Down Expand Up @@ -1791,9 +1786,7 @@ nni_pipe_remove(nni_pipe *p)
d->d_pipe = NULL;
dialer_timer_start_locked(d); // Kick the timer to redial.
}
if (s->s_closing) {
nni_cv_wake(&s->s_cv);
}
nni_cv_wake(&s->s_cv);
nni_mtx_unlock(&s->s_mx);
}

Expand Down
1 change: 0 additions & 1 deletion src/platform/windows/win_clock.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ nni_clock(void)
int
nni_time_get(uint64_t *seconds, uint32_t *nanoseconds)
{
int rv;
struct timespec ts;
if (timespec_get(&ts, TIME_UTC) == TIME_UTC) {
*seconds = ts.tv_sec;
Expand Down
4 changes: 2 additions & 2 deletions src/platform/windows/win_impl.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -122,7 +122,7 @@ extern void nni_win_udp_sysfini(void);
extern int nni_win_resolv_sysinit(void);
extern void nni_win_resolv_sysfini(void);

extern int nni_win_io_init(nni_win_io *, nni_win_io_cb, void *);
extern int nni_win_io_init(nni_win_io *, HANDLE, nni_win_io_cb, void *);
extern void nni_win_io_fini(nni_win_io *);

extern int nni_win_io_register(HANDLE);
Expand Down
50 changes: 38 additions & 12 deletions src/platform/windows/win_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ static int win_io_nthr = 0;
static HANDLE win_io_h = NULL;
static nni_thr *win_io_thrs;

static SRWLOCK win_io_lock = SRWLOCK_INIT;

static void
win_io_handler(void *arg)
{
Expand All @@ -45,9 +47,15 @@ win_io_handler(void *arg)
break;
}

// The following is done under the win_io_lock to prevent
// a use after free race against nni_win_io_fini.
AcquireSRWLockShared(&win_io_lock);
item = CONTAINING_RECORD(olpd, nni_win_io, olpd);
rv = ok ? 0 : nni_win_error(GetLastError());
item->cb(item, rv, (size_t) cnt);
if (item->cb != NULL) {
rv = ok ? 0 : nni_win_error(GetLastError());
item->cb(item, rv, (size_t) cnt);
}
ReleaseSRWLockShared(&win_io_lock);
}
}

Expand All @@ -61,26 +69,44 @@ nni_win_io_register(HANDLE h)
}

int
nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr)
nni_win_io_init(nni_win_io *io, HANDLE h, nni_win_io_cb cb, void *ptr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to update the header file as well

{
ZeroMemory(&io->olpd, sizeof(io->olpd));

io->cb = cb;
io->ptr = ptr;
io->aio = NULL;
io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (io->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}
io->f = h;
io->cb = cb;
io->ptr = ptr;
io->aio = NULL;
return (0);
}

void
nni_win_io_fini(nni_win_io *io)
{
if (io->olpd.hEvent != NULL) {
CloseHandle((HANDLE) io->olpd.hEvent);
DWORD num;
// Make absolutely sure there is no I/O running.
if (io->f != INVALID_HANDLE_VALUE) {
CancelIoEx(io->f, &io->olpd);
(void) GetOverlappedResult(io->f, &io->olpd, &num, TRUE);
}

// Now acquire the win_io_lock to make sure that no threads are
// running in this loop. Note that there is a subtle race here, where
// in theory its possible for the callback thread to be just about to
// to start the callback (entering this lock). This is very narrow,
// and it's unlikely to be problematic if it does, because we do not
// immedately destroy the data behind the overlapped I/O. Closing this
// race requires stopping all of the threads in the thread pool (which
// means waking them all up). This may be worth doing in the future.
// One problem with this approach is that we have to have a way to wake
// *all* of them, not just one.
//
// One approach to doing this would be to close the handle of the
// completion port itself, and generate a new one. This seems likely
// to be rather expensive?
AcquireSRWLockExclusive(&win_io_lock);
io->cb = NULL;
ReleaseSRWLockExclusive(&win_io_lock);
}

int
Expand Down
4 changes: 2 additions & 2 deletions src/platform/windows/win_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ nni_win_ipc_init(
c->stream.s_get = ipc_get;
c->stream.s_set = ipc_set;

if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) ||
((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) {
if (((rv = nni_win_io_init(&c->recv_io, p, ipc_recv_cb, c)) != 0) ||
((rv = nni_win_io_init(&c->send_io, p, ipc_send_cb, c)) != 0)) {
ipc_free(c);
return (rv);
}
Expand Down
11 changes: 6 additions & 5 deletions src/platform/windows/win_ipclisten.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

typedef struct {
nng_stream_listener sl;
char * path;
char *path;
bool started;
bool closed;
HANDLE f;
Expand All @@ -33,7 +33,7 @@ typedef struct {
static void
ipc_accept_done(ipc_listener *l, int rv)
{
nni_aio * aio;
nni_aio *aio;
HANDLE f;
nng_stream *c;

Expand Down Expand Up @@ -138,7 +138,7 @@ static int
ipc_listener_set_sec_desc(void *arg, const void *buf, size_t sz, nni_type t)
{
ipc_listener *l = arg;
void * desc;
void *desc;
int rv;

if ((rv = nni_copyin_ptr(&desc, buf, sz, t)) != 0) {
Expand Down Expand Up @@ -200,7 +200,7 @@ ipc_listener_listen(void *arg)
ipc_listener *l = arg;
int rv;
HANDLE f;
char * path;
char *path;

nni_mtx_lock(&l->mtx);
if (l->started) {
Expand Down Expand Up @@ -331,7 +331,8 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url)
if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
return (NNG_ENOMEM);
}
if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) {
if ((rv = nni_win_io_init(
&l->io, INVALID_HANDLE_VALUE, ipc_accept_cb, l)) != 0) {
NNI_FREE_STRUCT(l);
return (rv);
}
Expand Down
6 changes: 4 additions & 2 deletions src/platform/windows/win_tcpconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,10 @@ nni_win_tcp_init(nni_tcp_conn **connp, SOCKET s)
c->ops.s_get = tcp_get;
c->ops.s_set = tcp_set;

if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) ||
((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) ||
if (((rv = nni_win_io_init(&c->recv_io, (HANDLE) s, tcp_recv_cb, c)) !=
0) ||
((rv = nni_win_io_init(&c->send_io, (HANDLE) s, tcp_send_cb, c)) !=
0) ||
((rv = nni_win_io_register((HANDLE) s)) != 0)) {
tcp_free(c);
return (rv);
Expand Down
3 changes: 2 additions & 1 deletion src/platform/windows/win_tcpdial.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio)

c->peername = ss;

if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) {
if ((rv = nni_win_io_init(&c->conn_io, (HANDLE) s, tcp_dial_cb, c)) !=
0) {
nni_aio_finish_error(aio, rv);
return;
}
Expand Down
13 changes: 7 additions & 6 deletions src/platform/windows/win_tcplisten.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ tcp_listener_funcs(nni_tcp_listener *l)
static void
tcp_accept_cb(nni_win_io *io, int rv, size_t cnt)
{
nni_tcp_conn * c = io->ptr;
nni_tcp_conn *c = io->ptr;
nni_tcp_listener *l = c->listener;
nni_aio * aio;
nni_aio *aio;
int len1;
int len2;
SOCKADDR * sa1;
SOCKADDR * sa2;
SOCKADDR *sa1;
SOCKADDR *sa2;
BOOL nd;
BOOL ka;

Expand Down Expand Up @@ -270,7 +270,7 @@ static void
tcp_accept_cancel(nni_aio *aio, void *arg, int rv)
{
nni_tcp_listener *l = arg;
nni_tcp_conn * c;
nni_tcp_conn *c;

nni_mtx_lock(&l->mtx);
if ((c = nni_aio_get_prov_data(aio)) != NULL) {
Expand Down Expand Up @@ -322,7 +322,8 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio)
c->listener = l;
c->conn_aio = aio;
nni_aio_set_prov_data(aio, c);
if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) ||
if (((rv = nni_win_io_init(
&c->conn_io, (HANDLE) s, tcp_accept_cb, c)) != 0) ||
((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) {
nni_aio_set_prov_data(aio, NULL);
nni_mtx_unlock(&l->mtx);
Expand Down
17 changes: 9 additions & 8 deletions src/platform/windows/win_udp.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -38,7 +38,7 @@ static void udp_recv_start(nni_plat_udp *);
int
nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
{
nni_plat_udp * u;
nni_plat_udp *u;
SOCKADDR_STORAGE ss;
int sslen;
DWORD no;
Expand Down Expand Up @@ -67,7 +67,8 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa)
(void) setsockopt(
u->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no));

if (((rv = nni_win_io_init(&u->rxio, udp_recv_cb, u)) != 0) ||
if (((rv = nni_win_io_init(&u->rxio, (HANDLE) u->s, udp_recv_cb, u)) !=
0) ||
((rv = nni_win_io_register((HANDLE) u->s)) != 0)) {
nni_plat_udp_close(u);
return (rv);
Expand Down Expand Up @@ -115,10 +116,10 @@ nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio)
{
SOCKADDR_STORAGE to;
int tolen;
nng_sockaddr * sa;
nng_sockaddr *sa;
unsigned naiov;
nni_iov * aiov;
WSABUF * iov;
nni_iov *aiov;
WSABUF *iov;
int rv;
DWORD nsent;

Expand Down Expand Up @@ -191,7 +192,7 @@ udp_recv_cb(nni_win_io *io, int rv, size_t num)
{
nni_plat_udp *u = io->ptr;
nni_sockaddr *sa;
nni_aio * aio;
nni_aio *aio;

nni_mtx_lock(&u->lk);
if ((aio = nni_list_first(&u->rxq)) == NULL) {
Expand Down Expand Up @@ -226,7 +227,7 @@ udp_recv_start(nni_plat_udp *u)
DWORD flags;
nni_iov *aiov;
unsigned naiov;
WSABUF * iov;
WSABUF *iov;
nni_aio *aio;

if ((u->s == INVALID_SOCKET) || (u->closed)) {
Expand Down
Loading
Loading