From ca545507a9b147b17d691d803c666b162e5073f5 Mon Sep 17 00:00:00 2001 From: Johannes Weiss Date: Wed, 19 Jul 2023 15:23:31 +0100 Subject: [PATCH] add AsyncProcess, Concurreny for Foundation.Process --- Package.resolved | 8 +- Package.swift | 28 + Sources/AsyncProcess/ChunkSequence.swift | 63 ++ Sources/AsyncProcess/EOFSequence.swift | 27 + Sources/AsyncProcess/FileContentStream.swift | 447 ++++++++++ Sources/AsyncProcess/NIOAsyncPipeWriter.swift | 43 + .../ProcessExecutor+Convenience.swift | 269 ++++++ Sources/AsyncProcess/ProcessExecutor.swift | 401 +++++++++ Sources/AsyncProcess/ProcessExit.swift | 39 + .../AsyncByteBufferLineSequenceTests.swift | 77 ++ .../Helpers+LogRecorderHandler.swift | 98 +++ .../AsyncProcessTests/IntegrationTests.swift | 799 ++++++++++++++++++ .../ArchitectureMappingTests.swift | 0 .../EndToEndTests.swift | 0 14 files changed, 2295 insertions(+), 4 deletions(-) create mode 100644 Sources/AsyncProcess/ChunkSequence.swift create mode 100644 Sources/AsyncProcess/EOFSequence.swift create mode 100644 Sources/AsyncProcess/FileContentStream.swift create mode 100644 Sources/AsyncProcess/NIOAsyncPipeWriter.swift create mode 100644 Sources/AsyncProcess/ProcessExecutor+Convenience.swift create mode 100644 Sources/AsyncProcess/ProcessExecutor.swift create mode 100644 Sources/AsyncProcess/ProcessExit.swift create mode 100644 Tests/AsyncProcessTests/AsyncByteBufferLineSequenceTests.swift create mode 100644 Tests/AsyncProcessTests/Helpers+LogRecorderHandler.swift create mode 100644 Tests/AsyncProcessTests/IntegrationTests.swift rename Tests/{ => SwiftSDKGeneratorTests}/ArchitectureMappingTests.swift (100%) rename Tests/{ => SwiftSDKGeneratorTests}/EndToEndTests.swift (100%) diff --git a/Package.resolved b/Package.resolved index 2e7b638..e7b8a3e 100644 --- a/Package.resolved +++ b/Package.resolved @@ -32,8 +32,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-atomics.git", "state" : { - "revision" : "ff3d2212b6b093db7f177d0855adbc4ef9c5f036", - "version" : "1.0.3" + "revision" : "6c89474e62719ddcc1e9614989fff2f68208fe10", + "version" : "1.1.0" } }, { @@ -59,8 +59,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/apple/swift-nio.git", "state" : { - "revision" : "e0cc6dd6ffa8e6a6f565938acd858b24e47902d0", - "version" : "2.50.0" + "revision" : "5f542894dd8efbd766d8adf73ef2f947b0cd5e21", + "version" : "2.56.0" } }, { diff --git a/Package.swift b/Package.swift index daeaf59..34f5e4a 100644 --- a/Package.swift +++ b/Package.swift @@ -19,6 +19,11 @@ let package = Package( .package(url: "https://github.com/apple/swift-system", from: "1.2.1"), .package(url: "https://github.com/apple/swift-argument-parser", from: "1.2.2"), .package(url: "https://github.com/apple/swift-async-algorithms.git", from: "0.1.0"), + .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"), + .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.17.0"), + .package(url: "https://github.com/apple/swift-log.git", from: "1.5.2"), + .package(url: "https://github.com/apple/swift-collections.git", from: "1.0.0"), ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. @@ -45,5 +50,28 @@ let package = Package( .target(name: "SwiftSDKGenerator"), ] ), + .target( + name: "AsyncProcess", + dependencies: [ + .product(name: "Atomics", package: "swift-atomics"), + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), + .product(name: "Logging", package: "swift-log"), + .product(name: "NIO", package: "swift-nio"), + .product(name: "NIOExtras", package: "swift-nio-extras"), + .product(name: "DequeModule", package: "swift-collections"), + .product(name: "SystemPackage", package: "swift-system"), + ] + ), + .testTarget( + name: "AsyncProcessTests", + dependencies: [ + "AsyncProcess", + .product(name: "Atomics", package: "swift-atomics"), + .product(name: "AsyncAlgorithms", package: "swift-async-algorithms"), + .product(name: "NIO", package: "swift-nio"), + .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), + .product(name: "Logging", package: "swift-log"), + ] + ), ] ) diff --git a/Sources/AsyncProcess/ChunkSequence.swift b/Sources/AsyncProcess/ChunkSequence.swift new file mode 100644 index 0000000..f0f2a91 --- /dev/null +++ b/Sources/AsyncProcess/ChunkSequence.swift @@ -0,0 +1,63 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import NIO + +#if os(Linux) || os(Android) || os(Windows) +@preconcurrency import Foundation +#else +import Foundation +#endif + +public struct IllegalStreamConsumptionError: Error { + var description: String +} + +public struct ChunkSequence: AsyncSequence & Sendable { + private let fileHandle: FileHandle? + private let group: EventLoopGroup + + public init(takingOwnershipOfFileHandle fileHandle: FileHandle?, group: EventLoopGroup) { + self.group = group + self.fileHandle = fileHandle + } + + public func makeAsyncIterator() -> AsyncIterator { + // This will close the file handle. + AsyncIterator(try! self.fileHandle?.fileContentStream(eventLoop: self.group.any())) + } + + public typealias Element = ByteBuffer + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = ByteBuffer + internal typealias UnderlyingSequence = FileContentStream + + private var underlyingIterator: UnderlyingSequence.AsyncIterator? + + internal init(_ underlyingSequence: UnderlyingSequence?) { + self.underlyingIterator = underlyingSequence?.makeAsyncIterator() + } + + public mutating func next() async throws -> Element? { + if self.underlyingIterator != nil { + return try await self.underlyingIterator!.next() + } else { + throw IllegalStreamConsumptionError( + description: """ + Either `.discard`ed, `.inherit`ed or redirected this stream to a `.fileHandle`, + cannot also consume it. To consume, please `.stream` it. + """ + ) + } + } + } +} diff --git a/Sources/AsyncProcess/EOFSequence.swift b/Sources/AsyncProcess/EOFSequence.swift new file mode 100644 index 0000000..d36fed9 --- /dev/null +++ b/Sources/AsyncProcess/EOFSequence.swift @@ -0,0 +1,27 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +public struct EOFSequence: AsyncSequence & Sendable { + public typealias Element = Element + + public struct AsyncIterator: AsyncIteratorProtocol { + public mutating func next() async throws -> Element? { + nil + } + } + + public init(of type: Element.Type = Element.self) {} + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator() + } +} diff --git a/Sources/AsyncProcess/FileContentStream.swift b/Sources/AsyncProcess/FileContentStream.swift new file mode 100644 index 0000000..347fd34 --- /dev/null +++ b/Sources/AsyncProcess/FileContentStream.swift @@ -0,0 +1,447 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import AsyncAlgorithms +import DequeModule +import Foundation +import NIO + +// ⚠️ IMPLEMENTATION WARNING +// - Known issues: +// - no tests +// - most configurations have never run +internal struct FileContentStream: AsyncSequence { + public typealias Element = ByteBuffer + typealias Underlying = AsyncThrowingChannel + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(underlying: self.asyncChannel.makeAsyncIterator()) + } + + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = ByteBuffer + + var underlying: Underlying.AsyncIterator + + public mutating func next() async throws -> ByteBuffer? { + try await self.underlying.next() + } + } + + public struct IOError: Error { + public var errnoValue: CInt + + public static func makeFromErrnoGlobal() -> IOError { + IOError(errnoValue: errno) + } + } + + private let asyncChannel: AsyncThrowingChannel + + public init( + fileDescriptor: CInt, + eventLoop: EventLoop, + blockingPool: NIOThreadPool? = nil + ) throws { + var statInfo: stat = .init() + let statError = fstat(fileDescriptor, &statInfo) + if statError != 0 { + throw IOError.makeFromErrnoGlobal() + } + + let dupedFD = dup(fileDescriptor) + let asyncChannel = AsyncThrowingChannel() + self.asyncChannel = asyncChannel + + switch statInfo.st_mode & S_IFMT { + case S_IFREG: + guard let blockingPool else { + throw IOError(errnoValue: EINVAL) + } + let fileHandle = NIOFileHandle(descriptor: dupedFD) + NonBlockingFileIO(threadPool: blockingPool) + .readChunked( + fileHandle: fileHandle, + byteCount: .max, + allocator: ByteBufferAllocator(), + eventLoop: eventLoop, + chunkHandler: { chunk in + eventLoop.makeFutureWithTask { + await asyncChannel.send(chunk) + } + } + ) + .whenComplete { result in + try! fileHandle.close() + switch result { + case let .failure(error): + asyncChannel.fail(error) + case .success: + asyncChannel.finish() + } + } + case S_IFSOCK: + _ = ClientBootstrap(group: eventLoop) + .channelInitializer { channel in + channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel)) + } + .withConnectedSocket(dupedFD) + case S_IFIFO: + let deadPipe = Pipe() + NIOPipeBootstrap(group: eventLoop) + .channelInitializer { channel in + channel.pipeline.addHandler(ReadIntoAsyncChannelHandler(sink: asyncChannel)) + } + .withPipes( + inputDescriptor: dupedFD, + outputDescriptor: dup(deadPipe.fileHandleForWriting.fileDescriptor) + ) + .whenSuccess { channel in + channel.close(mode: .output, promise: nil) + } + try! deadPipe.fileHandleForReading.close() + try! deadPipe.fileHandleForWriting.close() + case S_IFDIR: + throw IOError(errnoValue: EISDIR) + case S_IFBLK, S_IFCHR, S_IFLNK: + throw IOError(errnoValue: EINVAL) + default: + // odd, but okay + throw IOError(errnoValue: EINVAL) + } + } +} + +private final class ReadIntoAsyncChannelHandler: ChannelDuplexHandler { + typealias InboundIn = ByteBuffer + typealias OutboundIn = Never + + private var heldUpRead = false + private let sink: AsyncThrowingChannel + private var state: State = .idle + + enum State { + case idle + case error(Error) + case sending(Deque) + + mutating func enqueue(_ data: ReceivedEvent) -> ReceivedEvent? { + switch self { + case .idle: + self = .sending([]) + return data + case .error: + return nil + case var .sending(queue): + queue.append(data) + self = .sending(queue) + return nil + } + } + + mutating func didSendOne() -> ReceivedEvent? { + switch self { + case .idle: + preconditionFailure("didSendOne during .idle") + case .error: + return nil + case var .sending(queue): + if queue.isEmpty { + self = .idle + return nil + } else { + let value = queue.removeFirst() + self = .sending(queue) + return value + } + } + } + + mutating func fail(_ error: Error) { + switch self { + case .idle, .sending: + self = .error(error) + case .error: + return + } + } + } + + enum ReceivedEvent { + case chunk(ByteBuffer) + case finish + } + + private var shouldRead: Bool { + switch self.state { + case .idle: + return true + case .error: + return false + case .sending: + return false + } + } + + init(sink: AsyncThrowingChannel) { + self.sink = sink + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let data = self.unwrapInboundIn(data) + if let itemToSend = self.state.enqueue(.chunk(data)) { + self.sendOneItem(itemToSend, context: context) + } + } + + private func sendOneItem(_ data: ReceivedEvent, context: ChannelHandlerContext) { + context.eventLoop.assertInEventLoop() + assert(self.shouldRead == false, "sendOneItem in unexpected state \(self.state)") + context.eventLoop.makeFutureWithTask { + switch data { + case let .chunk(data): + await self.sink.send(data) + case .finish: + self.sink.finish() + } + }.map { + if let moreToSend = self.state.didSendOne() { + self.sendOneItem(moreToSend, context: context) + } else { + if self.heldUpRead { + context.eventLoop.execute { + context.read() + } + } + } + }.whenFailure { error in + self.state.fail(error) + } + } + + func errorCaught(context: ChannelHandlerContext, error: Error) { + self.state.fail(error) + self.sink.fail(error) + context.close(promise: nil) + } + + func channelInactive(context: ChannelHandlerContext) { + if let itemToSend = self.state.enqueue(.finish) { + self.sendOneItem(itemToSend, context: context) + } + } + + func read(context: ChannelHandlerContext) { + if self.shouldRead { + context.read() + } else { + self.heldUpRead = true + } + } +} + +extension FileHandle { + func fileContentStream(eventLoop: EventLoop) throws -> FileContentStream { + let asyncBytes = try FileContentStream(fileDescriptor: self.fileDescriptor, eventLoop: eventLoop) + try self.close() + return asyncBytes + } +} + +extension FileContentStream { + var lines: AsyncByteBufferLineSequence { + AsyncByteBufferLineSequence( + self, + dropTerminator: true, + maximumAllowableBufferSize: 1024 * 1024, + dropLastChunkIfNoNewline: false + ) + } +} + +public extension AsyncSequence where Element == ByteBuffer { + func splitIntoLines( + dropTerminator: Bool = true, + maximumAllowableBufferSize: Int = 1024 * 1024, + dropLastChunkIfNoNewline: Bool = false + ) -> AsyncByteBufferLineSequence { + AsyncByteBufferLineSequence( + self, + dropTerminator: dropTerminator, + maximumAllowableBufferSize: maximumAllowableBufferSize, + dropLastChunkIfNoNewline: dropLastChunkIfNoNewline + ) + } + + var strings: AsyncMapSequence { + self.map { String(buffer: $0) } + } +} + +public struct AsyncByteBufferLineSequence: AsyncSequence & Sendable + where Base: AsyncSequence, Base.Element == ByteBuffer +{ + public typealias Element = ByteBuffer + private let underlying: Base + private let dropTerminator: Bool + private let maximumAllowableBufferSize: Int + private let dropLastChunkIfNoNewline: Bool + + public struct AsyncIterator: AsyncIteratorProtocol { + public typealias Element = ByteBuffer + private var underlying: Base.AsyncIterator + private let dropTerminator: Bool + private let maximumAllowableBufferSize: Int + private let dropLastChunkIfNoNewline: Bool + private var buffer = Buffer() + + struct Buffer { + private var buffer: [ByteBuffer] = [] + internal private(set) var byteCount: Int = 0 + + mutating func append(_ buffer: ByteBuffer) { + self.buffer.append(buffer) + self.byteCount += buffer.readableBytes + } + + func allButLast() -> ArraySlice { + self.buffer.dropLast() + } + + var byteCountButLast: Int { + self.byteCount - (self.buffer.last?.readableBytes ?? 0) + } + + var lastChunkView: ByteBufferView? { + self.buffer.last?.readableBytesView + } + + mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int) -> ByteBuffer { + var output = ByteBuffer() + output.reserveCapacity(lastLength + self.byteCountButLast) + + var writtenBytes = 0 + for buffer in self.buffer.dropLast() { + writtenBytes += output.writeImmutableBuffer(buffer) + } + writtenBytes += output.writeImmutableBuffer( + self.buffer[self.buffer.endIndex - 1].readSlice(length: lastLength)! + ) + if self.buffer.last!.readableBytes > 0 { + if self.buffer.count > 1 { + self.buffer.swapAt(0, self.buffer.endIndex - 1) + } + self.buffer.removeLast(self.buffer.count - 1) + } else { + self.buffer = [] + } + + self.byteCount -= writtenBytes + assert(self.byteCount >= 0) + return output + } + } + + internal init( + underlying: Base.AsyncIterator, + dropTerminator: Bool, + maximumAllowableBufferSize: Int, + dropLastChunkIfNoNewline: Bool + ) { + self.underlying = underlying + self.dropTerminator = dropTerminator + self.maximumAllowableBufferSize = maximumAllowableBufferSize + self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline + } + + private mutating func deliverUpTo( + view: ByteBufferView, + index: ByteBufferView.Index, + expectNewline: Bool + ) -> ByteBuffer { + let howMany = view.startIndex.distance(to: index) + (expectNewline ? 1 : 0) + + var output = self.buffer.concatenateEverything(upToLastChunkLengthToConsume: howMany) + if expectNewline { + assert(output.readableBytesView.last == UInt8(ascii: "\n")) + assert( + output.readableBytesView.firstIndex(of: UInt8(ascii: "\n")) + == output.readableBytesView.index(before: output.readableBytesView.endIndex) + ) + } else { + assert(output.readableBytesView.last != UInt8(ascii: "\n")) + assert(!output.readableBytesView.contains(UInt8(ascii: "\n"))) + } + if self.dropTerminator && expectNewline { + output.moveWriterIndex(to: output.writerIndex - 1) + } + + return output + } + + public mutating func next() async throws -> Element? { + while true { + if let view = self.buffer.lastChunkView { + if let newlineIndex = view.firstIndex(of: UInt8(ascii: "\n")) { + return self.deliverUpTo( + view: view, + index: newlineIndex, + expectNewline: true + ) + } + + if self.buffer.byteCount > self.maximumAllowableBufferSize { + return self.deliverUpTo( + view: view, + index: view.endIndex, + expectNewline: false + ) + } + } + + if let nextBuffer = try await self.underlying.next() { + self.buffer.append(nextBuffer) + } else { + if !self.dropLastChunkIfNoNewline, let view = self.buffer.lastChunkView, !view.isEmpty { + return self.deliverUpTo( + view: view, + index: view.endIndex, + expectNewline: false + ) + } else { + return nil + } + } + } + } + } + + public init( + _ underlying: Base, dropTerminator: Bool, + maximumAllowableBufferSize: Int, + dropLastChunkIfNoNewline: Bool + ) { + self.underlying = underlying + self.dropTerminator = dropTerminator + self.maximumAllowableBufferSize = maximumAllowableBufferSize + self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline + } + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator( + underlying: self.underlying.makeAsyncIterator(), + dropTerminator: self.dropTerminator, + maximumAllowableBufferSize: self.maximumAllowableBufferSize, + dropLastChunkIfNoNewline: self.dropLastChunkIfNoNewline + ) + } +} diff --git a/Sources/AsyncProcess/NIOAsyncPipeWriter.swift b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift new file mode 100644 index 0000000..557da61 --- /dev/null +++ b/Sources/AsyncProcess/NIOAsyncPipeWriter.swift @@ -0,0 +1,43 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import Foundation +import NIO +import NIOExtras + +struct NIOAsyncPipeWriter where Chunks.Element == ByteBuffer { + static func sinkSequenceInto( + _ chunks: Chunks, + fileDescriptor fd: CInt, + eventLoop: EventLoop + ) async throws { + // Just so we've got an input. + // (workaround for https://github.com/apple/swift-nio/issues/2444) + let deadPipe = Pipe() + let channel = try await NIOPipeBootstrap(group: eventLoop) + .channelOption(ChannelOptions.allowRemoteHalfClosure, value: true) + .channelOption(ChannelOptions.autoRead, value: false) + .withPipes( + inputDescriptor: dup(deadPipe.fileHandleForReading.fileDescriptor), + outputDescriptor: dup(fd) + ).get() + channel.close(mode: .input, promise: nil) + try! deadPipe.fileHandleForReading.close() + try! deadPipe.fileHandleForWriting.close() + defer { + channel.close(promise: nil) + } + for try await chunk in chunks { + try await channel.writeAndFlush(chunk).get() + } + } +} diff --git a/Sources/AsyncProcess/ProcessExecutor+Convenience.swift b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift new file mode 100644 index 0000000..06b089b --- /dev/null +++ b/Sources/AsyncProcess/ProcessExecutor+Convenience.swift @@ -0,0 +1,269 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import AsyncAlgorithms +import Logging +import NIO + +public struct OutputLoggingSettings { + /// Where should the output line put to? + public enum WhereTo { + /// Put the output line into the logMessage itself. + case logMessage + + /// Put the output line into the `metadata` of the ``Logger``. + case metadata(logMessage: Logger.Message, key: Logger.Metadata.Key) + } + + /// Which ``Logger.Level`` to log the output at. + public var logLevel: Logger.Level + + public var to: WhereTo + + public init(logLevel: Logger.Level, to: OutputLoggingSettings.WhereTo) { + self.logLevel = logLevel + self.to = to + } + + internal func logMessage(line: String) -> Logger.Message { + switch self.to { + case .logMessage: + return "\(line)" + case .metadata(logMessage: let message, key: _): + return message + } + } + + internal func metadata(stream: ProcessOutputStream, line: String) -> Logger.Metadata { + switch self.to { + case .logMessage: + return ["stream": "\(stream.description)"] + case .metadata(logMessage: _, let key): + return [key: "\(line)"] + } + } +} + +public extension ProcessExecutor { + /// Run child process, discarding all its output. + static func run( + group: EventLoopGroup, + executable: String, + _ arguments: [String], + standardInput: StandardInput, + environment: [String: String] = [:], + logger: Logger + ) async throws -> ProcessExitReason { + let p = Self( + group: group, + executable: executable, + arguments, + environment: environment, + standardInput: standardInput, + standardOutput: .discard, + standardError: .discard, + logger: logger + ) + return try await p.run() + } + + /// Run child process, logging all its output line by line. + static func runLogOutput( + group: EventLoopGroup, + executable: String, + _ arguments: [String], + standardInput: StandardInput, + environment: [String: String] = [:], + logger: Logger, + logConfiguration: OutputLoggingSettings + ) async throws -> ProcessExitReason { + let exe = ProcessExecutor( + group: group, + executable: executable, + arguments, + environment: environment, + standardInput: standardInput, + standardOutput: .stream, + standardError: .stream, + logger: logger + ) + return try await withThrowingTaskGroup(of: ProcessExitReason?.self) { group in + group.addTask { + for try await (stream, line) in await merge( + exe.standardOutput.splitIntoLines().strings.map { (ProcessOutputStream.standardOutput, $0) }, + exe.standardError.splitIntoLines().strings.map { (ProcessOutputStream.standardError, $0) } + ) { + logger.log( + level: logConfiguration.logLevel, + logConfiguration.logMessage(line: line), + metadata: logConfiguration.metadata(stream: stream, line: line) + ) + } + return nil + } + + group.addTask { + try await exe.run() + } + + while let next = try await group.next() { + if let result = next { + return result + } + } + fatalError("the impossible happened, second task didn't return.") + } + } + + /// Run child process, process all its output (`stdout` and `stderr`) using a closure. + static func runProcessingOutput( + group: EventLoopGroup, + executable: String, + _ arguments: [String], + standardInput: StandardInput, + outputProcessor: @escaping @Sendable (ProcessOutputStream, ByteBuffer) async throws -> (), + splitOutputIntoLines: Bool = false, + environment: [String: String] = [:], + logger: Logger + ) async throws -> ProcessExitReason { + let exe = ProcessExecutor( + group: group, + executable: executable, + arguments, + environment: environment, + standardInput: standardInput, + standardOutput: .stream, + standardError: .stream, + logger: logger + ) + return try await withThrowingTaskGroup(of: ProcessExitReason?.self) { group in + group.addTask { + if splitOutputIntoLines { + for try await (stream, chunk) in await merge( + exe.standardOutput.splitIntoLines().map { (ProcessOutputStream.standardOutput, $0) }, + exe.standardError.splitIntoLines().map { (ProcessOutputStream.standardError, $0) } + ) { + try await outputProcessor(stream, chunk) + } + return nil + } else { + for try await (stream, chunk) in await merge( + exe.standardOutput.map { (ProcessOutputStream.standardOutput, $0) }, + exe.standardError.map { (ProcessOutputStream.standardError, $0) } + ) { + try await outputProcessor(stream, chunk) + } + return nil + } + } + + group.addTask { + try await exe.run() + } + + while let next = try await group.next() { + if let result = next { + return result + } + } + fatalError("the impossible happened, second task didn't return.") + } + } + + struct TooMuchProcessOutputError: Error, Sendable & Hashable { + public var stream: ProcessOutputStream + } + + struct ProcessExitReasonAndOutput: Sendable & Hashable { + public var exitReason: ProcessExitReason + public var standardOutput: ByteBuffer? + public var standardError: ByteBuffer? + } + + internal enum ProcessExitInformationPiece { + case exitReason(ProcessExitReason) + case standardOutput(ByteBuffer?) + case standardError(ByteBuffer?) + } + + static func runCollectingOutput( + group: EventLoopGroup, + executable: String, + _ arguments: [String], + standardInput: StandardInput, + collectStandardOutput: Bool, + collectStandardError: Bool, + perStreamCollectionLimitBytes: Int = 128 * 1024, + environment: [String: String] = [:], + logger: Logger + ) async throws -> ProcessExitReasonAndOutput { + let exe = ProcessExecutor( + group: group, + executable: executable, + arguments, + environment: environment, + standardInput: standardInput, + standardOutput: collectStandardOutput ? .stream : .discard, + standardError: collectStandardError ? .stream : .discard, + logger: logger + ) + + return try await withThrowingTaskGroup(of: ProcessExitInformationPiece.self) { group in + group.addTask { + if collectStandardOutput { + var output: ByteBuffer? + for try await chunk in await exe.standardOutput { + guard (output?.readableBytes ?? 0) + chunk.readableBytes <= perStreamCollectionLimitBytes else { + throw TooMuchProcessOutputError(stream: .standardOutput) + } + output.setOrWriteImmutableBuffer(chunk) + } + return .standardOutput(output ?? ByteBuffer()) + } else { + return .standardOutput(nil) + } + } + + group.addTask { + if collectStandardError { + var output: ByteBuffer? + for try await chunk in await exe.standardError { + guard (output?.readableBytes ?? 0) + chunk.readableBytes <= perStreamCollectionLimitBytes else { + throw TooMuchProcessOutputError(stream: .standardError) + } + output.setOrWriteImmutableBuffer(chunk) + } + return .standardError(output ?? ByteBuffer()) + } else { + return .standardError(nil) + } + } + + group.addTask { + .exitReason(try await exe.run()) + } + + var allInfo = ProcessExitReasonAndOutput(exitReason: .exit(-1), standardOutput: nil, standardError: nil) + while let next = try await group.next() { + switch next { + case let .exitReason(exitReason): + allInfo.exitReason = exitReason + case let .standardOutput(output): + allInfo.standardOutput = output + case let .standardError(output): + allInfo.standardError = output + } + } + return allInfo + } + } +} diff --git a/Sources/AsyncProcess/ProcessExecutor.swift b/Sources/AsyncProcess/ProcessExecutor.swift new file mode 100644 index 0000000..0685126 --- /dev/null +++ b/Sources/AsyncProcess/ProcessExecutor.swift @@ -0,0 +1,401 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import Atomics +import Foundation +import Logging +import NIO + +@_exported import struct SystemPackage.FileDescriptor + +public struct ProcessOutputStream: Sendable & Hashable & CustomStringConvertible { + internal enum Backing { + case standardOutput + case standardError + } + + internal var backing: Backing + + public static let standardOutput: Self = .init(backing: .standardOutput) + + public static let standardError: Self = .init(backing: .standardError) + + public var description: String { + switch self.backing { + case .standardOutput: + return "stdout" + case .standardError: + return "stderr" + } + } +} + +/// What to do with a given stream (`stdout`/`stderr`) in the spawned child process. +public struct ProcessOutput { + internal enum Backing { + case discard + case inherit + case fileDescriptor(FileDescriptor) + case stream + } + + internal var backing: Backing + + /// Discard the child process' output. + /// + /// This will set the process' stream to `/dev/null`. + public static let discard: Self = .init(backing: .discard) + + /// Inherit the same file description from the parent process (i.e. this process). + public static let inherit: Self = .init(backing: .inherit) + + /// Take ownership of `fd` and install that as the child process' file descriptor. + /// + /// - warning: After passing a `FileDescriptor` to this method you _must not_ perform _any_ other operations on it. + public static func fileDescriptor(takingOwnershipOf fd: FileDescriptor) -> Self { + .init(backing: .fileDescriptor(fd)) + } + + /// Stream this using the ``ProcessExecutor.standardOutput`` / ``ProcessExecutor.standardError`` ``AsyncStream``s. + /// + /// If you select `.stream`, you _must_ consume the stream. This is back-pressured into the child which means that + /// if you fail to consume the child might get blocked producing its output. + public static let stream: Self = .init(backing: .stream) +} + +private struct OutputConsumptionState: OptionSet { + typealias RawValue = UInt8 + + var rawValue: UInt8 + + init(rawValue: UInt8) { + self.rawValue = rawValue + } + + static let stdoutConsumed: Self = .init(rawValue: 0b0001) + static let stderrConsumed: Self = .init(rawValue: 0b0010) + static let stdoutNotStreamed: Self = .init(rawValue: 0b0100) + static let stderrNotStreamed: Self = .init(rawValue: 0b1000) + + var hasStandardOutputBeenConsumed: Bool { + self.contains([.stdoutConsumed]) + } + + var hasStandardErrorBeenConsumed: Bool { + self.contains([.stderrConsumed]) + } + + var isStandardOutputStremed: Bool { + !self.contains([.stdoutNotStreamed]) + } + + var isStandardErrorStremed: Bool { + !self.contains([.stderrNotStreamed]) + } +} + +/// Execute a sub-process. +/// +/// - warning: Currently, the default for `standardOutput` & `standardError` is ``ProcessOutput.stream`` which means +/// you _must_ consume ``ProcessExecutor.standardOutput`` & ``ProcessExecutor.standardError``. If you prefer +/// to not consume it, please set them to ``ProcessOutput.discard`` explicitly. +public actor ProcessExecutor where StandardInput.Element == ByteBuffer { + private let logger: Logger + private let group: EventLoopGroup + private let executable: String + private let arguments: [String] + private let environment: [String: String]? + private let standardInput: StandardInput + private let standardInputPipe: Pipe? + private let standardOutputWriteHandle: FileHandle? + private let standardErrorWriteHandle: FileHandle? + private let _standardOutput: ChunkSequence + private let _standardError: ChunkSequence + private let processIsRunningApproximation = ManagedAtomic(RunningStateApproximation.neverStarted.rawValue) + private let processOutputConsumptionApproximation = ManagedAtomic(UInt8(0)) + + public var standardOutput: ChunkSequence { + let afterValue = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stdoutConsumed.rawValue, + ordering: .relaxed + ) + precondition( + OutputConsumptionState(rawValue: afterValue).contains([.stdoutConsumed]), + "Double-consumption of stdandardOutput" + ) + return self._standardOutput + } + + public var standardError: ChunkSequence { + let afterValue = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stderrConsumed.rawValue, + ordering: .relaxed + ) + precondition( + OutputConsumptionState(rawValue: afterValue).contains([.stderrConsumed]), + "Double-consumption of stdandardEror" + ) + return self._standardError + } + + private enum RunningStateApproximation: Int { + case neverStarted = 1 + case running = 2 + case finishedExecuting = 3 + } + + public init( + group: EventLoopGroup, + executable: String, + _ arguments: [String], + environment: [String: String]? = nil, + standardInput: StandardInput, + standardOutput: ProcessOutput = .stream, + standardError: ProcessOutput = .stream, + logger: Logger + ) { + self.group = group + self.executable = executable + self.environment = environment + self.arguments = arguments + self.standardInput = standardInput + self.logger = logger + + self.standardInputPipe = StandardInput.self == EOFSequence.self ? nil : Pipe() + + switch standardOutput.backing { + case .discard: + _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stdoutNotStreamed.rawValue, + ordering: .relaxed + ) + self.standardOutputWriteHandle = FileHandle(forWritingAtPath: "/dev/null") + self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + case let .fileDescriptor(fd): + _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stdoutNotStreamed.rawValue, + ordering: .relaxed + ) + self.standardOutputWriteHandle = FileHandle(fileDescriptor: fd.rawValue) + self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + case .inherit: + _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stdoutNotStreamed.rawValue, + ordering: .relaxed + ) + self.standardOutputWriteHandle = nil + self._standardOutput = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + case .stream: + let (stdoutSequence, stdoutWriteHandle) = Self.makeWriteStream(group: group) + self._standardOutput = stdoutSequence + self.standardOutputWriteHandle = stdoutWriteHandle + } + + switch standardError.backing { + case .discard: + _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stderrNotStreamed.rawValue, + ordering: .relaxed + ) + self.standardErrorWriteHandle = FileHandle(forWritingAtPath: "/dev/null") + self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + case let .fileDescriptor(fd): + _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stderrNotStreamed.rawValue, + ordering: .relaxed + ) + self.standardErrorWriteHandle = FileHandle(fileDescriptor: fd.rawValue) + self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + case .inherit: + _ = self.processOutputConsumptionApproximation.bitwiseXorThenLoad( + with: OutputConsumptionState.stderrNotStreamed.rawValue, + ordering: .relaxed + ) + self.standardErrorWriteHandle = nil + self._standardError = ChunkSequence(takingOwnershipOfFileHandle: nil, group: group) + case .stream: + let (stdoutSequence, stdoutWriteHandle) = Self.makeWriteStream(group: group) + self._standardError = stdoutSequence + self.standardErrorWriteHandle = stdoutWriteHandle + } + } + + private static func makeWriteStream(group: EventLoopGroup) -> (ChunkSequence, FileHandle) { + let pipe = Pipe() + let chunkSequence = ChunkSequence( + takingOwnershipOfFileHandle: pipe.fileHandleForReading, + group: group + ) + let writeHandle = pipe.fileHandleForWriting + return (chunkSequence, writeHandle) + } + + deinit { + let runningState = self.processIsRunningApproximation.load(ordering: .relaxed) + assert( + runningState == RunningStateApproximation.finishedExecuting.rawValue, + """ + Did you create a ProcessExecutor without run()ning it? \ + That's currently illegal: \ + illegal running state \(runningState) in deinit + """ + ) + + let outputConsumptionState = OutputConsumptionState( + rawValue: self.processOutputConsumptionApproximation.load(ordering: .relaxed) + ) + assert( + { () -> Bool in + guard + outputConsumptionState.contains([.stdoutConsumed]) + || outputConsumptionState.contains([.stdoutNotStreamed]) + else { + return false + } + + guard + outputConsumptionState.contains([.stderrConsumed]) + || outputConsumptionState.contains([.stderrNotStreamed]) + else { + return false + } + return true + }(), + """ + Did you create a ProcessExecutor with standardOutput/standardError in `.stream.` mode without + then consuming it? \ + That's currently illegal. If you do not want to consume the output, consider `.discard`int it: \ + illegal output consumption state \(outputConsumptionState) in deinit + """ + ) + } + + public func run() async throws -> ProcessExitReason { + let p = Process() + #if canImport(Darwin) + if #available(macOS 13.0, *) { + p.executableURL = URL(filePath: self.executable) + } else { + p.launchPath = self.executable + } + #else + p.executableURL = URL(fileURLWithPath: self.executable) + #endif + p.arguments = self.arguments + p.environment = self.environment ?? [:] + p.standardInput = nil + + if let standardOutputWriteHandle = self.standardOutputWriteHandle { + // NOTE: Do _NOT_ remove this if. Setting this to `nil` is different to not setting it at all! + p.standardOutput = standardOutputWriteHandle + } + if let standardErrorWriteHandle = self.standardErrorWriteHandle { + // NOTE: Do _NOT_ remove this if. Setting this to `nil` is different to not setting it at all! + p.standardError = standardErrorWriteHandle + } + p.standardInput = self.standardInputPipe + + @Sendable + func go() async throws -> ProcessExitReason { + try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in + p.terminationHandler = { p in + self.logger.debug( + "finished running command", + metadata: [ + "termination-reason": p.terminationReason == .uncaughtSignal ? "signal" : "exit", + "termination-status": "\(p.terminationStatus)", + "pid": "\(p.processIdentifier)", + ] + ) + let (worked, original) = self.processIsRunningApproximation.compareExchange( + expected: RunningStateApproximation.running.rawValue, + desired: RunningStateApproximation.finishedExecuting.rawValue, + ordering: .relaxed + ) + precondition(worked, "illegal running state \(original)") + + if p.terminationReason == .uncaughtSignal { + continuation.resume(returning: .signal(p.terminationStatus)) + } else { + continuation.resume(returning: .exit(p.terminationStatus)) + } + } + do { + let (worked, original) = self.processIsRunningApproximation.compareExchange( + expected: RunningStateApproximation.neverStarted.rawValue, + desired: RunningStateApproximation.running.rawValue, + ordering: .relaxed + ) + precondition(worked, "illegal running state \(original)") + try p.run() + self.logger.debug( + "running command", + metadata: [ + "executable": "\(self.executable)", + "arguments": "\(self.arguments)", + "pid": "\(p.processIdentifier)", + ] + ) + } catch { + continuation.resume(throwing: error) + } + + try! self.standardInputPipe?.fileHandleForReading.close() // Must work. + try! self.standardOutputWriteHandle?.close() // Must work. + try! self.standardErrorWriteHandle?.close() // Must work. + } + } + + @Sendable + func cancel() { + guard p.processIdentifier != 0 else { + self.logger.warning("leaking Process \(p) because it hasn't been started yet") + return + } + self.logger.info("terminating process", metadata: ["pid": "\(p.processIdentifier)"]) + #if os(Linux) + // workaround: https://github.com/apple/swift-corelibs-foundation/issues/4772 + if p.isRunning { + kill(p.processIdentifier, SIGKILL) + } + #else + p.terminate() + #endif + } + + return try await withThrowingTaskGroup(of: ProcessExitReason?.self, returning: ProcessExitReason.self) { + group in + group.addTask { + try await withTaskCancellationHandler(operation: go, onCancel: cancel) + } + group.addTask { + if let stdinPipe = self.standardInputPipe { + try await NIOAsyncPipeWriter.sinkSequenceInto( + self.standardInput, + fileDescriptor: stdinPipe.fileHandleForWriting.fileDescriptor, + eventLoop: self.group.any() + ) + } + return nil + } + + var exitReason: ProcessExitReason? + while let result = try await group.next() { + if let result { + exitReason = result + } + } + return exitReason! // must work because the real task will return a reason (or throw) + } + } +} diff --git a/Sources/AsyncProcess/ProcessExit.swift b/Sources/AsyncProcess/ProcessExit.swift new file mode 100644 index 0000000..5672430 --- /dev/null +++ b/Sources/AsyncProcess/ProcessExit.swift @@ -0,0 +1,39 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +public enum ProcessExitReason: Hashable & Sendable { + case exit(CInt) + case signal(CInt) + + public func throwIfNonZero() throws { + switch self { + case .exit(0): + return + default: + throw ProcessExecutionError(self) + } + } +} + +public struct ProcessExecutionError: Error & Hashable & Sendable { + public var exitReason: ProcessExitReason + + public init(_ exitResult: ProcessExitReason) { + self.exitReason = exitResult + } +} + +extension ProcessExecutionError: CustomStringConvertible { + public var description: String { + "process exited non-zero: \(self.exitReason)" + } +} diff --git a/Tests/AsyncProcessTests/AsyncByteBufferLineSequenceTests.swift b/Tests/AsyncProcessTests/AsyncByteBufferLineSequenceTests.swift new file mode 100644 index 0000000..8a999b8 --- /dev/null +++ b/Tests/AsyncProcessTests/AsyncByteBufferLineSequenceTests.swift @@ -0,0 +1,77 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import AsyncProcess +import Foundation +import NIO +import XCTest + +final class AsyncByteBufferLineSequenceTests: XCTestCase { + func testJustManyNewlines() async throws { + for n in 0..<100 { + let inputs: [ByteBuffer] = [ByteBuffer(repeating: UInt8(ascii: "\n"), count: n)] + let lines = try await Array(inputs.async.splitIntoLines().strings) + XCTAssertEqual(Array(repeating: "", count: n), lines) + } + } + + func testJustOneNewlineAtATime() async throws { + for n in 0..<100 { + let inputs: [ByteBuffer] = Array(repeating: ByteBuffer(integer: UInt8(ascii: "\n")), count: n) + let lines = try await Array(inputs.async.splitIntoLines().strings) + XCTAssertEqual(Array(repeating: "", count: n), lines) + } + } + + func testManyChunksNoNewlineDeliveringLastChunk() async throws { + for n in 1..<100 { + let inputs: [ByteBuffer] = [ByteBuffer(repeating: 0, count: n)] + let lines = try await Array(inputs.async.splitIntoLines().strings) + XCTAssertEqual([String(repeating: "\0", count: n)], lines) + } + } + + func testManyChunksNoNewlineNotDeliveringLastChunk() async throws { + for n in 0..<100 { + let inputs: [ByteBuffer] = [ByteBuffer(repeating: 0, count: n)] + let lines = try await Array(inputs.async.splitIntoLines(dropLastChunkIfNoNewline: true).strings) + XCTAssertEqual([], lines) + } + } + + func testOverlyLongLineIsSplit() async throws { + var inputs = Array(repeating: ByteBuffer(integer: UInt8(0)), count: 10) + inputs.append(ByteBuffer(integer: UInt8(ascii: "\n"))) + let lines = try await Array( + inputs.async.splitIntoLines( + maximumAllowableBufferSize: 3, + dropLastChunkIfNoNewline: true + ).strings + ) + XCTAssertEqual(["\0\0\0\0", "\0\0\0\0", "\0\0"], lines) + } + + func testOverlyLongLineIsSplitByDefault() async throws { + var inputs = [ByteBuffer(repeating: UInt8(0), count: 1024 * 1024 - 2)] // almost at the limit + inputs.append(ByteBuffer(integer: UInt8(ascii: "\0"))) + inputs.append(ByteBuffer(integer: UInt8(ascii: "\0"))) // hitting the limit + inputs.append(ByteBuffer(integer: UInt8(ascii: "\0"))) // over the limit + inputs.append(ByteBuffer(integer: UInt8(ascii: "\n"))) // too late + let lines = try await Array( + inputs.async.splitIntoLines( + dropTerminator: false, + dropLastChunkIfNoNewline: true + ).strings + ) + XCTAssertEqual([String(repeating: "\0", count: 1024 * 1024 + 1), "\n"], lines) + } +} diff --git a/Tests/AsyncProcessTests/Helpers+LogRecorderHandler.swift b/Tests/AsyncProcessTests/Helpers+LogRecorderHandler.swift new file mode 100644 index 0000000..606860a --- /dev/null +++ b/Tests/AsyncProcessTests/Helpers+LogRecorderHandler.swift @@ -0,0 +1,98 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import Logging +import NIOConcurrencyHelpers + +internal final class LogRecorderHandler: LogHandler { + internal let state = NIOLockedValueBox(State()) + + struct FullLogMessage: Equatable { + var level: Logger.Level + var message: Logger.Message + var metadata: Logger.Metadata + } + + struct State { + var metadata: [String: Logger.Metadata.Value] = [:] + var messages: [FullLogMessage] = [] + var logLevel: Logger.Level = .trace + } + + func makeLogger() -> Logger { + Logger(label: "LogRecorder for tests", factory: { _ in self }) + } + + func log( + level: Logger.Level, + message: Logger.Message, + metadata: Logger.Metadata?, + source: String, + file: String, + function: String, + line: UInt + ) { + let fullMessage = FullLogMessage( + level: level, + message: message, + metadata: self.metadata.merging(metadata ?? [:]) { _, r in r } + ) + self.state.withLockedValue { state in + state.messages.append(fullMessage) + } + } + + var recordedMessages: [FullLogMessage] { + self.state.withLockedValue { $0.messages } + } + + subscript(metadataKey key: String) -> Logging.Logger.Metadata.Value? { + get { + self.state.withLockedValue { + $0.metadata[key] + } + } + set { + self.state.withLockedValue { + $0.metadata[key] = newValue + } + } + } + + var metadata: Logging.Logger.Metadata { + get { + self.state.withLockedValue { + $0.metadata + } + } + + set { + self.state.withLockedValue { + $0.metadata = newValue + } + } + } + + var logLevel: Logging.Logger.Level { + get { + self.state.withLockedValue { + $0.logLevel + } + } + + set { + self.state.withLockedValue { + $0.logLevel = newValue + } + } + } +} diff --git a/Tests/AsyncProcessTests/IntegrationTests.swift b/Tests/AsyncProcessTests/IntegrationTests.swift new file mode 100644 index 0000000..465409d --- /dev/null +++ b/Tests/AsyncProcessTests/IntegrationTests.swift @@ -0,0 +1,799 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift open source project +// +// Copyright (c) 2022-2023 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See http://swift.org/LICENSE.txt for license information +// See http://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import AsyncAlgorithms +import AsyncProcess +import Atomics +import Logging +import NIO +import NIOConcurrencyHelpers +import XCTest + +#if canImport(Darwin) +import Darwin +#else +import Glibc +#endif + +final class IntegrationTests: XCTestCase { + private var group: EventLoopGroup! + private var logger: Logger! + + func testTheBasicsWork() async throws { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", ["-c", "exit 0"], + standardInput: EOFSequence(), + logger: self.logger + ) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + for try await chunk in await merge(exe.standardOutput, exe.standardError) { + XCTFail("unexpected output: \(chunk.debugDescription)") + } + } + let result = try await exe.run() + XCTAssertEqual(.exit(CInt(0)), result) + } + } + + func testExitCodesWork() async throws { + for exitCode in UInt8.min...UInt8.max { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", ["-c", "exit \(exitCode)"], + standardInput: EOFSequence(), + logger: self.logger + ) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + for try await chunk in await merge(exe.standardOutput, exe.standardError) { + XCTFail("unexpected output: \(chunk.debugDescription)") + } + } + + let result = try await exe.run() + XCTAssertEqual(.exit(CInt(exitCode)), result) + } + } + } + + func testSignalsWork() async throws { + #if os(Linux) + // workaround for https://github.com/apple/swift-corelibs-foundation/issues/4772 + let signalsToTest: [CInt] = [SIGKILL] + #else + let signalsToTest: [CInt] = [SIGKILL, SIGTERM, SIGINT] + #endif + for signal in signalsToTest { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", ["-c", "kill -\(signal) $$"], + standardInput: EOFSequence(), + logger: self.logger + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + for try await chunk in await merge(exe.standardOutput, exe.standardError) { + XCTFail("unexpected output: \(chunk.debugDescription)") + } + } + + let result = try await exe.run() + XCTAssertEqual(.signal(CInt(signal)), result) + } + } + } + + func testStreamingInputAndOutputWorks() async throws { + let input = AsyncStream.justMakeIt(elementType: ByteBuffer.self) + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/cat", ["-nu"], // sh", ["-c", "while read -r line; do echo $line; done"], + standardInput: input.consumer, + logger: self.logger + ) + try await withThrowingTaskGroup(of: ProcessExitReason?.self) { group in + group.addTask { + var lastLine: String? + for try await line in await exe.standardOutput.splitIntoLines(dropTerminator: false) { + if line.readableBytes > 72 { + lastLine = String(buffer: line) + break + } + input.producer.yield(line) + } + XCTAssertEqual( + " 10\t 9\t 8\t 7\t 6\t 5\t 4\t 3\t 2\t 1\tGO\n", + lastLine + ) + return nil + } + + group.addTask { + for try await chunk in await exe.standardError { + XCTFail("unexpected stderr output: \(chunk.debugDescription)") + } + return nil + } + + group.addTask { + try await exe.run() + } + + input.producer.yield(ByteBuffer(string: "GO\n")) + + // The stdout-reading task will exit first (the others will only return when we explicitly cancel because + // the sub process would keep going forever). + let stdoutReturn = try await group.next() + var totalTasksReturned = 1 + XCTAssertEqual(.some(nil), stdoutReturn) + group.cancelAll() + + while let furtherReturn = try await group.next() { + totalTasksReturned += 1 + switch furtherReturn { + case let .some(result): + // the `exe.run()` task + #if os(Linux) + // Because of the workaround for https://github.com/apple/swift-corelibs-foundation/issues/4772 , + // we can't use `Process.terminate()` on Linux... + XCTAssertEqual(.signal(SIGKILL), result) + #else + XCTAssertEqual(.signal(SIGTERM), result) + #endif + case .none: + // stderr task + () + } + } + XCTAssertEqual(3, totalTasksReturned) + } + } + + func testAbsorbing1MBOfDevZeroWorks() async throws { + let totalAmountInBytes = 1024 * 1024 + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + [ + "-c", + // spawn two `dd`s that output 1 MiB of zeros (but no diagnostics output). One bunch of zeroes will + // go to stdout, the other one to stderr. + "/bin/dd 2>/dev/null bs=\(totalAmountInBytes) count=1 if=/dev/zero; " + + "/bin/dd >&2 2>/dev/null bs=\(totalAmountInBytes) count=1 if=/dev/zero; ", + ], + standardInput: EOFSequence(), + logger: self.logger + ) + try await withThrowingTaskGroup(of: ByteBuffer.self) { group in + group.addTask { + var accumulation = ByteBuffer() + accumulation.reserveCapacity(totalAmountInBytes) + + for try await chunk in await exe.standardOutput { + accumulation.writeImmutableBuffer(chunk) + } + + return accumulation + } + + group.addTask { + var accumulation = ByteBuffer() + accumulation.reserveCapacity(totalAmountInBytes) + + for try await chunk in await exe.standardError { + accumulation.writeImmutableBuffer(chunk) + } + + return accumulation + } + + let result = try await exe.run() + + // once for stdout, once for stderr + let stream1 = try await group.next() + let stream2 = try await group.next() + XCTAssertEqual(ByteBuffer(repeating: 0, count: totalAmountInBytes), stream1) + XCTAssertEqual(ByteBuffer(repeating: 0, count: totalAmountInBytes), stream2) + + XCTAssertEqual(.exit(0), result) + } + } + + func testInteractiveShell() async throws { + let input = AsyncStream.justMakeIt(elementType: ByteBuffer.self) + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", [], + standardInput: input.consumer, + logger: self.logger + ) + + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + var allOutput: [String] = [] + for try await (stream, line) in merge( + await exe.standardOutput.splitIntoLines(dropTerminator: true).map { ("stdout", $0) }, + await exe.standardError.splitIntoLines(dropTerminator: true).map { ("stderr", $0) } + ) { + let formattedOutput = "\(String(buffer: line)) [\(stream)]" + allOutput.append(formattedOutput) + } + + XCTAssertEqual( + [ + "hello stderr [stderr]", + "hello stdout [stdout]", + ], + allOutput.sorted() + ) + } + + group.addTask { + let result = try await exe.run() + XCTAssertEqual(.exit(0), result) + } + + input.producer.yield(ByteBuffer(string: "echo hello stdout\n")) + input.producer.yield(ByteBuffer(string: "echo >&2 hello stderr\n")) + input.producer.yield(ByteBuffer(string: "exit 0\n")) + input.producer.finish() + + try await group.waitForAll() + } + } + + func testEnvironmentVariables() async throws { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "echo $MY_VAR"], + environment: ["MY_VAR": "value of my var"], + standardInput: EOFSequence(), + logger: self.logger + ) + let all = try await exe.runGetAllOutput() + XCTAssertEqual(.exit(0), all.exitReason) + XCTAssertEqual("value of my var\n", String(buffer: all.standardOutput)) + XCTAssertEqual("", String(buffer: all.standardError)) + } + + func testStressTestVeryLittleOutput() async throws { + for _ in 0..<128 { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "echo x; echo >&2 y;"], + standardInput: EOFSequence(), + logger: self.logger + ) + let all = try await exe.runGetAllOutput() + XCTAssertEqual(.exit(0), all.exitReason) + XCTAssertEqual("x\n", String(buffer: all.standardOutput)) + XCTAssertEqual("y\n", String(buffer: all.standardError)) + } + } + + func testOutputWithoutNewlinesThatIsSplitIntoLines() async throws { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "/bin/echo -n x; /bin/echo >&2 -n y"], + standardInput: EOFSequence(), + logger: self.logger + ) + try await withThrowingTaskGroup(of: (String, ByteBuffer)?.self) { group in + group.addTask { + try await exe.run().throwIfNonZero() + return nil + } + group.addTask { + var things: [ByteBuffer] = [] + for try await chunk in await exe.standardOutput.splitIntoLines() { + things.append(chunk) + } + XCTAssertEqual(1, things.count) + return ("stdout", things.first.flatMap { $0 } ?? ByteBuffer(string: "n/a")) + } + group.addTask { + var things: [ByteBuffer?] = [] + for try await chunk in await exe.standardError.splitIntoLines() { + things.append(chunk) + } + XCTAssertEqual(1, things.count) + return ("stderr", things.first.flatMap { $0 } ?? ByteBuffer(string: "n/a")) + } + + let everything = try await Array(group).sorted { l, r in + guard let l else { + return true + } + guard let r else { + return false + } + return l.0 < r.0 + } + XCTAssertEqual( + [nil, "stderr", "stdout"], + everything.map { $0?.0 } + ) + + XCTAssertEqual( + [nil, ByteBuffer(string: "y"), ByteBuffer(string: "x")], + everything.map { $0?.1 } + ) + } + } + + func testDiscardingStdoutWorks() async throws { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/dd", + ["if=/dev/zero", "bs=\(1024 * 1024)", "count=1024", "status=none"], + standardInput: EOFSequence(), + standardOutput: .discard, + standardError: .stream, + logger: self.logger + ) + async let stderr = exe.standardError.pullAllOfIt() + try await exe.run().throwIfNonZero() + let stderrBytes = try await stderr + XCTAssertEqual(ByteBuffer(), stderrBytes) + } + + func testDiscardingStderrWorks() async throws { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=1024 status=none; echo OK"], + standardInput: EOFSequence(), + standardOutput: .stream, + standardError: .discard, + logger: self.logger + ) + async let stdout = exe.standardOutput.pullAllOfIt() + try await exe.run().throwIfNonZero() + let stdoutBytes = try await stdout + XCTAssertEqual(ByteBuffer(string: "OK\n"), stdoutBytes) + } + + func testStdoutToFileWorks() async throws { + let tempDir = URL(fileURLWithPath: NSTemporaryDirectory()) + .appendingPathComponent("AsyncProcessTests-\(getpid())-\(UUID())") + try FileManager.default.createDirectory(at: tempDir, withIntermediateDirectories: false) + defer { + XCTAssertNoThrow(try FileManager.default.removeItem(at: tempDir)) + } + + let file = tempDir.appendingPathComponent("file") + + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/dd", + ["if=/dev/zero", "bs=\(1024 * 1024)", "count=3", "status=none"], + standardInput: EOFSequence(), + standardOutput: .fileDescriptor( + takingOwnershipOf: try .open( + .init(file.path.removingPercentEncoding!), + .writeOnly, + options: .create, + permissions: [.ownerRead, .ownerWrite] + ) + ), + standardError: .stream, + logger: self.logger + ) + async let stderr = exe.standardError.pullAllOfIt() + try await exe.run().throwIfNonZero() + let stderrBytes = try await stderr + XCTAssertEqual(Data(repeating: 0, count: 3 * 1024 * 1024), try Data(contentsOf: file)) + XCTAssertEqual(ByteBuffer(), stderrBytes) + } + + func testStderrToFileWorks() async throws { + let tempDir = URL(fileURLWithPath: NSTemporaryDirectory()) + .appendingPathComponent("AsyncProcessTests-\(getpid())-\(UUID())") + try FileManager.default.createDirectory(at: tempDir, withIntermediateDirectories: false) + defer { + XCTAssertNoThrow(try FileManager.default.removeItem(at: tempDir)) + } + + let file = tempDir.appendingPathComponent("file") + + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=3 status=none; echo OK"], + standardInput: EOFSequence(), + standardOutput: .stream, + standardError: .fileDescriptor( + takingOwnershipOf: try! .open( + .init(file.path.removingPercentEncoding!), + .writeOnly, + options: .create, + permissions: [.ownerRead, .ownerWrite] + ) + ), + logger: self.logger + ) + async let stdout = exe.standardOutput.pullAllOfIt() + try await exe.run().throwIfNonZero() + let stdoutBytes = try await stdout + XCTAssertEqual(ByteBuffer(string: "OK\n"), stdoutBytes) + XCTAssertEqual(Data(repeating: 0, count: 3 * 1024 * 1024), try Data(contentsOf: file)) + } + + func testInheritingStdoutAndStderrWork() async throws { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "echo this is stdout; echo >&2 this is stderr"], + standardInput: EOFSequence(), + standardOutput: .inherit, + standardError: .inherit, + logger: self.logger + ) + try await exe.run().throwIfNonZero() + } + + func testDiscardingAndConsumingOutputYieldsAnError() async throws { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + ["-c", "echo this is stdout; echo >&2 this is stderr"], + standardInput: EOFSequence(), + standardOutput: .discard, + standardError: .discard, + logger: self.logger + ) + try await exe.run().throwIfNonZero() + var stdoutIterator = await exe.standardOutput.makeAsyncIterator() + var stderrIterator = await exe.standardError.makeAsyncIterator() + do { + _ = try await stdoutIterator.next() + XCTFail("expected this to throw") + } catch is IllegalStreamConsumptionError { + // OK + } + do { + _ = try await stderrIterator.next() + XCTFail("expected this to throw") + } catch is IllegalStreamConsumptionError { + // OK + } + } + + func testStressTestDiscardingOutput() async throws { + for _ in 0..<128 { + let exe = ProcessExecutor( + group: self.group, + executable: "/bin/sh", + [ + "-c", + "/bin/dd if=/dev/zero bs=\(1024 * 1024) count=1; /bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=1;", + ], + standardInput: EOFSequence(), + standardOutput: .discard, + standardError: .discard, + logger: self.logger + ) + try await exe.run().throwIfNonZero() + } + } + + func testLogOutputToMetadata() async throws { + let sharedRecorder = LogRecorderHandler() + var recordedLogger = Logger(label: "recorder", factory: { _ in sharedRecorder }) + recordedLogger.logLevel = .info // don't give us the normal messages + recordedLogger[metadataKey: "yo"] = "hey" + + try await ProcessExecutor.runLogOutput( + group: self.group, + executable: "/bin/sh", + ["-c", "echo 1; echo >&2 2; echo 3; echo >&2 4; echo 5; echo >&2 6; echo 7; echo >&2 8;"], + standardInput: EOFSequence(), + logger: recordedLogger, + logConfiguration: OutputLoggingSettings(logLevel: .critical, to: .metadata(logMessage: "msg", key: "key")) + ).throwIfNonZero() + XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.level == .critical }) + XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.message == "msg" }) + XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.metadata["key"] != nil }) + XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.metadata["yo"] == "hey" }) + let loggedLines = sharedRecorder.recordedMessages.compactMap { $0.metadata["key"]?.description }.sorted() + XCTAssertEqual(["1", "2", "3", "4", "5", "6", "7", "8"], loggedLines) + } + + func testLogOutputToMessage() async throws { + let sharedRecorder = LogRecorderHandler() + var recordedLogger = Logger(label: "recorder", factory: { _ in sharedRecorder }) + recordedLogger.logLevel = .info // don't give us the normal messages + recordedLogger[metadataKey: "yo"] = "hey" + + try await ProcessExecutor.runLogOutput( + group: self.group, + executable: "/bin/sh", + ["-c", "echo 1; echo >&2 2; echo 3; echo >&2 4; echo 5; echo >&2 6; echo 7; echo >&2 8;"], + standardInput: EOFSequence(), + logger: recordedLogger, + logConfiguration: OutputLoggingSettings(logLevel: .critical, to: .logMessage) + ).throwIfNonZero() + XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.level == .critical }) + XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.metadata["key"] == nil }) + XCTAssert(sharedRecorder.recordedMessages.allSatisfy { $0.metadata["yo"] == "hey" }) + let loggedLines = sharedRecorder.recordedMessages.map(\.message.description).sorted() + XCTAssertEqual(["1", "2", "3", "4", "5", "6", "7", "8"], loggedLines) + } + + func testProcessOutputByLine() async throws { + let collectedLines: NIOLockedValueBox<[(String, String)]> = NIOLockedValueBox([]) + try await ProcessExecutor.runProcessingOutput( + group: self.group, + executable: "/bin/sh", + [ + "-c", + """ + ( echo 1; echo >&2 2; echo 3; echo >&2 4; echo 5; echo >&2 6; echo 7; echo >&2 8; ) | \ + /bin/dd bs=1000 status=none + """, + ], + standardInput: EOFSequence(), + outputProcessor: { stream, line in + collectedLines.withLockedValue { collection in + collection.append((stream.description, String(buffer: line))) + } + }, + splitOutputIntoLines: true, + logger: self.logger + ).throwIfNonZero() + XCTAssertEqual( + ["1", "2", "3", "4", "5", "6", "7", "8"], + collectedLines.withLockedValue { $0.map(\.1) }.sorted() + ) + } + + func testProcessOutputInChunks() async throws { + let collectedBytes = ManagedAtomic(0) + try await ProcessExecutor.runProcessingOutput( + group: self.group, + executable: "/bin/dd", + ["if=/dev/zero", "bs=\(1024 * 1024)", "count=20", "status=none"], + standardInput: EOFSequence(), + outputProcessor: { stream, chunk in + XCTAssertEqual(stream, .standardOutput) + XCTAssert(chunk.withUnsafeReadableBytes { $0.allSatisfy { $0 == 0 } }) + collectedBytes.wrappingIncrement(by: chunk.readableBytes, ordering: .relaxed) + }, + splitOutputIntoLines: true, + logger: self.logger + ).throwIfNonZero() + XCTAssertEqual(20 * 1024 * 1024, collectedBytes.load(ordering: .relaxed)) + } + + func testBasicRunMethodWorks() async throws { + try await ProcessExecutor.run( + group: self.group, + executable: "/bin/dd", ["if=/dev/zero", "bs=\(1024 * 1024)", "count=100"], + standardInput: EOFSequence(), + logger: self.logger + ).throwIfNonZero() + } + + func testCollectJustStandardOutput() async throws { + let allInfo = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: "/bin/dd", ["if=/dev/zero", "bs=\(1024 * 1024)", "count=1"], + standardInput: EOFSequence(), + collectStandardOutput: true, + collectStandardError: false, + perStreamCollectionLimitBytes: 1024 * 1024, + logger: self.logger + ) + XCTAssertNoThrow(try allInfo.exitReason.throwIfNonZero()) + XCTAssertNil(allInfo.standardError) + XCTAssertEqual(ByteBuffer(repeating: 0, count: 1024 * 1024), allInfo.standardOutput) + } + + func testCollectJustStandardError() async throws { + let allInfo = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: "/bin/sh", ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=1 status=none"], + standardInput: EOFSequence(), + collectStandardOutput: false, + collectStandardError: true, + perStreamCollectionLimitBytes: 1024 * 1024, + logger: self.logger + ) + XCTAssertNoThrow(try allInfo.exitReason.throwIfNonZero()) + XCTAssertNil(allInfo.standardOutput) + XCTAssertEqual(ByteBuffer(repeating: 0, count: 1024 * 1024), allInfo.standardError) + } + + func testCollectNothing() async throws { + let allInfo = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: "/bin/sh", ["-c", "/bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=100 status=none"], + standardInput: EOFSequence(), + collectStandardOutput: false, + collectStandardError: false, + perStreamCollectionLimitBytes: 1024 * 1024, + logger: self.logger + ) + XCTAssertNoThrow(try allInfo.exitReason.throwIfNonZero()) + XCTAssertNil(allInfo.standardOutput) + XCTAssertNil(allInfo.standardError) + } + + func testCollectStdOutAndErr() async throws { + let allInfo = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: "/bin/sh", + [ + "-c", + """ + /bin/dd >&2 if=/dev/zero bs=\(1024 * 1024) count=1 status=none; + /bin/dd if=/dev/zero bs=100 count=1 status=none; + """, + ], + standardInput: EOFSequence(), + collectStandardOutput: true, + collectStandardError: true, + perStreamCollectionLimitBytes: 1024 * 1024, + logger: self.logger + ) + XCTAssertNoThrow(try allInfo.exitReason.throwIfNonZero()) + XCTAssertEqual(ByteBuffer(repeating: 0, count: 1024 * 1024), allInfo.standardError) + XCTAssertEqual(ByteBuffer(repeating: 0, count: 100), allInfo.standardOutput) + } + + func testTooMuchToCollectStdout() async throws { + do { + let result = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: "/bin/dd", ["if=/dev/zero", "bs=\(1024 * 1024)", "count=1"], + standardInput: EOFSequence(), + collectStandardOutput: true, + collectStandardError: false, + perStreamCollectionLimitBytes: 1024 * 1024 - 1, + logger: self.logger + ) + XCTFail("should've thrown but got result: \(result)") + } catch { + XCTAssertTrue(error is ProcessExecutor>.TooMuchProcessOutputError) + XCTAssertEqual( + ProcessOutputStream.standardOutput, + (error as? ProcessExecutor.TooMuchProcessOutputError)?.stream + ) + } + } + + func testTooMuchToCollectStderr() async throws { + do { + let result = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: "/bin/dd", + ["if=/dev/zero", "bs=\(1024 * 1024)", "of=/dev/stderr", "count=1", "status=none"], + standardInput: EOFSequence(), + collectStandardOutput: false, + collectStandardError: true, + perStreamCollectionLimitBytes: 1024 * 1024 - 1, + logger: self.logger + ) + XCTFail("should've thrown but got result: \(result)") + } catch { + XCTAssertTrue(error is ProcessExecutor>.TooMuchProcessOutputError) + XCTAssertEqual( + ProcessOutputStream.standardError, + (error as? ProcessExecutor.TooMuchProcessOutputError)?.stream + ) + } + } + + func testCollectEmptyStringFromStdoutAndErr() async throws { + let allInfo = try await ProcessExecutor.runCollectingOutput( + group: self.group, + executable: "/bin/sh", + ["-c", ""], + standardInput: EOFSequence(), + collectStandardOutput: true, + collectStandardError: true, + perStreamCollectionLimitBytes: 1024 * 1024, + logger: self.logger + ) + XCTAssertNoThrow(try allInfo.exitReason.throwIfNonZero()) + XCTAssertEqual(ByteBuffer(), allInfo.standardError) + XCTAssertEqual(ByteBuffer(), allInfo.standardOutput) + } + + // MARK: - Setup/teardown + + override func setUp() async throws { + self.group = MultiThreadedEventLoopGroup(numberOfThreads: 3) + self.logger = Logger(label: "test", factory: { _ in SwiftLogNoOpLogHandler() }) + } + + override func tearDown() { + self.logger = nil + + XCTAssertNoThrow(try self.group.syncShutdownGracefully()) + self.group = nil + } +} + +extension AsyncStream { + static func justMakeIt(elementType: Element.Type = Element.self) -> ( + consumer: AsyncStream, producer: AsyncStream.Continuation + ) { + var _producer: AsyncStream.Continuation? + let stream = AsyncStream { producer in + _producer = producer + } + + return (stream, _producer!) + } +} + +extension AsyncSequence where Element == ByteBuffer { + func pullAllOfIt() async throws -> ByteBuffer { + var buffer: ByteBuffer? + for try await chunk in self { + buffer.setOrWriteImmutableBuffer(chunk) + } + return buffer ?? ByteBuffer() + } +} + +extension ProcessExecutor { + struct AllOfAProcess: Sendable { + var exitReason: ProcessExitReason + var standardOutput: ByteBuffer + var standardError: ByteBuffer + } + + private enum What { + case exit(ProcessExitReason) + case stdout(ByteBuffer) + case stderr(ByteBuffer) + } + + func runGetAllOutput() async throws -> AllOfAProcess { + try await withThrowingTaskGroup(of: What.self, returning: AllOfAProcess.self) { group in + group.addTask { + .exit(try await self.run()) + } + group.addTask { + .stdout(try await self.standardOutput.pullAllOfIt()) + } + group.addTask { + .stderr(try await self.standardError.pullAllOfIt()) + } + + var exitReason: ProcessExitReason? + var stdout: ByteBuffer? + var stderr: ByteBuffer? + + while let next = try await group.next() { + switch next { + case let .exit(value): + exitReason = value + case let .stderr(value): + stderr = value + case let .stdout(value): + stdout = value + } + } + + return AllOfAProcess(exitReason: exitReason!, standardOutput: stdout!, standardError: stderr!) + } + } +} diff --git a/Tests/ArchitectureMappingTests.swift b/Tests/SwiftSDKGeneratorTests/ArchitectureMappingTests.swift similarity index 100% rename from Tests/ArchitectureMappingTests.swift rename to Tests/SwiftSDKGeneratorTests/ArchitectureMappingTests.swift diff --git a/Tests/EndToEndTests.swift b/Tests/SwiftSDKGeneratorTests/EndToEndTests.swift similarity index 100% rename from Tests/EndToEndTests.swift rename to Tests/SwiftSDKGeneratorTests/EndToEndTests.swift