From 96184efe96e6468be30c032fa07d4bc8ab42a99d Mon Sep 17 00:00:00 2001 From: Aaron Dierking Date: Tue, 20 Aug 2019 11:47:29 -0700 Subject: [PATCH] event: implement source muxing on Windows Implement the cases in `_dispatch_unote_register_muxed()` and `_dispatch_unote_unregister_muxed()` for when multiple event sources are open on a handle and we need to combine them. The test suite doesn't hit these codepaths anywhere and we haven't run into issues with Foundation yet, so I added a dispatch_io_muxed test which opens multiple sources on a file/pipe/socket and checks that events fire correctly. --- src/event/event_windows.c | 108 +++++++++++---- tests/CMakeLists.txt | 2 + tests/dispatch_io_muxed.c | 275 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 359 insertions(+), 26 deletions(-) create mode 100644 tests/dispatch_io_muxed.c diff --git a/src/event/event_windows.c b/src/event/event_windows.c index 3576774b2..ce322258a 100644 --- a/src/event/event_windows.c +++ b/src/event/event_windows.c @@ -174,27 +174,62 @@ _dispatch_muxnote_create(dispatch_unote_t du, } static void -_dispatch_muxnote_stop(dispatch_muxnote_t dmn) +_dispatch_muxnote_disarm_events(dispatch_muxnote_t dmn, + enum _dispatch_muxnote_events events) { - if (dmn->dmn_thread) { - // Keep trying to cancel ReadFile() until the thread exits - os_atomic_store(&dmn->dmn_stop, true, relaxed); - SetEvent(dmn->dmn_event); - do { - CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL); - } while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT); - CloseHandle(dmn->dmn_thread); - dmn->dmn_thread = NULL; - } - if (dmn->dmn_threadpool_wait) { - SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL); - WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait, - /* fCancelPendingCallbacks */ FALSE); - CloseThreadpoolWait(dmn->dmn_threadpool_wait); - dmn->dmn_threadpool_wait = NULL; - } - if (dmn->dmn_handle_type == DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET) { - WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0); + long lNetworkEvents; + dmn->dmn_events &= ~events; + switch (dmn->dmn_handle_type) { + case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID: + DISPATCH_INTERNAL_CRASH(0, "invalid handle"); + + case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE: + break; + + case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + if ((events & DISPATCH_MUXNOTE_EVENT_READ) && dmn->dmn_thread) { + // Keep trying to cancel ReadFile() until the thread exits + os_atomic_store(&dmn->dmn_stop, true, relaxed); + SetEvent(dmn->dmn_event); + do { + CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL); + } while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT); + CloseHandle(dmn->dmn_thread); + dmn->dmn_thread = NULL; + } + break; + + case DISPATCH_MUXNOTE_HANDLE_TYPE_SOCKET: + lNetworkEvents = dmn->dmn_network_events; + if (events & DISPATCH_MUXNOTE_EVENT_READ) { + lNetworkEvents &= ~FD_READ; + } + if (events & DISPATCH_MUXNOTE_EVENT_WRITE) { + lNetworkEvents &= ~FD_WRITE; + } + if (lNetworkEvents == dmn->dmn_network_events) { + break; + } + int iResult; + if (lNetworkEvents & (FD_READ | FD_WRITE)) { + iResult = WSAEventSelect((SOCKET)dmn->dmn_ident, + (WSAEVENT)dmn->dmn_event, lNetworkEvents); + } else { + lNetworkEvents = 0; + iResult = WSAEventSelect((SOCKET)dmn->dmn_ident, NULL, 0); + } + if (iResult != 0) { + DISPATCH_INTERNAL_CRASH(WSAGetLastError(), "WSAEventSelect"); + } + dmn->dmn_network_events = lNetworkEvents; + if (!lNetworkEvents && dmn->dmn_threadpool_wait) { + SetThreadpoolWait(dmn->dmn_threadpool_wait, NULL, NULL); + WaitForThreadpoolWaitCallbacks(dmn->dmn_threadpool_wait, + /* fCancelPendingCallbacks */ FALSE); + CloseThreadpoolWait(dmn->dmn_threadpool_wait); + dmn->dmn_threadpool_wait = NULL; + } + break; } } @@ -389,8 +424,16 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn) } if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) { _dispatch_muxnote_retain(dmn); - DWORD available = + DWORD available; + if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) { + // We can't query a pipe which has a read source open on it + // because the ReadFile() in the background thread might cause + // NtQueryInformationFile() to block + available = 1; + } else { + available = _dispatch_pipe_write_availability((HANDLE)dmn->dmn_ident); + } bSuccess = PostQueuedCompletionStatus(hPort, available, (ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_WRITE, (LPOVERLAPPED)dmn); @@ -487,8 +530,12 @@ _dispatch_unote_register_muxed(dispatch_unote_t du) dmn = _dispatch_unote_muxnote_find(dmb, du._du->du_ident, du._du->du_filter); if (dmn) { - WIN_PORT_ERROR(); - DISPATCH_INTERNAL_CRASH(0, "muxnote updating is not supported"); + if (events & ~dmn->dmn_events) { + dmn->dmn_events |= events; + if (_dispatch_io_trigger(dmn) == FALSE) { + return false; + } + } } else { dmn = _dispatch_muxnote_create(du, events); if (!dmn) { @@ -551,9 +598,18 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du) } dul->du_muxnote = NULL; - LIST_REMOVE(dmn, dmn_list); - _dispatch_muxnote_stop(dmn); - _dispatch_muxnote_release(dmn); + enum _dispatch_muxnote_events disarmed = 0; + if (LIST_EMPTY(&dmn->dmn_readers_head)) { + disarmed |= DISPATCH_MUXNOTE_EVENT_READ; + } + if (LIST_EMPTY(&dmn->dmn_writers_head)) { + disarmed |= DISPATCH_MUXNOTE_EVENT_WRITE; + } + _dispatch_muxnote_disarm_events(dmn, disarmed); + if (!dmn->dmn_events) { + LIST_REMOVE(dmn, dmn_list); + _dispatch_muxnote_release(dmn); + } _dispatch_unote_state_set(du, DU_STATE_UNREGISTERED); return true; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 7a956ab6e..11ab629a7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -127,6 +127,7 @@ set(DISPATCH_C_TESTS timer_bit63 timer_set_time data + io_muxed io_net io_pipe io_pipe_close @@ -178,6 +179,7 @@ add_unit_test(dispatch_plusplus SOURCES dispatch_plusplus.cpp) # test-specific link options if(WIN32) + target_link_libraries(dispatch_io_muxed PRIVATE WS2_32) target_link_libraries(dispatch_io_net PRIVATE WS2_32) else() # When dispatch_group is reenabled above, remove this diff --git a/tests/dispatch_io_muxed.c b/tests/dispatch_io_muxed.c new file mode 100644 index 000000000..99b1d23d9 --- /dev/null +++ b/tests/dispatch_io_muxed.c @@ -0,0 +1,275 @@ +/* + * Copyright (c) 2019 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +#include +#include +#include +#include +#if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__)) +#include +#include +#include +#elif defined(_WIN32) +#include +#include +#include +#endif + +#include + +#include +#include "dispatch_test.h" + +#if !defined(_WIN32) +#define closesocket(x) close(x) +#endif + +static void +test_file_muxed(void) +{ + printf("\nFile Muxed\n"); + +#if defined(_WIN32) + char *temp_dir = getenv("TMP"); + if (!temp_dir) { + temp_dir = getenv("TEMP"); + } + if (!temp_dir) { + test_ptr_notnull("temporary directory", temp_dir); + test_stop(); + } + char *path = NULL; + asprintf(&path, "%s\\dispatchtest_io.XXXXXX", temp_dir); +#else + char path[] = "/tmp/dispatchtest_io.XXXXXX"; +#endif + dispatch_fd_t fd = mkstemp(path); + if (fd == -1) { + test_errno("mkstemp", errno, 0); + test_stop(); + } + if (unlink(path) == -1) { + test_errno("unlink", errno, 0); + test_stop(); + } +#if defined(_WIN32) + free(path); +#endif + dispatch_test_fd_write(fd, "test", 4); + dispatch_test_fd_lseek(fd, 0, SEEK_SET); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + dispatch_group_enter(g); + + dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, + (uintptr_t)fd, 0, dispatch_get_global_queue(0, 0)); + test_ptr_notnull("dispatch_source_create", reader); + assert(reader); + dispatch_source_set_event_handler(reader, ^{ + dispatch_source_cancel(reader); + }); + dispatch_source_set_cancel_handler(reader, ^{ + dispatch_release(reader); + dispatch_group_leave(g); + }); + + dispatch_source_t writer = dispatch_source_create( + DISPATCH_SOURCE_TYPE_WRITE, (uintptr_t)fd, 0, + dispatch_get_global_queue(0, 0)); + test_ptr_notnull("dispatch_source_create", writer); + assert(writer); + dispatch_source_set_event_handler(writer, ^{ + dispatch_source_cancel(writer); + }); + dispatch_source_set_cancel_handler(writer, ^{ + dispatch_release(writer); + dispatch_group_leave(g); + }); + + dispatch_resume(reader); + dispatch_resume(writer); + + test_group_wait(g); + dispatch_release(g); + dispatch_test_fd_close(fd); +} + +static void +test_stream_muxed(dispatch_fd_t serverfd, dispatch_fd_t clientfd) +{ + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + dispatch_group_enter(g); + + dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, + (uintptr_t)serverfd, 0, dispatch_get_global_queue(0, 0)); + test_ptr_notnull("dispatch_source_create", reader); + assert(reader); + dispatch_source_set_event_handler(reader, ^{ + dispatch_source_cancel(reader); + }); + dispatch_source_set_cancel_handler(reader, ^{ + dispatch_release(reader); + dispatch_group_leave(g); + }); + + dispatch_source_t writer = dispatch_source_create( + DISPATCH_SOURCE_TYPE_WRITE, (uintptr_t)serverfd, 0, + dispatch_get_global_queue(0, 0)); + test_ptr_notnull("dispatch_source_create", writer); + assert(writer); + dispatch_source_set_event_handler(writer, ^{ + dispatch_source_cancel(writer); + }); + dispatch_source_set_cancel_handler(writer, ^{ + dispatch_release(writer); + dispatch_group_leave(g); + }); + + dispatch_resume(reader); + dispatch_resume(writer); + + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + dispatch_get_global_queue(0, 0), ^{ + dispatch_group_enter(g); + char buf[512] = {0}; + ssize_t n = dispatch_test_fd_write(clientfd, buf, sizeof(buf)); + if (n < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)n, sizeof(buf)); + dispatch_group_leave(g); + }); + + test_group_wait(g); + dispatch_release(g); +} + +static void +test_socket_muxed(void) +{ + printf("\nSocket Muxed\n"); + + int listenfd = -1, serverfd = -1, clientfd = -1; + struct sockaddr_in addr; + socklen_t addrlen; + + listenfd = socket(AF_INET, SOCK_STREAM, 0); + if (listenfd == -1) { + test_errno("socket()", errno, 0); + test_stop(); + } + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + addrlen = sizeof(addr); + if (bind(listenfd, (struct sockaddr *)&addr, addrlen) == -1) { + test_errno("bind()", errno, 0); + test_stop(); + } + if (listen(listenfd, 3) == -1) { + test_errno("listen()", errno, 0); + test_stop(); + } + if (getsockname(listenfd, (struct sockaddr *)&addr, &addrlen) == -1) { + test_errno("getsockname()", errno, 0); + test_stop(); + } + + clientfd = socket(AF_INET, SOCK_STREAM, 0); + if (clientfd == -1) { + test_errno("socket()", errno, 0); + test_stop(); + } + if (connect(clientfd, (struct sockaddr *)&addr, addrlen)) { + test_errno("connect()", errno, 0); + test_stop(); + } + + serverfd = accept(listenfd, (struct sockaddr *)&addr, &addrlen); + if (serverfd == -1) { + test_errno("accept()", errno, 0); + test_stop(); + } + + test_stream_muxed((dispatch_fd_t)serverfd, (dispatch_fd_t)clientfd); + + closesocket(clientfd); + closesocket(serverfd); + closesocket(listenfd); +} + +#if defined(_WIN32) +static void +test_pipe_muxed(void) +{ + printf("\nDuplex Pipe Muxed\n"); + + wchar_t name[64]; + swprintf(name, sizeof(name), L"\\\\.\\pipe\\dispatch_io_muxed_%lu", + GetCurrentProcessId()); + HANDLE server = CreateNamedPipeW(name, + PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE, + /* nMaxInstances */ 1, /* nOutBufferSize */ 0x1000, + /* nInBufferSize */ 0x1000, /* nDefaultTimeOut */ 0, + /* lpSecurityAttributes */ NULL); + if (server == INVALID_HANDLE_VALUE) { + test_ptr_not("CreateNamedPipe", server, INVALID_HANDLE_VALUE); + test_stop(); + } + HANDLE client = CreateFileW(name, GENERIC_READ | GENERIC_WRITE, + /* dwShareMode */ 0, /* lpSecurityAttributes */ NULL, OPEN_EXISTING, + /* dwFlagsAndAttributes */ 0, /* hTemplateFile */ NULL); + if (client == INVALID_HANDLE_VALUE) { + test_ptr_not("CreateFile", client, INVALID_HANDLE_VALUE); + test_stop(); + } + + test_stream_muxed((dispatch_fd_t)server, (dispatch_fd_t)client); + + CloseHandle(client); + CloseHandle(server); +} +#endif + +int +main(void) +{ + dispatch_test_start("Dispatch IO Muxed"); +#if defined(_WIN32) + WSADATA wsa; + int err = WSAStartup(MAKEWORD(2, 2), &wsa); + if (err != 0) { + fprintf(stderr, "WSAStartup failed with %d\n", err); + test_stop(); + } +#endif + dispatch_async(dispatch_get_main_queue(), ^{ + test_file_muxed(); + test_socket_muxed(); +#if defined(_WIN32) + test_pipe_muxed(); +#endif + test_stop(); + }); + dispatch_main(); +}