From 80fa7c942f8c12d6a4848247353f346a953cee36 Mon Sep 17 00:00:00 2001 From: Aaron Dierking Date: Tue, 18 Jun 2019 15:27:45 -0700 Subject: [PATCH] event: support pipe sources on Windows This implements full support for using pipe sources on Windows to trigger an event when a pipe becomes readable or writable. Due to differences between the Windows asynchronous I/O model and the Unix model, this is rather complex. On Windows, the standard mechanism to achieve asynchronous pipe I/O is to just read or write from the pipe and then get notified once the operation completes. Unlike the Unix `select()`/`poll()` model, there is no way to simply know when a pipe becomes readable or writable without actually running an operation. So we need to resort to several tricks in order to achieve the semantics that Dispatch wants here. To monitor a pipe for readability, we take advantage of the fact that a zero-byte `ReadFile()` on a pipe will block until data becomes available in the pipe. A muxnote which monitors a pipe for reading will spin up a lightweight thread which repeatedly calls `ReadFile()` (blocking) on the pipe and posts back to the I/O completion queue when it returns. To monitor pipes for writability, we use the `NtQueryInformationFile()` kernel API to get the amount of available space in the pipe's write buffer. There is no way to block here, so we have no choice but to continually disarm and rearm unotes until space becomes available. This is inefficient, but it generally seems to work OK. In order to test this, I implemented a new dispatch_io_pipe test which performs various read and write operations on pipes. On Windows, this will run the tests on most of the different pipe kinds (anonymous, named, inbound, outbound, overlapped). This caught a lot of issues in the Windows `_dispatch_operation_perform()` which I fixed along the way. The dispatch_io and dispatch_io_pipe_close tests pass as well with my other pull request applied. --- src/event/event_windows.c | 356 +++++++++++++++++++++++-- src/io.c | 90 ++++++- src/shims/generic_win_stubs.c | 31 +++ src/shims/generic_win_stubs.h | 24 ++ tests/CMakeLists.txt | 2 + tests/dispatch_io_pipe.c | 488 ++++++++++++++++++++++++++++++++++ tests/dispatch_test.c | 30 +++ 7 files changed, 990 insertions(+), 31 deletions(-) create mode 100644 tests/dispatch_io_pipe.c diff --git a/src/event/event_windows.c b/src/event/event_windows.c index 33a8bad73..23e1eb78c 100644 --- a/src/event/event_windows.c +++ b/src/event/event_windows.c @@ -28,18 +28,44 @@ enum _dispatch_windows_port { DISPATCH_PORT_TIMER_CLOCK_UPTIME, DISPATCH_PORT_TIMER_CLOCK_MONOTONIC, DISPATCH_PORT_FILE_HANDLE, + DISPATCH_PORT_PIPE_HANDLE_READ, + DISPATCH_PORT_PIPE_HANDLE_WRITE, +}; + +enum _dispatch_muxnote_events { + DISPATCH_MUXNOTE_EVENT_READ = 1 << 0, + DISPATCH_MUXNOTE_EVENT_WRITE = 1 << 1, }; #pragma mark dispatch_unote_t typedef struct dispatch_muxnote_s { LIST_ENTRY(dispatch_muxnote_s) dmn_list; + LIST_HEAD(, dispatch_unote_linkage_s) dmn_readers_head; + LIST_HEAD(, dispatch_unote_linkage_s) dmn_writers_head; + + // This refcount solves a race condition that can happen with I/O completion + // ports. When we enqueue packets with muxnote pointers associated with + // them, it's possible that those packets might not be processed until after + // the event has been unregistered. We increment this upon creating a + // muxnote or posting to a completion port, and we decrement it upon + // unregistering the event or processing a packet. When it hits zero, we + // dispose the muxnote. + os_atomic(uintptr_t) dmn_refcount; + dispatch_unote_ident_t dmn_ident; int8_t dmn_filter; enum _dispatch_muxnote_handle_type { DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID, DISPATCH_MUXNOTE_HANDLE_TYPE_FILE, + DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE, } dmn_handle_type; + enum _dispatch_muxnote_events dmn_events; + + // Used by the pipe monitoring thread + HANDLE dmn_thread; + HANDLE dmn_event; + os_atomic(bool) dmn_stop; } *dispatch_muxnote_t; static LIST_HEAD(dispatch_muxnote_bucket_s, dispatch_muxnote_s) @@ -71,7 +97,8 @@ _dispatch_unote_muxnote_find(struct dispatch_muxnote_bucket_s *dmb, } static dispatch_muxnote_t -_dispatch_muxnote_create(dispatch_unote_t du) +_dispatch_muxnote_create(dispatch_unote_t du, + enum _dispatch_muxnote_events events) { dispatch_muxnote_t dmn; int8_t filter = du._du->du_filter; @@ -81,12 +108,18 @@ _dispatch_muxnote_create(dispatch_unote_t du) if (dmn == NULL) { DISPATCH_INTERNAL_CRASH(0, "_dispatch_calloc"); } + os_atomic_store(&dmn->dmn_refcount, 1, relaxed); dmn->dmn_ident = (dispatch_unote_ident_t)handle; dmn->dmn_filter = filter; + dmn->dmn_events = events; + LIST_INIT(&dmn->dmn_readers_head); + LIST_INIT(&dmn->dmn_writers_head); switch (filter) { case EVFILT_SIGNAL: WIN_PORT_ERROR(); + free(dmn); + return NULL; case EVFILT_WRITE: case EVFILT_READ: @@ -103,17 +136,28 @@ _dispatch_muxnote_create(dispatch_unote_t du) // The specified file is a character file, typically a // LPT device or a console. WIN_PORT_ERROR(); + free(dmn); + return NULL; case FILE_TYPE_DISK: // The specified file is a disk file - dmn->dmn_handle_type = - DISPATCH_MUXNOTE_HANDLE_TYPE_FILE; + dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_FILE; break; case FILE_TYPE_PIPE: // The specified file is a socket, a named pipe, or an - // anonymous pipe. - WIN_PORT_ERROR(); + // anonymous pipe. Use GetNamedPipeInfo() to distinguish between + // a pipe and a socket. Despite its name, it also succeeds for + // anonymous pipes. + if (!GetNamedPipeInfo(handle, NULL, NULL, NULL, NULL)) { + // We'll get ERROR_ACCESS_DENIED for outbound pipes. + if (GetLastError() != ERROR_ACCESS_DENIED) { + // The file is probably a socket. + WIN_PORT_ERROR(); + } + } + dmn->dmn_handle_type = DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE; + break; } break; @@ -126,13 +170,136 @@ _dispatch_muxnote_create(dispatch_unote_t du) return dmn; } +static void +_dispatch_muxnote_stop(dispatch_muxnote_t dmn) +{ + if (dmn->dmn_thread) { + // Keep trying to cancel ReadFile() until the thread exits + os_atomic_store(&dmn->dmn_stop, true, relaxed); + SetEvent(dmn->dmn_event); + do { + CancelIoEx((HANDLE)dmn->dmn_ident, /* lpOverlapped */ NULL); + } while (WaitForSingleObject(dmn->dmn_thread, 1) == WAIT_TIMEOUT); + CloseHandle(dmn->dmn_thread); + dmn->dmn_thread = NULL; + } + if (dmn->dmn_event) { + CloseHandle(dmn->dmn_event); + dmn->dmn_event = NULL; + } +} + static void _dispatch_muxnote_dispose(dispatch_muxnote_t dmn) { + if (dmn->dmn_thread) { + DISPATCH_INTERNAL_CRASH(0, "disposed a muxnote with an active thread"); + } free(dmn); } -DISPATCH_ALWAYS_INLINE +static void +_dispatch_muxnote_retain(dispatch_muxnote_t dmn) +{ + uintptr_t refcount = os_atomic_inc(&dmn->dmn_refcount, relaxed); + if (refcount == 0) { + DISPATCH_INTERNAL_CRASH(0, "muxnote refcount overflow"); + } + if (refcount == 1) { + DISPATCH_INTERNAL_CRASH(0, "retained a disposing muxnote"); + } +} + +static void +_dispatch_muxnote_release(dispatch_muxnote_t dmn) +{ + uintptr_t refcount = os_atomic_dec(&dmn->dmn_refcount, relaxed); + if (refcount == 0) { + _dispatch_muxnote_dispose(dmn); + } else if (refcount == UINTPTR_MAX) { + DISPATCH_INTERNAL_CRASH(0, "muxnote refcount underflow"); + } +} + +static unsigned WINAPI +_dispatch_pipe_monitor_thread(void *context) +{ + dispatch_muxnote_t dmn = (dispatch_muxnote_t)context; + HANDLE hPipe = (HANDLE)dmn->dmn_ident; + do { + char cBuffer[1]; + DWORD dwNumberOfBytesTransferred; + OVERLAPPED ov = {0}; + BOOL bSuccess = ReadFile(hPipe, cBuffer, /* nNumberOfBytesToRead */ 0, + &dwNumberOfBytesTransferred, &ov); + DWORD dwBytesAvailable; + DWORD dwError = GetLastError(); + if (!bSuccess && dwError == ERROR_IO_PENDING) { + bSuccess = GetOverlappedResult(hPipe, &ov, + &dwNumberOfBytesTransferred, /* bWait */ TRUE); + dwError = GetLastError(); + } + if (bSuccess) { + bSuccess = PeekNamedPipe(hPipe, NULL, 0, NULL, &dwBytesAvailable, + NULL); + dwError = GetLastError(); + } + if (bSuccess) { + if (dwBytesAvailable == 0) { + // This can happen with a zero-byte write. Try again. + continue; + } + } else if (dwError == ERROR_NO_DATA) { + // The pipe is nonblocking. Try again. + Sleep(0); + continue; + } else { + _dispatch_debug("pipe[0x%llx]: GetLastError() returned %lu", + (long long)hPipe, dwError); + if (dwError == ERROR_OPERATION_ABORTED) { + continue; + } + os_atomic_store(&dmn->dmn_stop, true, relaxed); + dwBytesAvailable = 0; + } + + // Make sure the muxnote stays alive until the packet is dequeued + _dispatch_muxnote_retain(dmn); + + // The lpOverlapped parameter does not actually need to point to an + // OVERLAPPED struct. It's really just a pointer to pass back to + // GetQueuedCompletionStatus(). + bSuccess = PostQueuedCompletionStatus(hPort, + dwBytesAvailable, (ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_READ, + (LPOVERLAPPED)dmn); + if (!bSuccess) { + DISPATCH_INTERNAL_CRASH(GetLastError(), + "PostQueuedCompletionStatus"); + } + + // If data is written into the pipe and not read right away, ReadFile() + // will keep returning immediately and we'll flood the completion port. + // This event lets us synchronize with _dispatch_event_loop_drain() so + // that we only post events when it's ready for them. + WaitForSingleObject(dmn->dmn_event, INFINITE); + } while (!os_atomic_load(&dmn->dmn_stop, relaxed)); + _dispatch_debug("pipe[0x%llx]: monitor exiting", (long long)hPipe); + return 0; +} + +static DWORD +_dispatch_pipe_write_availability(HANDLE hPipe) +{ + IO_STATUS_BLOCK iosb; + FILE_PIPE_LOCAL_INFORMATION fpli; + NTSTATUS status = _dispatch_NtQueryInformationFile(hPipe, &iosb, &fpli, + sizeof(fpli), FilePipeLocalInformation); + if (!NT_SUCCESS(status)) { + return 1; + } + return fpli.WriteQuotaAvailable; +} + static BOOL _dispatch_io_trigger(dispatch_muxnote_t dmn) { @@ -150,9 +317,56 @@ _dispatch_io_trigger(dispatch_muxnote_t dmn) "PostQueuedCompletionStatus"); } break; + + case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + if ((dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_READ) && + !dmn->dmn_thread) { + HANDLE hThread = (HANDLE)_beginthreadex(/* security */ NULL, + /* stack_size */ 1, _dispatch_pipe_monitor_thread, + (void *)dmn, /* initflag */ 0, /* thrdaddr */ NULL); + if (!hThread) { + DISPATCH_INTERNAL_CRASH(errno, "_beginthread"); + } + HANDLE hEvent = CreateEventW(NULL, /* bManualReset */ FALSE, + /* bInitialState */ FALSE, NULL); + if (!hEvent) { + DISPATCH_INTERNAL_CRASH(GetLastError(), "CreateEventW"); + } + dmn->dmn_thread = hThread; + dmn->dmn_event = hEvent; + } + if (dmn->dmn_events & DISPATCH_MUXNOTE_EVENT_WRITE) { + _dispatch_muxnote_retain(dmn); + DWORD available = + _dispatch_pipe_write_availability((HANDLE)dmn->dmn_ident); + bSuccess = PostQueuedCompletionStatus(hPort, available, + (ULONG_PTR)DISPATCH_PORT_PIPE_HANDLE_WRITE, + (LPOVERLAPPED)dmn); + if (bSuccess == FALSE) { + DISPATCH_INTERNAL_CRASH(GetLastError(), + "PostQueuedCompletionStatus"); + } + } + break; } - return bSuccess; + return TRUE; +} + +DISPATCH_ALWAYS_INLINE +static inline enum _dispatch_muxnote_events +_dispatch_unote_required_events(dispatch_unote_t du) +{ + switch (du._du->du_filter) { + case DISPATCH_EVFILT_CUSTOM_ADD: + case DISPATCH_EVFILT_CUSTOM_OR: + case DISPATCH_EVFILT_CUSTOM_REPLACE: + return 0; + case EVFILT_WRITE: + return DISPATCH_MUXNOTE_EVENT_WRITE; + default: + return DISPATCH_MUXNOTE_EVENT_READ; + } } bool @@ -160,37 +374,52 @@ _dispatch_unote_register_muxed(dispatch_unote_t du) { struct dispatch_muxnote_bucket_s *dmb; dispatch_muxnote_t dmn; + enum _dispatch_muxnote_events events; + + events = _dispatch_unote_required_events(du); dmb = _dispatch_unote_muxnote_bucket(du._du->du_ident); dmn = _dispatch_unote_muxnote_find(dmb, du._du->du_ident, du._du->du_filter); if (dmn) { WIN_PORT_ERROR(); + DISPATCH_INTERNAL_CRASH(0, "muxnote updating is not supported"); } else { - dmn = _dispatch_muxnote_create(du); - if (dmn) { - if (_dispatch_io_trigger(dmn) == FALSE) { - _dispatch_muxnote_dispose(dmn); - dmn = NULL; - } else { - LIST_INSERT_HEAD(dmb, dmn, dmn_list); - } + dmn = _dispatch_muxnote_create(du, events); + if (!dmn) { + return false; } + if (_dispatch_io_trigger(dmn) == FALSE) { + _dispatch_muxnote_release(dmn); + return false; + } + LIST_INSERT_HEAD(dmb, dmn, dmn_list); } - if (dmn) { - dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du); + dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du); + switch (dmn->dmn_handle_type) { + case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID: + DISPATCH_INTERNAL_CRASH(0, "invalid handle"); + case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE: AcquireSRWLockExclusive(&_dispatch_file_handles_lock); LIST_INSERT_HEAD(&_dispatch_file_handles, dul, du_link); ReleaseSRWLockExclusive(&_dispatch_file_handles_lock); + break; - dul->du_muxnote = dmn; - _dispatch_unote_state_set(du, DISPATCH_WLH_ANON, - DU_STATE_ARMED); + case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + if (events & DISPATCH_MUXNOTE_EVENT_READ) { + LIST_INSERT_HEAD(&dmn->dmn_readers_head, dul, du_link); + } else if (events & DISPATCH_MUXNOTE_EVENT_WRITE) { + LIST_INSERT_HEAD(&dmn->dmn_writers_head, dul, du_link); + } + break; } - return dmn != NULL; + dul->du_muxnote = dmn; + _dispatch_unote_state_set(du, DISPATCH_WLH_ANON, DU_STATE_ARMED); + + return true; } void @@ -208,21 +437,34 @@ _dispatch_unote_unregister_muxed(dispatch_unote_t du) dispatch_unote_linkage_t dul = _dispatch_unote_get_linkage(du); dispatch_muxnote_t dmn = dul->du_muxnote; - AcquireSRWLockExclusive(&_dispatch_file_handles_lock); - LIST_REMOVE(dul, du_link); - _LIST_TRASH_ENTRY(dul, du_link); - ReleaseSRWLockExclusive(&_dispatch_file_handles_lock); + switch (dmn->dmn_handle_type) { + case DISPATCH_MUXNOTE_HANDLE_TYPE_INVALID: + DISPATCH_INTERNAL_CRASH(0, "invalid handle"); + + case DISPATCH_MUXNOTE_HANDLE_TYPE_FILE: + AcquireSRWLockExclusive(&_dispatch_file_handles_lock); + LIST_REMOVE(dul, du_link); + _LIST_TRASH_ENTRY(dul, du_link); + ReleaseSRWLockExclusive(&_dispatch_file_handles_lock); + break; + + case DISPATCH_MUXNOTE_HANDLE_TYPE_PIPE: + LIST_REMOVE(dul, du_link); + _LIST_TRASH_ENTRY(dul, du_link); + break; + } dul->du_muxnote = NULL; LIST_REMOVE(dmn, dmn_list); - _dispatch_muxnote_dispose(dmn); + _dispatch_muxnote_stop(dmn); + _dispatch_muxnote_release(dmn); _dispatch_unote_state_set(du, DU_STATE_UNREGISTERED); return true; } static void -_dispatch_event_merge_file_handle() +_dispatch_event_merge_file_handle(void) { dispatch_unote_linkage_t dul, dul_next; @@ -240,6 +482,56 @@ _dispatch_event_merge_file_handle() ReleaseSRWLockExclusive(&_dispatch_file_handles_lock); } +static void +_dispatch_event_merge_pipe_handle_read(dispatch_muxnote_t dmn, + DWORD dwBytesAvailable) +{ + dispatch_unote_linkage_t dul, dul_next; + LIST_FOREACH_SAFE(dul, &dmn->dmn_readers_head, du_link, dul_next) { + dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul); + // consumed by dux_merge_evt() + _dispatch_retain_unote_owner(du); + dispatch_unote_state_t du_state = _dispatch_unote_state(du); + du_state &= ~DU_STATE_ARMED; + uintptr_t data = dwBytesAvailable; + uint32_t flags; + if (dwBytesAvailable > 0) { + flags = EV_ADD | EV_ENABLE | EV_DISPATCH; + } else { + du_state |= DU_STATE_NEEDS_DELETE; + flags = EV_DELETE | EV_DISPATCH; + } + _dispatch_unote_state_set(du, du_state); + os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed); + dux_merge_evt(du._du, flags, data, 0); + } + SetEvent(dmn->dmn_event); + // Retained when posting the completion packet + _dispatch_muxnote_release(dmn); +} + +static void +_dispatch_event_merge_pipe_handle_write(dispatch_muxnote_t dmn, + DWORD dwBytesAvailable) +{ + dispatch_unote_linkage_t dul, dul_next; + LIST_FOREACH_SAFE(dul, &dmn->dmn_writers_head, du_link, dul_next) { + dispatch_unote_t du = _dispatch_unote_linkage_get_unote(dul); + // consumed by dux_merge_evt() + _dispatch_retain_unote_owner(du); + _dispatch_unote_state_clear_bit(du, DU_STATE_ARMED); + uintptr_t data = dwBytesAvailable; + if (dwBytesAvailable > 0) { + os_atomic_store2o(du._dr, ds_pending_data, ~data, relaxed); + } else { + os_atomic_store2o(du._dr, ds_pending_data, 0, relaxed); + } + dux_merge_evt(du._du, EV_ADD | EV_ENABLE | EV_DISPATCH, data, 0); + } + // Retained when posting the completion packet + _dispatch_muxnote_release(dmn); +} + #pragma mark timers typedef struct _dispatch_windows_timeout_s { @@ -414,6 +706,16 @@ _dispatch_event_loop_drain(uint32_t flags) _dispatch_event_merge_file_handle(); break; + case DISPATCH_PORT_PIPE_HANDLE_READ: + _dispatch_event_merge_pipe_handle_read((dispatch_muxnote_t)pOV, + dwNumberOfBytesTransferred); + break; + + case DISPATCH_PORT_PIPE_HANDLE_WRITE: + _dispatch_event_merge_pipe_handle_write((dispatch_muxnote_t)pOV, + dwNumberOfBytesTransferred); + break; + default: DISPATCH_INTERNAL_CRASH(ulCompletionKey, "unsupported completion key"); diff --git a/src/io.c b/src/io.c index 6337c7877..839367767 100644 --- a/src/io.c +++ b/src/io.c @@ -2410,7 +2410,43 @@ _dispatch_operation_perform(dispatch_operation_t op) if (op->direction == DOP_DIR_READ) { if (op->params.type == DISPATCH_IO_STREAM) { #if defined(_WIN32) - ReadFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, (LPDWORD)&processed, NULL); + HANDLE hFile = (HANDLE)op->fd_entry->fd; + BOOL bSuccess; + if (GetFileType(hFile) == FILE_TYPE_PIPE) { + OVERLAPPED ovlOverlapped = {}; + DWORD dwTotalBytesAvail; + bSuccess = PeekNamedPipe(hFile, NULL, 0, NULL, + &dwTotalBytesAvail, NULL); + if (bSuccess) { + if (dwTotalBytesAvail == 0) { + err = EAGAIN; + goto error; + } + len = MIN(len, dwTotalBytesAvail); + bSuccess = ReadFile(hFile, buf, (DWORD)len, + (LPDWORD)&processed, &ovlOverlapped); + } + if (!bSuccess) { + DWORD dwError = GetLastError(); + if (dwError == ERROR_IO_PENDING) { + bSuccess = GetOverlappedResult(hFile, &ovlOverlapped, + (LPDWORD)&processed, /* bWait */ TRUE); + dwError = GetLastError(); + } + if (dwError == ERROR_BROKEN_PIPE || + dwError == ERROR_NO_DATA) { + bSuccess = TRUE; + processed = 0; + } + } + } else { + bSuccess = ReadFile(hFile, buf, (DWORD)len, + (LPDWORD)&processed, NULL); + } + if (!bSuccess) { + err = EIO; + goto error; + } #else processed = read(op->fd_entry->fd, buf, len); #endif @@ -2419,7 +2455,8 @@ _dispatch_operation_perform(dispatch_operation_t op) OVERLAPPED ovlOverlapped = {}; ovlOverlapped.Offset = off & 0xffffffff; ovlOverlapped.OffsetHigh = (off >> 32) & 0xffffffff; - ReadFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, (LPDWORD)&processed, &ovlOverlapped); + ReadFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, + (LPDWORD)&processed, &ovlOverlapped); #else processed = pread(op->fd_entry->fd, buf, len, off); #endif @@ -2427,7 +2464,51 @@ _dispatch_operation_perform(dispatch_operation_t op) } else if (op->direction == DOP_DIR_WRITE) { if (op->params.type == DISPATCH_IO_STREAM) { #if defined(_WIN32) - WriteFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, (LPDWORD)&processed, NULL); + HANDLE hFile = (HANDLE)op->fd_entry->fd; + BOOL bSuccess; + if (GetFileType(hFile) == FILE_TYPE_PIPE) { + // Unfortunately there isn't a good way to achieve O_NONBLOCK + // semantics when writing to a pipe. SetNamedPipeHandleState() + // can allow pipes to be switched into a "no wait" mode, but + // that doesn't work on most pipe handles because Windows + // doesn't consistently create pipes with FILE_WRITE_ATTRIBUTES + // access. The best we can do is to try to query the write quota + // and then write as much as we can. + IO_STATUS_BLOCK iosb; + FILE_PIPE_LOCAL_INFORMATION fpli; + NTSTATUS status = _dispatch_NtQueryInformationFile(hFile, &iosb, + &fpli, sizeof(fpli), FilePipeLocalInformation); + if (NT_SUCCESS(status)) { + if (fpli.WriteQuotaAvailable == 0) { + err = EAGAIN; + goto error; + } + len = MIN(len, fpli.WriteQuotaAvailable); + } + OVERLAPPED ovlOverlapped = {}; + bSuccess = WriteFile(hFile, buf, (DWORD)len, + (LPDWORD)&processed, &ovlOverlapped); + if (!bSuccess) { + DWORD dwError = GetLastError(); + if (dwError == ERROR_IO_PENDING) { + bSuccess = GetOverlappedResult(hFile, &ovlOverlapped, + (LPDWORD)&processed, /* bWait */ TRUE); + dwError = GetLastError(); + } + if (dwError == ERROR_BROKEN_PIPE || + dwError == ERROR_NO_DATA) { + bSuccess = TRUE; + processed = 0; + } + } + } else { + bSuccess = WriteFile(hFile, buf, (DWORD)len, + (LPDWORD)&processed, NULL); + } + if (!bSuccess) { + err = EIO; + goto error; + } #else processed = write(op->fd_entry->fd, buf, len); #endif @@ -2436,7 +2517,8 @@ _dispatch_operation_perform(dispatch_operation_t op) OVERLAPPED ovlOverlapped = {}; ovlOverlapped.Offset = off & 0xffffffff; ovlOverlapped.OffsetHigh = (off >> 32) & 0xffffffff; - WriteFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, (LPDWORD)&processed, &ovlOverlapped); + WriteFile((HANDLE)op->fd_entry->fd, buf, (DWORD)len, + (LPDWORD)&processed, &ovlOverlapped); #else processed = pwrite(op->fd_entry->fd, buf, len, off); #endif diff --git a/src/shims/generic_win_stubs.c b/src/shims/generic_win_stubs.c index c48eef66a..b976075af 100644 --- a/src/shims/generic_win_stubs.c +++ b/src/shims/generic_win_stubs.c @@ -6,6 +6,13 @@ DISPATCH_STATIC_GLOBAL(dispatch_once_t _dispatch_precise_time_pred); DISPATCH_STATIC_GLOBAL(_precise_time_fn_t _dispatch_QueryInterruptTimePrecise_ptr); DISPATCH_STATIC_GLOBAL(_precise_time_fn_t _dispatch_QueryUnbiasedInterruptTimePrecise_ptr); +typedef NTSTATUS (NTAPI *_NtQueryInformationFile_fn_t)(HANDLE FileHandle, + PIO_STATUS_BLOCK IoStatusBlock, PVOID FileInformation, ULONG Length, + FILE_INFORMATION_CLASS FileInformationClass); + +DISPATCH_STATIC_GLOBAL(dispatch_once_t _dispatch_ntdll_pred); +DISPATCH_STATIC_GLOBAL(_NtQueryInformationFile_fn_t _dispatch_NtQueryInformationFile_ptr); + static void _dispatch_init_precise_time(void *context DISPATCH_UNUSED) { @@ -38,3 +45,27 @@ _dispatch_QueryUnbiasedInterruptTimePrecise(PULONGLONG lpUnbiasedInterruptTimePr dispatch_once_f(&_dispatch_precise_time_pred, NULL, _dispatch_init_precise_time); return _dispatch_QueryUnbiasedInterruptTimePrecise_ptr(lpUnbiasedInterruptTimePrecise); } + +static void +_dispatch_init_ntdll(void *context DISPATCH_UNUSED) +{ + HMODULE ntdll = LoadLibraryW(L"ntdll.dll"); + if (!ntdll) { + // ntdll is not required. + return; + } + _dispatch_NtQueryInformationFile_ptr = (_NtQueryInformationFile_fn_t) + GetProcAddress(ntdll, "NtQueryInformationFile"); +} + +NTSTATUS _dispatch_NtQueryInformationFile(HANDLE FileHandle, + PIO_STATUS_BLOCK IoStatusBlock, PVOID FileInformation, ULONG Length, + FILE_INFORMATION_CLASS FileInformationClass) +{ + dispatch_once_f(&_dispatch_ntdll_pred, NULL, _dispatch_init_ntdll); + if (!_dispatch_NtQueryInformationFile_ptr) { + return STATUS_NOT_SUPPORTED; + } + return _dispatch_NtQueryInformationFile_ptr(FileHandle, IoStatusBlock, + FileInformation, Length, FileInformationClass); +} diff --git a/src/shims/generic_win_stubs.h b/src/shims/generic_win_stubs.h index 7d38adb29..1f7f4eaa3 100644 --- a/src/shims/generic_win_stubs.h +++ b/src/shims/generic_win_stubs.h @@ -6,7 +6,9 @@ #include #include +#include #include +#include #include #include @@ -40,7 +42,29 @@ typedef __typeof__(_Generic((__SIZE_TYPE__)0, \ /* * Wrappers for dynamically loaded Windows APIs */ + void _dispatch_QueryInterruptTimePrecise(PULONGLONG lpInterruptTimePrecise); void _dispatch_QueryUnbiasedInterruptTimePrecise(PULONGLONG lpUnbiasedInterruptTimePrecise); +enum { + FilePipeLocalInformation = 24, +}; + +typedef struct _FILE_PIPE_LOCAL_INFORMATION { + ULONG NamedPipeType; + ULONG NamedPipeConfiguration; + ULONG MaximumInstances; + ULONG CurrentInstances; + ULONG InboundQuota; + ULONG ReadDataAvailable; + ULONG OutboundQuota; + ULONG WriteQuotaAvailable; + ULONG NamedPipeState; + ULONG NamedPipeEnd; +} FILE_PIPE_LOCAL_INFORMATION, *PFILE_PIPE_LOCAL_INFORMATION; + +NTSTATUS _dispatch_NtQueryInformationFile(HANDLE FileHandle, + PIO_STATUS_BLOCK IoStatusBlock, PVOID FileInformation, ULONG Length, + FILE_INFORMATION_CLASS FileInformationClass); + #endif diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index c6aa30449..6cc82179a 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -128,6 +128,7 @@ set(DISPATCH_C_TESTS timer_set_time data io_net + io_pipe io_pipe_close select) @@ -171,6 +172,7 @@ foreach(test ${DISPATCH_C_TESTS}) dispatch_${test}.c) endforeach() +set_tests_properties(dispatch_io_pipe PROPERTIES TIMEOUT 15) set_tests_properties(dispatch_io_pipe_close PROPERTIES TIMEOUT 5) # test dispatch API for various C/CXX language variants diff --git a/tests/dispatch_io_pipe.c b/tests/dispatch_io_pipe.c new file mode 100644 index 000000000..f94438483 --- /dev/null +++ b/tests/dispatch_io_pipe.c @@ -0,0 +1,488 @@ +/* + * Copyright (c) 2019 Apple Inc. All rights reserved. + * + * @APPLE_APACHE_LICENSE_HEADER_START@ + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * @APPLE_APACHE_LICENSE_HEADER_END@ + */ + +#include +#include +#include +#include +#include +#include +#if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__)) +#include +#endif + +#include + +#include +#include "dispatch_test.h" + +enum { + DISPATCH_PIPE_KIND_ANONYMOUS, +#if defined(_WIN32) + DISPATCH_PIPE_KIND_NAMED_INBOUND, + DISPATCH_PIPE_KIND_NAMED_OUTBOUND, + DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED, + DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED, +#endif + DISPATCH_PIPE_KIND_COUNT, +}; + +enum { + DISPATCH_TEST_IMMEDIATE, + DISPATCH_TEST_DELAYED, +}; + +static const char *const pipe_names[] = { + [DISPATCH_PIPE_KIND_ANONYMOUS] = "anonymous", +#if defined(_WIN32) + [DISPATCH_PIPE_KIND_NAMED_INBOUND] = "named, inbound", + [DISPATCH_PIPE_KIND_NAMED_OUTBOUND] = "named, outbound", + [DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED] = "named, inbound, overlapped", + [DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED] = "named, outbound, overlapped", +#endif +}; + +static const char *const delay_names[] = { + [DISPATCH_TEST_IMMEDIATE] = "Immediate", + [DISPATCH_TEST_DELAYED] = "Delayed", +}; + +#if defined(_WIN32) +enum { + NAMED_PIPE_BUFFER_SIZE = 0x1000, +}; +#endif + +static size_t +test_get_pipe_buffer_size(int kind) +{ +#if defined(_WIN32) + if (kind != DISPATCH_PIPE_KIND_ANONYMOUS) { + return NAMED_PIPE_BUFFER_SIZE; + } + static dispatch_once_t once; + static DWORD size; + dispatch_once(&once, ^{ + HANDLE read_handle, write_handle; + if (!CreatePipe(&read_handle, &write_handle, NULL, 0)) { + test_long("CreatePipe", GetLastError(), ERROR_SUCCESS); + test_stop(); + } + GetNamedPipeInfo(write_handle, NULL, &size, NULL, NULL); + CloseHandle(read_handle); + CloseHandle(write_handle); + }); + return size; +#else + (void)kind; + static dispatch_once_t once; + static size_t size; + dispatch_once(&once, ^{ + int fds[2]; + if (pipe(fds) < 0) { + test_errno("pipe", errno, 0); + test_stop(); + } + fcntl(fds[1], F_SETFL, O_NONBLOCK); + for (size = 0; write(fds[1], "", 1) > 0; size++) {} + close(fds[0]); + close(fds[1]); + }); + return size; +#endif +} + +#if defined(_WIN32) +static void +test_make_named_pipe(DWORD flags, dispatch_fd_t *readfd, dispatch_fd_t *writefd) +{ + wchar_t name[64]; + static int counter = 0; + swprintf(name, sizeof(name), L"\\\\.\\pipe\\dispatch_io_pipe_%lu_%d", + GetCurrentProcessId(), counter++); + HANDLE server = CreateNamedPipeW(name, + flags | FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE, + /* nMaxInstances */ 1, NAMED_PIPE_BUFFER_SIZE, + NAMED_PIPE_BUFFER_SIZE, /* nDefaultTimeOut */ 0, + /* lpSecurityAttributes */ NULL); + if (server == INVALID_HANDLE_VALUE) { + test_ptr_not("CreateNamedPipe", server, INVALID_HANDLE_VALUE); + test_stop(); + } + HANDLE client = CreateFileW(name, + (flags & PIPE_ACCESS_INBOUND) ? GENERIC_WRITE : GENERIC_READ, + /* dwShareMode */ 0, /* lpSecurityAttributes */ NULL, OPEN_EXISTING, + flags & FILE_FLAG_OVERLAPPED, /* hTemplateFile */ NULL); + if (client == INVALID_HANDLE_VALUE) { + test_ptr_not("CreateFile", client, INVALID_HANDLE_VALUE); + test_stop(); + } + if (flags & PIPE_ACCESS_INBOUND) { + *readfd = (dispatch_fd_t)server; + *writefd = (dispatch_fd_t)client; + } else { + *readfd = (dispatch_fd_t)client; + *writefd = (dispatch_fd_t)server; + } +} +#endif + +static void +test_make_pipe(int kind, dispatch_fd_t *readfd, dispatch_fd_t *writefd) +{ +#if defined(_WIN32) + switch (kind) { + case DISPATCH_PIPE_KIND_ANONYMOUS: + if (!CreatePipe((PHANDLE)readfd, (PHANDLE)writefd, NULL, 0)) { + test_long("CreatePipe", GetLastError(), ERROR_SUCCESS); + test_stop(); + } + break; + case DISPATCH_PIPE_KIND_NAMED_INBOUND: + test_make_named_pipe(PIPE_ACCESS_INBOUND, readfd, writefd); + break; + case DISPATCH_PIPE_KIND_NAMED_OUTBOUND: + test_make_named_pipe(PIPE_ACCESS_OUTBOUND, readfd, writefd); + break; + case DISPATCH_PIPE_KIND_NAMED_INBOUND_OVERLAPPED: + test_make_named_pipe(PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, readfd, + writefd); + break; + case DISPATCH_PIPE_KIND_NAMED_OUTBOUND_OVERLAPPED: + test_make_named_pipe(PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED, + readfd, writefd); + break; + } +#else + (void)kind; + int fds[2]; + if (pipe(fds) < 0) { + test_errno("pipe", errno, 0); + test_stop(); + } + *readfd = fds[0]; + *writefd = fds[1]; +#endif +} + +static void +test_source_read(int kind, int delay) +{ + printf("\nSource Read %s: %s\n", delay_names[delay], pipe_names[kind]); + + dispatch_fd_t readfd, writefd; + test_make_pipe(kind, &readfd, &writefd); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + + void (^write_block)(void) = ^{ + dispatch_group_enter(g); + char buf[512] = {0}; + ssize_t n = dispatch_test_fd_write(writefd, buf, sizeof(buf)); + if (n < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)n, sizeof(buf)); + dispatch_group_leave(g); + }; + if (delay == DISPATCH_TEST_IMMEDIATE) { + write_block(); + } + + dispatch_source_t reader = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, + (uintptr_t)readfd, 0, dispatch_get_global_queue(0, 0)); + test_ptr_notnull("dispatch_source_create", reader); + assert(reader); + dispatch_source_set_event_handler(reader, ^{ + dispatch_group_enter(g); + char buf[512]; + size_t available = dispatch_source_get_data(reader); + test_sizet("num available", available, sizeof(buf)); + ssize_t n = dispatch_test_fd_read(readfd, buf, sizeof(buf)); + if (n >= 0) { + test_sizet("num read", (size_t)n, sizeof(buf)); + } else { + test_errno("read error", errno, 0); + } + dispatch_source_cancel(reader); + dispatch_group_leave(g); + }); + dispatch_source_set_cancel_handler(reader, ^{ + dispatch_release(reader); + dispatch_group_leave(g); + }); + dispatch_resume(reader); + + dispatch_source_t t = NULL; + if (delay == DISPATCH_TEST_DELAYED) { + t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, + dispatch_get_global_queue(0, 0)); + dispatch_source_set_event_handler(t, write_block); + dispatch_source_set_timer(t, + dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + DISPATCH_TIME_FOREVER, 0); + dispatch_resume(t); + } + + test_group_wait(g); + dispatch_release(g); + if (t) { + dispatch_source_cancel(t); + dispatch_release(t); + } + dispatch_test_fd_close(readfd); + dispatch_test_fd_close(writefd); +} + +static void +test_source_write(int kind, int delay) +{ + printf("\nSource Write %s: %s\n", delay_names[delay], pipe_names[kind]); + + dispatch_fd_t readfd, writefd; + test_make_pipe(kind, &readfd, &writefd); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + + const size_t bufsize = test_get_pipe_buffer_size(kind); + + void (^write_block)(void) = ^{ + char *buf = calloc(bufsize, 1); + assert(buf); + ssize_t nw = dispatch_test_fd_write(writefd, buf, bufsize); + free(buf); + if (nw < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)nw, bufsize); + }; + write_block(); + + void (^read_block)(void) = ^{ + dispatch_group_enter(g); + char *buf = calloc(bufsize, 1); + assert(buf); + ssize_t nr = dispatch_test_fd_read(readfd, buf, bufsize); + free(buf); + if (nr < 0) { + test_errno("read error", errno, 0); + test_stop(); + } + test_sizet("num read", (size_t)nr, bufsize); + dispatch_group_leave(g); + }; + if (delay == DISPATCH_TEST_IMMEDIATE) { + read_block(); + } + + dispatch_source_t writer = dispatch_source_create( + DISPATCH_SOURCE_TYPE_WRITE, (uintptr_t)writefd, 0, + dispatch_get_global_queue(0, 0)); + test_ptr_notnull("dispatch_source_create", writer); + assert(writer); + dispatch_source_set_event_handler(writer, ^{ + dispatch_group_enter(g); + size_t available = dispatch_source_get_data(writer); + test_sizet_less_than("num available", 0, available); + write_block(); + read_block(); + dispatch_source_cancel(writer); + dispatch_group_leave(g); + }); + dispatch_source_set_cancel_handler(writer, ^{ + dispatch_release(writer); + dispatch_group_leave(g); + }); + dispatch_resume(writer); + + dispatch_source_t t = NULL; + if (delay == DISPATCH_TEST_DELAYED) { + t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, + dispatch_get_global_queue(0, 0)); + dispatch_source_set_event_handler(t, read_block); + dispatch_source_set_timer(t, + dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + DISPATCH_TIME_FOREVER, 0); + dispatch_resume(t); + } + + test_group_wait(g); + dispatch_release(g); + if (t) { + dispatch_source_cancel(t); + dispatch_release(t); + } + dispatch_test_fd_close(readfd); + dispatch_test_fd_close(writefd); +} + +static void +test_dispatch_read(int kind, int delay) +{ + printf("\nDispatch Read %s: %s\n", delay_names[delay], pipe_names[kind]); + + dispatch_fd_t readfd, writefd; + test_make_pipe(kind, &readfd, &writefd); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + + char writebuf[512] = {0}; + char *writebufp = writebuf; + void (^write_block)(void) = ^{ + dispatch_group_enter(g); + ssize_t n = + dispatch_test_fd_write(writefd, writebufp, sizeof(writebuf)); + if (n < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)n, sizeof(writebuf)); + dispatch_group_leave(g); + }; + if (delay == DISPATCH_TEST_IMMEDIATE) { + write_block(); + } + + dispatch_read(readfd, sizeof(writebuf), dispatch_get_global_queue(0, 0), + ^(dispatch_data_t data, int err) { + test_errno("read error", err, 0); + test_sizet("num read", dispatch_data_get_size(data), sizeof(writebuf)); + dispatch_group_leave(g); + }); + + dispatch_source_t t = NULL; + if (delay == DISPATCH_TEST_DELAYED) { + t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, + dispatch_get_global_queue(0, 0)); + dispatch_source_set_event_handler(t, write_block); + dispatch_source_set_timer(t, + dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + DISPATCH_TIME_FOREVER, 0); + dispatch_resume(t); + } + + test_group_wait(g); + dispatch_release(g); + if (t) { + dispatch_source_cancel(t); + dispatch_release(t); + } + dispatch_test_fd_close(readfd); + dispatch_test_fd_close(writefd); +} + +static void +test_dispatch_write(int kind, int delay) +{ + printf("\nDispatch Write %s: %s\n", delay_names[delay], pipe_names[kind]); + + dispatch_fd_t readfd, writefd; + test_make_pipe(kind, &readfd, &writefd); + + dispatch_group_t g = dispatch_group_create(); + dispatch_group_enter(g); + + const size_t bufsize = test_get_pipe_buffer_size(kind); + + char *buf = calloc(bufsize, 1); + assert(buf); + ssize_t nw = dispatch_test_fd_write(writefd, buf, bufsize); + free(buf); + if (nw < 0) { + test_errno("write error", errno, 0); + test_stop(); + } + test_sizet("num written", (size_t)nw, bufsize); + + void (^read_block)(void) = ^{ + dispatch_group_enter(g); + char *readbuf = calloc(bufsize, 1); + assert(readbuf); + ssize_t nr = dispatch_test_fd_read(readfd, readbuf, bufsize); + free(readbuf); + if (nr < 0) { + test_errno("read error", errno, 0); + test_stop(); + } + test_sizet("num read", (size_t)nr, bufsize); + dispatch_group_leave(g); + }; + if (delay == DISPATCH_TEST_IMMEDIATE) { + read_block(); + } + + buf = calloc(bufsize, 1); + assert(buf); + dispatch_data_t wd = dispatch_data_create(buf, bufsize, + dispatch_get_global_queue(0, 0), DISPATCH_DATA_DESTRUCTOR_FREE); + dispatch_write(writefd, wd, dispatch_get_global_queue(0, 0), + ^(dispatch_data_t data, int err) { + test_errno("write error", err, 0); + test_ptr_null("data written", data); + read_block(); + dispatch_group_leave(g); + }); + + dispatch_source_t t = NULL; + if (delay == DISPATCH_TEST_DELAYED) { + t = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, + dispatch_get_global_queue(0, 0)); + dispatch_source_set_event_handler(t, read_block); + dispatch_source_set_timer(t, + dispatch_time(DISPATCH_TIME_NOW, 500 * NSEC_PER_MSEC), + DISPATCH_TIME_FOREVER, 0); + dispatch_resume(t); + } + + test_group_wait(g); + dispatch_release(g); + dispatch_release(wd); + if (t) { + dispatch_source_cancel(t); + dispatch_release(t); + } + dispatch_test_fd_close(readfd); + dispatch_test_fd_close(writefd); +} + +int +main(void) +{ + dispatch_test_start("Dispatch IO Pipe"); + dispatch_async(dispatch_get_main_queue(), ^{ + for (int kind = 0; kind < DISPATCH_PIPE_KIND_COUNT; kind++) { + test_source_read(kind, DISPATCH_TEST_IMMEDIATE); + test_source_read(kind, DISPATCH_TEST_DELAYED); + test_source_write(kind, DISPATCH_TEST_IMMEDIATE); + test_source_write(kind, DISPATCH_TEST_DELAYED); + test_dispatch_read(kind, DISPATCH_TEST_IMMEDIATE); + test_dispatch_read(kind, DISPATCH_TEST_DELAYED); + test_dispatch_write(kind, DISPATCH_TEST_IMMEDIATE); + test_dispatch_write(kind, DISPATCH_TEST_DELAYED); + } + test_stop(); + }); + dispatch_main(); +} diff --git a/tests/dispatch_test.c b/tests/dispatch_test.c index b0d028df3..d84a7b228 100644 --- a/tests/dispatch_test.c +++ b/tests/dispatch_test.c @@ -337,6 +337,21 @@ ssize_t dispatch_test_fd_read(dispatch_fd_t fd, void *buf, size_t count) { #if defined(_WIN32) + if (GetFileType((HANDLE)fd) == FILE_TYPE_PIPE) { + OVERLAPPED ov = {0}; + DWORD num_read; + BOOL success = ReadFile((HANDLE)fd, buf, count, &num_read, &ov); + if (!success && GetLastError() == ERROR_IO_PENDING) { + success = GetOverlappedResult((HANDLE)fd, &ov, &num_read, + /* bWait */ TRUE); + } + if (!success) { + print_winapi_error("ReadFile", GetLastError()); + errno = EIO; + return -1; + } + return (ssize_t)num_read; + } DWORD num_read; if (!ReadFile((HANDLE)fd, buf, count, &num_read, NULL)) { print_winapi_error("ReadFile", GetLastError()); @@ -353,6 +368,21 @@ ssize_t dispatch_test_fd_write(dispatch_fd_t fd, const void *buf, size_t count) { #if defined(_WIN32) + if (GetFileType((HANDLE)fd) == FILE_TYPE_PIPE) { + OVERLAPPED ov = {0}; + DWORD num_written; + BOOL success = WriteFile((HANDLE)fd, buf, count, &num_written, &ov); + if (!success && GetLastError() == ERROR_IO_PENDING) { + success = GetOverlappedResult((HANDLE)fd, &ov, &num_written, + /* bWait */ TRUE); + } + if (!success) { + print_winapi_error("WriteFile", GetLastError()); + errno = EIO; + return -1; + } + return (ssize_t)num_written; + } DWORD num_written; if (!WriteFile((HANDLE)fd, buf, count, &num_written, NULL)) { print_winapi_error("WriteFile", GetLastError());