diff --git a/Sources/Command/AsyncResourceLimiter.swift b/Sources/Command/AsyncResourceLimiter.swift new file mode 100644 index 0000000..580104a --- /dev/null +++ b/Sources/Command/AsyncResourceLimiter.swift @@ -0,0 +1,92 @@ +import Foundation + +actor AsyncResourceLimiter { + private struct Waiter { + let id: UUID + let continuation: CheckedContinuation + } + + private let limitProvider: @Sendable () -> Int + private var limit: Int + private var activePermits = 0 + private var waiters: [Waiter] = [] + + init(limit: Int) { + precondition(limit > 0, "AsyncResourceLimiter limit must be greater than zero.") + self.init(limitProvider: { limit }) + } + + init(limitProvider: @escaping @Sendable () -> Int) { + let limit = limitProvider() + precondition(limit > 0, "AsyncResourceLimiter limit must be greater than zero.") + self.limitProvider = limitProvider + self.limit = limit + } + + func withPermit(_ operation: @Sendable () async throws -> T) async throws -> T { + try await acquire() + + do { + try Task.checkCancellation() + let value = try await operation() + release() + return value + } catch { + release() + throw error + } + } + + private func acquire() async throws { + try Task.checkCancellation() + refreshLimit() + + if activePermits < limit, waiters.isEmpty { + activePermits += 1 + return + } + + let waiterID = UUID() + let wasGrantedPermit = await withTaskCancellationHandler { + await withCheckedContinuation { continuation in + waiters.append(Waiter(id: waiterID, continuation: continuation)) + grantAvailablePermits() + } + } onCancel: { + Task { await self.cancelWaiter(id: waiterID) } + } + + guard wasGrantedPermit else { + throw CancellationError() + } + } + + private func release() { + refreshLimit() + activePermits -= 1 + precondition(activePermits >= 0, "AsyncResourceLimiter released more permits than it acquired.") + grantAvailablePermits() + } + + private func refreshLimit() { + let refreshedLimit = limitProvider() + precondition(refreshedLimit > 0, "AsyncResourceLimiter limit must be greater than zero.") + limit = refreshedLimit + } + + private func grantAvailablePermits() { + while activePermits < limit, !waiters.isEmpty { + let waiter = waiters.removeFirst() + activePermits += 1 + waiter.continuation.resume(returning: true) + } + } + + private func cancelWaiter(id: UUID) { + guard let index = waiters.firstIndex(where: { $0.id == id }) else { return } + let waiter = waiters.remove(at: index) + waiter.continuation.resume(returning: false) + refreshLimit() + grantAvailablePermits() + } +} diff --git a/Sources/Command/CommandRunner.swift b/Sources/Command/CommandRunner.swift index c44e251..a3e09cd 100644 --- a/Sources/Command/CommandRunner.swift +++ b/Sources/Command/CommandRunner.swift @@ -6,6 +6,24 @@ import Logging #endif import Path +#if canImport(Darwin) + import Darwin +#elseif canImport(Glibc) + import Glibc +#endif + +/// Controls how a subprocess's standard output and error are handled. +public enum OutputRedirection: Sendable { + /// Captures standard output and error through pipes and emits them as `CommandEvent`s. + case capture + /// Discards standard output and error by redirecting them to the null device. No pipes are + /// created, so the command holds no extra file descriptors and emits no events. + case discard + /// Lets the subprocess inherit the parent process's standard output and error. No pipes are + /// created and no events are emitted. + case inherit +} + /** `CommandRunning` is a protocol that declares the interface to run system processes. The main implementation of the protocol is `CommandRunner`. @@ -27,6 +45,21 @@ public protocol CommandRunning: Sendable { environment: [String: String], workingDirectory: Path.AbsolutePath? ) -> AsyncThrowingStream + + /// Runs a command in the system, controlling how its output is handled. + /// - Parameters: + /// - arguments: The command arguments where the first argument represents the executable. + /// - environment: The environment variables that will be passed to the process running the command. + /// - workingDirectory: The directory from where the command will be executed. + /// - output: How the subprocess's standard output and error are handled. Use ``OutputRedirection/discard`` or + /// ``OutputRedirection/inherit`` to avoid allocating output pipes when the output isn't needed. + /// - Returns: An async throwing stream to subscribe to the emitted events and completion of the underlying process. + func run( + arguments: [String], + environment: [String: String], + workingDirectory: Path.AbsolutePath?, + output: OutputRedirection + ) -> AsyncThrowingStream } extension CommandRunning { @@ -140,133 +173,228 @@ public enum CommandError: Error, CustomStringConvertible, LocalizedError, Sendab public struct CommandRunner: CommandRunning, Sendable { let logger: Logger? + /// Bounds the number of concurrently-running subprocesses so the pipe file descriptors they + /// hold can't exhaust the process's file-descriptor table. + private let processLimiter: AsyncResourceLimiter + public init(logger: Logger? = nil) { self.logger = logger + processLimiter = Self.sharedProcessLimiter + } + + /// Creates a runner with a custom cap on concurrently-running subprocesses. + init(logger: Logger? = nil, maximumConcurrentProcesses: Int) { + self.logger = logger + processLimiter = AsyncResourceLimiter(limit: maximumConcurrentProcesses) + } + + /// File descriptors kept in reserve for stdio and other process-wide handles. + private static let reservedFileDescriptors = 32 + /// Approximate number of file descriptors a running subprocess holds (two pipes, plus the + /// transient pipe used to locate the executable). + private static let fileDescriptorsPerProcess = 6 + /// Upper bound on concurrent subprocesses, regardless of how high the file-descriptor limit is. + private static let maximumConcurrentProcesses = 256 + /// Used when the file-descriptor limit can't be determined. + private static let fallbackMaximumConcurrentProcesses = 16 + + /// Shared limiter whose cap is derived from the current soft `RLIMIT_NOFILE`, so it adapts if + /// the limit is raised at startup. + private static let sharedProcessLimiter = AsyncResourceLimiter( + limitProvider: { systemMaximumConcurrentProcesses() } + ) + + private static func systemMaximumConcurrentProcesses() -> Int { + #if os(Windows) + return fallbackMaximumConcurrentProcesses + #else + var resourceLimit = rlimit() + #if canImport(Glibc) + let openFilesResource = Int32(RLIMIT_NOFILE.rawValue) + #else + let openFilesResource = RLIMIT_NOFILE + #endif + guard getrlimit(openFilesResource, &resourceLimit) == 0 else { + return fallbackMaximumConcurrentProcesses + } + let softLimit = Int(clamping: resourceLimit.rlim_cur) + let descriptorsAvailable = max(1, softLimit - reservedFileDescriptors) + let proportionalLimit = max(1, descriptorsAvailable / fileDescriptorsPerProcess) + return min(maximumConcurrentProcesses, proportionalLimit) + #endif } - // swiftlint:disable:next function_body_length public func run( arguments: [String], environment: [String: String] = ProcessInfo.processInfo.environment, workingDirectory: Path.AbsolutePath? = nil + ) -> AsyncThrowingStream { + run(arguments: arguments, environment: environment, workingDirectory: workingDirectory, output: .capture) + } + + // swiftlint:disable:next function_body_length + public func run( + arguments: [String], + environment: [String: String] = ProcessInfo.processInfo.environment, + workingDirectory: Path.AbsolutePath? = nil, + output: OutputRedirection = .capture ) -> AsyncThrowingStream { AsyncThrowingStream(CommandEvent.self, bufferingPolicy: .unbounded) { continuation in - Task.detached { + let runningProcess = ThreadSafe(nil) + + let task = Task.detached { do { - let loggerMetadata: Logger.Metadata = ["command": .string(arguments.joined(separator: " "))] - // Resolve the working directory if not passed. `getcwd` can transiently - // return an empty path under concurrent process launches, so fall back to - // letting the child inherit the process working directory instead of failing. - var workingDirectory = workingDirectory - if workingDirectory == nil { - let currentDirectoryPath = FileManager.default.currentDirectoryPath - if !currentDirectoryPath.isEmpty { - workingDirectory = try? .init(validating: currentDirectoryPath) + try await processLimiter.withPermit { + let loggerMetadata: Logger.Metadata = ["command": .string(arguments.joined(separator: " "))] + // Resolve the working directory if not passed. `getcwd` can transiently + // return an empty path under concurrent process launches, so fall back to + // letting the child inherit the process working directory instead of failing. + var workingDirectory = workingDirectory + if workingDirectory == nil { + let currentDirectoryPath = FileManager.default.currentDirectoryPath + if !currentDirectoryPath.isEmpty { + workingDirectory = try? .init(validating: currentDirectoryPath) + } } - } - let collectedStdErr: ThreadSafe = ThreadSafe("") - - // Process - let process = Process() - let stdoutPipe = Pipe() - let stderrPipe = Pipe() - - let stdoutTask = Task { - do { - for try await data in stdoutPipe.fileHandleForReading.byteStream() { - continuation.yield(.standardOutput([UInt8](data))) - if let output = String(data: data, encoding: .utf8) { - logger?.debug("\(output)", metadata: loggerMetadata) + let collectedStdErr: ThreadSafe = ThreadSafe("") + + // Process + let process = Process() + + // Only allocate output pipes when the caller wants to capture output. + // Discarding or inheriting avoids holding any extra file descriptors. + let stdoutPipe: Pipe? + let stderrPipe: Pipe? + let stdoutTask: Task? + let stderrTask: Task? + + switch output { + case .capture: + let outPipe = Pipe() + let errPipe = Pipe() + stdoutPipe = outPipe + stderrPipe = errPipe + process.standardOutput = outPipe + process.standardError = errPipe + stdoutTask = Task { + do { + for try await data in outPipe.fileHandleForReading.byteStream() { + continuation.yield(.standardOutput([UInt8](data))) + if let output = String(data: data, encoding: .utf8) { + logger?.debug("\(output)", metadata: loggerMetadata) + } + } + } catch { + logger?.error("Error reading stdout: \(error)", metadata: loggerMetadata) } } - } catch { - logger?.error("Error reading stdout: \(error)", metadata: loggerMetadata) - } - } - - let stderrTask = Task { - do { - for try await data in stderrPipe.fileHandleForReading.byteStream() { - continuation.yield(.standardError([UInt8](data))) - if let output = String(data: data, encoding: .utf8) { - collectedStdErr.mutate { $0.append(output) } - logger?.error("\(output)", metadata: loggerMetadata) + stderrTask = Task { + do { + for try await data in errPipe.fileHandleForReading.byteStream() { + continuation.yield(.standardError([UInt8](data))) + if let output = String(data: data, encoding: .utf8) { + collectedStdErr.mutate { $0.append(output) } + logger?.error("\(output)", metadata: loggerMetadata) + } + } + } catch { + logger?.error("Error reading stderr: \(error)", metadata: loggerMetadata) } } - } catch { - logger?.error("Error reading stderr: \(error)", metadata: loggerMetadata) + case .discard: + stdoutPipe = nil + stderrPipe = nil + stdoutTask = nil + stderrTask = nil + process.standardOutput = FileHandle.nullDevice + process.standardError = FileHandle.nullDevice + case .inherit: + stdoutPipe = nil + stderrPipe = nil + stdoutTask = nil + stderrTask = nil } - } - if let workingDirectory { - process.currentDirectoryURL = URL(fileURLWithPath: workingDirectory.pathString) - } - process.standardOutput = stdoutPipe - process.standardError = stderrPipe - process.standardInput = FileHandle.standardInput - process.environment = environment + if let workingDirectory { + process.currentDirectoryURL = URL(fileURLWithPath: workingDirectory.pathString) + } + process.standardInput = FileHandle.standardInput + process.environment = environment - let processArguments = Array(arguments.dropFirst()) - process.arguments = processArguments + let processArguments = Array(arguments.dropFirst()) + process.arguments = processArguments - let executable = try lookupExecutable(firstArgument: arguments.first) - process.executableURL = executable + let executable = try lookupExecutable(firstArgument: arguments.first) + process.executableURL = executable - logger?.debug("Running sub-process", metadata: loggerMetadata) + logger?.debug("Running sub-process", metadata: loggerMetadata) - let threadSafeProcess = ThreadSafe(process) + // Publish the process so the stream's termination handler (installed before + // the permit was awaited) can terminate it once it exists. + runningProcess.mutate { $0 = process } - continuation.onTermination = { termination in - switch termination { - case .cancelled: - if threadSafeProcess.value.isRunning { - threadSafeProcess.value.terminate() + try await withCheckedThrowingContinuation { (processCompletion: CheckedContinuation) in + process.terminationHandler = { _ in + processCompletion.resume() + } + do { + try process.run() + // Close the race where the stream is cancelled after the process is + // published but before it started running. + if Task.isCancelled { + process.terminate() + } + } catch { + process.terminationHandler = nil + processCompletion.resume(throwing: error) } - default: - break - } - } - - try await withCheckedThrowingContinuation { (processCompletion: CheckedContinuation) in - process.terminationHandler = { _ in - processCompletion.resume() - } - do { - try process.run() - } catch { - process.terminationHandler = nil - processCompletion.resume(throwing: error) } - } - await stdoutTask.value - await stderrTask.value + await stdoutTask?.value + await stderrTask?.value - try? stdoutPipe.fileHandleForReading.close() - try? stderrPipe.fileHandleForReading.close() + try? stdoutPipe?.fileHandleForReading.close() + try? stderrPipe?.fileHandleForReading.close() - switch process.terminationReason { - case .exit: - if process.terminationStatus != 0 { - throw CommandError.terminated( - process.terminationStatus, - stderr: collectedStdErr.value, - command: arguments - ) - } - case .uncaughtSignal: - if process.terminationStatus != 0 { - throw CommandError.signalled(process.terminationStatus, command: arguments) + switch process.terminationReason { + case .exit: + if process.terminationStatus != 0 { + throw CommandError.terminated( + process.terminationStatus, + stderr: collectedStdErr.value, + command: arguments + ) + } + case .uncaughtSignal: + if process.terminationStatus != 0 { + throw CommandError.signalled(process.terminationStatus, command: arguments) + } + @unknown default: + break } - @unknown default: - break } continuation.finish() } catch { continuation.finish(throwing: error) } } + + continuation.onTermination = { termination in + switch termination { + case .cancelled: + // Cancelling the producer task unwinds it whether it is still waiting for a + // permit or already running the subprocess. + task.cancel() + runningProcess.withValue { process in + if let process, process.isRunning { + process.terminate() + } + } + default: + break + } + } } } diff --git a/Tests/CommandTests/CommandRunnerRaceTests.swift b/Tests/CommandTests/CommandRunnerRaceTests.swift index 0438903..b5240a1 100644 --- a/Tests/CommandTests/CommandRunnerRaceTests.swift +++ b/Tests/CommandTests/CommandRunnerRaceTests.swift @@ -1,9 +1,104 @@ +import Foundation import Mockable import Testing @testable import Command #if !os(Linux) struct CommandRunnerRaceTests { + @Test func boundsConcurrentSubprocessLaunches() async throws { + #if os(macOS) + // Each running subprocess holds open pipe file descriptors, so an unbounded fan-out + // can exhaust the process's file-descriptor table (surfacing as EBADF on launch). + // The runner must cap how many subprocesses run at once; verify that cap holds. + let limit = 4 + let commandRunner = CommandRunner(maximumConcurrentProcesses: limit) + + let directory = FileManager.default.temporaryDirectory + .appendingPathComponent("command-limiter-\(UUID().uuidString)") + try FileManager.default.createDirectory(at: directory, withIntermediateDirectories: true) + defer { try? FileManager.default.removeItem(at: directory) } + + // Each command creates a marker file while it runs and removes it on exit, so the + // number of marker files present at any instant equals the number of subprocesses + // running concurrently. + let maxObserved = ThreadSafe(0) + let sampler = Task { + while !Task.isCancelled { + let count = (try? FileManager.default.contentsOfDirectory(atPath: directory.path))?.count ?? 0 + maxObserved.mutate { $0 = max($0, count) } + try? await Task.sleep(nanoseconds: 2_000_000) + } + } + + let script = "marker=\"\(directory.path)/$$\"; touch \"$marker\"; sleep 0.2; rm -f \"$marker\"" + await withTaskGroup(of: Void.self) { group in + for _ in 0 ..< 60 { + group.addTask { + do { + for try await _ in commandRunner.run(arguments: ["/bin/sh", "-c", script]) {} + } catch {} + } + } + await group.waitForAll() + } + sampler.cancel() + + #expect(maxObserved.value > 0, "Expected to observe running subprocesses") + #expect( + maxObserved.value <= limit, + "Observed \(maxObserved.value) concurrent subprocesses, expected at most \(limit)" + ) + #endif + } + + @Test func cancellingWhileWaitingForPermit_doesNotLaunchSubprocess() async throws { + #if os(macOS) + // With a single permit, a second command is parked inside the limiter waiting for the + // permit. Cancelling its stream while it waits must unwind the producer so it never + // launches a subprocess once the permit frees up. + let commandRunner = CommandRunner(maximumConcurrentProcesses: 1) + + let directory = FileManager.default.temporaryDirectory + .appendingPathComponent("command-cancel-\(UUID().uuidString)") + try FileManager.default.createDirectory(at: directory, withIntermediateDirectories: true) + defer { try? FileManager.default.removeItem(at: directory) } + + let holderMarker = directory.appendingPathComponent("holder-running") + let blockedSentinel = directory.appendingPathComponent("blocked-launched") + + // Holds the only permit: signals it is running, then stays alive long enough for us to + // enqueue and cancel a second command while the permit is taken. + let holderScript = "touch \"\(holderMarker.path)\"; sleep 2; rm -f \"\(holderMarker.path)\"" + let holder = Task { + for try await _ in commandRunner.run(arguments: ["/bin/sh", "-c", holderScript]) {} + } + + while !FileManager.default.fileExists(atPath: holderMarker.path) { + try await Task.sleep(nanoseconds: 5_000_000) + } + + // This can only start once the permit frees. If it ever launches it creates the + // sentinel; because we cancel it while it is still waiting, the sentinel must never appear. + let blockedScript = "touch \"\(blockedSentinel.path)\"" + let blocked = Task { + for try await _ in commandRunner.run(arguments: ["/bin/sh", "-c", blockedScript]) {} + } + + try await Task.sleep(nanoseconds: 100_000_000) + blocked.cancel() + _ = await blocked.result + + // Release the permit and give any incorrectly-orphaned producer a chance to launch. + _ = try? await holder.value + try await Task.sleep(nanoseconds: 300_000_000) + + #expect( + !FileManager.default.fileExists(atPath: blockedSentinel.path), + "A command cancelled while waiting for a permit must not launch a subprocess" + ) + #endif + } + @Test func runsManyConcurrent_successfully() async throws { #if os(Linux) || os(macOS) let commandRunner = CommandRunner() diff --git a/Tests/CommandTests/CommandRunnerTests.swift b/Tests/CommandTests/CommandRunnerTests.swift index 06e48c5..0a0b974 100644 --- a/Tests/CommandTests/CommandRunnerTests.swift +++ b/Tests/CommandTests/CommandRunnerTests.swift @@ -22,6 +22,32 @@ import Testing #endif } + @Test func discardOutput_emitsNoEventsButStillRuns() async throws { + #if !os(Windows) + let commandRunner = CommandRunner() + + // Capturing yields the command's output... + let captured = try await commandRunner.run(arguments: ["echo", "foo"], output: .capture) + .reduce(into: [String]()) { $0.append($1.string() ?? "") } + #expect(captured == ["foo\n"]) + + // ...while discarding allocates no pipes and emits nothing, yet still runs. + let discardedEventCount = try await commandRunner.run(arguments: ["echo", "foo"], output: .discard) + .reduce(into: 0) { count, _ in count += 1 } + #expect(discardedEventCount == 0) + #endif + } + + @Test func discardOutput_stillReportsNonZeroExit() async throws { + #if !os(Windows) + let commandRunner = CommandRunner() + + await #expect(throws: CommandError.self) { + for try await _ in commandRunner.run(arguments: ["/bin/sh", "-c", "exit 7"], output: .discard) {} + } + #endif + } + @Test func lookupExecutable_withAbsolutePath() throws { // Given let commandRunner = CommandRunner()