Skip to content

Commit

Permalink
Merge branch 'feature/withLatestFrom'
Browse files Browse the repository at this point in the history
  • Loading branch information
Thibault Wittemberg committed Mar 12, 2022
2 parents af684d5 + a9cd8d0 commit 0aafcc4
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 11 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ AsyncSequences
* [Assign](#Assign)
* [Multicast](#Multicast)
* [Share](#Share)
* [WithLatestFrom](#WithLatestFrom)
* [EraseToAnyAsyncSequence](#EraseToAnyAsyncSequence)

More operators and extensions are to come. Pull requests are of course welcome.
Expand Down Expand Up @@ -485,6 +486,33 @@ Task {
// Stream 1 received: ("Third", 61)
```

### WithLatestFrom

`withLatestFrom(_:)` merges two async sequences into a single one by combining each value
from self with the latest value from the other sequence, if any.

```
let seq1 = AsyncStreams.CurrentValue<Int>(1)
let seq2 = AsyncStreams.CurrentValue<String>("1")
let combinedSeq = seq1.withLatestFrom(seq2)
Task {
for try await element in combinedSeq {
print(element)
}
}
seq1.send(2)
seq2.send("2")
seq1.send(3)
// will print:
(1, "1")
(2, "1")
(3, "2")
```

### EraseToAnyAsyncSequence

`eraseToAnyAsyncSequence()` type-erases the async sequence into an AnyAsyncSequence.
36 changes: 28 additions & 8 deletions Sources/Operators/AsyncSequence+SwitchToLatest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ public extension AsyncSequence where Element: AsyncSequence {
/// // will print:
/// a3, b3
/// ```
/// - parameter upstreamPriority: can be used to change the priority of the task that supports the iteration over the upstream sequence (nil by default)
///
/// - Returns: The async sequence that republishes elements sent by the most recently received async sequence.
func switchToLatest() -> AsyncSwitchToLatestSequence<Self> {
AsyncSwitchToLatestSequence<Self>(self)
func switchToLatest(upstreamPriority: TaskPriority? = nil) -> AsyncSwitchToLatestSequence<Self> {
AsyncSwitchToLatestSequence<Self>(self, upstreamPriority: upstreamPriority)
}
}

Expand All @@ -34,13 +35,21 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
public typealias AsyncIterator = Iterator

let upstreamAsyncSequence: UpstreamAsyncSequence
let upstreamPriority: TaskPriority?

public init(_ upstreamAsyncSequence: UpstreamAsyncSequence) {
public init(
_ upstreamAsyncSequence: UpstreamAsyncSequence,
upstreamPriority: TaskPriority?
) {
self.upstreamAsyncSequence = upstreamAsyncSequence
self.upstreamPriority = upstreamPriority
}

public func makeAsyncIterator() -> AsyncIterator {
Iterator(upstreamIterator: self.upstreamAsyncSequence.makeAsyncIterator())
Iterator(
upstreamIterator: self.upstreamAsyncSequence.makeAsyncIterator(),
upstreamPriority: self.upstreamPriority
)
}

final class UpstreamIteratorManager {
Expand All @@ -49,10 +58,15 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
var hasStarted = false
var currentTask: Task<Element?, Error>?

let upstreamPriority: TaskPriority?
let serialQueue = DispatchQueue(label: UUID().uuidString)

init(upstreamIterator: UpstreamAsyncSequence.AsyncIterator) {
init(
upstreamIterator: UpstreamAsyncSequence.AsyncIterator,
upstreamPriority: TaskPriority?
) {
self.upstreamIterator = upstreamIterator
self.upstreamPriority = upstreamPriority
}

func setCurrentTask(task: Task<Element?, Error>) {
Expand All @@ -70,7 +84,7 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
}
}

Task { [weak self] in
Task(priority: self.upstreamPriority) { [weak self] in
while let nextChildSequence = try await self?.upstreamIterator.next() {
self?.serialQueue.async { [weak self] in
self?.childIterators.removeFirst()
Expand All @@ -92,8 +106,14 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
public struct Iterator: AsyncIteratorProtocol {
let upstreamIteratorManager: UpstreamIteratorManager

init(upstreamIterator: UpstreamAsyncSequence.AsyncIterator) {
self.upstreamIteratorManager = UpstreamIteratorManager(upstreamIterator: upstreamIterator)
init(
upstreamIterator: UpstreamAsyncSequence.AsyncIterator,
upstreamPriority: TaskPriority?
) {
self.upstreamIteratorManager = UpstreamIteratorManager(
upstreamIterator: upstreamIterator,
upstreamPriority: upstreamPriority
)
}

public mutating func next() async throws -> Element? {
Expand Down
143 changes: 143 additions & 0 deletions Sources/Operators/AsyncSequence+WithLatestFrom.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//
// AsyncSequence+WithLatestFrom.swift
//
//
// Created by Thibault Wittemberg on 07/03/2022.
//

public extension AsyncSequence {
/// Merges two AsyncSequences into a single one by combining each value
/// from self with the latest value from the other sequence, if any.
///
/// ```
/// let seq1 = AsyncStreams.CurrentValue<Int>(1)
/// let seq2 = AsyncStreams.CurrentValue<String>("1")
///
/// let combinedSeq = seq1.withLatestFrom(seq2)
///
/// Task {
/// for try await element in combinedSeq {
/// print(element)
/// }
/// }
///
/// seq1.send(2)
/// seq2.send("2")
/// seq1.send(3)
///
/// // will print:
/// (1, "1")
/// (2, "1")
/// (3, "2")
/// ```
///
/// - parameter other: the other async sequence
///
/// - returns: An async sequence emitting the result of combining each value of the self
/// with the latest value from the other sequence. If the other sequence finishes, the returned sequence
/// will finish with the next value from self.
///
func withLatestFrom<OtherAsyncSequence: AsyncSequence>(
_ other: OtherAsyncSequence,
otherPriority: TaskPriority? = nil
) -> AsyncWithLatestFromSequence<Self, OtherAsyncSequence> {
AsyncWithLatestFromSequence(
self,
other: other,
otherPriority: otherPriority
)
}
}

public struct AsyncWithLatestFromSequence<UpstreamAsyncSequence: AsyncSequence, OtherAsyncSequence: AsyncSequence>: AsyncSequence {
public typealias Element = (UpstreamAsyncSequence.Element, OtherAsyncSequence.Element)
public typealias AsyncIterator = Iterator

let upstreamAsyncSequence: UpstreamAsyncSequence
let otherAsyncSequence: OtherAsyncSequence
let otherPriority: TaskPriority?

public init(
_ upstreamAsyncSequence: UpstreamAsyncSequence,
other otherAsyncSequence: OtherAsyncSequence,
otherPriority: TaskPriority? = nil
) {
self.upstreamAsyncSequence = upstreamAsyncSequence
self.otherAsyncSequence = otherAsyncSequence
self.otherPriority = otherPriority
}

public func makeAsyncIterator() -> AsyncIterator {
Iterator(
self.upstreamAsyncSequence.makeAsyncIterator(),
other: self.otherAsyncSequence.makeAsyncIterator(),
otherPriority: self.otherPriority
)
}

final class OtherIteratorManager {
var otherElement: OtherAsyncSequence.Element?
var otherIterator: OtherAsyncSequence.AsyncIterator
var hasStarted = false

let otherPriority: TaskPriority?

init(
otherIterator: OtherAsyncSequence.AsyncIterator,
otherPriority: TaskPriority?
) {
self.otherIterator = otherIterator
self.otherPriority = otherPriority
}

/// iterates over the other sequence and track its current value
func startOtherIterator() async throws {
guard !self.hasStarted else { return }
self.hasStarted = true

self.otherElement = try await self.otherIterator.next()

Task(priority: self.otherPriority) { [weak self] in
while let element = try await self?.otherIterator.next() {
guard !Task.isCancelled else { break }

self?.otherElement = element
}
self?.otherElement = nil
}
}
}

public struct Iterator: AsyncIteratorProtocol {
var upstreamAsyncIterator: UpstreamAsyncSequence.AsyncIterator
let otherIteratorManager: OtherIteratorManager

init(
_ upstreamAsyncIterator: UpstreamAsyncSequence.AsyncIterator,
other otherAsyncIterator: OtherAsyncSequence.AsyncIterator,
otherPriority: TaskPriority?
) {
self.upstreamAsyncIterator = upstreamAsyncIterator
self.otherIteratorManager = OtherIteratorManager(
otherIterator: otherAsyncIterator,
otherPriority: otherPriority
)
}

public mutating func next() async throws -> Element? {
guard !Task.isCancelled else { return nil }

try await self.otherIteratorManager.startOtherIterator()

let upstreamElement = try await self.upstreamAsyncIterator.next()
let otherElement = self.otherIteratorManager.otherElement

guard let nonNilUpstreamElement = upstreamElement,
let nonNilOtherElement = otherElement else {
return nil
}

return (nonNilUpstreamElement, nonNilOtherElement)
}
}
}
6 changes: 3 additions & 3 deletions Tests/Operators/AsyncSequence+SwitchToLatestTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase {
interval: .milliSeconds(30),
onCancel: {})

let sut = mainAsyncSequence.switchToLatest()
let sut = mainAsyncSequence.switchToLatest(upstreamPriority: .high)

var receivedElements = [Int]()
let expectedElements = [0, 4, 8, 9, 10, 11]
Expand All @@ -81,7 +81,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase {
LongAsyncSequence(elements: [1], onCancel: {}).eraseToAnyAsyncSequence(),
AsyncSequences.Fail<Int>(error: expectedError).eraseToAnyAsyncSequence()].asyncElements

let sut = sourceSequence.switchToLatest()
let sut = sourceSequence.switchToLatest(upstreamPriority: .high)

do {
for try await _ in sut {}
Expand All @@ -101,7 +101,7 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase {
interval: .milliSeconds(50),
onCancel: {}
)}
let sut = mappedSequence.switchToLatest()
let sut = mappedSequence.switchToLatest(upstreamPriority: .high)

let task = Task {
var firstElement: Int?
Expand Down

0 comments on commit 0aafcc4

Please sign in to comment.