Skip to content

Commit

Permalink
operators: handle errors in other for withLatestFrom
Browse files Browse the repository at this point in the history
  • Loading branch information
Thibault Wittemberg committed Mar 13, 2022
1 parent 0aafcc4 commit 3c41837
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 7 deletions.
3 changes: 2 additions & 1 deletion Sources/Operators/AsyncSequence+SwitchToLatest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public extension AsyncSequence where Element: AsyncSequence {
/// // will print:
/// a3, b3
/// ```
/// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration over the upstream sequence (nil by default)
/// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration
/// over the upstream sequence (nil by default)
///
/// - Returns: The async sequence that republishes elements sent by the most recently received async sequence.
func switchToLatest(upstreamPriority: TaskPriority? = nil) -> AsyncSwitchToLatestSequence<Self> {
Expand Down
24 changes: 18 additions & 6 deletions Sources/Operators/AsyncSequence+WithLatestFrom.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public extension AsyncSequence {
///
/// - returns: An async sequence emitting the result of combining each value of the self
/// with the latest value from the other sequence. If the other sequence finishes, the returned sequence
/// will finish with the next value from self.
/// will finish with the next value from self. if the other sequence fails, the returned sequence will fail
/// with the next value from self.
///
func withLatestFrom<OtherAsyncSequence: AsyncSequence>(
_ other: OtherAsyncSequence,
Expand Down Expand Up @@ -77,6 +78,8 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,

final class OtherIteratorManager {
var otherElement: OtherAsyncSequence.Element?
var otherError: Error?

var otherIterator: OtherAsyncSequence.AsyncIterator
var hasStarted = false

Expand All @@ -98,12 +101,16 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,
self.otherElement = try await self.otherIterator.next()

Task(priority: self.otherPriority) { [weak self] in
while let element = try await self?.otherIterator.next() {
guard !Task.isCancelled else { break }

self?.otherElement = element
do {
while let element = try await self?.otherIterator.next() {
guard !Task.isCancelled else { break }

self?.otherElement = element
}
self?.otherElement = nil
} catch {
self?.otherError = error
}
self?.otherElement = nil
}
}
}
Expand Down Expand Up @@ -132,6 +139,11 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,
let upstreamElement = try await self.upstreamAsyncIterator.next()
let otherElement = self.otherIteratorManager.otherElement

if let otherError = self.otherIteratorManager.otherError {
print("other is error")
throw otherError
}

guard let nonNilUpstreamElement = upstreamElement,
let nonNilOtherElement = otherElement else {
return nil
Expand Down
49 changes: 49 additions & 0 deletions Tests/Operators/AsyncSequence+WithLatestFromTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,53 @@ final class AsyncSequence_WithLatestFromTests: XCTestCase {

wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished
}

func test_withLatestFrom_throws_when_other_throws() {
let canSendErrorInOtherExpectation = expectation(description: "The first element has been emitted by the upstrean sequence")
let secondElementSentInUpstreamExpectation = expectation(description: "2 has been sent in the upstream sequence")
let otherHasFailedExpectation = expectation(description: "the other sequence has failed")
let sutHasFailedExpectation = expectation(description: "The sut has failed")

let upstream = AsyncStreams.CurrentValue<Int>(1)
let other = AsyncStreams.CurrentValue<String>("1")

let sut = upstream.withLatestFrom(other)

// monitoring the other sequence's error
Task(priority: .high) {
do {
try await other.collect { _ in }
} catch {
otherHasFailedExpectation.fulfill()
}

}

Task {
var firstElement: (Int, String)?
do {
for try await element in sut {
firstElement = element
canSendErrorInOtherExpectation.fulfill()
wait(for: [secondElementSentInUpstreamExpectation], timeout: 5)
}
} catch {
XCTAssertEqual(firstElement?.0, 1)
XCTAssertEqual(firstElement?.1, "1")
sutHasFailedExpectation.fulfill()
}
}

wait(for: [canSendErrorInOtherExpectation], timeout: 5) // one element has been emitted, we can send error in the other seq

other.send(termination: .failure(NSError(domain: "", code: 1)))

wait(for: [otherHasFailedExpectation], timeout: 5)

upstream.send(2)

secondElementSentInUpstreamExpectation.fulfill() // we can release the lock in the for loop

wait(for: [sutHasFailedExpectation], timeout: 5) // task has been cancelled and has finished
}
}

0 comments on commit 3c41837

Please sign in to comment.