diff --git a/src/event/event_windows.c b/src/event/event_windows.c index 23e1eb78c..512d0b536 100644 --- a/src/event/event_windows.c +++ b/src/event/event_windows.c @@ -30,6 +30,8 @@ enum _dispatch_windows_port { DISPATCH_PORT_FILE_HANDLE, DISPATCH_PORT_PIPE_HANDLE_READ, DISPATCH_PORT_PIPE_HANDLE_WRITE, + DISPATCH_PORT_SOCKET_READ, + DISPATCH_PORT_SOCKET_WRITE, }; enum _dispatch_muxnote_events { @@ -59,13 +61,24 @@ typedef struct dispatch_muxnote_s { DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID, DISPATCH_MUXNOTE_HANDLE_TYPE_FILE, DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE, + DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET, } dmn_handle_type; enum _dispatch_muxnote_events dmn_events; - // Used by the pipe monitoring thread - HANDLE dmn_thread; + // For pipes, this event is used to synchronize the monitoring thread with + // I/O completion port processing. For sockets, this is the event used with + // WSAEventSelect(). HANDLE dmn_event; + + // Pipe monitoring thread control + HANDLE dmn_thread; os_atomic(bool) dmn_stop; + + // Socket events registered with WSAEventSelect() + long dmn_network_events; + + // Threadpool wait handle for socket events + PTP_WAIT dmn_threadpool_wait; } *dispatch_muxnote_t; static LIST_HEAD(dispatch_muxnote_bucket_s, dispatch_muxnote_s) @@ -146,17 +159,10 @@ _dispatch_muxnote_create(dispatch_unote_t du, case FILE_TYPE_PIPE: // The specified file is a socket, a named pipe, or an - // anonymous pipe. Use GetNamedPipeInfo() to distinguish between - // a pipe and a socket. Despite its name, it also succeeds for - // anonymous pipes. - if (!GetNamedPipeInfo(handle, NULL, NULL, NULL, NULL)) { - // We'll get ERROR_ACCESS_DENIED for outbound pipes. - if (GetLastError() != ERROR_ACCESS_DENIED) { - // The file is probably a socket. - WIN_PORT_ERROR(); - } - } - dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE; + // anonymous pipe. + dmn->dmn_handle_type = _dispatch_handle_is_socket(handle) + ? DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET + : DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE; break; } @@ -183,18 +189,27 @@ _dispatch_muxnote_stop(dispatch_muxnote_t dmn) CloseHandle(dmn->dmn_thread); dmn->dmn_thread = NULL; } - if (dmn->dmn_event) { - CloseHandle(dmn->dmn_event); - dmn->dmn_event = NULL; + if (dmn->dmn_threadpool_wait) { + SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL); + WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait, + /* fCancelPendingCallbacks */ FALSE); + CloseThreadpoolWait(dmn->dmn_threadpool_wait); + dmn->dmn_threadpool_wait = NULL; + } + if (dmn->dmn_handle_type == DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET) { + WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0); } } static void _dispatch_muxnote_dispose(dispatch_muxnote_t dmn) { - if (dmn->dmn_thread) { + if (dmn->dmn_thread || dmn->dmn_threadpool_wait) { DISPATCH_INTERNAL_CRASH(0, "disposed a muxnote with an active thread"); } + if (dmn->dmn_event) { + CloseHandle(dmn->dmn_event); + } free(dmn); } @@ -300,10 +315,51 @@ _dispatch_pipe_write_availability(HANDLE hPipe) return fpli.WriteQuotaAvailable; } +static VOID CALLBACK +_dispatch_socket_callback(PTP_CALLBACK_INSTANCE inst, void *context, + PTP_WAIT pwa, TP_WAIT_RESULT res) +{ + dispatch_muxnote_t dmn = (dispatch_muxnote_t)context; + SOCKET sock = (SOCKET)dmn->dmn_ident; + WSANETWORKEVENTS events; + if (WSAEnumNetworkEvents(sock, (WSAEVENT)dmn->dmn_event, &events) == 0) { + long lNetworkEvents = events.lNetworkEvents; + DWORD dwBytesAvailable = 1; + if (lNetworkEvents & FD_CLOSE) { + dwBytesAvailable = 0; + // Post to all registered read and write handlers + lNetworkEvents |= FD_READ | FD_WRITE; + } else if (lNetworkEvents & FD_READ) { + ioctlsocket(sock, FIONREAD, &dwBytesAvailable); + } + if (lNetworkEvents & FD_READ) { + _dispatch_muxnote_retain(dmn); + if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable, + (ULONG_PTR)DISPATCH_PORT_SOCKET_READ, (LPOVERLAPPED)dmn)) { + DISPATCH_INTERNAL_CRASH(GetLastError(), + "PostQueuedCompletionStatus"); + } + } + if (lNetworkEvents & FD_WRITE) { + _dispatch_muxnote_retain(dmn); + if (!PostQueuedCompletionStatus(hPort, dwBytesAvailable, + (ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE, (LPOVERLAPPED)dmn)) { + DISPATCH_INTERNAL_CRASH(GetLastError(), + "PostQueuedCompletionStatus"); + } + } + } else { + _dispatch_debug("socket[0x%llx]: WSAEnumNetworkEvents() failed (%d)", + (long long)sock, WSAGetLastError()); + } + SetThreadpoolWait(pwa, dmn->dmn_event, /* pftTimeout */ NULL); +} + static BOOL _dispatch_io_trigger(dispatch_muxnote_t dmn) { BOOL bSuccess; + long lNetworkEvents; switch (dmn->dmn_handle_type) { case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID: @@ -321,19 +377,17 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn) case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: if ((dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) && !dmn->dmn_thread) { - HANDLE hThread = (HANDLE)_beginthreadex(/* security */ NULL, + dmn->dmn_thread = (HANDLE)_beginthreadex(/* security */ NULL, /* stack_size */ 1, _dispatch_pipe_monitor_thread, (void *)dmn, /* initflag */ 0, /* thrdaddr */ NULL); - if (!hThread) { + if (!dmn->dmn_thread) { DISPATCH_INTERNAL_CRASH(errno, "_beginthread"); } - HANDLE hEvent = CreateEventW(NULL, /* bManualReset */ FALSE, + dmn->dmn_event = CreateEventW(NULL, /* bManualReset */ FALSE, /* bInitialState */ FALSE, NULL); - if (!hEvent) { + if (!dmn->dmn_event) { DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW"); } - dmn->dmn_thread = hThread; - dmn->dmn_event = hEvent; } if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) { _dispatch_muxnote_retain(dmn); @@ -348,6 +402,59 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn) } } break; + + case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET: + if (!dmn->dmn_event) { + dmn->dmn_event = CreateEventW(NULL, /* bManualReset */ FALSE, + /* bInitialState */ FALSE, NULL); + if (!dmn->dmn_event) { + DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW"); + } + } + if (!dmn->dmn_threadpool_wait) { + dmn->dmn_threadpool_wait = CreateThreadpoolWait( + _dispatch_socket_callback, dmn, + /* PTP_CALLBACK_ENVIRON */ NULL); + if (!dmn->dmn_threadpool_wait) { + DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateThreadpoolWait"); + } + SetThreadpoolWait(dmn->dmn_threadpool_wait, dmn->dmn_event, + /* pftTimeout */ NULL); + } + lNetworkEvents = FD_CLOSE; + if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) { + lNetworkEvents |= FD_READ; + } + if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) { + lNetworkEvents |= FD_WRITE; + } + if (dmn->dmn_network_events != lNetworkEvents) { + if (WSAEventSelect((SOCKET)dmn->dmn_ident, (WSAEVENT)dmn->dmn_event, + lNetworkEvents) != 0) { + DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect"); + } + dmn->dmn_network_events = lNetworkEvents; + } + if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) { + // FD_WRITE is edge-triggered, not level-triggered, so it will only + // be signaled if the socket becomes writable after a send() fails + // with WSAEWOULDBLOCK. We can work around this by performing a + // zero-byte send(). If the socket is writable, the send() will + // succeed and we can immediately post a packet, and if it isn't, it + // will fail with WSAEWOULDBLOCK and WSAEventSelect() will report + // the next time it becomes available. + if (send((SOCKET)dmn->dmn_ident, "", 0, 0) == 0) { + _dispatch_muxnote_retain(dmn); + bSuccess = PostQueuedCompletionStatus(hPort, 1, + (ULONG_PTR)DISPATCH_PORT_SOCKET_WRITE, + (LPOVERLAPPED)dmn); + if (bSuccess == FALSE) { + DISPATCH_INTERNAL_CRASH(GetLastError(), + "PostQueuedCompletionStatus"); + } + } + } + break; } return TRUE; @@ -408,6 +515,7 @@ _dispatch_unote_register_muxed(dispatch_unote_t du) break; case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET: if (events & DISPATCH_MUXNOTE_EVENT_READ) { LIST_INSERT_HEAD(&dmn->dmn_readers_head, dul, du_link); } else if (events & DISPATCH_MUXNOTE_EVENT_WRITE) { @@ -449,6 +557,7 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du) break; case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET: LIST_REMOVE(dul, du_link); _LIST_TRASH_ENTRY(dul, du_link); break; @@ -532,6 +641,52 @@ _dispatch_event_merge_pipe_handle_write(dispatch_muxnote_t dmn, _dispatch_muxnote_release(dmn); } +static void +_dispatch_event_merge_socket(dispatch_unote_t du, DWORD dwBytesAvailable) +{ + // consumed by dux_merge_evt() + _dispatch_retain_unote_owner(du); + dispatch_unote_state_t du_state = _dispatch_unote_state(du); + du_state &= ~DU_STATE_ARMED; + uintptr_t data = dwBytesAvailable; + uint32_t flags; + if (dwBytesAvailable > 0) { + flags = EV_ADD | EV_ENABLE | EV_DISPATCH; + } else { + du_state |= DU_STATE_NEEDS_DELETE; + flags = EV_DELETE | EV_DISPATCH; + } + _dispatch_unote_state_set(du, du_state); + os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed); + dux_merge_evt(du._du, flags, data, 0); +} + +static void +_dispatch_event_merge_socket_read(dispatch_muxnote_t dmn, + DWORD dwBytesAvailable) +{ + dispatch_unote_linkage_t dul, dul_next; + LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) { + dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul); + _dispatch_event_merge_socket(du, dwBytesAvailable); + } + // Retained when posting the completion packet + _dispatch_muxnote_release(dmn); +} + +static void +_dispatch_event_merge_socket_write(dispatch_muxnote_t dmn, + DWORD dwBytesAvailable) +{ + dispatch_unote_linkage_t dul, dul_next; + LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) { + dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul); + _dispatch_event_merge_socket(du, dwBytesAvailable); + } + // Retained when posting the completion packet + _dispatch_muxnote_release(dmn); +} + #pragma mark timers typedef struct _dispatch_windows_timeout_s { @@ -716,6 +871,16 @@ _dispatch_event_loop_drain(uint32_t flags) dwNumberOfBytesTransferred); break; + case DISPATCH_PORT_SOCKET_READ: + _dispatch_event_merge_socket_read((dispatch_muxnote_t)pOV, + dwNumberOfBytesTransferred); + break; + + case DISPATCH_PORT_SOCKET_WRITE: + _dispatch_event_merge_socket_write((dispatch_muxnote_t)pOV, + dwNumberOfBytesTransferred); + break; + default: DISPATCH_INTERNAL_CRASH(ulCompletionKey, "unsupported completion key"); diff --git a/src/io.c b/src/io.c index 839367767..9377e7d25 100644 --- a/src/io.c +++ b/src/io.c @@ -2412,7 +2412,18 @@ _dispatch_operation_perform(dispatch_operation_t op) #if defined(_WIN32) HANDLE hFile = (HANDLE)op->fd_entry->fd; BOOL bSuccess; - if (GetFileType(hFile) == FILE_TYPE_PIPE) { + if (_dispatch_handle_is_socket(hFile)) { + processed = recv((SOCKET)hFile, buf, len, 0); + if (processed < 0) { + bSuccess = FALSE; + err = WSAGetLastError(); + if (err == WSAEWOULDBLOCK) { + err = EAGAIN; + } + goto error; + } + bSuccess = TRUE; + } else if (GetFileType(hFile) == FILE_TYPE_PIPE) { OVERLAPPED ovlOverlapped = {}; DWORD dwTotalBytesAvail; bSuccess = PeekNamedPipe(hFile, NULL, 0, NULL, @@ -2466,7 +2477,18 @@ _dispatch_operation_perform(dispatch_operation_t op) #if defined(_WIN32) HANDLE hFile = (HANDLE)op->fd_entry->fd; BOOL bSuccess; - if (GetFileType(hFile) == FILE_TYPE_PIPE) { + if (_dispatch_handle_is_socket(hFile)) { + processed = send((SOCKET)hFile, buf, len, 0); + if (processed < 0) { + bSuccess = FALSE; + err = WSAGetLastError(); + if (err == WSAEWOULDBLOCK) { + err = EAGAIN; + } + goto error; + } + bSuccess = TRUE; + } else if (GetFileType(hFile) == FILE_TYPE_PIPE) { // Unfortunately there isn't a good way to achieve O_NONBLOCK // semantics when writing to a pipe. SetNamedPipeHandleState() // can allow pipes to be switched into a "no wait" mode, but diff --git a/src/shims/generic_win_stubs.c b/src/shims/generic_win_stubs.c index b976075af..7781673a4 100644 --- a/src/shims/generic_win_stubs.c +++ b/src/shims/generic_win_stubs.c @@ -13,6 +13,19 @@ typedef NTSTATUS (NTAPI *_NtQueryInformationFile_fn_t)(HANDLE FileHandle, DISPATCH_STATIC_GLOBAL(dispatch_once_t _dispatch_ntdll_pred); DISPATCH_STATIC_GLOBAL(_NtQueryInformationFile_fn_t _dispatch_NtQueryInformationFile_ptr); +bool +_dispatch_handle_is_socket(HANDLE hFile) +{ + // GetFileType() returns FILE_TYPE_PIPE for both pipes and sockets. We can + // disambiguate by checking if PeekNamedPipe() fails with + // ERROR_INVALID_FUNCTION. + if (GetFileType(hFile) == FILE_TYPE_PIPE && + !PeekNamedPipe(hFile, NULL, 0, NULL, NULL, NULL)) { + return GetLastError() == ERROR_INVALID_FUNCTION; + } + return false; +} + static void _dispatch_init_precise_time(void *context DISPATCH_UNUSED) { diff --git a/src/shims/generic_win_stubs.h b/src/shims/generic_win_stubs.h index 1f7f4eaa3..985bbe30b 100644 --- a/src/shims/generic_win_stubs.h +++ b/src/shims/generic_win_stubs.h @@ -4,6 +4,7 @@ #include +#include #include #include #include @@ -39,6 +40,8 @@ typedef __typeof__(_Generic((__SIZE_TYPE__)0, \ #define strcasecmp _stricmp +bool _dispatch_handle_is_socket(HANDLE hFile); + /* * Wrappers for dynamically loaded Windows APIs */