Skip to content

Commit

Permalink
Merge pull request #14 from AsyncCommunity/improve/handle-error-withL…
Browse files Browse the repository at this point in the history
…atestFrom

operators: handle errors in other for withLatestFrom
  • Loading branch information
twittemb committed Mar 13, 2022
2 parents 0aafcc4 + db10b3f commit 6d3d520
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 8 deletions.
3 changes: 2 additions & 1 deletion Sources/Operators/AsyncSequence+SwitchToLatest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public extension AsyncSequence where Element: AsyncSequence {
/// // will print:
/// a3, b3
/// ```
/// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration over the upstream sequence (nil by default)
/// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration
/// over the upstream sequence (nil by default)
///
/// - Returns: The async sequence that republishes elements sent by the most recently received async sequence.
func switchToLatest(upstreamPriority: TaskPriority? = nil) -> AsyncSwitchToLatestSequence<Self> {
Expand Down
23 changes: 17 additions & 6 deletions Sources/Operators/AsyncSequence+WithLatestFrom.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public extension AsyncSequence {
///
/// - returns: An async sequence emitting the result of combining each value of the self
/// with the latest value from the other sequence. If the other sequence finishes, the returned sequence
/// will finish with the next value from self.
/// will finish with the next value from self. if the other sequence fails, the returned sequence will fail
/// with the next value from self.
///
func withLatestFrom<OtherAsyncSequence: AsyncSequence>(
_ other: OtherAsyncSequence,
Expand Down Expand Up @@ -77,6 +78,8 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,

final class OtherIteratorManager {
var otherElement: OtherAsyncSequence.Element?
var otherError: Error?

var otherIterator: OtherAsyncSequence.AsyncIterator
var hasStarted = false

Expand All @@ -98,12 +101,16 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,
self.otherElement = try await self.otherIterator.next()

Task(priority: self.otherPriority) { [weak self] in
while let element = try await self?.otherIterator.next() {
guard !Task.isCancelled else { break }

self?.otherElement = element
do {
while let element = try await self?.otherIterator.next() {
guard !Task.isCancelled else { break }

self?.otherElement = element
}
self?.otherElement = nil
} catch {
self?.otherError = error
}
self?.otherElement = nil
}
}
}
Expand Down Expand Up @@ -132,6 +139,10 @@ public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence,
let upstreamElement = try await self.upstreamAsyncIterator.next()
let otherElement = self.otherIteratorManager.otherElement

if let otherError = self.otherIteratorManager.otherError {
throw otherError
}

guard let nonNilUpstreamElement = upstreamElement,
let nonNilOtherElement = otherElement else {
return nil
Expand Down
53 changes: 52 additions & 1 deletion Tests/Operators/AsyncSequence+WithLatestFromTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ final class AsyncSequence_WithLatestFromTests: XCTestCase {
wait(for: [sutIsFinishedExpectation], timeout: 2)
}

func test_withLatestFrom_finishes_if_upstream_finishes() async throws {
func test_withLatestFrom_finishes_when_upstream_finishes() async throws {
let upstream = AsyncStreams.CurrentValue<Int>(1)
let other = AsyncStreams.CurrentValue<String>("1")

Expand Down Expand Up @@ -125,4 +125,55 @@ final class AsyncSequence_WithLatestFromTests: XCTestCase {

wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished
}

func test_withLatestFrom_throws_when_other_throws() {
let canSendErrorInOtherSequenceExpectation = expectation(description: "The first element has been emitted by the upstrean sequence")
let secondElementSentInUpstreamExpectation = expectation(description: "2 has been sent in the upstream sequence")
let otherHasFailedExpectation = expectation(description: "The other sequence has failed")
let sutHasFailedExpectation = expectation(description: "The sut has failed")

let upstream = AsyncStreams.CurrentValue<Int>(1)
let other = AsyncStreams.CurrentValue<String>("1")

let sut = upstream.withLatestFrom(other, otherPriority: .high)

// monitoring the other sequence's error
Task(priority: .high) {
do {
try await other.collect { _ in }
} catch {
otherHasFailedExpectation.fulfill()
}
}

Task(priority: .low) {
var firstElement: (Int, String)?
do {
for try await element in sut {
firstElement = element
if firstElement?.0 == 1 {
// first element is received, make other fail
canSendErrorInOtherSequenceExpectation.fulfill()
wait(for: [secondElementSentInUpstreamExpectation], timeout: 5)
}
}
} catch {
XCTAssertEqual(firstElement?.0, 1)
XCTAssertEqual(firstElement?.1, "1")
sutHasFailedExpectation.fulfill()
}
}

wait(for: [canSendErrorInOtherSequenceExpectation], timeout: 5) // one element has been emitted, we can send error in the other seq

other.send(termination: .failure(NSError(domain: "", code: 1)))

wait(for: [otherHasFailedExpectation], timeout: 5)

upstream.send(2)

secondElementSentInUpstreamExpectation.fulfill() // we can release the lock

wait(for: [sutHasFailedExpectation], timeout: 5) // task has been cancelled and has finished
}
}

0 comments on commit 6d3d520

Please sign in to comment.