From ce7132b6de88a77b5bc403e7e00080330bfc4a22 Mon Sep 17 00:00:00 2001 From: tom doron Date: Thu, 1 Apr 2021 20:19:45 -0700 Subject: [PATCH] improve process state management motivation: reports on hang process (mostly tests) changes: * refactor Process to use a state machine to track the process execution state * replace use of DispatchQueue with Locks to protect state rdar://76087764 --- Sources/TSCBasic/Process.swift | 221 ++++++++++++++-------- Tests/TSCBasicTests/ProcessSetTests.swift | 2 +- 2 files changed, 141 insertions(+), 82 deletions(-) diff --git a/Sources/TSCBasic/Process.swift b/Sources/TSCBasic/Process.swift index ff4038d0..54113317 100644 --- a/Sources/TSCBasic/Process.swift +++ b/Sources/TSCBasic/Process.swift @@ -178,12 +178,20 @@ public final class Process: ObjectIdentifierProtocol { } } + // process execution mutable state + private enum State { + case idle + case readingOutput(stdout: Thread, stderr: Thread?) + case outputReady(stdout: Result<[UInt8], Swift.Error>, stderr: Result<[UInt8], Swift.Error>) + case complete(ProcessResult) + } + /// Typealias for process id type. - #if !os(Windows) + #if !os(Windows) public typealias ProcessID = pid_t - #else + #else public typealias ProcessID = DWORD - #endif + #endif /// Typealias for stdout/stderr output closure. public typealias OutputClosure = ([UInt8]) -> Void @@ -210,45 +218,47 @@ public final class Process: ObjectIdentifierProtocol { public let workingDirectory: AbsolutePath? /// The process id of the spawned process, available after the process is launched. - #if os(Windows) + #if os(Windows) private var _process: Foundation.Process? public var processID: ProcessID { return DWORD(_process?.processIdentifier ?? 0) } - #else + #else public private(set) var processID = ProcessID() - #endif + #endif + + // process execution mutable state + private var state: State = .idle - /// If the subprocess has launched. - /// Note: This property is not protected by the serial queue because it is only mutated in `launch()`, which will be - /// called only once. - public private(set) var launched = false + /// Lock to protect execution state + private let stateLock = Lock() /// The result of the process execution. Available after process is terminated. + /// This will block while the process is running, as such equivalent to `waitUntilExit` + @available(*, deprecated, message: "use waitUntilExit instead") public var result: ProcessResult? { - return self.serialQueue.sync { - self._result + return self.stateLock.withLock { + switch self.state { + case .complete(let result): + return result + default: + return nil + } } } - /// How process redirects its output. - public let outputRedirection: OutputRedirection + // ideally we would use the state for this, but we need to access it while the waitForExit is locking state + private var _launched = false + private let launchedLock = Lock() - /// The result of the process execution. Available after process is terminated. - private var _result: ProcessResult? - - /// If redirected, stdout result and reference to the thread reading the output. - private var stdout: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil) - - /// If redirected, stderr result and reference to the thread reading the output. - private var stderr: (result: Result<[UInt8], Swift.Error>, thread: Thread?) = (.success([]), nil) - - /// Queue to protect concurrent reads. - private let serialQueue = DispatchQueue(label: "org.swift.swiftpm.process") + public var launched: Bool { + return self.launchedLock.withLock { + return self._launched + } + } - /// Queue to protect reading/writing on map of validated executables. - private static let executablesQueue = DispatchQueue( - label: "org.swift.swiftpm.process.findExecutable") + /// How process redirects its output. + public let outputRedirection: OutputRedirection /// Indicates if a new progress group is created for the child process. private let startNewProcessGroup: Bool @@ -257,7 +267,10 @@ public final class Process: ObjectIdentifierProtocol { /// /// Key: Executable name or path. /// Value: Path to the executable, if found. - static private var validatedExecutablesMap = [String: AbsolutePath?]() + private static var validatedExecutablesMap = [String: AbsolutePath?]() + + // Lock to protect reading/writing on validatedExecutablesMap. + private static let validatedExecutablesMapLock = Lock() /// Create a new process instance. /// @@ -348,7 +361,7 @@ public final class Process: ObjectIdentifierProtocol { } // This should cover the most common cases, i.e. when the cache is most helpful. if workingDirectory == localFileSystem.currentWorkingDirectory { - return Process.executablesQueue.sync { + return Process.validatedExecutablesMapLock.withLock { if let value = Process.validatedExecutablesMap[program] { return value } @@ -364,10 +377,11 @@ public final class Process: ObjectIdentifierProtocol { /// Launch the subprocess. public func launch() throws { precondition(arguments.count > 0 && !arguments[0].isEmpty, "Need at least one argument to launch the process.") - precondition(!launched, "It is not allowed to launch the same process object again.") - // Set the launch bool to true. - launched = true + self.launchedLock.withLock { + precondition(!self._launched, "It is not allowed to launch the same process object again.") + self._launched = true + } // Print the arguments if we are verbose. if self.verbose { @@ -381,7 +395,7 @@ public final class Process: ObjectIdentifierProtocol { throw Process.Error.missingExecutableProgram(program: executable) } - #if os(Windows) + #if os(Windows) _process = Foundation.Process() _process?.arguments = Array(arguments.dropFirst()) // Avoid including the executable URL twice. _process?.executableURL = executablePath.asURL @@ -409,13 +423,13 @@ public final class Process: ObjectIdentifierProtocol { } try _process?.run() - #else + #else // Initialize the spawn attributes. - #if canImport(Darwin) || os(Android) + #if canImport(Darwin) || os(Android) var attributes: posix_spawnattr_t? = nil - #else + #else var attributes = posix_spawnattr_t() - #endif + #endif posix_spawnattr_init(&attributes) defer { posix_spawnattr_destroy(&attributes) } @@ -425,13 +439,13 @@ public final class Process: ObjectIdentifierProtocol { posix_spawnattr_setsigmask(&attributes, &noSignals) // Reset all signals to default behavior. - #if os(macOS) + #if os(macOS) var mostSignals = sigset_t() sigfillset(&mostSignals) sigdelset(&mostSignals, SIGKILL) sigdelset(&mostSignals, SIGSTOP) posix_spawnattr_setsigdefault(&attributes, &mostSignals) - #else + #else // On Linux, this can only be used to reset signals that are legal to // modify, so we have to take care about the set we use. var mostSignals = sigset_t() @@ -443,7 +457,7 @@ public final class Process: ObjectIdentifierProtocol { sigaddset(&mostSignals, i) } posix_spawnattr_setsigdefault(&attributes, &mostSignals) - #endif + #endif // Set the attribute flags. var flags = POSIX_SPAWN_SETSIGMASK | POSIX_SPAWN_SETSIGDEF @@ -456,31 +470,31 @@ public final class Process: ObjectIdentifierProtocol { posix_spawnattr_setflags(&attributes, Int16(flags)) // Setup the file actions. - #if canImport(Darwin) || os(Android) + #if canImport(Darwin) || os(Android) var fileActions: posix_spawn_file_actions_t? = nil - #else + #else var fileActions = posix_spawn_file_actions_t() - #endif + #endif posix_spawn_file_actions_init(&fileActions) defer { posix_spawn_file_actions_destroy(&fileActions) } if let workingDirectory = workingDirectory?.pathString { - #if os(macOS) + #if os(macOS) // The only way to set a workingDirectory is using an availability-gated initializer, so we don't need // to handle the case where the posix_spawn_file_actions_addchdir_np method is unavailable. This check only // exists here to make the compiler happy. if #available(macOS 10.15, *) { posix_spawn_file_actions_addchdir_np(&fileActions, workingDirectory) } - #elseif os(Linux) + #elseif os(Linux) guard SPM_posix_spawn_file_actions_addchdir_np_supported() else { throw Process.Error.workingDirectoryNotSupported } SPM_posix_spawn_file_actions_addchdir_np(&fileActions, workingDirectory) - #else + #else throw Process.Error.workingDirectoryNotSupported - #endif + #endif } // Workaround for https://sourceware.org/git/gitweb.cgi?p=glibc.git;h=89e435f3559c53084498e9baad22172b64429362 @@ -534,43 +548,84 @@ public final class Process: ObjectIdentifierProtocol { throw SystemError.posix_spawn(rv, arguments) } - if outputRedirection.redirectsOutput { + if !outputRedirection.redirectsOutput { + // no stdout or stderr in this case + self.stateLock.withLock { + self.state = .outputReady(stdout: .success([]), stderr: .success([])) + } + } else { + var outputResult: (stdout: Result<[UInt8], Swift.Error>?, stderr: Result<[UInt8], Swift.Error>?) + let outputResultLock = Lock() + let outputClosures = outputRedirection.outputClosures // Close the write end of the output pipe. try close(fd: &outputPipe[1]) // Create a thread and start reading the output on it. - var thread = Thread { [weak self] in + let stdoutThread = Thread { [weak self] in if let readResult = self?.readOutput(onFD: outputPipe[0], outputClosure: outputClosures?.stdoutClosure) { - self?.stdout.result = readResult + outputResultLock.withLock { + if let stderrResult = outputResult.stderr { + self?.stateLock.withLock { + self?.state = .outputReady(stdout: readResult, stderr: stderrResult) + } + } else { + outputResult.stdout = readResult + } + } + } else if let stderrResult = (outputResultLock.withLock { outputResult.stderr }) { + // TODO: this is more of an error + self?.stateLock.withLock { + self?.state = .outputReady(stdout: .success([]), stderr: stderrResult) + } } } - thread.start() - self.stdout.thread = thread // Only schedule a thread for stderr if no redirect was requested. + var stderrThread: Thread? = nil if !outputRedirection.redirectStderr { // Close the write end of the stderr pipe. try close(fd: &stderrPipe[1]) // Create a thread and start reading the stderr output on it. - thread = Thread { [weak self] in + stderrThread = Thread { [weak self] in if let readResult = self?.readOutput(onFD: stderrPipe[0], outputClosure: outputClosures?.stderrClosure) { - self?.stderr.result = readResult + outputResultLock.withLock { + if let stdoutResult = outputResult.stdout { + self?.stateLock.withLock { + self?.state = .outputReady(stdout: stdoutResult, stderr: readResult) + } + } else { + outputResult.stderr = readResult + } + } + } else if let stdoutResult = (outputResultLock.withLock { outputResult.stdout }) { + // TODO: this is more of an error + self?.stateLock.withLock { + self?.state = .outputReady(stdout: stdoutResult, stderr: .success([])) + } } } - thread.start() - self.stderr.thread = thread + } else { + outputResultLock.withLock { + outputResult.stderr = .success([]) // no stderr in this case + } + } + // first set state then start reading threads + self.stateLock.withLock { + self.state = .readingOutput(stdout: stdoutThread, stderr: stderrThread) } + stdoutThread.start() + stderrThread?.start() } - #endif // POSIX implementation + #endif // POSIX implementation } /// Blocks the calling process until the subprocess finishes execution. @discardableResult public func waitUntilExit() throws -> ProcessResult { - #if os(Windows) + #if os(Windows) precondition(_process != nil, "The process is not yet launched.") let p = _process! p.waitUntilExit() @@ -585,19 +640,23 @@ public final class Process: ObjectIdentifierProtocol { stderrOutput: stderr.result ) return executionResult - #else - return try serialQueue.sync { - precondition(launched, "The process is not yet launched.") - - // If the process has already finsihed, return it. - if let existingResult = _result { - return existingResult - } - + #else + self.stateLock.lock() + switch self.state { + case .idle: + defer { self.stateLock.unlock() } + preconditionFailure("The process is not yet launched.") + case .complete(let result): + defer { self.stateLock.unlock() } + return result + case .readingOutput(let stdoutThread, let stderrThread): + self.stateLock.unlock() // unlock early since output read thread need to change state // If we're reading output, make sure that is finished. - stdout.thread?.join() - stderr.thread?.join() - + stdoutThread.join() + stderrThread?.join() + return try self.waitUntilExit() + case .outputReady(let stdoutResult, let stderrResult): + defer { self.stateLock.unlock() } // Wait until process finishes execution. var exitStatusCode: Int32 = 0 var result = waitpid(processID, &exitStatusCode, 0) @@ -613,13 +672,13 @@ public final class Process: ObjectIdentifierProtocol { arguments: arguments, environment: environment, exitStatusCode: exitStatusCode, - output: stdout.result, - stderrOutput: stderr.result + output: stdoutResult, + stderrOutput: stderrResult ) - self._result = executionResult + self.state = .complete(executionResult) return executionResult } - #endif + #endif } #if !os(Windows) @@ -671,16 +730,16 @@ public final class Process: ObjectIdentifierProtocol { /// /// Note: This will signal all processes in the process group. public func signal(_ signal: Int32) { - #if os(Windows) + #if os(Windows) if signal == SIGINT { - _process?.interrupt() + _process?.interrupt() } else { - _process?.terminate() + _process?.terminate() } - #else - assert(launched, "The process is not yet launched.") + #else + assert(self.launched, "The process is not yet launched.") _ = TSCLibc.kill(startNewProcessGroup ? -processID : processID, signal) - #endif + #endif } } diff --git a/Tests/TSCBasicTests/ProcessSetTests.swift b/Tests/TSCBasicTests/ProcessSetTests.swift index 59d499af..e6f98c11 100644 --- a/Tests/TSCBasicTests/ProcessSetTests.swift +++ b/Tests/TSCBasicTests/ProcessSetTests.swift @@ -55,7 +55,7 @@ class ProcessSetTests: XCTestCase { threadStartCondition.signal() } let result = try process.waitUntilExit() - // Ensure we did termiated due to signal. + // Ensure we did terminated due to signal. switch result.exitStatus { case .signalled: break default: XCTFail("Expected to exit via signal")