diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c index 1e985130b..3351b9ddb 100644 --- a/src/platform/windows/win_io.c +++ b/src/platform/windows/win_io.c @@ -23,6 +23,8 @@ static int win_io_nthr = 0; static HANDLE win_io_h = NULL; static nni_thr *win_io_thrs; +static SRWLock win_io_lock = SRWLOCK_INIT; + static void win_io_handler(void *arg) { @@ -45,9 +47,15 @@ win_io_handler(void *arg) break; } + // The following is done under the win_io_lock to prevent + // a use after free race against nni_win_io_fini. + AcquireSRWLockShared(&win_io_lock); item = CONTAINING_RECORD(olpd, nni_win_io, olpd); - rv = ok ? 0 : nni_win_error(GetLastError()); - item->cb(item, rv, (size_t) cnt); + if (item->cb != NULL) { + rv = ok ? 0 : nni_win_error(GetLastError()); + item->cb(item, rv, (size_t) cnt); + } + ReleaseSRWLockShared(&win_io_lock); } } @@ -61,26 +69,44 @@ nni_win_io_register(HANDLE h) } int -nni_win_io_init(nni_win_io *io, nni_win_io_cb cb, void *ptr) +nni_win_io_init(nni_win_io *io, HANDLE h, nni_win_io_cb cb, void *ptr) { ZeroMemory(&io->olpd, sizeof(io->olpd)); - io->cb = cb; - io->ptr = ptr; - io->aio = NULL; - io->olpd.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (io->olpd.hEvent == NULL) { - return (nni_win_error(GetLastError())); - } + io->f = h; + io->cb = cb; + io->ptr = ptr; + io->aio = NULL; return (0); } void nni_win_io_fini(nni_win_io *io) { - if (io->olpd.hEvent != NULL) { - CloseHandle((HANDLE) io->olpd.hEvent); + DWORD num; + // Make absolutely sure there is no I/O running. + if (io->f != INVALID_HANDLE) { + CancelIoEx(io->f, &o->olpd); + (void) GetOverlappedResult(io->f, &io->olpd, &num, TRUE); } + + // Now acquire the win_io_lock to make sure that no threads are + // running in this loop. Note that there is a subtle race here, where + // in theory its possible for the callback thread to be just about to + // to start the callback (entering this lock). This is very narrow, + // and it's unlikely to be problematic if it does, because we do not + // immedately destroy the data behind the overlapped I/O. Closing this + // race requires stopping all of the threads in the thread pool (which + // means waking them all up). This may be worth doing in the future. + // One problem with this approach is that we have to have a way to wake + // *all* of them, not just one. + // + // One approach to doing this would be to close the handle of the + // completion port itself, and generate a new one. This seems likely + // to be rather expensive? + AcquireSRWLockExclusive(&win_io_lock); + io->cb = NULL; + ReleaseSRWLockExclusive(&win_io_lock); } int diff --git a/src/platform/windows/win_ipcconn.c b/src/platform/windows/win_ipcconn.c index abe92604b..86dd842be 100644 --- a/src/platform/windows/win_ipcconn.c +++ b/src/platform/windows/win_ipcconn.c @@ -422,8 +422,8 @@ nni_win_ipc_init( c->stream.s_get = ipc_get; c->stream.s_set = ipc_set; - if (((rv = nni_win_io_init(&c->recv_io, ipc_recv_cb, c)) != 0) || - ((rv = nni_win_io_init(&c->send_io, ipc_send_cb, c)) != 0)) { + if (((rv = nni_win_io_init(&c->recv_io, p, ipc_recv_cb, c)) != 0) || + ((rv = nni_win_io_init(&c->send_io, p, ipc_send_cb, c)) != 0)) { ipc_free(c); return (rv); } diff --git a/src/platform/windows/win_ipclisten.c b/src/platform/windows/win_ipclisten.c index 07969e974..a8f60a190 100644 --- a/src/platform/windows/win_ipclisten.c +++ b/src/platform/windows/win_ipclisten.c @@ -17,7 +17,7 @@ typedef struct { nng_stream_listener sl; - char * path; + char *path; bool started; bool closed; HANDLE f; @@ -33,7 +33,7 @@ typedef struct { static void ipc_accept_done(ipc_listener *l, int rv) { - nni_aio * aio; + nni_aio *aio; HANDLE f; nng_stream *c; @@ -138,7 +138,7 @@ static int ipc_listener_set_sec_desc(void *arg, const void *buf, size_t sz, nni_type t) { ipc_listener *l = arg; - void * desc; + void *desc; int rv; if ((rv = nni_copyin_ptr(&desc, buf, sz, t)) != 0) { @@ -200,7 +200,7 @@ ipc_listener_listen(void *arg) ipc_listener *l = arg; int rv; HANDLE f; - char * path; + char *path; nni_mtx_lock(&l->mtx); if (l->started) { @@ -331,7 +331,8 @@ nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_win_io_init(&l->io, ipc_accept_cb, l)) != 0) { + if ((rv = nni_win_io_init(&l->io, INVALID_HANDLE, ipc_accept_cb, l)) != + 0) { NNI_FREE_STRUCT(l); return (rv); } diff --git a/src/platform/windows/win_tcpconn.c b/src/platform/windows/win_tcpconn.c index fad7c495b..19d17c393 100644 --- a/src/platform/windows/win_tcpconn.c +++ b/src/platform/windows/win_tcpconn.c @@ -448,8 +448,8 @@ nni_win_tcp_init(nni_tcp_conn **connp, SOCKET s) c->ops.s_get = tcp_get; c->ops.s_set = tcp_set; - if (((rv = nni_win_io_init(&c->recv_io, tcp_recv_cb, c)) != 0) || - ((rv = nni_win_io_init(&c->send_io, tcp_send_cb, c)) != 0) || + if (((rv = nni_win_io_init(&c->recv_io, s, tcp_recv_cb, c)) != 0) || + ((rv = nni_win_io_init(&c->send_io, s, tcp_send_cb, c)) != 0) || ((rv = nni_win_io_register((HANDLE) s)) != 0)) { tcp_free(c); return (rv); diff --git a/src/platform/windows/win_tcpdial.c b/src/platform/windows/win_tcpdial.c index b4a6d7459..479562389 100644 --- a/src/platform/windows/win_tcpdial.c +++ b/src/platform/windows/win_tcpdial.c @@ -208,7 +208,7 @@ nni_tcp_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) c->peername = ss; - if ((rv = nni_win_io_init(&c->conn_io, tcp_dial_cb, c)) != 0) { + if ((rv = nni_win_io_init(&c->conn_io, s, tcp_dial_cb, c)) != 0) { nni_aio_finish_error(aio, rv); return; } diff --git a/src/platform/windows/win_tcplisten.c b/src/platform/windows/win_tcplisten.c index 7cfef8d24..e1aeddad6 100644 --- a/src/platform/windows/win_tcplisten.c +++ b/src/platform/windows/win_tcplisten.c @@ -81,13 +81,13 @@ tcp_listener_funcs(nni_tcp_listener *l) static void tcp_accept_cb(nni_win_io *io, int rv, size_t cnt) { - nni_tcp_conn * c = io->ptr; + nni_tcp_conn *c = io->ptr; nni_tcp_listener *l = c->listener; - nni_aio * aio; + nni_aio *aio; int len1; int len2; - SOCKADDR * sa1; - SOCKADDR * sa2; + SOCKADDR *sa1; + SOCKADDR *sa2; BOOL nd; BOOL ka; @@ -270,7 +270,7 @@ static void tcp_accept_cancel(nni_aio *aio, void *arg, int rv) { nni_tcp_listener *l = arg; - nni_tcp_conn * c; + nni_tcp_conn *c; nni_mtx_lock(&l->mtx); if ((c = nni_aio_get_prov_data(aio)) != NULL) { @@ -322,7 +322,7 @@ nni_tcp_listener_accept(nni_tcp_listener *l, nni_aio *aio) c->listener = l; c->conn_aio = aio; nni_aio_set_prov_data(aio, c); - if (((rv = nni_win_io_init(&c->conn_io, tcp_accept_cb, c)) != 0) || + if (((rv = nni_win_io_init(&c->conn_io, s, tcp_accept_cb, c)) != 0) || ((rv = nni_aio_schedule(aio, tcp_accept_cancel, l)) != 0)) { nni_aio_set_prov_data(aio, NULL); nni_mtx_unlock(&l->mtx); diff --git a/src/platform/windows/win_udp.c b/src/platform/windows/win_udp.c index 88f2cd5b8..55148ceac 100644 --- a/src/platform/windows/win_udp.c +++ b/src/platform/windows/win_udp.c @@ -38,7 +38,7 @@ static void udp_recv_start(nni_plat_udp *); int nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa) { - nni_plat_udp * u; + nni_plat_udp *u; SOCKADDR_STORAGE ss; int sslen; DWORD no; @@ -67,7 +67,7 @@ nni_plat_udp_open(nni_plat_udp **udpp, nni_sockaddr *sa) (void) setsockopt( u->s, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &no, sizeof(no)); - if (((rv = nni_win_io_init(&u->rxio, udp_recv_cb, u)) != 0) || + if (((rv = nni_win_io_init(&u->rxio, u->s, udp_recv_cb, u)) != 0) || ((rv = nni_win_io_register((HANDLE) u->s)) != 0)) { nni_plat_udp_close(u); return (rv); @@ -115,10 +115,10 @@ nni_plat_udp_send(nni_plat_udp *u, nni_aio *aio) { SOCKADDR_STORAGE to; int tolen; - nng_sockaddr * sa; + nng_sockaddr *sa; unsigned naiov; - nni_iov * aiov; - WSABUF * iov; + nni_iov *aiov; + WSABUF *iov; int rv; DWORD nsent; @@ -191,7 +191,7 @@ udp_recv_cb(nni_win_io *io, int rv, size_t num) { nni_plat_udp *u = io->ptr; nni_sockaddr *sa; - nni_aio * aio; + nni_aio *aio; nni_mtx_lock(&u->lk); if ((aio = nni_list_first(&u->rxq)) == NULL) { @@ -226,7 +226,7 @@ udp_recv_start(nni_plat_udp *u) DWORD flags; nni_iov *aiov; unsigned naiov; - WSABUF * iov; + WSABUF *iov; nni_aio *aio; if ((u->s == INVALID_SOCKET) || (u->closed)) {