diff --git a/README.md b/README.md index ab0bc05..52fb227 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ AsyncSequences * [Assign](#Assign) * [Multicast](#Multicast) * [Share](#Share) +* [WithLatestFrom](#WithLatestFrom) * [EraseToAnyAsyncSequence](#EraseToAnyAsyncSequence) More operators and extensions are to come. Pull requests are of course welcome. @@ -485,6 +486,33 @@ Task { // Stream 1 received: ("Third", 61) ``` +### WithLatestFrom + +`withLatestFrom(_:)` merges two async sequences into a single one by combining each value +from self with the latest value from the other sequence, if any. + +``` +let seq1 = AsyncStreams.CurrentValue(1) +let seq2 = AsyncStreams.CurrentValue("1") + +let combinedSeq = seq1.withLatestFrom(seq2) + +Task { + for try await element in combinedSeq { + print(element) + } +} + +seq1.send(2) +seq2.send("2") +seq1.send(3) + +// will print: +(1, "1") +(2, "1") +(3, "2") +``` + ### EraseToAnyAsyncSequence `eraseToAnyAsyncSequence()` type-erases the async sequence into an AnyAsyncSequence. diff --git a/Sources/Operators/AsyncSequence+SwitchToLatest.swift b/Sources/Operators/AsyncSequence+SwitchToLatest.swift index 1488e2e..19568be 100644 --- a/Sources/Operators/AsyncSequence+SwitchToLatest.swift +++ b/Sources/Operators/AsyncSequence+SwitchToLatest.swift @@ -22,10 +22,11 @@ public extension AsyncSequence where Element: AsyncSequence { /// // will print: /// a3, b3 /// ``` + /// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration over the upstream sequence (nil by default) /// /// - Returns: The async sequence that republishes elements sent by the most recently received async sequence. - func switchToLatest() -> AsyncSwitchToLatestSequence { - AsyncSwitchToLatestSequence(self) + func switchToLatest(upstreamPriority: TaskPriority? = nil) -> AsyncSwitchToLatestSequence { + AsyncSwitchToLatestSequence(self, upstreamPriority: upstreamPriority) } } @@ -34,13 +35,21 @@ public struct AsyncSwitchToLatestSequence: public typealias AsyncIterator = Iterator let upstreamAsyncSequence: UpstreamAsyncSequence + let upstreamPriority: TaskPriority? - public init(_ upstreamAsyncSequence: UpstreamAsyncSequence) { + public init( + _ upstreamAsyncSequence: UpstreamAsyncSequence, + upstreamPriority: TaskPriority? + ) { self.upstreamAsyncSequence = upstreamAsyncSequence + self.upstreamPriority = upstreamPriority } public func makeAsyncIterator() -> AsyncIterator { - Iterator(upstreamIterator: self.upstreamAsyncSequence.makeAsyncIterator()) + Iterator( + upstreamIterator: self.upstreamAsyncSequence.makeAsyncIterator(), + upstreamPriority: self.upstreamPriority + ) } final class UpstreamIteratorManager { @@ -49,10 +58,15 @@ public struct AsyncSwitchToLatestSequence: var hasStarted = false var currentTask: Task? + let upstreamPriority: TaskPriority? let serialQueue = DispatchQueue(label: UUID().uuidString) - init(upstreamIterator: UpstreamAsyncSequence.AsyncIterator) { + init( + upstreamIterator: UpstreamAsyncSequence.AsyncIterator, + upstreamPriority: TaskPriority? + ) { self.upstreamIterator = upstreamIterator + self.upstreamPriority = upstreamPriority } func setCurrentTask(task: Task) { @@ -70,7 +84,7 @@ public struct AsyncSwitchToLatestSequence: } } - Task { [weak self] in + Task(priority: self.upstreamPriority) { [weak self] in while let nextChildSequence = try await self?.upstreamIterator.next() { self?.serialQueue.async { [weak self] in self?.childIterators.removeFirst() @@ -92,8 +106,14 @@ public struct AsyncSwitchToLatestSequence: public struct Iterator: AsyncIteratorProtocol { let upstreamIteratorManager: UpstreamIteratorManager - init(upstreamIterator: UpstreamAsyncSequence.AsyncIterator) { - self.upstreamIteratorManager = UpstreamIteratorManager(upstreamIterator: upstreamIterator) + init( + upstreamIterator: UpstreamAsyncSequence.AsyncIterator, + upstreamPriority: TaskPriority? + ) { + self.upstreamIteratorManager = UpstreamIteratorManager( + upstreamIterator: upstreamIterator, + upstreamPriority: upstreamPriority + ) } public mutating func next() async throws -> Element? { diff --git a/Sources/Operators/AsyncSequence+WithLatestFrom.swift b/Sources/Operators/AsyncSequence+WithLatestFrom.swift new file mode 100644 index 0000000..2a2a1bd --- /dev/null +++ b/Sources/Operators/AsyncSequence+WithLatestFrom.swift @@ -0,0 +1,143 @@ +// +// AsyncSequence+WithLatestFrom.swift +// +// +// Created by Thibault Wittemberg on 07/03/2022. +// + +public extension AsyncSequence { + /// Merges two AsyncSequences into a single one by combining each value + /// from self with the latest value from the other sequence, if any. + /// + /// ``` + /// let seq1 = AsyncStreams.CurrentValue(1) + /// let seq2 = AsyncStreams.CurrentValue("1") + /// + /// let combinedSeq = seq1.withLatestFrom(seq2) + /// + /// Task { + /// for try await element in combinedSeq { + /// print(element) + /// } + /// } + /// + /// seq1.send(2) + /// seq2.send("2") + /// seq1.send(3) + /// + /// // will print: + /// (1, "1") + /// (2, "1") + /// (3, "2") + /// ``` + /// + /// - parameter other: the other async sequence + /// + /// - returns: An async sequence emitting the result of combining each value of the self + /// with the latest value from the other sequence. If the other sequence finishes, the returned sequence + /// will finish with the next value from self. + /// + func withLatestFrom( + _ other: OtherAsyncSequence, + otherPriority: TaskPriority? = nil + ) -> AsyncWithLatestFromSequence { + AsyncWithLatestFromSequence( + self, + other: other, + otherPriority: otherPriority + ) + } +} + +public struct AsyncWithLatestFromSequence: AsyncSequence { + public typealias Element = (UpstreamAsyncSequence.Element, OtherAsyncSequence.Element) + public typealias AsyncIterator = Iterator + + let upstreamAsyncSequence: UpstreamAsyncSequence + let otherAsyncSequence: OtherAsyncSequence + let otherPriority: TaskPriority? + + public init( + _ upstreamAsyncSequence: UpstreamAsyncSequence, + other otherAsyncSequence: OtherAsyncSequence, + otherPriority: TaskPriority? = nil + ) { + self.upstreamAsyncSequence = upstreamAsyncSequence + self.otherAsyncSequence = otherAsyncSequence + self.otherPriority = otherPriority + } + + public func makeAsyncIterator() -> AsyncIterator { + Iterator( + self.upstreamAsyncSequence.makeAsyncIterator(), + other: self.otherAsyncSequence.makeAsyncIterator(), + otherPriority: self.otherPriority + ) + } + + final class OtherIteratorManager { + var otherElement: OtherAsyncSequence.Element? + var otherIterator: OtherAsyncSequence.AsyncIterator + var hasStarted = false + + let otherPriority: TaskPriority? + + init( + otherIterator: OtherAsyncSequence.AsyncIterator, + otherPriority: TaskPriority? + ) { + self.otherIterator = otherIterator + self.otherPriority = otherPriority + } + + /// iterates over the other sequence and track its current value + func startOtherIterator() async throws { + guard !self.hasStarted else { return } + self.hasStarted = true + + self.otherElement = try await self.otherIterator.next() + + Task(priority: self.otherPriority) { [weak self] in + while let element = try await self?.otherIterator.next() { + guard !Task.isCancelled else { break } + + self?.otherElement = element + } + self?.otherElement = nil + } + } + } + + public struct Iterator: AsyncIteratorProtocol { + var upstreamAsyncIterator: UpstreamAsyncSequence.AsyncIterator + let otherIteratorManager: OtherIteratorManager + + init( + _ upstreamAsyncIterator: UpstreamAsyncSequence.AsyncIterator, + other otherAsyncIterator: OtherAsyncSequence.AsyncIterator, + otherPriority: TaskPriority? + ) { + self.upstreamAsyncIterator = upstreamAsyncIterator + self.otherIteratorManager = OtherIteratorManager( + otherIterator: otherAsyncIterator, + otherPriority: otherPriority + ) + } + + public mutating func next() async throws -> Element? { + guard !Task.isCancelled else { return nil } + + try await self.otherIteratorManager.startOtherIterator() + + let upstreamElement = try await self.upstreamAsyncIterator.next() + let otherElement = self.otherIteratorManager.otherElement + + guard let nonNilUpstreamElement = upstreamElement, + let nonNilOtherElement = otherElement else { + return nil + } + + return (nonNilUpstreamElement, nonNilOtherElement) + } + } +} diff --git a/Tests/Operators/AsyncSequence+SwitchToLatestTests.swift b/Tests/Operators/AsyncSequence+SwitchToLatestTests.swift index c25811a..f563d5f 100644 --- a/Tests/Operators/AsyncSequence+SwitchToLatestTests.swift +++ b/Tests/Operators/AsyncSequence+SwitchToLatestTests.swift @@ -60,7 +60,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase { interval: .milliSeconds(30), onCancel: {}) - let sut = mainAsyncSequence.switchToLatest() + let sut = mainAsyncSequence.switchToLatest(upstreamPriority: .high) var receivedElements = [Int]() let expectedElements = [0, 4, 8, 9, 10, 11] @@ -81,7 +81,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase { LongAsyncSequence(elements: [1], onCancel: {}).eraseToAnyAsyncSequence(), AsyncSequences.Fail(error: expectedError).eraseToAnyAsyncSequence()].asyncElements - let sut = sourceSequence.switchToLatest() + let sut = sourceSequence.switchToLatest(upstreamPriority: .high) do { for try await _ in sut {} @@ -101,7 +101,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase { interval: .milliSeconds(50), onCancel: {} )} - let sut = mappedSequence.switchToLatest() + let sut = mappedSequence.switchToLatest(upstreamPriority: .high) let task = Task { var firstElement: Int? diff --git a/Tests/Operators/AsyncSequence+WithLatestFromTests.swift b/Tests/Operators/AsyncSequence+WithLatestFromTests.swift new file mode 100644 index 0000000..3b90d8d --- /dev/null +++ b/Tests/Operators/AsyncSequence+WithLatestFromTests.swift @@ -0,0 +1,128 @@ +// +// AsyncSequence+WithLatestFromTests.swift +// +// +// Created by Thibault Wittemberg on 07/03/2022. +// + +import AsyncExtensions +import XCTest + +final class AsyncSequence_WithLatestFromTests: XCTestCase { + func test_withLatestFrom_combines_values_and_finishes_if_other_finishes() { + // time: |--------------------------------------------------------- + // seq1: | 1 2 3 4 5 + // seq2: | "1" "2" finish + // expected: (1, "1") -> (2, "1") -> (3, "1") -> (4, "2") -> finished + let otherHasEmitted1Expectation = expectation(description: "the other sequence has produced '1'") + let otherHasEmitted2Expectation = expectation(description: "the other sequence has produced '2'") + let otherIsFinishedExpectation = expectation(description: "the other sequence is finished") + let sutIsFinishedExpectation = expectation(description: "the combined sequence is finished") + + let upstream = AsyncStreams.CurrentValue(1) + let other = AsyncStreams.Passthrough() + + // monitoring the other sequence to drive the timeline + Task(priority: .high) { + try await other.collect { element in + if element == "1" { + otherHasEmitted1Expectation.fulfill() + } + if element == "2" { + otherHasEmitted2Expectation.fulfill() + } + } + otherIsFinishedExpectation.fulfill() + } + + Task(priority: .low) { + let sut = upstream.withLatestFrom(other, otherPriority: .high) + var iterator = sut.makeAsyncIterator() + + other.send("1") + wait(for: [otherHasEmitted1Expectation], timeout: 2) + + let element1 = try await iterator.next() + XCTAssertEqual(element1?.0, 1) + XCTAssertEqual(element1?.1, "1") + + upstream.send(2) + + let element2 = try await iterator.next() + XCTAssertEqual(element2?.0, 2) + XCTAssertEqual(element2?.1, "1") + + upstream.send(3) + let element3 = try await iterator.next() + XCTAssertEqual(element3?.0, 3) + XCTAssertEqual(element3?.1, "1") + + other.send("2") + wait(for: [otherHasEmitted2Expectation], timeout: 2) + + upstream.send(4) + let element4 = try await iterator.next() + XCTAssertEqual(element4?.0, 4) + XCTAssertEqual(element4?.1, "2") + + other.send(termination: .finished) + wait(for: [otherIsFinishedExpectation], timeout: 2) + + upstream.send(5) + let element5 = try await iterator.next() + XCTAssertNil(element5) + + sutIsFinishedExpectation.fulfill() + } + + wait(for: [sutIsFinishedExpectation], timeout: 2) + } + + func test_withLatestFrom_finishes_if_upstream_finishes() async throws { + let upstream = AsyncStreams.CurrentValue(1) + let other = AsyncStreams.CurrentValue("1") + + let sut = upstream.withLatestFrom(other) + var iterator = sut.makeAsyncIterator() + + let element1 = try await iterator.next() + XCTAssertEqual(element1?.0, 1) + XCTAssertEqual(element1?.1, "1") + + upstream.send(termination: .finished) + + let element2 = try await iterator.next() + XCTAssertNil(element2) + } + + func test_withLatestFrom_finishes_when_task_is_cancelled() { + let canCancelExpectation = expectation(description: "The first element has been emitted") + let hasCancelExceptation = expectation(description: "The task has been cancelled") + let taskHasFinishedExpectation = expectation(description: "The task has finished") + + let upstream = AsyncStreams.CurrentValue(1) + let other = AsyncStreams.CurrentValue("1") + + let sut = upstream.withLatestFrom(other) + + let task = Task { + var firstElement: (Int, String)? + for try await element in sut { + firstElement = element + canCancelExpectation.fulfill() + wait(for: [hasCancelExceptation], timeout: 5) + } + XCTAssertEqual(firstElement?.0, 1) + XCTAssertEqual(firstElement?.1, "1") + taskHasFinishedExpectation.fulfill() + } + + wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task + + task.cancel() + + hasCancelExceptation.fulfill() // we can release the lock in the for loop + + wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished + } +}