From 63ddb46a71895029bb9c34fbde8f6c7a0eae6215 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Wed, 20 Aug 2025 13:39:59 -0700 Subject: [PATCH 1/4] Call spawn() on a background thread because it might block Introduce runOnBackgroundThread() to run closures on a background thread without blocking the Swift Concurrency thread pool --- Sources/Subprocess/CMakeLists.txt | 1 + Sources/Subprocess/Configuration.swift | 2 +- Sources/Subprocess/IO/AsyncIO+Windows.swift | 79 +++-- .../Platforms/Subprocess+Darwin.swift | 63 +++- .../Platforms/Subprocess+Unix.swift | 6 +- Sources/Subprocess/Thread.swift | 325 ++++++++++++++++++ .../ProcessMonitoringTests.swift | 4 +- 7 files changed, 428 insertions(+), 52 deletions(-) create mode 100644 Sources/Subprocess/Thread.swift diff --git a/Sources/Subprocess/CMakeLists.txt b/Sources/Subprocess/CMakeLists.txt index 53de37cf..b9c34090 100644 --- a/Sources/Subprocess/CMakeLists.txt +++ b/Sources/Subprocess/CMakeLists.txt @@ -14,6 +14,7 @@ add_library(Subprocess Buffer.swift Error.swift Teardown.swift + Thread.swift Result.swift IO/Output.swift IO/Input.swift diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index ca8c6a19..94d9cd97 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -84,7 +84,7 @@ public struct Configuration: Sendable { isolation: isolated (any Actor)? = #isolation, _ body: ((Execution, consuming IOChannel?, consuming IOChannel?, consuming IOChannel?) async throws -> Result) ) async throws -> ExecutionResult { - let spawnResults = try self.spawn( + let spawnResults = try await self.spawn( withInput: input, outputPipe: output, errorPipe: error diff --git a/Sources/Subprocess/IO/AsyncIO+Windows.swift b/Sources/Subprocess/IO/AsyncIO+Windows.swift index 4e794ee1..3120d3bb 100644 --- a/Sources/Subprocess/IO/AsyncIO+Windows.swift +++ b/Sources/Subprocess/IO/AsyncIO+Windows.swift @@ -43,7 +43,7 @@ final class AsyncIO: @unchecked Sendable { ) rethrows -> ResultType } - private final class MonitorThreadContext { + private struct MonitorThreadContext: @unchecked Sendable { let ioCompletionPort: HANDLE init(ioCompletionPort: HANDLE) { @@ -60,11 +60,9 @@ final class AsyncIO: @unchecked Sendable { internal init() { var maybeSetupError: SubprocessError? = nil // Create the the completion port - guard - let port = CreateIoCompletionPort( - INVALID_HANDLE_VALUE, nil, 0, 0 - ), port != INVALID_HANDLE_VALUE - else { + guard let ioCompletionPort = CreateIoCompletionPort( + INVALID_HANDLE_VALUE, nil, 0, 0 + ), ioCompletionPort != INVALID_HANDLE_VALUE else { let error = SubprocessError( code: .init(.asyncIOFailed("CreateIoCompletionPort failed")), underlyingError: .init(rawValue: GetLastError()) @@ -73,17 +71,12 @@ final class AsyncIO: @unchecked Sendable { self.monitorThread = .failure(error) return } - self.ioCompletionPort = .success(port) + self.ioCompletionPort = .success(ioCompletionPort) // Create monitor thread - let threadContext = MonitorThreadContext(ioCompletionPort: port) - let threadContextPtr = Unmanaged.passRetained(threadContext) - /// Microsoft documentation for `CreateThread` states: - /// > A thread in an executable that calls the C run-time library (CRT) - /// > should use the _beginthreadex and _endthreadex functions for - /// > thread management rather than CreateThread and ExitThread - let threadHandleValue = _beginthreadex( - nil, 0, - { args in + let context = MonitorThreadContext(ioCompletionPort: ioCompletionPort) + let threadHandle: HANDLE + do { + threadHandle = try begin_thread_x { func reportError(_ error: SubprocessError) { let continuations = _registration.withLock { store in return store.values @@ -93,15 +86,50 @@ final class AsyncIO: @unchecked Sendable { } } - let unmanaged = Unmanaged.fromOpaque(args!) - let context = unmanaged.takeRetainedValue() - + // Monitor loop + while true { + var bytesTransferred: DWORD = 0 + var targetFileDescriptor: UInt64 = 0 + var overlapped: LPOVERLAPPED? = nil // Monitor loop while true { var bytesTransferred: DWORD = 0 var targetFileDescriptor: UInt64 = 0 var overlapped: LPOVERLAPPED? = nil + let monitorResult = GetQueuedCompletionStatus( + context.ioCompletionPort, + &bytesTransferred, + &targetFileDescriptor, + &overlapped, + INFINITE + ) + if !monitorResult { + let lastError = GetLastError() + if lastError == ERROR_BROKEN_PIPE { + // We finished reading the handle. Signal EOF by + // finishing the stream. + // NOTE: here we deliberately leave now unused continuation + // in the store. Windows does not offer an API to remove a + // HANDLE from an IOCP port, therefore we leave the registration + // to signify the HANDLE has already been resisted. + let continuation = _registration.withLock { store -> SignalStream.Continuation? in + if let continuation = store[targetFileDescriptor] { + return continuation + } + return nil + } + continuation?.finish() + continue + } else { + let error = SubprocessError( + code: .init(.asyncIOFailed("GetQueuedCompletionStatus failed")), + underlyingError: .init(rawValue: lastError) + ) + reportError(error) + break + } + } let monitorResult = GetQueuedCompletionStatus( context.ioCompletionPort, &bytesTransferred, @@ -149,16 +177,13 @@ final class AsyncIO: @unchecked Sendable { } continuation?.yield(bytesTransferred) } + return 0 - }, threadContextPtr.toOpaque(), 0, nil) - guard threadHandleValue > 0, - let threadHandle = HANDLE(bitPattern: threadHandleValue) - else { - // _beginthreadex uses errno instead of GetLastError() - let capturedError = _subprocess_windows_get_errno() + } + } catch let underlyingError { let error = SubprocessError( - code: .init(.asyncIOFailed("_beginthreadex failed")), - underlyingError: .init(rawValue: capturedError) + code: .init(.asyncIOFailed("Failed to create monitor thread")), + underlyingError: underlyingError ) self.monitorThread = .failure(error) return diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index c1aa76c4..4d74c39b 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -153,11 +153,24 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible // MARK: - Spawn extension Configuration { + // @unchecked Sendable because we need to capture UnsafePointers + // to send to another thread. While UnsafePointers are not + // Sendable, we are not mutating them -- we only need these type + // for C interface. + internal struct SpawnContext: @unchecked Sendable { + let fileActions: posix_spawn_file_actions_t? + let spawnAttributes: posix_spawnattr_t? + let argv: [UnsafeMutablePointer?] + let env: [UnsafeMutablePointer?] + let uidPtr: UnsafeMutablePointer? + let gidPtr: UnsafeMutablePointer? + } + internal func spawn( withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe - ) throws -> SpawnResult { + ) async throws -> SpawnResult { // Instead of checking if every possible executable path // is valid, spawn each directly and catch ENOENT let possiblePaths = self.executable.possibleExecutablePaths( @@ -167,7 +180,7 @@ extension Configuration { var outputPipeBox: CreatedPipe? = consume outputPipe var errorPipeBox: CreatedPipe? = consume errorPipe - return try self.preSpawn { args throws -> SpawnResult in + return try await self.preSpawn { args throws -> SpawnResult in let (env, uidPtr, gidPtr, supplementaryGroups) = args var _inputPipe = inputPipeBox.take()! var _outputPipe = outputPipeBox.take()! @@ -181,8 +194,6 @@ extension Configuration { let errorWriteFileDescriptor: IODescriptor? = _errorPipe.writeFileDescriptor() for possibleExecutablePath in possiblePaths { - var pid: pid_t = 0 - // Setup Arguments let argv: [UnsafeMutablePointer?] = self.arguments.createArgs( withExecutablePath: possibleExecutablePath @@ -389,21 +400,35 @@ extension Configuration { } // Spawn - let spawnError: CInt = possibleExecutablePath.withCString { exePath in - return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in - return _subprocess_spawn( - &pid, - exePath, - &fileActions, - &spawnAttributes, - argv, - env, - uidPtr, - gidPtr, - Int32(supplementaryGroups?.count ?? 0), - sgroups?.baseAddress, - self.platformOptions.createSession ? 1 : 0 - ) + let spawnContext = SpawnContext( + fileActions: fileActions, + spawnAttributes: spawnAttributes, + argv: argv, + env: env, + uidPtr: uidPtr, + gidPtr: gidPtr + ) + let (spawnError, pid) = try await runOnBackgroundThread { + return possibleExecutablePath.withCString { exePath in + return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in + var pid: pid_t = 0 + var _fileActions = spawnContext.fileActions + var _spawnAttributes = spawnContext.spawnAttributes + let rc = _subprocess_spawn( + &pid, + exePath, + &_fileActions, + &_spawnAttributes, + spawnContext.argv, + spawnContext.env, + spawnContext.uidPtr, + spawnContext.gidPtr, + Int32(supplementaryGroups?.count ?? 0), + sgroups?.baseAddress, + self.platformOptions.createSession ? 1 : 0 + ) + return (rc, pid) + } } } // Spawn error diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 17d5b32b..c490f31d 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -350,8 +350,8 @@ extension Configuration { ) internal func preSpawn( - _ work: (PreSpawnArgs) throws -> Result - ) throws -> Result { + _ work: (PreSpawnArgs) async throws -> Result + ) async throws -> Result { // Prepare environment let env = self.environment.createEnv() defer { @@ -378,7 +378,7 @@ extension Configuration { if let groupsValue = self.platformOptions.supplementaryGroups { supplementaryGroups = groupsValue } - return try work( + return try await work( ( env: env, uidPtr: uidPtr, diff --git a/Sources/Subprocess/Thread.swift b/Sources/Subprocess/Thread.swift new file mode 100644 index 00000000..51321f36 --- /dev/null +++ b/Sources/Subprocess/Thread.swift @@ -0,0 +1,325 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +#if canImport(Darwin) +import Darwin +#elseif canImport(Glibc) +import Glibc +#elseif canImport(Bionic) +import Bionic +#elseif canImport(Musl) +import Musl +#elseif canImport(WinSDK) +import WinSDK +#endif + +internal import Dispatch +import _SubprocessCShims + +#if canImport(Synchronization) +import Synchronization +#endif + +#if canImport(Darwin) +internal func runOnBackgroundThread( + _ body: @Sendable @escaping () throws -> Result +) async throws -> Result { + let result = try await withCheckedThrowingContinuation { continuation in + // On Darwin, use DispatchQueue directly + DispatchQueue.global().async { + do { + let result = try body() + continuation.resume(returning: result) + } catch { + continuation.resume(throwing: error) + } + } + } + return result +} +#else + +#if canImport(WinSDK) +private typealias MutexType = CRITICAL_SECTION +private typealias ConditionType = CONDITION_VARIABLE +private typealias ThreadType = HANDLE +#else +private typealias MutexType = pthread_mutex_t +private typealias ConditionType = pthread_cond_t +private typealias ThreadType = pthread_t +#endif + +private struct BackgroundWorkItem { + private let work: @Sendable () -> Void + + init( + _ body: @Sendable @escaping () throws -> Result, + continuation: CheckedContinuation + ) { + self.work = { + do { + let result = try body() + continuation.resume(returning: result) + } catch { + continuation.resume(throwing: error) + } + } + } + + func run() { + self.work() + } +} + +// We can't use Mutex directly here because we need the underlying `pthread_mutex_t` to be +// exposed so we can use it with `pthread_cond_wait`. +private final class WorkQueue: Sendable { + private nonisolated(unsafe) var queue: [BackgroundWorkItem] + internal nonisolated(unsafe) let mutex: UnsafeMutablePointer + internal nonisolated(unsafe) let waitCondition: UnsafeMutablePointer + + init() { + self.queue = [] + self.mutex = UnsafeMutablePointer.allocate(capacity: 1) + self.waitCondition = UnsafeMutablePointer.allocate(capacity: 1) + #if canImport(WinSDK) + InitializeCriticalSection(self.mutex) + InitializeConditionVariable(self.waitCondition) + #else + pthread_mutex_init(self.mutex, nil) + pthread_cond_init(self.waitCondition, nil) + #endif + } + + func withLock(_ body: (inout [BackgroundWorkItem]) throws -> R) rethrows -> R { + try withUnsafeUnderlyingLock { _, queue in + try body(&queue) + } + } + + private func withUnsafeUnderlyingLock( + _ body: (UnsafeMutablePointer, inout [BackgroundWorkItem]) throws -> R + ) rethrows -> R { + #if canImport(WinSDK) + EnterCriticalSection(self.mutex) + defer { + LeaveCriticalSection(self.mutex); + } + #else + pthread_mutex_lock(self.mutex) + defer { + pthread_mutex_unlock(mutex) + } + #endif + return try body(mutex, &queue) + } + + // Only called in worker thread. Sleeps the thread if there's no more item + func dequeue() -> BackgroundWorkItem? { + return self.withUnsafeUnderlyingLock { mutex, queue in + if queue.isEmpty { + // Sleep the worker thread if there's no more work + #if canImport(WinSDK) + SleepConditionVariableCS(self.waitCondition, mutex, INFINITE) + #else + pthread_cond_wait(self.waitCondition, mutex) + #endif + } + guard !queue.isEmpty else { + return nil + } + return queue.removeFirst() + } + } + + // Only called in parent thread. Signals wait condition to wake up worker thread + func enqueue(_ workItem: BackgroundWorkItem) { + self.withLock { queue in + queue.append(workItem) + #if canImport(WinSDK) + WakeConditionVariable(self.waitCondition); + #else + pthread_cond_signal(self.waitCondition) + #endif + } + } + + func shutdown() { + self.withLock { queue in + queue.removeAll() + #if canImport(WinSDK) + WakeConditionVariable(self.waitCondition); + #else + pthread_cond_signal(self.waitCondition) + #endif + } + } +} + +private let _workQueue = WorkQueue() +private let _workQueueShutdownFlag: Atomic = Atomic(0) + +// Okay to be unlocked global mutable because this value is only set once like dispatch_once +private nonisolated(unsafe) var _workerThread: Result = .failure(SubprocessError(code: .init(.spawnFailed), underlyingError: nil)) + +private let setupWorkerThread: () = { + do { + #if canImport(WinSDK) + let workerThread = try begin_thread_x { + while true { + // This dequeue call will suspend the thread if there's no more work left + guard let workItem = _workQueue.dequeue() else { + break + } + workItem.run() + } + return 0 + } + #else + let workerThread = try pthread_create { + while true { + // This dequeue call will suspend the thread if there's no more work left + guard let workItem = _workQueue.dequeue() else { + break + } + workItem.run() + } + } + #endif + _workerThread = .success(workerThread) + } catch { + _workerThread = .failure(error) + } + + atexit { + _shutdownWorkerThread() + } +}() + +private func _shutdownWorkerThread() { + guard case .success(let thread) = _workerThread else { + return + } + guard _workQueueShutdownFlag.add(1, ordering: .sequentiallyConsistent).newValue == 1 else { + // We already shutdown this thread + return + } + _workQueue.shutdown() + #if canImport(WinSDK) + WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); + DeleteCriticalSection(_workQueue.mutex) + // We do not need to destroy CONDITION_VARIABLE + #else + pthread_join(thread, nil) + pthread_mutex_destroy(_workQueue.mutex) + pthread_cond_destroy(_workQueue.waitCondition) + #endif + _workQueue.mutex.deallocate() + _workQueue.waitCondition.deallocate() +} + +internal func runOnBackgroundThread( + _ body: @Sendable @escaping () throws -> Result +) async throws -> Result { + // Only executed once + setupWorkerThread + + let result = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let workItem = BackgroundWorkItem(body, continuation: continuation) + _workQueue.enqueue(workItem) + } + return result +} + +#endif + +// MARK: - Thread Creation Primitives +#if canImport(Glibc) || canImport(Bionic) || canImport(Musl) +internal func pthread_create( + _ body: @Sendable @escaping () -> () +) throws(SubprocessError.UnderlyingError) -> pthread_t { + final class Context { + let body: @Sendable () -> () + init(body: @Sendable @escaping () -> Void) { + self.body = body + } + } + #if canImport(Glibc) || canImport(Musl) + func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? { + (Unmanaged.fromOpaque(context!).takeRetainedValue() as! Context).body() + return nil + } + #elseif canImport(Bionic) + func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { + (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() + return context + } + #endif + #if canImport(Glibc) || canImport(Bionic) + var thread = pthread_t() + #else + var thread: pthread_t? + #endif + let rc = pthread_create( + &thread, + nil, + proc, + Unmanaged.passRetained(Context(body: body)).toOpaque() + ) + if rc != 0 { + throw SubprocessError.UnderlyingError(rawValue: rc) + } + #if canImport(Glibc) || canImport(Bionic) + return thread + #else + return thread! + #endif +} + +#elseif canImport(WinSDK) +/// Microsoft documentation for `CreateThread` states: +/// > A thread in an executable that calls the C run-time library (CRT) +/// > should use the _beginthreadex and _endthreadex functions for +/// > thread management rather than CreateThread and ExitThread +internal func begin_thread_x( + _ body: @Sendable @escaping () -> UInt32 +) throws(SubprocessError.UnderlyingError) -> HANDLE { + final class Context { + let body: @Sendable () -> UInt32 + init(body: @Sendable @escaping () -> UInt32) { + self.body = body + } + } + + func proc(_ context: UnsafeMutableRawPointer?) -> UInt32 { + return Unmanaged.fromOpaque(context!).takeRetainedValue().body() + } + + let threadHandleValue = _beginthreadex( + nil, + 0, + proc, + Unmanaged.passRetained(Context(body: body)).toOpaque(), + 0, + nil + ) + guard threadHandleValue != 0, + let threadHandle = HANDLE(bitPattern: threadHandleValue) else { + // _beginthreadex uses errno instead of GetLastError() + let capturedError = _subprocess_windows_get_errno() + throw SubprocessError.UnderlyingError(rawValue: DWORD(capturedError)) + } + + return threadHandle +} +#endif + diff --git a/Tests/SubprocessTests/ProcessMonitoringTests.swift b/Tests/SubprocessTests/ProcessMonitoringTests.swift index e0a869c2..1b743d46 100644 --- a/Tests/SubprocessTests/ProcessMonitoringTests.swift +++ b/Tests/SubprocessTests/ProcessMonitoringTests.swift @@ -103,7 +103,7 @@ struct SubprocessProcessMonitoringTests { config: Configuration, _ body: (Execution) async throws -> Void ) async throws { - let spawnResult = try config.spawn( + let spawnResult = try await config.spawn( withInput: self.devNullInputPipe(), outputPipe: self.devNullOutputPipe(), errorPipe: self.devNullOutputPipe() @@ -307,7 +307,7 @@ extension SubprocessProcessMonitoringTests { for _ in 0.. Date: Wed, 27 Aug 2025 17:34:55 -0700 Subject: [PATCH 2/4] Use pthread_create() instead of DispatchQueue for runOnBackgroundThread() on Darwin --- .../Platforms/Subprocess+Unix.swift | 66 ++++--- Sources/Subprocess/Thread.swift | 163 ++++++++++-------- 2 files changed, 135 insertions(+), 94 deletions(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index c490f31d..8090945b 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -415,11 +415,24 @@ internal typealias PlatformFileDescriptor = CInt #if !canImport(Darwin) extension Configuration { + + // @unchecked Sendable because we need to capture UnsafePointers + // to send to another thread. While UnsafePointers are not + // Sendable, we are not mutating them -- we only need these type + // for C interface. + internal struct SpawnContext: @unchecked Sendable { + let argv: [UnsafeMutablePointer?] + let env: [UnsafeMutablePointer?] + let uidPtr: UnsafeMutablePointer? + let gidPtr: UnsafeMutablePointer? + let processGroupIDPtr: UnsafeMutablePointer? + } + internal func spawn( withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe - ) throws -> SpawnResult { + ) async throws -> SpawnResult { // Ensure the waiter thread is running. #if os(Linux) || os(Android) _setupMonitorSignalHandler() @@ -434,7 +447,7 @@ extension Configuration { var outputPipeBox: CreatedPipe? = consume outputPipe var errorPipeBox: CreatedPipe? = consume errorPipe - return try self.preSpawn { args throws -> SpawnResult in + return try await self.preSpawn { args throws -> SpawnResult in let (env, uidPtr, gidPtr, supplementaryGroups) = args var _inputPipe = inputPipeBox.take()! @@ -472,27 +485,34 @@ extension Configuration { ] // Spawn - var pid: pid_t = 0 - var processDescriptor: PlatformFileDescriptor = -1 - let spawnError: CInt = possibleExecutablePath.withCString { exePath in - return (self.workingDirectory?.string).withOptionalCString { workingDir in - return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in - return fileDescriptors.withUnsafeBufferPointer { fds in - return _subprocess_fork_exec( - &pid, - &processDescriptor, - exePath, - workingDir, - fds.baseAddress!, - argv, - env, - uidPtr, - gidPtr, - processGroupIDPtr, - CInt(supplementaryGroups?.count ?? 0), - sgroups?.baseAddress, - self.platformOptions.createSession ? 1 : 0 - ) + let spawnContext = SpawnContext( + argv: argv, env: env, uidPtr: uidPtr, gidPtr: gidPtr, processGroupIDPtr: processGroupIDPtr + ) + let (pid, processDescriptor, spawnError) = try await runOnBackgroundThread { + return possibleExecutablePath.withCString { exePath in + return (self.workingDirectory?.string).withOptionalCString { workingDir in + return supplementaryGroups.withOptionalUnsafeBufferPointer { sgroups in + return fileDescriptors.withUnsafeBufferPointer { fds in + var pid: pid_t = 0 + var processDescriptor: PlatformFileDescriptor = -1 + + let rc = _subprocess_fork_exec( + &pid, + &processDescriptor, + exePath, + workingDir, + fds.baseAddress!, + spawnContext.argv, + spawnContext.env, + spawnContext.uidPtr, + spawnContext.gidPtr, + spawnContext.processGroupIDPtr, + CInt(supplementaryGroups?.count ?? 0), + sgroups?.baseAddress, + self.platformOptions.createSession ? 1 : 0 + ) + return (pid, processDescriptor, rc) + } } } } diff --git a/Sources/Subprocess/Thread.swift b/Sources/Subprocess/Thread.swift index 51321f36..86f98d84 100644 --- a/Sources/Subprocess/Thread.swift +++ b/Sources/Subprocess/Thread.swift @@ -11,6 +11,7 @@ #if canImport(Darwin) import Darwin +import os #elseif canImport(Glibc) import Glibc #elseif canImport(Bionic) @@ -28,24 +29,6 @@ import _SubprocessCShims import Synchronization #endif -#if canImport(Darwin) -internal func runOnBackgroundThread( - _ body: @Sendable @escaping () throws -> Result -) async throws -> Result { - let result = try await withCheckedThrowingContinuation { continuation in - // On Darwin, use DispatchQueue directly - DispatchQueue.global().async { - do { - let result = try body() - continuation.resume(returning: result) - } catch { - continuation.resume(throwing: error) - } - } - } - return result -} -#else #if canImport(WinSDK) private typealias MutexType = CRITICAL_SECTION @@ -57,6 +40,19 @@ private typealias ConditionType = pthread_cond_t private typealias ThreadType = pthread_t #endif +internal func runOnBackgroundThread( + _ body: @Sendable @escaping () throws -> Result +) async throws -> Result { + // Only executed once + _setupWorkerThread + + let result = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let workItem = BackgroundWorkItem(body, continuation: continuation) + _workQueue.enqueue(workItem) + } + return result +} + private struct BackgroundWorkItem { private let work: @Sendable () -> Void @@ -165,12 +161,12 @@ private final class WorkQueue: Sendable { } private let _workQueue = WorkQueue() -private let _workQueueShutdownFlag: Atomic = Atomic(0) +private let _workQueueShutdownFlag = AtomicCounter() // Okay to be unlocked global mutable because this value is only set once like dispatch_once private nonisolated(unsafe) var _workerThread: Result = .failure(SubprocessError(code: .init(.spawnFailed), underlyingError: nil)) -private let setupWorkerThread: () = { +private let _setupWorkerThread: () = { do { #if canImport(WinSDK) let workerThread = try begin_thread_x { @@ -208,7 +204,7 @@ private func _shutdownWorkerThread() { guard case .success(let thread) = _workerThread else { return } - guard _workQueueShutdownFlag.add(1, ordering: .sequentiallyConsistent).newValue == 1 else { + guard _workQueueShutdownFlag.addOne() == 1 else { // We already shutdown this thread return } @@ -227,65 +223,41 @@ private func _shutdownWorkerThread() { _workQueue.waitCondition.deallocate() } -internal func runOnBackgroundThread( - _ body: @Sendable @escaping () throws -> Result -) async throws -> Result { - // Only executed once - setupWorkerThread +// MARK: - AtomicCounter - let result = try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let workItem = BackgroundWorkItem(body, continuation: continuation) - _workQueue.enqueue(workItem) - } - return result -} +#if canImport(Darwin) +// Unfortunately on Darwin we can unconditionally use Atomic since it requires macOS 15 +internal struct AtomicCounter: ~Copyable { + private let storage: OSAllocatedUnfairLock -#endif + internal init() { + self.storage = .init(initialState: 0) + } -// MARK: - Thread Creation Primitives -#if canImport(Glibc) || canImport(Bionic) || canImport(Musl) -internal func pthread_create( - _ body: @Sendable @escaping () -> () -) throws(SubprocessError.UnderlyingError) -> pthread_t { - final class Context { - let body: @Sendable () -> () - init(body: @Sendable @escaping () -> Void) { - self.body = body + internal func addOne() -> UInt8 { + return self.storage.withLock { + $0 += 1 + return $0 } } - #if canImport(Glibc) || canImport(Musl) - func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? { - (Unmanaged.fromOpaque(context!).takeRetainedValue() as! Context).body() - return nil - } - #elseif canImport(Bionic) - func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { - (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() - return context +} +#else +internal struct AtomicCounter: ~Copyable { + + private let storage: Atomic + + internal init() { + self.storage = Atomic(0) } - #endif - #if canImport(Glibc) || canImport(Bionic) - var thread = pthread_t() - #else - var thread: pthread_t? - #endif - let rc = pthread_create( - &thread, - nil, - proc, - Unmanaged.passRetained(Context(body: body)).toOpaque() - ) - if rc != 0 { - throw SubprocessError.UnderlyingError(rawValue: rc) + + internal func addOne() -> UInt8 { + return self.storage.add(1, ordering: .sequentiallyConsistent).newValue } - #if canImport(Glibc) || canImport(Bionic) - return thread - #else - return thread! - #endif } +#endif -#elseif canImport(WinSDK) +// MARK: - Thread Creation Primitives +#if canImport(WinSDK) /// Microsoft documentation for `CreateThread` states: /// > A thread in an executable that calls the C run-time library (CRT) /// > should use the _beginthreadex and _endthreadex functions for @@ -321,5 +293,54 @@ internal func begin_thread_x( return threadHandle } +#else + +internal func pthread_create( + _ body: @Sendable @escaping () -> () +) throws(SubprocessError.UnderlyingError) -> pthread_t { + final class Context { + let body: @Sendable () -> () + init(body: @Sendable @escaping () -> Void) { + self.body = body + } + } +#if canImport(Darwin) + func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? { + (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() + return context + } +#elseif canImport(Glibc) || canImport(Musl) + func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? { + (Unmanaged.fromOpaque(context!).takeRetainedValue() as! Context).body() + return context + } +#elseif canImport(Bionic) + func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { + (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() + return context + } +#endif + +#if canImport(Glibc) || canImport(Bionic) + var thread = pthread_t() +#else + var thread: pthread_t? #endif + let rc = pthread_create( + &thread, + nil, + proc, + Unmanaged.passRetained(Context(body: body)).toOpaque() + ) + if rc != 0 { + throw SubprocessError.UnderlyingError(rawValue: rc) + } +#if canImport(Glibc) || canImport(Bionic) + return thread +#else + return thread! +#endif +} + +#endif // canImport(WinSDK) From 58445d956b526d8543b23a745c1f2eee3bcd2e8e Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Wed, 27 Aug 2025 19:48:55 -0700 Subject: [PATCH 3/4] Enable runOnBackgroundThread for spawn() on Windows --- Sources/Subprocess/Configuration.swift | 4 + Sources/Subprocess/IO/AsyncIO+Windows.swift | 46 +--- .../Platforms/Subprocess+Unix.swift | 45 ---- .../Platforms/Subprocess+Windows.swift | 196 ++++++++++-------- Sources/Subprocess/Thread.swift | 25 ++- 5 files changed, 132 insertions(+), 184 deletions(-) diff --git a/Sources/Subprocess/Configuration.swift b/Sources/Subprocess/Configuration.swift index 94d9cd97..4e674298 100644 --- a/Sources/Subprocess/Configuration.swift +++ b/Sources/Subprocess/Configuration.swift @@ -660,7 +660,11 @@ internal struct IODescriptor: ~Copyable { #endif internal var closeWhenDone: Bool + #if canImport(WinSDK) + internal nonisolated(unsafe) let descriptor: Descriptor + #else internal let descriptor: Descriptor + #endif internal init( _ descriptor: Descriptor, diff --git a/Sources/Subprocess/IO/AsyncIO+Windows.swift b/Sources/Subprocess/IO/AsyncIO+Windows.swift index 3120d3bb..ad02d947 100644 --- a/Sources/Subprocess/IO/AsyncIO+Windows.swift +++ b/Sources/Subprocess/IO/AsyncIO+Windows.swift @@ -60,9 +60,11 @@ final class AsyncIO: @unchecked Sendable { internal init() { var maybeSetupError: SubprocessError? = nil // Create the the completion port - guard let ioCompletionPort = CreateIoCompletionPort( - INVALID_HANDLE_VALUE, nil, 0, 0 - ), ioCompletionPort != INVALID_HANDLE_VALUE else { + guard + let ioCompletionPort = CreateIoCompletionPort( + INVALID_HANDLE_VALUE, nil, 0, 0 + ), ioCompletionPort != INVALID_HANDLE_VALUE + else { let error = SubprocessError( code: .init(.asyncIOFailed("CreateIoCompletionPort failed")), underlyingError: .init(rawValue: GetLastError()) @@ -86,50 +88,12 @@ final class AsyncIO: @unchecked Sendable { } } - // Monitor loop - while true { - var bytesTransferred: DWORD = 0 - var targetFileDescriptor: UInt64 = 0 - var overlapped: LPOVERLAPPED? = nil // Monitor loop while true { var bytesTransferred: DWORD = 0 var targetFileDescriptor: UInt64 = 0 var overlapped: LPOVERLAPPED? = nil - let monitorResult = GetQueuedCompletionStatus( - context.ioCompletionPort, - &bytesTransferred, - &targetFileDescriptor, - &overlapped, - INFINITE - ) - if !monitorResult { - let lastError = GetLastError() - if lastError == ERROR_BROKEN_PIPE { - // We finished reading the handle. Signal EOF by - // finishing the stream. - // NOTE: here we deliberately leave now unused continuation - // in the store. Windows does not offer an API to remove a - // HANDLE from an IOCP port, therefore we leave the registration - // to signify the HANDLE has already been resisted. - let continuation = _registration.withLock { store -> SignalStream.Continuation? in - if let continuation = store[targetFileDescriptor] { - return continuation - } - return nil - } - continuation?.finish() - continue - } else { - let error = SubprocessError( - code: .init(.asyncIOFailed("GetQueuedCompletionStatus failed")), - underlyingError: .init(rawValue: lastError) - ) - reportError(error) - break - } - } let monitorResult = GetQueuedCompletionStatus( context.ioCompletionPort, &bytesTransferred, diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 8090945b..565ad788 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -674,51 +674,6 @@ extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible return self.description(withIndent: 0) } } - -// Special keys used in Error's user dictionary -extension String { - static let debugDescriptionErrorKey = "DebugDescription" -} - -internal func pthread_create(_ body: @Sendable @escaping () -> ()) throws(SubprocessError.UnderlyingError) -> pthread_t { - final class Context { - let body: @Sendable () -> () - init(body: @Sendable @escaping () -> Void) { - self.body = body - } - } - #if canImport(Glibc) || canImport(Musl) - func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? { - (Unmanaged.fromOpaque(context!).takeRetainedValue() as! Context).body() - return nil - } - #elseif canImport(Bionic) - func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { - (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() - return context - } - #endif - #if (os(Linux) && canImport(Glibc)) || canImport(Bionic) - var thread = pthread_t() - #else - var thread: pthread_t? - #endif - let rc = pthread_create( - &thread, - nil, - proc, - Unmanaged.passRetained(Context(body: body)).toOpaque() - ) - if rc != 0 { - throw SubprocessError.UnderlyingError(rawValue: rc) - } - #if (os(Linux) && canImport(Glibc)) || canImport(Bionic) - return thread - #else - return thread! - #endif -} - #endif // !canImport(Darwin) extension ProcessIdentifier { diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index b3526989..6c70a127 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -23,21 +23,30 @@ import _SubprocessCShims // Windows specific implementation extension Configuration { + // @unchecked Sendable because we need to capture UnsafePointers + // to send to another thread. While UnsafePointers are not + // Sendable, we are not mutating them -- we only need these type + // for C interface. + internal struct SpawnContext: @unchecked Sendable { + let startupInfo: UnsafeMutablePointer + let createProcessFlags: DWORD + } + internal func spawn( withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe - ) throws -> SpawnResult { + ) async throws -> SpawnResult { // Spawn differently depending on whether // we need to spawn as a user guard let userCredentials = self.platformOptions.userCredentials else { - return try self.spawnDirect( + return try await self.spawnDirect( withInput: inputPipe, outputPipe: outputPipe, errorPipe: errorPipe ) } - return try self.spawnAsUser( + return try await self.spawnAsUser( withInput: inputPipe, outputPipe: outputPipe, errorPipe: errorPipe, @@ -49,7 +58,7 @@ extension Configuration { withInput inputPipe: consuming CreatedPipe, outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe - ) throws -> SpawnResult { + ) async throws -> SpawnResult { var inputReadFileDescriptor: IODescriptor? = inputPipe.readFileDescriptor() var inputWriteFileDescriptor: IODescriptor? = inputPipe.writeFileDescriptor() var outputReadFileDescriptor: IODescriptor? = outputPipe.readFileDescriptor() @@ -104,10 +113,9 @@ extension Configuration { throw error } - var processInfo: PROCESS_INFORMATION = PROCESS_INFORMATION() var createProcessFlags = self.generateCreateProcessFlag() - let created = try self.withStartupInfoEx( + let (created, processInfo, windowsError) = try await self.withStartupInfoEx( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -121,26 +129,34 @@ extension Configuration { } // Spawn! - return try applicationName.withOptionalNTPathRepresentation { applicationNameW in - try commandAndArgs.withCString( - encodedAs: UTF16.self - ) { commandAndArgsW in - try environment.withCString( + let spawnContext = SpawnContext( + startupInfo: startupInfo, + createProcessFlags: createProcessFlags + ) + return try await runOnBackgroundThread { + return try applicationName.withOptionalNTPathRepresentation { applicationNameW in + try commandAndArgs.withCString( encodedAs: UTF16.self - ) { environmentW in - try intendedWorkingDir.withOptionalNTPathRepresentation { intendedWorkingDirW in - CreateProcessW( - applicationNameW, - UnsafeMutablePointer(mutating: commandAndArgsW), - nil, // lpProcessAttributes - nil, // lpThreadAttributes - true, // bInheritHandles - createProcessFlags, - UnsafeMutableRawPointer(mutating: environmentW), - intendedWorkingDirW, - startupInfo.pointer(to: \.StartupInfo)!, - &processInfo - ) + ) { commandAndArgsW in + try environment.withCString( + encodedAs: UTF16.self + ) { environmentW in + try intendedWorkingDir.withOptionalNTPathRepresentation { intendedWorkingDirW in + var processInfo = PROCESS_INFORMATION() + let result = CreateProcessW( + applicationNameW, + UnsafeMutablePointer(mutating: commandAndArgsW), + nil, // lpProcessAttributes + nil, // lpThreadAttributes + true, // bInheritHandles + spawnContext.createProcessFlags, + UnsafeMutableRawPointer(mutating: environmentW), + intendedWorkingDirW, + spawnContext.startupInfo.pointer(to: \.StartupInfo)!, + &processInfo + ) + return (result, processInfo, GetLastError()) + } } } } @@ -148,7 +164,6 @@ extension Configuration { } guard created else { - let windowsError = GetLastError() if windowsError == ERROR_FILE_NOT_FOUND || windowsError == ERROR_PATH_NOT_FOUND { // This execution path is not it. Try the next one continue @@ -233,7 +248,7 @@ extension Configuration { outputPipe: consuming CreatedPipe, errorPipe: consuming CreatedPipe, userCredentials: PlatformOptions.UserCredentials - ) throws -> SpawnResult { + ) async throws -> SpawnResult { var inputPipeBox: CreatedPipe? = consume inputPipe var outputPipeBox: CreatedPipe? = consume outputPipe var errorPipeBox: CreatedPipe? = consume errorPipe @@ -297,10 +312,9 @@ extension Configuration { throw error } - var processInfo: PROCESS_INFORMATION = PROCESS_INFORMATION() var createProcessFlags = self.generateCreateProcessFlag() - let created = try self.withStartupInfoEx( + let (created, processInfo, windowsError) = try await self.withStartupInfoEx( inputRead: inputReadFileDescriptor, inputWrite: inputWriteFileDescriptor, outputRead: outputReadFileDescriptor, @@ -313,37 +327,45 @@ extension Configuration { try configurator(&createProcessFlags, &startupInfo.pointer(to: \.StartupInfo)!.pointee) } + let spawnContext = SpawnContext( + startupInfo: startupInfo, + createProcessFlags: createProcessFlags + ) // Spawn (featuring pyramid!) - return try userCredentials.username.withCString( - encodedAs: UTF16.self - ) { usernameW in - try userCredentials.password.withCString( + return try await runOnBackgroundThread { + return try userCredentials.username.withCString( encodedAs: UTF16.self - ) { passwordW in - try userCredentials.domain.withOptionalCString( + ) { usernameW in + try userCredentials.password.withCString( encodedAs: UTF16.self - ) { domainW in - try applicationName.withOptionalNTPathRepresentation { applicationNameW in - try commandAndArgs.withCString( - encodedAs: UTF16.self - ) { commandAndArgsW in - try environment.withCString( + ) { passwordW in + try userCredentials.domain.withOptionalCString( + encodedAs: UTF16.self + ) { domainW in + try applicationName.withOptionalNTPathRepresentation { applicationNameW in + try commandAndArgs.withCString( encodedAs: UTF16.self - ) { environmentW in - try intendedWorkingDir.withOptionalNTPathRepresentation { intendedWorkingDirW in - CreateProcessWithLogonW( - usernameW, - domainW, - passwordW, - DWORD(LOGON_WITH_PROFILE), - applicationNameW, - UnsafeMutablePointer(mutating: commandAndArgsW), - createProcessFlags, - UnsafeMutableRawPointer(mutating: environmentW), - intendedWorkingDirW, - startupInfo.pointer(to: \.StartupInfo)!, - &processInfo - ) + ) { commandAndArgsW in + try environment.withCString( + encodedAs: UTF16.self + ) { environmentW in + try intendedWorkingDir.withOptionalNTPathRepresentation { intendedWorkingDirW in + var processInfo = PROCESS_INFORMATION() + let created = CreateProcessWithLogonW( + usernameW, + domainW, + passwordW, + DWORD(LOGON_WITH_PROFILE), + applicationNameW, + UnsafeMutablePointer(mutating: commandAndArgsW), + spawnContext.createProcessFlags, + UnsafeMutableRawPointer(mutating: environmentW), + intendedWorkingDirW, + spawnContext.startupInfo.pointer(to: \.StartupInfo)!, + &processInfo + ) + return (created, processInfo, GetLastError()) + } } } } @@ -354,8 +376,6 @@ extension Configuration { } guard created else { - let windowsError = GetLastError() - if windowsError == ERROR_FILE_NOT_FOUND || windowsError == ERROR_PATH_NOT_FOUND { // This executable path is not it. Try the next one continue @@ -1045,8 +1065,8 @@ extension Configuration { outputWrite outputWriteFileDescriptor: borrowing IODescriptor?, errorRead errorReadFileDescriptor: borrowing IODescriptor?, errorWrite errorWriteFileDescriptor: borrowing IODescriptor?, - _ body: (UnsafeMutablePointer) throws -> Result - ) rethrows -> Result { + _ body: (UnsafeMutablePointer) async throws -> Result + ) async throws -> Result { var info: STARTUPINFOEXW = STARTUPINFOEXW() info.StartupInfo.cb = DWORD(MemoryLayout.size(ofValue: info)) info.StartupInfo.dwFlags |= DWORD(STARTF_USESTDHANDLES) @@ -1112,35 +1132,41 @@ extension Configuration { let alignment = 16 var attributeListByteCount = SIZE_T(0) _ = InitializeProcThreadAttributeList(nil, 1, 0, &attributeListByteCount) - return try withUnsafeTemporaryAllocation(byteCount: Int(attributeListByteCount), alignment: alignment) { attributeListPtr in - let attributeList = LPPROC_THREAD_ATTRIBUTE_LIST(attributeListPtr.baseAddress!) - guard InitializeProcThreadAttributeList(attributeList, 1, 0, &attributeListByteCount) else { - throw SubprocessError( - code: .init(.spawnFailed), - underlyingError: .init(rawValue: GetLastError()) - ) - } - defer { - DeleteProcThreadAttributeList(attributeList) - } + // We can't use withUnsafeTemporaryAllocation here because body is async + let attributeListPtr: UnsafeMutableRawBufferPointer = .allocate( + byteCount: Int(attributeListByteCount), + alignment: alignment + ) + defer { + attributeListPtr.deallocate() + } - var handles = Array(inheritedHandles) - return try handles.withUnsafeMutableBufferPointer { inheritedHandlesPtr in - _ = UpdateProcThreadAttribute( - attributeList, - 0, - _subprocess_PROC_THREAD_ATTRIBUTE_HANDLE_LIST(), - inheritedHandlesPtr.baseAddress!, - SIZE_T(MemoryLayout.stride * inheritedHandlesPtr.count), - nil, - nil - ) + let attributeList = LPPROC_THREAD_ATTRIBUTE_LIST(attributeListPtr.baseAddress!) + guard InitializeProcThreadAttributeList(attributeList, 1, 0, &attributeListByteCount) else { + throw SubprocessError( + code: .init(.spawnFailed), + underlyingError: .init(rawValue: GetLastError()) + ) + } + defer { + DeleteProcThreadAttributeList(attributeList) + } - info.lpAttributeList = attributeList + var handles = Array(inheritedHandles) + handles.withUnsafeMutableBufferPointer { inheritedHandlesPtr in + _ = UpdateProcThreadAttribute( + attributeList, + 0, + _subprocess_PROC_THREAD_ATTRIBUTE_HANDLE_LIST(), + inheritedHandlesPtr.baseAddress!, + SIZE_T(MemoryLayout.stride * inheritedHandlesPtr.count), + nil, + nil + ) - return try body(&info) - } + info.lpAttributeList = attributeList } + return try await body(&info) } private func generateWindowsCommandAndArguments( diff --git a/Sources/Subprocess/Thread.swift b/Sources/Subprocess/Thread.swift index 86f98d84..24ef9747 100644 --- a/Sources/Subprocess/Thread.swift +++ b/Sources/Subprocess/Thread.swift @@ -29,7 +29,6 @@ import _SubprocessCShims import Synchronization #endif - #if canImport(WinSDK) private typealias MutexType = CRITICAL_SECTION private typealias ConditionType = CONDITION_VARIABLE @@ -285,7 +284,8 @@ internal func begin_thread_x( nil ) guard threadHandleValue != 0, - let threadHandle = HANDLE(bitPattern: threadHandleValue) else { + let threadHandle = HANDLE(bitPattern: threadHandleValue) + else { // _beginthreadex uses errno instead of GetLastError() let capturedError = _subprocess_windows_get_errno() throw SubprocessError.UnderlyingError(rawValue: DWORD(capturedError)) @@ -304,28 +304,28 @@ internal func pthread_create( self.body = body } } -#if canImport(Darwin) + #if canImport(Darwin) func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? { (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() return context } -#elseif canImport(Glibc) || canImport(Musl) + #elseif canImport(Glibc) || canImport(Musl) func proc(_ context: UnsafeMutableRawPointer?) -> UnsafeMutableRawPointer? { (Unmanaged.fromOpaque(context!).takeRetainedValue() as! Context).body() return context } -#elseif canImport(Bionic) + #elseif canImport(Bionic) func proc(_ context: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer { (Unmanaged.fromOpaque(context).takeRetainedValue() as! Context).body() return context } -#endif + #endif -#if canImport(Glibc) || canImport(Bionic) + #if canImport(Glibc) || canImport(Bionic) var thread = pthread_t() -#else + #else var thread: pthread_t? -#endif + #endif let rc = pthread_create( &thread, nil, @@ -335,12 +335,11 @@ internal func pthread_create( if rc != 0 { throw SubprocessError.UnderlyingError(rawValue: rc) } -#if canImport(Glibc) || canImport(Bionic) + #if canImport(Glibc) || canImport(Bionic) return thread -#else + #else return thread! -#endif + #endif } #endif // canImport(WinSDK) - From 0faf7233c605fa7232f36284cdbe45fd3056b855 Mon Sep 17 00:00:00 2001 From: Charles Hu Date: Wed, 3 Sep 2025 15:10:23 -0700 Subject: [PATCH 4/4] Disable testWriteToClosedPipe() and testReadFromClosedPipe() because we can't safely write to / read from a closed fd --- Sources/Subprocess/Platforms/Subprocess+Linux.swift | 6 +++--- Sources/Subprocess/Thread.swift | 12 ++++++------ Tests/SubprocessTests/AsyncIOTests.swift | 6 ++++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index aee34f87..bd828733 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -225,9 +225,9 @@ private func monitorThreadFunc(context: MonitorThreadContext) { repeating: epoll_event(events: 0, data: epoll_data(fd: 0)), count: 256 ) - var waitMask = sigset_t(); - sigemptyset(&waitMask); - sigaddset(&waitMask, SIGCHLD); + var waitMask = sigset_t() + sigemptyset(&waitMask) + sigaddset(&waitMask, SIGCHLD) // Enter the monitor loop monitorLoop: while true { let eventCount = epoll_pwait( diff --git a/Sources/Subprocess/Thread.swift b/Sources/Subprocess/Thread.swift index 24ef9747..a4e5c0ce 100644 --- a/Sources/Subprocess/Thread.swift +++ b/Sources/Subprocess/Thread.swift @@ -106,7 +106,7 @@ private final class WorkQueue: Sendable { #if canImport(WinSDK) EnterCriticalSection(self.mutex) defer { - LeaveCriticalSection(self.mutex); + LeaveCriticalSection(self.mutex) } #else pthread_mutex_lock(self.mutex) @@ -140,7 +140,7 @@ private final class WorkQueue: Sendable { self.withLock { queue in queue.append(workItem) #if canImport(WinSDK) - WakeConditionVariable(self.waitCondition); + WakeConditionVariable(self.waitCondition) #else pthread_cond_signal(self.waitCondition) #endif @@ -151,7 +151,7 @@ private final class WorkQueue: Sendable { self.withLock { queue in queue.removeAll() #if canImport(WinSDK) - WakeConditionVariable(self.waitCondition); + WakeConditionVariable(self.waitCondition) #else pthread_cond_signal(self.waitCondition) #endif @@ -209,8 +209,8 @@ private func _shutdownWorkerThread() { } _workQueue.shutdown() #if canImport(WinSDK) - WaitForSingleObject(thread, INFINITE); - CloseHandle(thread); + WaitForSingleObject(thread, INFINITE) + CloseHandle(thread) DeleteCriticalSection(_workQueue.mutex) // We do not need to destroy CONDITION_VARIABLE #else @@ -225,7 +225,7 @@ private func _shutdownWorkerThread() { // MARK: - AtomicCounter #if canImport(Darwin) -// Unfortunately on Darwin we can unconditionally use Atomic since it requires macOS 15 +// Unfortunately on Darwin we cannot unconditionally use Atomic since it requires macOS 15 internal struct AtomicCounter: ~Copyable { private let storage: OSAllocatedUnfairLock diff --git a/Tests/SubprocessTests/AsyncIOTests.swift b/Tests/SubprocessTests/AsyncIOTests.swift index a05f40ac..7b725b59 100644 --- a/Tests/SubprocessTests/AsyncIOTests.swift +++ b/Tests/SubprocessTests/AsyncIOTests.swift @@ -135,7 +135,8 @@ extension SubprocessAsyncIOTests { // MARK: - Error Handling Tests extension SubprocessAsyncIOTests { - @Test func testWriteToClosedPipe() async throws { + @Test(.disabled("Cannot safely write to a closed fd without risking it being reused")) + func testWriteToClosedPipe() async throws { var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input) var writeChannel = try _require(pipe.writeFileDescriptor()).createIOChannel() var readChannel = try _require(pipe.readFileDescriptor()).createIOChannel() @@ -171,7 +172,8 @@ extension SubprocessAsyncIOTests { } - @Test func testReadFromClosedPipe() async throws { + @Test(.disabled("Cannot safely read from a closed fd without risking it being reused")) + func testReadFromClosedPipe() async throws { var pipe = try CreatedPipe(closeWhenDone: true, purpose: .input) var writeChannel = try _require(pipe.writeFileDescriptor()).createIOChannel() var readChannel = try _require(pipe.readFileDescriptor()).createIOChannel()