From db10b3f667ef152e02f219762278822b1e7e64a0 Mon Sep 17 00:00:00 2001 From: Thibault Wittemberg Date: Sun, 13 Mar 2022 13:33:15 +0100 Subject: [PATCH] operators: handle errors in other for withLatestFrom --- .../AsyncSequence+SwitchToLatest.swift | 3 +- .../AsyncSequence+WithLatestFrom.swift | 23 +++++--- .../AsyncSequence+WithLatestFromTests.swift | 53 ++++++++++++++++++- 3 files changed, 71 insertions(+), 8 deletions(-) diff --git a/Sources/Operators/AsyncSequence+SwitchToLatest.swift b/Sources/Operators/AsyncSequence+SwitchToLatest.swift index 19568be..de282ec 100644 --- a/Sources/Operators/AsyncSequence+SwitchToLatest.swift +++ b/Sources/Operators/AsyncSequence+SwitchToLatest.swift @@ -22,7 +22,8 @@ public extension AsyncSequence where Element: AsyncSequence { /// // will print: /// a3, b3 /// ``` - /// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration over the upstream sequence (nil by default) + /// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration + /// over the upstream sequence (nil by default) /// /// - Returns: The async sequence that republishes elements sent by the most recently received async sequence. func switchToLatest(upstreamPriority: TaskPriority? = nil) -> AsyncSwitchToLatestSequence { diff --git a/Sources/Operators/AsyncSequence+WithLatestFrom.swift b/Sources/Operators/AsyncSequence+WithLatestFrom.swift index 2a2a1bd..a715e20 100644 --- a/Sources/Operators/AsyncSequence+WithLatestFrom.swift +++ b/Sources/Operators/AsyncSequence+WithLatestFrom.swift @@ -35,7 +35,8 @@ public extension AsyncSequence { /// /// - returns: An async sequence emitting the result of combining each value of the self /// with the latest value from the other sequence. If the other sequence finishes, the returned sequence - /// will finish with the next value from self. + /// will finish with the next value from self. if the other sequence fails, the returned sequence will fail + /// with the next value from self. /// func withLatestFrom( _ other: OtherAsyncSequence, @@ -77,6 +78,8 @@ public struct AsyncWithLatestFromSequence(1) let other = AsyncStreams.CurrentValue("1") @@ -125,4 +125,55 @@ final class AsyncSequence_WithLatestFromTests: XCTestCase { wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished } + + func test_withLatestFrom_throws_when_other_throws() { + let canSendErrorInOtherSequenceExpectation = expectation(description: "The first element has been emitted by the upstrean sequence") + let secondElementSentInUpstreamExpectation = expectation(description: "2 has been sent in the upstream sequence") + let otherHasFailedExpectation = expectation(description: "The other sequence has failed") + let sutHasFailedExpectation = expectation(description: "The sut has failed") + + let upstream = AsyncStreams.CurrentValue(1) + let other = AsyncStreams.CurrentValue("1") + + let sut = upstream.withLatestFrom(other, otherPriority: .high) + + // monitoring the other sequence's error + Task(priority: .high) { + do { + try await other.collect { _ in } + } catch { + otherHasFailedExpectation.fulfill() + } + } + + Task(priority: .low) { + var firstElement: (Int, String)? + do { + for try await element in sut { + firstElement = element + if firstElement?.0 == 1 { + // first element is received, make other fail + canSendErrorInOtherSequenceExpectation.fulfill() + wait(for: [secondElementSentInUpstreamExpectation], timeout: 5) + } + } + } catch { + XCTAssertEqual(firstElement?.0, 1) + XCTAssertEqual(firstElement?.1, "1") + sutHasFailedExpectation.fulfill() + } + } + + wait(for: [canSendErrorInOtherSequenceExpectation], timeout: 5) // one element has been emitted, we can send error in the other seq + + other.send(termination: .failure(NSError(domain: "", code: 1))) + + wait(for: [otherHasFailedExpectation], timeout: 5) + + upstream.send(2) + + secondElementSentInUpstreamExpectation.fulfill() // we can release the lock + + wait(for: [sutHasFailedExpectation], timeout: 5) // task has been cancelled and has finished + } }