Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions Sources/AsyncSequences/AsyncSequences+From.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,14 @@ public struct AsyncFromSequence<BaseSequence: Sequence>: AsyncSequence {
var baseIterator: BaseSequence.Iterator
var interval: AsyncSequences.Interval

public mutating func next() async -> BaseSequence.Element? {
public mutating func next() async throws -> BaseSequence.Element? {
guard !Task.isCancelled else { return nil }

if self.interval != .immediate {
do {
try await Task.sleep(nanoseconds: self.interval.value)
} catch {}
try await Task.sleep(nanoseconds: self.interval.value)
}

let next = self.baseIterator.next()
return next
return self.baseIterator.next()
}
}
}
163 changes: 79 additions & 84 deletions Sources/AsyncSequences/AsyncSequences+Merge.swift
Original file line number Diff line number Diff line change
Expand Up @@ -66,112 +66,50 @@ public struct AsyncMergeSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeq
return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { $0.makeAsyncIterator() })
}

actor UpstreamAsyncIteratorState {
var busy = false
var finished = false

func setBusy(_ value: Bool) {
self.busy = value
}

func setFinished() {
self.finished = true
self.busy = false
}

func isAvailable() -> Bool {
!self.busy && !self.finished
}
enum UpstreamElement {
case element(Element)
case finished
}

final class UpstreamAsyncIterator<BaseAsyncIterator: AsyncIteratorProtocol>: AsyncIteratorProtocol {
public typealias Element = BaseAsyncIterator.Element
var iterator: BaseAsyncIterator?
let state = UpstreamAsyncIteratorState()
actor ElementCounter {
var counter = 0

init(iterator: BaseAsyncIterator?) {
self.iterator = iterator
func increaseCounter() {
self.counter += 1
}

public func next() async throws -> BaseAsyncIterator.Element? {
guard !Task.isCancelled else { return nil }

await self.state.setBusy(true)
let next = try await self.iterator?.next()
if next == nil {
await self.state.setFinished()
}
await self.state.setBusy(false)
return next
func decreaseCounter() {
guard self.counter > 0 else { return }
self.counter -= 1
}

public func isAvailable() async -> Bool {
await self.state.isAvailable()
func hasElement() -> Bool {
self.counter > 0
}
}

enum UpstreamElement {
case element(Element)
case finished
}

public struct Iterator: AsyncIteratorProtocol {
let passthrough = AsyncStreams.Passthrough<UpstreamElement>()
var passthroughIterator: AsyncStreams.Passthrough<UpstreamElement>.AsyncIterator
let upstreamIterators: [UpstreamAsyncIterator<UpstreamAsyncSequence.AsyncIterator>]
let sink = AsyncStreams.Passthrough<UpstreamElement>()
var sinkIterator: AsyncStreams.Passthrough<UpstreamElement>.AsyncIterator
let upstreamIterators: [SharedAsyncIterator<UpstreamAsyncSequence.AsyncIterator>]
let elementCounter = ElementCounter()
var numberOfFinished = 0

public init(upstreamIterators: [UpstreamAsyncSequence.AsyncIterator]) {
self.upstreamIterators = upstreamIterators.map { UpstreamAsyncIterator(iterator: $0) }
self.passthroughIterator = self.passthrough.makeAsyncIterator()
self.upstreamIterators = upstreamIterators.map { SharedAsyncIterator(iterator: $0) }
self.sinkIterator = self.sink.makeAsyncIterator()
}

// swiftlint:disable:next cyclomatic_complexity
public mutating func next() async throws -> Element? {
guard !Task.isCancelled else { return nil }

let localPassthrough = self.passthrough

// iterating over the upstream iterators to ask for their next element. Only
// the available iterators are requested (not already being computing the next
// element from the previous iteration)
for upstreamIterator in self.upstreamIterators {
guard !Task.isCancelled else { break }

let localUpstreamIterator = upstreamIterator

// isAvailable() means is not busy and not finished
if await localUpstreamIterator.isAvailable() {
Task {
do {
let nextElement = try await localUpstreamIterator.next()

// if the next element is nil, it means one if the upstream iterator
// is finished ... its does not mean the zipped async sequence is finished
guard let nonNilNextElement = nextElement else {
localPassthrough.send(.finished)
return
}

localPassthrough.send(.element(nonNilNextElement))
} catch is CancellationError {
localPassthrough.send(.finished)
} catch {
localPassthrough.send(termination: .failure(error))
}
}
}
}

mutating func nextElementFromSink() async throws -> Element? {
var noValue = true
var value: Element?

// we now have to eliminate the intermediate ".finished" values until the next
// true value is found.
// if every upstream iterator has finished, then the zipped async sequence is also finished
while noValue {
guard let nextChildElement = try await self.passthroughIterator.next() else {
// the passthrough is finished, so is the zipped async sequence
guard let nextChildElement = try await self.sinkIterator.next() else {
// the sink stream is finished, so is the zipped async sequence
noValue = false
value = nil
break
Expand All @@ -187,13 +125,70 @@ public struct AsyncMergeSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeq
break
}
case let .element(element):
// nominal case: a net element is available
// nominal case: a next element is available
noValue = false
value = element
await self.elementCounter.decreaseCounter()
}
}

return value
}

public mutating func next() async throws -> Element? {
guard !Task.isCancelled else { return nil }

// before requesting elements from the upstream iterators, we should reauest the next element from the sink iterator
// if it has some stacked values

// for now we leave it commented as I'm not sure it is not counterproductive.
// This "early" drain might prevent from requesting the next available upstream iterators as soon as possible
// since the sink iterator might deliver a value and the next will return right away

// guard await !self.elementCounter.hasElement() else {
// return try await self.nextElementFromSink()
// }

let localSink = self.sink
let localElementCounter = self.elementCounter

// iterating over the upstream iterators to ask for their next element. Only
// the available iterators are requested (not already being computing the next
// element from the previous iteration and not already finished)
for upstreamIterator in self.upstreamIterators {
guard !Task.isCancelled else { break }

let localUpstreamIterator = upstreamIterator
guard await !localUpstreamIterator.isFinished() else { continue }
Task {
do {
let nextSharedElement = try await localUpstreamIterator.next()

// if the next element is nil, it means one of the upstream iterator
// is finished ... its does not mean the zipped async sequence is finished (all upstream iterators have to be finished)
guard let nextNonNilSharedElement = nextSharedElement else {
await localSink.send(.finished)
return
}

guard case let .value(nextElement) = nextNonNilSharedElement else {
// the upstream iterator was not available ... see you at the next iteration
return
}

// we have a next element from an upstream iterator, pushing it in the sink stream
await localSink.send(.element(nextElement))
await localElementCounter.increaseCounter()
} catch is CancellationError {
await localSink.send(.finished)
} catch {
await localSink.send(termination: .failure(error))
}
}
}

// we wait for the sink iterator to deliver the next element
return try await self.nextElementFromSink()
}
}
}
48 changes: 24 additions & 24 deletions Sources/AsyncSequences/AsyncSequences+Zip.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ public struct AsyncZip2Sequence<UpstreamAsyncSequenceA: AsyncSequence, UpstreamA

public func makeAsyncIterator() -> AsyncIterator {
return Iterator(
upstreamIteratorA: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()),
upstreamIteratorB: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator())
upstreamIteratorA: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()),
upstreamIteratorB: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator())
)
}

public struct Iterator: AsyncIteratorProtocol {
let upstreamIteratorA: SharedAsyncIterator<UpstreamAsyncSequenceA.AsyncIterator>
let upstreamIteratorB: SharedAsyncIterator<UpstreamAsyncSequenceB.AsyncIterator>
let upstreamIteratorA: AsyncIteratorByRef<UpstreamAsyncSequenceA.AsyncIterator>
let upstreamIteratorB: AsyncIteratorByRef<UpstreamAsyncSequenceB.AsyncIterator>

public mutating func next() async throws -> Element? {
guard !Task.isCancelled else { return nil }
Expand Down Expand Up @@ -136,9 +136,9 @@ public struct AsyncZip2Sequence<UpstreamAsyncSequenceA: AsyncSequence, UpstreamA
}
}

public struct AsyncZip3Sequence < UpstreamAsyncSequenceA: AsyncSequence,
UpstreamAsyncSequenceB: AsyncSequence,
UpstreamAsyncSequenceC: AsyncSequence>: AsyncSequence {
public struct AsyncZip3Sequence <UpstreamAsyncSequenceA: AsyncSequence,
UpstreamAsyncSequenceB: AsyncSequence,
UpstreamAsyncSequenceC: AsyncSequence>: AsyncSequence {
public typealias Element = (UpstreamAsyncSequenceA.Element, UpstreamAsyncSequenceB.Element, UpstreamAsyncSequenceC.Element)
public typealias AsyncIterator = Iterator

Expand All @@ -156,16 +156,16 @@ public struct AsyncZip3Sequence < UpstreamAsyncSequenceA: AsyncSequence,

public func makeAsyncIterator() -> AsyncIterator {
return Iterator(
upstreamIteratorA: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()),
upstreamIteratorB: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator()),
upstreamIteratorC: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceC.makeAsyncIterator())
upstreamIteratorA: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()),
upstreamIteratorB: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator()),
upstreamIteratorC: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceC.makeAsyncIterator())
)
}

public struct Iterator: AsyncIteratorProtocol {
let upstreamIteratorA: SharedAsyncIterator<UpstreamAsyncSequenceA.AsyncIterator>
let upstreamIteratorB: SharedAsyncIterator<UpstreamAsyncSequenceB.AsyncIterator>
let upstreamIteratorC: SharedAsyncIterator<UpstreamAsyncSequenceC.AsyncIterator>
let upstreamIteratorA: AsyncIteratorByRef<UpstreamAsyncSequenceA.AsyncIterator>
let upstreamIteratorB: AsyncIteratorByRef<UpstreamAsyncSequenceB.AsyncIterator>
let upstreamIteratorC: AsyncIteratorByRef<UpstreamAsyncSequenceC.AsyncIterator>

public mutating func next() async throws -> Element? {
guard !Task.isCancelled else { return nil }
Expand Down Expand Up @@ -242,20 +242,11 @@ public struct AsyncZipSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeque
}

public func makeAsyncIterator() -> AsyncIterator {
return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { SharedAsyncIterator(iterator: $0.makeAsyncIterator()) })
}

actor SequenceIndexGenerator {
var index: Int = 0

func nextIndex() -> Int {
self.index += 1
return index
}
return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { AsyncIteratorByRef(iterator: $0.makeAsyncIterator()) })
}

public struct Iterator: AsyncIteratorProtocol {
let upstreamIterators: [SharedAsyncIterator<UpstreamAsyncSequence.AsyncIterator>]
let upstreamIterators: [AsyncIteratorByRef<UpstreamAsyncSequence.AsyncIterator>]

public mutating func next() async throws -> Element? {
guard !Task.isCancelled else { return nil }
Expand Down Expand Up @@ -302,3 +293,12 @@ public struct AsyncZipSequence<UpstreamAsyncSequence: AsyncSequence>: AsyncSeque
}
}
}

actor SequenceIndexGenerator {
var index: Int = 0

func nextIndex() -> Int {
self.index += 1
return index
}
}
Loading