From 26a089e414b87e5f81ad2b51154bcd3d9b7a55a4 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Wed, 11 Jan 2023 12:59:50 +0000 Subject: [PATCH 1/2] Implement convenience factory methods for Async[Throwing]Stream This is the implementation for https://github.com/apple/swift-evolution/pull/1824 --- stdlib/public/Concurrency/AsyncStream.swift | 20 +++++++++ .../Concurrency/AsyncThrowingStream.swift | 22 ++++++++++ test/Concurrency/Runtime/async_stream.swift | 44 +++++++++++++------ 3 files changed, 72 insertions(+), 14 deletions(-) diff --git a/stdlib/public/Concurrency/AsyncStream.swift b/stdlib/public/Concurrency/AsyncStream.swift index 6e65cab5ca724..0687a8a918290 100644 --- a/stdlib/public/Concurrency/AsyncStream.swift +++ b/stdlib/public/Concurrency/AsyncStream.swift @@ -428,6 +428,26 @@ extension AsyncStream.Continuation { } } +@available(SwiftStdlib 5.1, *) +extension AsyncStream { + /// Initializes a new ``AsyncStream`` and an ``AsyncStream/Continuation``. + /// + /// - Parameters: + /// - elementType: The element type of the stream. + /// - limit: The buffering policy that the stream should use. + /// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the + /// producer while the stream should be passed to the consumer. + @backDeployed(before: SwiftStdlib 5.9) + public static func makeStream( + of elementType: Element.Type = Element.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncStream, continuation: AsyncStream.Continuation) { + var continuation: AsyncStream.Continuation! + let stream = AsyncStream(bufferingPolicy: limit) { continuation = $0 } + return (stream: stream, continuation: continuation!) + } +} + @available(SwiftStdlib 5.1, *) extension AsyncStream: @unchecked Sendable where Element: Sendable { } #else diff --git a/stdlib/public/Concurrency/AsyncThrowingStream.swift b/stdlib/public/Concurrency/AsyncThrowingStream.swift index dfbfea5085790..2d25dbef88328 100644 --- a/stdlib/public/Concurrency/AsyncThrowingStream.swift +++ b/stdlib/public/Concurrency/AsyncThrowingStream.swift @@ -473,6 +473,28 @@ extension AsyncThrowingStream.Continuation { } } +@available(SwiftStdlib 5.1, *) +extension AsyncThrowingStream { + /// Initializes a new ``AsyncThrowingStream`` and an ``AsyncThrowingStream/Continuation``. + /// + /// - Parameters: + /// - elementType: The element type of the stream. + /// - failureType: The failure type of the stream. + /// - limit: The buffering policy that the stream should use. + /// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the + /// producer while the stream should be passed to the consumer. + @backDeployed(before: SwiftStdlib 5.9) + public static func makeStream( + of elementType: Element.Type = Element.self, + throwing failureType: Failure.Type = Failure.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: AsyncThrowingStream, continuation: AsyncThrowingStream.Continuation) where Failure == Error { + var continuation: AsyncThrowingStream.Continuation! + let stream = AsyncThrowingStream(bufferingPolicy: limit) { continuation = $0 } + return (stream: stream, continuation: continuation!) + } +} + @available(SwiftStdlib 5.1, *) extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { } #else diff --git a/test/Concurrency/Runtime/async_stream.swift b/test/Concurrency/Runtime/async_stream.swift index ffdc42577dd0a..72e9cc0fed025 100644 --- a/test/Concurrency/Runtime/async_stream.swift +++ b/test/Concurrency/Runtime/async_stream.swift @@ -7,12 +7,8 @@ // rdar://78109470 // UNSUPPORTED: back_deployment_runtime -// Race condition -// REQUIRES: rdar78033828 - import _Concurrency import StdlibUnittest -import Dispatch struct SomeError: Error, Equatable { var value = Int.random(in: 0..<100) @@ -27,14 +23,34 @@ var tests = TestSuite("AsyncStream") var fulfilled = false } + tests.test("factory method") { + let (stream, continuation) = AsyncStream.makeStream(of: String.self) + continuation.yield("hello") + + var iterator = stream.makeAsyncIterator() + expectEqual(await iterator.next(), "hello") + } + + tests.test("throwing factory method") { + let (stream, continuation) = AsyncThrowingStream.makeStream(of: String.self, throwing: Error.self) + continuation.yield("hello") + + var iterator = stream.makeAsyncIterator() + do { + expectEqual(try await iterator.next(), "hello") + } catch { + expectUnreachable("unexpected error thrown") + } + } + tests.test("yield with no awaiting next") { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in continuation.yield("hello") } } tests.test("yield with no awaiting next throwing") { - let series = AsyncThrowingStream(String.self) { continuation in + _ = AsyncThrowingStream(String.self) { continuation in continuation.yield("hello") } } @@ -122,7 +138,7 @@ var tests = TestSuite("AsyncStream") do { expectEqual(try await iterator.next(), "hello") expectEqual(try await iterator.next(), "world") - try await iterator.next() + _ = try await iterator.next() expectUnreachable("expected thrown error") } catch { if let failure = error as? SomeError { @@ -134,7 +150,7 @@ var tests = TestSuite("AsyncStream") } tests.test("yield with no awaiting next detached") { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in detach { continuation.yield("hello") } @@ -142,7 +158,7 @@ var tests = TestSuite("AsyncStream") } tests.test("yield with no awaiting next detached throwing") { - let series = AsyncThrowingStream(String.self) { continuation in + _ = AsyncThrowingStream(String.self) { continuation in detach { continuation.yield("hello") } @@ -246,7 +262,7 @@ var tests = TestSuite("AsyncStream") do { expectEqual(try await iterator.next(), "hello") expectEqual(try await iterator.next(), "world") - try await iterator.next() + _ = try await iterator.next() expectUnreachable("expected thrown error") } catch { if let failure = error as? SomeError { @@ -337,7 +353,7 @@ var tests = TestSuite("AsyncStream") let expectation = Expectation() func scopedLifetime(_ expectation: Expectation) { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in continuation.onTermination = { @Sendable _ in expectation.fulfilled = true } } } @@ -351,7 +367,7 @@ var tests = TestSuite("AsyncStream") let expectation = Expectation() func scopedLifetime(_ expectation: Expectation) { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in continuation.onTermination = { @Sendable _ in expectation.fulfilled = true } continuation.finish() } @@ -366,7 +382,7 @@ var tests = TestSuite("AsyncStream") let expectation = Expectation() func scopedLifetime(_ expectation: Expectation) { - let series = AsyncStream(String.self) { continuation in + _ = AsyncStream(String.self) { continuation in continuation.onTermination = { @Sendable terminal in switch terminal { case .cancelled: @@ -386,7 +402,7 @@ var tests = TestSuite("AsyncStream") let expectation = Expectation() func scopedLifetime(_ expectation: Expectation) { - let series = AsyncThrowingStream(String.self) { continuation in + _ = AsyncThrowingStream(String.self) { continuation in continuation.onTermination = { @Sendable terminal in switch terminal { case .cancelled: From 3e0181817f9bdfb66c4ab540386594d640d96e22 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Sat, 4 Mar 2023 15:17:48 +0000 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Konrad `ktoso` Malawski --- stdlib/public/Concurrency/AsyncStream.swift | 1 + stdlib/public/Concurrency/AsyncThrowingStream.swift | 1 + 2 files changed, 2 insertions(+) diff --git a/stdlib/public/Concurrency/AsyncStream.swift b/stdlib/public/Concurrency/AsyncStream.swift index 0687a8a918290..eb2b5c56a57c0 100644 --- a/stdlib/public/Concurrency/AsyncStream.swift +++ b/stdlib/public/Concurrency/AsyncStream.swift @@ -437,6 +437,7 @@ extension AsyncStream { /// - limit: The buffering policy that the stream should use. /// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the /// producer while the stream should be passed to the consumer. + @available(SwiftStdlib 5.1, *) @backDeployed(before: SwiftStdlib 5.9) public static func makeStream( of elementType: Element.Type = Element.self, diff --git a/stdlib/public/Concurrency/AsyncThrowingStream.swift b/stdlib/public/Concurrency/AsyncThrowingStream.swift index 2d25dbef88328..c0c48870eb735 100644 --- a/stdlib/public/Concurrency/AsyncThrowingStream.swift +++ b/stdlib/public/Concurrency/AsyncThrowingStream.swift @@ -483,6 +483,7 @@ extension AsyncThrowingStream { /// - limit: The buffering policy that the stream should use. /// - Returns: A tuple containing the stream and its continuation. The continuation should be passed to the /// producer while the stream should be passed to the consumer. + @available(SwiftStdlib 5.1, *) @backDeployed(before: SwiftStdlib 5.9) public static func makeStream( of elementType: Element.Type = Element.self,