diff --git a/Sources/AsyncSequences/AsyncSequences+From.swift b/Sources/AsyncSequences/AsyncSequences+From.swift index 901391b..d2b9110 100644 --- a/Sources/AsyncSequences/AsyncSequences+From.swift +++ b/Sources/AsyncSequences/AsyncSequences+From.swift @@ -57,17 +57,14 @@ public struct AsyncFromSequence: AsyncSequence { var baseIterator: BaseSequence.Iterator var interval: AsyncSequences.Interval - public mutating func next() async -> BaseSequence.Element? { + public mutating func next() async throws -> BaseSequence.Element? { guard !Task.isCancelled else { return nil } if self.interval != .immediate { - do { - try await Task.sleep(nanoseconds: self.interval.value) - } catch {} + try await Task.sleep(nanoseconds: self.interval.value) } - let next = self.baseIterator.next() - return next + return self.baseIterator.next() } } } diff --git a/Sources/AsyncSequences/AsyncSequences+Merge.swift b/Sources/AsyncSequences/AsyncSequences+Merge.swift index fcd2cda..8508adf 100644 --- a/Sources/AsyncSequences/AsyncSequences+Merge.swift +++ b/Sources/AsyncSequences/AsyncSequences+Merge.swift @@ -66,103 +66,41 @@ public struct AsyncMergeSequence: AsyncSeq return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { $0.makeAsyncIterator() }) } - actor UpstreamAsyncIteratorState { - var busy = false - var finished = false - - func setBusy(_ value: Bool) { - self.busy = value - } - - func setFinished() { - self.finished = true - self.busy = false - } - - func isAvailable() -> Bool { - !self.busy && !self.finished - } + enum UpstreamElement { + case element(Element) + case finished } - final class UpstreamAsyncIterator: AsyncIteratorProtocol { - public typealias Element = BaseAsyncIterator.Element - var iterator: BaseAsyncIterator? - let state = UpstreamAsyncIteratorState() + actor ElementCounter { + var counter = 0 - init(iterator: BaseAsyncIterator?) { - self.iterator = iterator + func increaseCounter() { + self.counter += 1 } - public func next() async throws -> BaseAsyncIterator.Element? { - guard !Task.isCancelled else { return nil } - - await self.state.setBusy(true) - let next = try await self.iterator?.next() - if next == nil { - await self.state.setFinished() - } - await self.state.setBusy(false) - return next + func decreaseCounter() { + guard self.counter > 0 else { return } + self.counter -= 1 } - public func isAvailable() async -> Bool { - await self.state.isAvailable() + func hasElement() -> Bool { + self.counter > 0 } } - enum UpstreamElement { - case element(Element) - case finished - } - public struct Iterator: AsyncIteratorProtocol { - let passthrough = AsyncStreams.Passthrough() - var passthroughIterator: AsyncStreams.Passthrough.AsyncIterator - let upstreamIterators: [UpstreamAsyncIterator] + let sink = AsyncStreams.Passthrough() + var sinkIterator: AsyncStreams.Passthrough.AsyncIterator + let upstreamIterators: [SharedAsyncIterator] + let elementCounter = ElementCounter() var numberOfFinished = 0 public init(upstreamIterators: [UpstreamAsyncSequence.AsyncIterator]) { - self.upstreamIterators = upstreamIterators.map { UpstreamAsyncIterator(iterator: $0) } - self.passthroughIterator = self.passthrough.makeAsyncIterator() + self.upstreamIterators = upstreamIterators.map { SharedAsyncIterator(iterator: $0) } + self.sinkIterator = self.sink.makeAsyncIterator() } - // swiftlint:disable:next cyclomatic_complexity - public mutating func next() async throws -> Element? { - guard !Task.isCancelled else { return nil } - - let localPassthrough = self.passthrough - - // iterating over the upstream iterators to ask for their next element. Only - // the available iterators are requested (not already being computing the next - // element from the previous iteration) - for upstreamIterator in self.upstreamIterators { - guard !Task.isCancelled else { break } - - let localUpstreamIterator = upstreamIterator - - // isAvailable() means is not busy and not finished - if await localUpstreamIterator.isAvailable() { - Task { - do { - let nextElement = try await localUpstreamIterator.next() - - // if the next element is nil, it means one if the upstream iterator - // is finished ... its does not mean the zipped async sequence is finished - guard let nonNilNextElement = nextElement else { - localPassthrough.send(.finished) - return - } - - localPassthrough.send(.element(nonNilNextElement)) - } catch is CancellationError { - localPassthrough.send(.finished) - } catch { - localPassthrough.send(termination: .failure(error)) - } - } - } - } - + mutating func nextElementFromSink() async throws -> Element? { var noValue = true var value: Element? @@ -170,8 +108,8 @@ public struct AsyncMergeSequence: AsyncSeq // true value is found. // if every upstream iterator has finished, then the zipped async sequence is also finished while noValue { - guard let nextChildElement = try await self.passthroughIterator.next() else { - // the passthrough is finished, so is the zipped async sequence + guard let nextChildElement = try await self.sinkIterator.next() else { + // the sink stream is finished, so is the zipped async sequence noValue = false value = nil break @@ -187,13 +125,70 @@ public struct AsyncMergeSequence: AsyncSeq break } case let .element(element): - // nominal case: a net element is available + // nominal case: a next element is available noValue = false value = element + await self.elementCounter.decreaseCounter() } } return value } + + public mutating func next() async throws -> Element? { + guard !Task.isCancelled else { return nil } + + // before requesting elements from the upstream iterators, we should reauest the next element from the sink iterator + // if it has some stacked values + + // for now we leave it commented as I'm not sure it is not counterproductive. + // This "early" drain might prevent from requesting the next available upstream iterators as soon as possible + // since the sink iterator might deliver a value and the next will return right away + +// guard await !self.elementCounter.hasElement() else { +// return try await self.nextElementFromSink() +// } + + let localSink = self.sink + let localElementCounter = self.elementCounter + + // iterating over the upstream iterators to ask for their next element. Only + // the available iterators are requested (not already being computing the next + // element from the previous iteration and not already finished) + for upstreamIterator in self.upstreamIterators { + guard !Task.isCancelled else { break } + + let localUpstreamIterator = upstreamIterator + guard await !localUpstreamIterator.isFinished() else { continue } + Task { + do { + let nextSharedElement = try await localUpstreamIterator.next() + + // if the next element is nil, it means one of the upstream iterator + // is finished ... its does not mean the zipped async sequence is finished (all upstream iterators have to be finished) + guard let nextNonNilSharedElement = nextSharedElement else { + await localSink.send(.finished) + return + } + + guard case let .value(nextElement) = nextNonNilSharedElement else { + // the upstream iterator was not available ... see you at the next iteration + return + } + + // we have a next element from an upstream iterator, pushing it in the sink stream + await localSink.send(.element(nextElement)) + await localElementCounter.increaseCounter() + } catch is CancellationError { + await localSink.send(.finished) + } catch { + await localSink.send(termination: .failure(error)) + } + } + } + + // we wait for the sink iterator to deliver the next element + return try await self.nextElementFromSink() + } } } diff --git a/Sources/AsyncSequences/AsyncSequences+Zip.swift b/Sources/AsyncSequences/AsyncSequences+Zip.swift index 1e121eb..42f69b5 100644 --- a/Sources/AsyncSequences/AsyncSequences+Zip.swift +++ b/Sources/AsyncSequences/AsyncSequences+Zip.swift @@ -80,14 +80,14 @@ public struct AsyncZip2Sequence AsyncIterator { return Iterator( - upstreamIteratorA: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()), - upstreamIteratorB: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator()) + upstreamIteratorA: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()), + upstreamIteratorB: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator()) ) } public struct Iterator: AsyncIteratorProtocol { - let upstreamIteratorA: SharedAsyncIterator - let upstreamIteratorB: SharedAsyncIterator + let upstreamIteratorA: AsyncIteratorByRef + let upstreamIteratorB: AsyncIteratorByRef public mutating func next() async throws -> Element? { guard !Task.isCancelled else { return nil } @@ -136,9 +136,9 @@ public struct AsyncZip2Sequence: AsyncSequence { +public struct AsyncZip3Sequence : AsyncSequence { public typealias Element = (UpstreamAsyncSequenceA.Element, UpstreamAsyncSequenceB.Element, UpstreamAsyncSequenceC.Element) public typealias AsyncIterator = Iterator @@ -156,16 +156,16 @@ public struct AsyncZip3Sequence < UpstreamAsyncSequenceA: AsyncSequence, public func makeAsyncIterator() -> AsyncIterator { return Iterator( - upstreamIteratorA: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()), - upstreamIteratorB: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator()), - upstreamIteratorC: SharedAsyncIterator(iterator: self.upstreamAsyncSequenceC.makeAsyncIterator()) + upstreamIteratorA: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceA.makeAsyncIterator()), + upstreamIteratorB: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceB.makeAsyncIterator()), + upstreamIteratorC: AsyncIteratorByRef(iterator: self.upstreamAsyncSequenceC.makeAsyncIterator()) ) } public struct Iterator: AsyncIteratorProtocol { - let upstreamIteratorA: SharedAsyncIterator - let upstreamIteratorB: SharedAsyncIterator - let upstreamIteratorC: SharedAsyncIterator + let upstreamIteratorA: AsyncIteratorByRef + let upstreamIteratorB: AsyncIteratorByRef + let upstreamIteratorC: AsyncIteratorByRef public mutating func next() async throws -> Element? { guard !Task.isCancelled else { return nil } @@ -242,20 +242,11 @@ public struct AsyncZipSequence: AsyncSeque } public func makeAsyncIterator() -> AsyncIterator { - return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { SharedAsyncIterator(iterator: $0.makeAsyncIterator()) }) - } - - actor SequenceIndexGenerator { - var index: Int = 0 - - func nextIndex() -> Int { - self.index += 1 - return index - } + return Iterator(upstreamIterators: self.upstreamAsyncSequences.map { AsyncIteratorByRef(iterator: $0.makeAsyncIterator()) }) } public struct Iterator: AsyncIteratorProtocol { - let upstreamIterators: [SharedAsyncIterator] + let upstreamIterators: [AsyncIteratorByRef] public mutating func next() async throws -> Element? { guard !Task.isCancelled else { return nil } @@ -302,3 +293,12 @@ public struct AsyncZipSequence: AsyncSeque } } } + +actor SequenceIndexGenerator { + var index: Int = 0 + + func nextIndex() -> Int { + self.index += 1 + return index + } +} diff --git a/Sources/AsyncStreams/AsyncStreams+CurrentValue.swift b/Sources/AsyncStreams/AsyncStreams+CurrentValue.swift index adebcee..5fdd9db 100644 --- a/Sources/AsyncStreams/AsyncStreams+CurrentValue.swift +++ b/Sources/AsyncStreams/AsyncStreams+CurrentValue.swift @@ -29,26 +29,37 @@ public extension AsyncStreams { /// /// .. later in the application flow /// - /// currentValue.send(2) + /// await currentValue.send(2) /// /// print(currentValue.element) // will print 2 /// ``` typealias CurrentValue = AsyncCurrentValueStream } -public final class AsyncCurrentValueStream: Stream, AsyncSequence, Sendable { - public typealias AsyncIterator = AsyncStreams.Iterator +public final class AsyncCurrentValueStream: Stream, Sendable { + actor Storage { + var element: Element + init(_ element: Element) { + self.element = element + } + + func update(_ element: Element) { + self.element = element + } + + func retrieve() -> Element { + self.element + } + } + + public typealias AsyncIterator = AsyncStreams.Iterator let continuations = AsyncStreams.Continuations() let storage: Storage public var element: Element { - get { - self.storage.retrieve() - } - - set { - self.send(newValue) + get async { + await self.storage.retrieve() } } @@ -58,22 +69,26 @@ public final class AsyncCurrentValueStream: Stream, AsyncSequence, Send /// Sends a value to all underlying async sequences /// - Parameter element: the value to send - public func send(_ element: Element) { - self.storage.update(element) - self.continuations.send(element) + public func send(_ element: Element) async { + await self.storage.update(element) + await self.continuations.send(element) } /// Finishes the async sequences with either a normal ending or an error. /// - Parameter termination: The termination to finish the async sequence. - public func send(termination: Termination) { - self.continuations.send(termination) + public func send(termination: Termination) async { + await self.continuations.send(termination) } func makeStream(forClientId clientId: UUID) -> AsyncThrowingStream { return AsyncThrowingStream(Element.self, bufferingPolicy: .unbounded) { [continuations, storage] continuation in - continuations.register(continuation: continuation, forId: clientId) - let element = storage.retrieve() - continuation.yield(element) + Task { + // registration is async because the continuations are managed by an actor (to avoid race conditions on its internal storage). + // registering a continuation is possible only when the actor is available. + let element = await storage.retrieve() + continuation.yield(element) + await continuations.register(continuation: continuation, forId: clientId) + } } } @@ -86,25 +101,4 @@ public final class AsyncCurrentValueStream: Stream, AsyncSequence, Send continuations: self.continuations ) } - - final class Storage: @unchecked Sendable { - let queue = DispatchQueue(label: UUID().uuidString) - var element: Element - - init(_ element: Element) { - self.element = element - } - - func update(_ element: Element) { - self.queue.sync { [weak self] in - self?.element = element - } - } - - func retrieve() -> Element { - self.queue.sync { [element] in - element - } - } - } } diff --git a/Sources/AsyncStreams/AsyncStreams+Passthrough.swift b/Sources/AsyncStreams/AsyncStreams+Passthrough.swift index 38b8c38..2d398cc 100644 --- a/Sources/AsyncStreams/AsyncStreams+Passthrough.swift +++ b/Sources/AsyncStreams/AsyncStreams+Passthrough.swift @@ -25,15 +25,15 @@ public extension AsyncStreams { /// } /// } /// - /// .. later in the application flow + /// ... later in the application flow /// - /// passthrough.send(1) - /// passthrough.send(2) + /// await passthrough.send(1) + /// await passthrough.send(2) /// ``` typealias Passthrough = AsyncPassthroughStream } -public final class AsyncPassthroughStream: AsyncSequence, Stream, Sendable { +public final class AsyncPassthroughStream: Stream, Sendable { public typealias AsyncIterator = AsyncStreams.Iterator let continuations = AsyncStreams.Continuations() @@ -42,19 +42,23 @@ public final class AsyncPassthroughStream: AsyncSequence, Stream, Senda /// Sends a value to all underlying async sequences /// - Parameter element: the value to send - public func send(_ element: Element) { - self.continuations.send(element) + public func send(_ element: Element) async { + await self.continuations.send(element) } /// Finishes the async sequences with either a normal ending or an error. /// - Parameter termination: The termination to finish the async sequence. - public func send(termination: Termination) { - self.continuations.send(termination) + public func send(termination: Termination) async { + await self.continuations.send(termination) } func makeStream(forClientId clientId: UUID) -> AsyncThrowingStream { return AsyncThrowingStream(Element.self, bufferingPolicy: .unbounded) { [continuations] continuation in - continuations.register(continuation: continuation, forId: clientId) + Task { + // registration is async because the continuations are managed by an actor (to avoid race conditions on its internal storage). + // registering a continuation is possible only when the actor is available. + await continuations.register(continuation: continuation, forId: clientId) + } } } diff --git a/Sources/AsyncStreams/AsyncStreams+Replay.swift b/Sources/AsyncStreams/AsyncStreams+Replay.swift index 0a0840a..ce6e80e 100644 --- a/Sources/AsyncStreams/AsyncStreams+Replay.swift +++ b/Sources/AsyncStreams/AsyncStreams+Replay.swift @@ -15,7 +15,7 @@ public extension AsyncStreams { /// ``` /// let replay = AsyncStreams.Replay(bufferSize: 3) /// - /// (1...5).forEach { replay.send($0) } + /// for i in (1...5) { await replay.send(i) } /// /// for try await element in replay { /// print(element) // will print 3, 4, 5 @@ -24,7 +24,36 @@ public extension AsyncStreams { typealias Replay = AsyncReplayStream } -public final class AsyncReplayStream: Stream, AsyncSequence, Sendable { +public final class AsyncReplayStream: Stream, Sendable { + actor Storage { + var buffer = ContiguousArray() + let bufferSize: Int + + var elements: ContiguousArray { + return self.buffer + } + + var size: Int { + return self.bufferSize + } + + init(bufferSize: Int) { + precondition(bufferSize >= 0, "The bufferSize cannot be negative.") + self.bufferSize = bufferSize + } + + func push(_ element: Element) { + self.buffer.append(element) + if self.buffer.count > self.bufferSize { + self.buffer.removeFirst() + } + } + + func clear() { + self.buffer.removeAll() + } + } + public typealias AsyncIterator = AsyncStreams.Iterator let continuations = AsyncStreams.Continuations() @@ -38,22 +67,24 @@ public final class AsyncReplayStream: Stream, AsyncSequence, Sendable { /// Sends a value to all underlying async sequences /// - Parameter element: the value to send - public func send(_ element: Element) { - self.storage.push(element) - self.continuations.send(element) + public func send(_ element: Element) async { + await self.storage.push(element) + await self.continuations.send(element) } /// Finishes the async sequences with either a normal ending or an error. /// - Parameter completion: The termination to finish the async sequence. - public func send(termination: Termination) { - self.continuations.send(termination) - self.storage.clear() + public func send(termination: Termination) async { + await self.continuations.send(termination) + await self.storage.clear() } func makeStream(forClientId clientId: UUID) -> AsyncThrowingStream { return AsyncThrowingStream(Element.self, bufferingPolicy: .unbounded) { [continuations, storage] continuation in - continuations.register(continuation: continuation, forId: clientId) - storage.elements.forEach { continuation.yield($0) } + Task { + await continuations.register(continuation: continuation, forId: clientId) + await storage.elements.forEach { continuation.yield($0) } + } } } @@ -66,38 +97,4 @@ public final class AsyncReplayStream: Stream, AsyncSequence, Sendable { continuations: self.continuations ) } - - final class Storage: @unchecked Sendable { - let queue = DispatchQueue(label: UUID().uuidString) - var buffer = ContiguousArray() - let bufferSize: Int - - var elements: ContiguousArray { - return self.buffer - } - - var size: Int { - return self.bufferSize - } - - init(bufferSize: Int) { - precondition(bufferSize >= 0, "The bufferSize cannot be negative.") - self.bufferSize = bufferSize - } - - func push(_ element: Element) { - self.queue.sync { [weak self] in - self?.buffer.append(element) - if let bufferCount = self?.buffer.count, let bufferSize = self?.bufferSize, bufferCount > bufferSize { - self?.buffer.removeFirst() - } - } - } - - func clear() { - self.queue.sync { [weak self] in - self?.buffer.removeAll() - } - } - } } diff --git a/Sources/AsyncStreams/AsyncStreams.swift b/Sources/AsyncStreams/AsyncStreams.swift index cd95ab6..d4ad5e4 100644 --- a/Sources/AsyncStreams/AsyncStreams.swift +++ b/Sources/AsyncStreams/AsyncStreams.swift @@ -7,80 +7,83 @@ import Foundation -public protocol Stream { - associatedtype Element - +public protocol Stream: AnyObject, AsyncSequence { func send(_ element: Element) async func send(termination: Termination) async } +public extension Stream { + func nonBlockingSend(_ element: Element) { + Task { [weak self] in + await self?.send(element) + } + } + + func nonBlockingSend(termination: Termination) { + Task { [weak self] in + await self?.send(termination: termination) + } + } +} + public enum AsyncStreams {} extension AsyncStreams { - final class Continuations: @unchecked Sendable { + actor Continuations { var continuations = [AnyHashable: AsyncThrowingStream.Continuation]() - let queue = DispatchQueue(label: UUID().uuidString) - - func register(continuation: AsyncThrowingStream.Continuation, forId id: AnyHashable) { - self.queue.sync { - self.continuations[id] = continuation - } - } func send(_ element: Element) { - self.queue.sync { - self.continuations.values.forEach { $0.yield(element) } - } + self.continuations.values.forEach { $0.yield(element) } } func send(_ termination: Termination) { - self.queue.sync { - switch termination { - case .finished: self.continuations.values.forEach { $0.finish() } - case let .failure(error): self.continuations.values.forEach { $0.finish(throwing: error) } - } - self.continuations.removeAll() + switch termination { + case .finished: self.continuations.values.forEach { $0.finish() } + case let .failure(error): self.continuations.values.forEach { $0.finish(throwing: error) } } + self.continuations.removeAll() + } + + func register(continuation: AsyncThrowingStream.Continuation, forId id: AnyHashable) { + self.continuations[id] = continuation } func unregister(id: AnyHashable) { - self.queue.sync { - self.continuations[id] = nil - } + self.continuations[id] = nil } } public struct Iterator: AsyncIteratorProtocol { public typealias Element = Element - let clientId: UUID var baseIterator: AsyncThrowingStream.Iterator - let continuations: AsyncStreams.Continuations + let unregisterBlock: () async -> Void init( clientId: UUID, baseIterator: AsyncThrowingStream.Iterator, continuations: AsyncStreams.Continuations ) { - self.clientId = clientId self.baseIterator = baseIterator - self.continuations = continuations + self.unregisterBlock = { await continuations.unregister(id: clientId) } } public mutating func next() async throws -> Element? { guard !Task.isCancelled else { - self.continuations.unregister(id: self.clientId) + await self.unregisterBlock() return nil } - let localContinuations = self.continuations - let localClientId = self.clientId - - return try await withTaskCancellationHandler(operation: { - try await self.baseIterator.next() - }, onCancel: { - localContinuations.unregister(id: localClientId) - }) + do { + let next = try await self.baseIterator.next() + if next == nil { + await self.unregisterBlock() + } + return next + } catch { + await self.unregisterBlock() + throw error + } } } } diff --git a/Sources/Common/SharedAsyncIterator.swift b/Sources/Internal/AsyncIteratorByRef.swift similarity index 58% rename from Sources/Common/SharedAsyncIterator.swift rename to Sources/Internal/AsyncIteratorByRef.swift index 9c6c798..9816c46 100644 --- a/Sources/Common/SharedAsyncIterator.swift +++ b/Sources/Internal/AsyncIteratorByRef.swift @@ -1,11 +1,12 @@ // -// SharedAsyncIterator.swift +// AsyncIteratorByRef.swift // // -// Created by Thibault Wittemberg on 23/01/2022. +// Created by Thibault Wittemberg on 08/02/2022. // -public final class SharedAsyncIterator: AsyncIteratorProtocol { +/// Allows to store an iterator and mutate it, when in a non mutating environment. +final class AsyncIteratorByRef: AsyncIteratorProtocol { public typealias Element = BaseAsyncIterator.Element var iterator: BaseAsyncIterator? diff --git a/Sources/Internal/SharedAsyncIterator.swift b/Sources/Internal/SharedAsyncIterator.swift new file mode 100644 index 0000000..65317a7 --- /dev/null +++ b/Sources/Internal/SharedAsyncIterator.swift @@ -0,0 +1,81 @@ +// +// SharedAsyncIterator.swift +// +// +// Created by Thibault Wittemberg on 23/01/2022. +// + +actor NonBlockingGate { + var isLockable = true + + func lock() -> Bool { + defer { self.isLockable = false } + let isLockable = self.isLockable + return isLockable + } + + func unlock() { + self.isLockable = true + } +} + +actor FinishState { + var finished = false + + func isFinished() -> Bool { + self.finished + } + + func setFinished() { + self.finished = true + } +} + +enum SharedElement { + case value(Element) + case notAvailable +} + +extension SharedElement: Equatable where Element: Equatable {} + +/// Allows to request a next element in a concurrent context. If the iterator is still busy when requesting next, then it will respond +/// "notAvailable". +class SharedAsyncIterator: AsyncIteratorProtocol { + typealias Element = SharedElement + + var iterator: BaseAsyncIterator + let gate = NonBlockingGate() + let finishState = FinishState() + + init(iterator: BaseAsyncIterator) { + self.iterator = iterator + } + + func isFinished() async -> Bool { + await self.finishState.isFinished() + } + + func next() async throws -> Element? { + guard !Task.isCancelled else { return nil } + guard await !self.finishState.isFinished() else { return nil } + + guard await self.gate.lock() else { + // yield allows to promote other tasks to resume, giving a chance to free the lock + await Task.yield() + return .notAvailable + } + + let next = try await self.iterator.next() + + await self.gate.unlock() + // yield allows to promote other tasks to resume, giving a chance to not be notAvailable + await Task.yield() + + if let nonNilNext = next { + return .value(nonNilNext) + } + + await self.finishState.setFinished() + return nil + } +} diff --git a/Sources/Operators/AsyncSequence+HandleEvents.swift b/Sources/Operators/AsyncSequence+HandleEvents.swift index c03d5c1..ad6ff33 100644 --- a/Sources/Operators/AsyncSequence+HandleEvents.swift +++ b/Sources/Operators/AsyncSequence+HandleEvents.swift @@ -5,7 +5,7 @@ // Created by Thibault Wittemberg on 31/12/2021. // -import Combine +import Foundation public extension AsyncSequence { /// Performs the specified closures when async sequences events occur. @@ -119,24 +119,23 @@ public struct AsyncHandleEventsSequence: A self.onStartExecuted = true } - let localOnCancel = self.onCancel - do { - return try await withTaskCancellationHandler(operation: { - let nextElement = try await self.upstreamIterator.next() + let nextElement = try await self.upstreamIterator.next() - if let element = nextElement { - await self.onElement?(element) + if let element = nextElement { + await self.onElement?(element) + } else { + if Task.isCancelled { + await self.onCancel?() } else { await self.onFinish?(.finished) } + } - return nextElement - }, onCancel: { - Task { - await localOnCancel?() - } - }) + return nextElement + } catch let error as CancellationError { + await self.onCancel?() + throw error } catch { await self.onFinish?(.failure(error)) throw error diff --git a/Sources/Operators/AsyncSequence+SwitchToLatest.swift b/Sources/Operators/AsyncSequence+SwitchToLatest.swift index e9cd4c0..4750a19 100644 --- a/Sources/Operators/AsyncSequence+SwitchToLatest.swift +++ b/Sources/Operators/AsyncSequence+SwitchToLatest.swift @@ -43,7 +43,7 @@ public struct AsyncSwitchToLatestSequence: final class UpstreamIteratorManager { var upstreamIterator: UpstreamAsyncSequence.AsyncIterator - var currentChildIterator: UpstreamAsyncSequence.Element.AsyncIterator? + var childIterators = [AsyncIteratorByRef]() var hasStarted = false var currentTask: Task? @@ -60,21 +60,21 @@ public struct AsyncSwitchToLatestSequence: guard !self.hasStarted else { return } self.hasStarted = true - let firstChildSequence = try await self.upstreamIterator.next() - self.currentChildIterator = firstChildSequence?.makeAsyncIterator() + if let firstChildSequence = try await self.upstreamIterator.next() { + self.childIterators.append(AsyncIteratorByRef(iterator: firstChildSequence.makeAsyncIterator())) + } Task { while let nextChildSequence = try await self.upstreamIterator.next() { - self.currentChildIterator = nextChildSequence.makeAsyncIterator() + self.childIterators.removeFirst() + self.childIterators.append(AsyncIteratorByRef(iterator: nextChildSequence.makeAsyncIterator())) self.currentTask?.cancel() - self.currentTask = nil } } } func nextOnCurrentChildIterator() async throws -> Element? { - let nextElement = try await self.currentChildIterator?.next() - return nextElement + try await self.childIterators.last?.next() } } @@ -92,9 +92,12 @@ public struct AsyncSwitchToLatestSequence: var emittedElement: Element? var currentTask: Task + // starting the root iterator to be able to iterate in the first child iterator try await self.upstreamIteratorManager.startUpstreamIterator() let localUpstreamIteratorManager = self.upstreamIteratorManager + // if a task is cancelled while waiting with the next element (a new element arrived in the root iterator) + // we create a new task and wait for the elements from the new child iterator while noValueHasBeenEmitted { currentTask = Task { do { diff --git a/Tests/AsyncSequences/AsyncSequences+FailTests.swift b/Tests/AsyncSequences/AsyncSequences+FailTests.swift index 264df00..b1c846f 100644 --- a/Tests/AsyncSequences/AsyncSequences+FailTests.swift +++ b/Tests/AsyncSequences/AsyncSequences+FailTests.swift @@ -37,22 +37,29 @@ final class AsyncSequences_FailTests: XCTestCase { } func testFail_returns_an_asyncSequence_that_finishes_without_error_when_task_is_cancelled() { - let exp = expectation(description: "AsyncSequence finishes when task is cancelled") + let taskHasBeenCancelledExpectation = expectation(description: "The task has been cancelled") + let sequenceHasFinishedExpectation = expectation(description: "The async sequence has finished") let failSequence = AsyncSequences.Fail(error: MockError(code: 1)) - Task { + let task = Task { do { - for try await _ in failSequence { + var iterator = failSequence.makeAsyncIterator() + wait(for: [taskHasBeenCancelledExpectation], timeout: 1) + while let _ = try await iterator.next() { XCTFail("The AsyncSequence should not output elements") } - exp.fulfill() + sequenceHasFinishedExpectation.fulfill() } catch { XCTFail("The AsyncSequence should not throw an error") } - }.cancel() + } + + task.cancel() + + taskHasBeenCancelledExpectation.fulfill() - waitForExpectations(timeout: 1) + wait(for: [sequenceHasFinishedExpectation], timeout: 1) } } diff --git a/Tests/AsyncSequences/AsyncSequences+FromTests.swift b/Tests/AsyncSequences/AsyncSequences+FromTests.swift index 3bdcb5f..07d2066 100644 --- a/Tests/AsyncSequences/AsyncSequences+FromTests.swift +++ b/Tests/AsyncSequences/AsyncSequences+FromTests.swift @@ -16,14 +16,14 @@ final class AsyncSequences_FromTests: XCTestCase { let sut = AsyncSequences.From(sequence) - for await element in sut { + for try await element in sut { receivedResult.append(element) } XCTAssertEqual(receivedResult, sequence) } - func testFrom_returns_an_asyncSequence_that_finishes_when_task_is_cancelled() { + func testFrom_returns_an_asyncSequence_that_finishes_when_task_is_cancelled() throws { let canCancelExpectation = expectation(description: "The first element has been emitted") let hasCancelExceptation = expectation(description: "The task has been cancelled") @@ -33,7 +33,7 @@ final class AsyncSequences_FromTests: XCTestCase { let task = Task { var firstElement: Int? - for await element in sut { + for try await element in sut { firstElement = element canCancelExpectation.fulfill() wait(for: [hasCancelExceptation], timeout: 5) diff --git a/Tests/AsyncSequences/AsyncSequences+MergeTests.swift b/Tests/AsyncSequences/AsyncSequences+MergeTests.swift index b875300..6a98041 100644 --- a/Tests/AsyncSequences/AsyncSequences+MergeTests.swift +++ b/Tests/AsyncSequences/AsyncSequences+MergeTests.swift @@ -1,6 +1,6 @@ // // AsyncSequences+Merge.swift -// +// // // Created by Thibault Wittemberg on 01/01/2022. // @@ -13,22 +13,22 @@ private struct MockError: Error, Equatable { let code: Int } -private struct TimedAsyncSequence: AsyncSequence, AsyncIteratorProtocol { - typealias Element = Int +private struct TimedAsyncSequence: AsyncSequence, AsyncIteratorProtocol { + typealias Element = Element typealias AsyncIterator = TimedAsyncSequence private let intervalInMills: [UInt64] - private var iterator: Array.Iterator + private var iterator: Array.Iterator private var index = 0 private let indexOfError: Int? - init(intervalInMills: [UInt64], sequence: [Int], indexOfError: Int? = nil) { + init(intervalInMills: [UInt64], sequence: [Element], indexOfError: Int? = nil) { self.intervalInMills = intervalInMills self.iterator = sequence.makeIterator() self.indexOfError = indexOfError } - mutating func next() async throws -> Int? { + mutating func next() async throws -> Element? { if let indexOfError = self.indexOfError, self.index == indexOfError { throw MockError(code: 1) @@ -47,7 +47,31 @@ private struct TimedAsyncSequence: AsyncSequence, AsyncIteratorProtocol { } final class AsyncSequences_MergeTests: XCTestCase { - func testMerge_merges_sequences_according_to_the_timeline() { + func testMerge_merges_sequences_according_to_the_timeline_using_asyncSequences() async throws { + // -- 0 ------------------------------- 1200 --------------------------- + // ------- 300 ------------- 900 ------------------------------ 1800 --- + // --------------- 600 --------------------------- 1500 ---------------- + // -- a --- c ----- f ------- d --------- b -------- g ---------- e ---- + // + // output should be: a c f d b g e + let expectedElements = ["a", "c", "f", "d", "b", "g", "e"] + + let asyncSequence1 = TimedAsyncSequence(intervalInMills: [0, 1200], sequence: ["a", "b"]) + let asyncSequence2 = TimedAsyncSequence(intervalInMills: [300, 600, 900], sequence: ["c", "d", "e"]) + let asyncSequence3 = TimedAsyncSequence(intervalInMills: [600, 1100], sequence: ["f", "g"]) + + let sut = AsyncSequences.Merge(asyncSequence1, asyncSequence2, asyncSequence3) + + var receivedElements = [String]() + for try await element in sut { + try await Task.sleep(nanoseconds: 110_000_000) + receivedElements.append(element) + } + + XCTAssertEqual(receivedElements, expectedElements) + } + + func testMerge_merges_sequences_according_to_the_timeline_using_streams() { let readyToBeIteratedExpectation = expectation(description: "The merged sequence is ready to be iterated") let canSend2Expectation = expectation(description: "2 can be sent") let canSend3Expectation = expectation(description: "3 can be sent") @@ -92,29 +116,29 @@ final class AsyncSequences_MergeTests: XCTestCase { wait(for: [readyToBeIteratedExpectation], timeout: 1) - asyncSequence1.send(1) + asyncSequence1.nonBlockingSend(1) wait(for: [canSend2Expectation], timeout: 1) - asyncSequence2.send(2) + asyncSequence2.nonBlockingSend(2) wait(for: [canSend3Expectation], timeout: 1) - asyncSequence3.send(3) + asyncSequence3.nonBlockingSend(3) wait(for: [canSend4Expectation], timeout: 1) - asyncSequence3.send(4) + asyncSequence3.nonBlockingSend(4) wait(for: [canSend5Expectation], timeout: 1) - asyncSequence2.send(5) + asyncSequence2.nonBlockingSend(5) wait(for: [canSend6Expectation], timeout: 1) - asyncSequence1.send(6) - asyncSequence1.send(termination: .finished) - asyncSequence2.send(termination: .finished) - asyncSequence3.send(termination: .finished) + asyncSequence1.nonBlockingSend(6) + asyncSequence1.nonBlockingSend(termination: .finished) + asyncSequence2.nonBlockingSend(termination: .finished) + asyncSequence3.nonBlockingSend(termination: .finished) wait(for: [mergedSequenceIsFinisedExpectation], timeout: 1) } - + func testMerge_returns_empty_sequence_when_all_sequences_are_empty() async throws { var receivedResult = [Int]() @@ -181,13 +205,13 @@ final class AsyncSequences_MergeTests: XCTestCase { wait(for: [readyToBeIteratedExpectation], timeout: 1) - asyncSequence1.send(1) + asyncSequence1.nonBlockingSend(1) wait(for: [canSend2Expectation], timeout: 1) - asyncSequence2.send(2) + asyncSequence2.nonBlockingSend(2) wait(for: [canSend3Expectation], timeout: 1) - asyncSequence1.send(termination: .failure(MockError(code: 1))) + asyncSequence1.nonBlockingSend(termination: .failure(MockError(code: 1))) wait(for: [mergedSequenceIsFinisedExpectation], timeout: 1) } diff --git a/Tests/AsyncSequences/AsyncSequences+ZipTests.swift b/Tests/AsyncSequences/AsyncSequences+ZipTests.swift index 59e0e92..ce0b339 100644 --- a/Tests/AsyncSequences/AsyncSequences+ZipTests.swift +++ b/Tests/AsyncSequences/AsyncSequences+ZipTests.swift @@ -5,7 +5,7 @@ // Created by Thibault Wittemberg on 14/01/2022. // -import AsyncExtensions +@testable import AsyncExtensions import XCTest private struct TimedAsyncSequence: AsyncSequence, AsyncIteratorProtocol { @@ -288,3 +288,58 @@ final class AsyncSequences_ZipTests: XCTestCase { wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished } } + +// MARK: unit tests for AsyncIteratorByRef +extension AsyncSequences_ZipTests { + func testAsyncIteratorByRef_forwards_elements_from_input_iterator() async throws { + let baseSequence = [1, 2, 3].asyncElements + let sut = AsyncIteratorByRef(iterator: baseSequence.makeAsyncIterator()) + + var receivedElements = [Int]() + while let element = try await sut.next() { + receivedElements.append(element) + } + XCTAssertEqual(receivedElements, [1, 2, 3]) + } + + func testAsyncIteratorByRef_forwards_errors_from_input_iterator() async throws { + let mockError = MockError(count: Int.random(in: 0...100)) + + let baseSequence = AsyncSequences.Fail(error: mockError) + let sut = AsyncIteratorByRef(iterator: baseSequence.makeAsyncIterator()) + + do { + while let _ = try await sut.next() {} + } catch { + XCTAssertEqual(error as? MockError, mockError) + } + } + + func testAsyncIteratorByRef_finishes_when_task_is_cancelled() { + let canCancelExpectation = expectation(description: "The first element has been emitted") + let hasCancelExceptation = expectation(description: "The task has been cancelled") + let taskHasFinishedExpectation = expectation(description: "The task has finished") + + let baseSequence = [1, 2, 3, 4, 5].asyncElements + let sut = AsyncIteratorByRef(iterator: baseSequence.makeAsyncIterator()) + + let task = Task { + var firstElement: Int? + while let element = try await sut.next() { + firstElement = element + canCancelExpectation.fulfill() + wait(for: [hasCancelExceptation], timeout: 5) + } + XCTAssertEqual(firstElement, 1) + taskHasFinishedExpectation.fulfill() + } + + wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task + + task.cancel() + + hasCancelExceptation.fulfill() // we can release the lock in the for loop + + wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished + } +} diff --git a/Tests/AsyncStreams/AsyncStreamTests.swift b/Tests/AsyncStreams/AsyncStreamTests.swift index c0c61df..94f9596 100644 --- a/Tests/AsyncStreams/AsyncStreamTests.swift +++ b/Tests/AsyncStreams/AsyncStreamTests.swift @@ -10,35 +10,40 @@ import XCTest // MARK: tests for AsyncStreams.Continuations final class AsyncStreamTests: XCTestCase { - func testContinuations_register_adds_the_continuation() { + func testContinuations_register_adds_the_continuation() async{ let continuationIsRegisteredExpectation = expectation(description: "Continuation is registered") let id = UUID() let sut = AsyncStreams.Continuations() _ = AsyncThrowingStream(String.self, bufferingPolicy: .unbounded) { continuation in - sut.register(continuation: continuation, forId: id) - continuationIsRegisteredExpectation.fulfill() + Task { + await sut.register(continuation: continuation, forId: id) + continuationIsRegisteredExpectation.fulfill() + } } - wait(for: [continuationIsRegisteredExpectation], timeout: 1) - let continuations = sut.continuations + await waitForExpectations(timeout: 1) + let continuations = await sut.continuations XCTAssertNotNil(continuations[id]) } - func testContinuations_send_yields_element_in_continuations() { + func testContinuations_send_yields_element_in_continuations() async{ let continuationIsRegisteredExpectation = expectation(description: "Continuation is registered") - let elementIsReceivedExpectation = expectation(description: "Element is received") let sut = AsyncStreams.Continuations() let asyncStream = AsyncThrowingStream(String.self, bufferingPolicy: .unbounded) { continuation in - sut.register(continuation: continuation, forId: UUID()) - continuationIsRegisteredExpectation.fulfill() + Task { + await sut.register(continuation: continuation, forId: UUID()) + continuationIsRegisteredExpectation.fulfill() + } } - wait(for: [continuationIsRegisteredExpectation], timeout: 1) + await waitForExpectations(timeout: 1) + + let elementIsReceivedExpectation = expectation(description: "Element is received") Task { var receivedElements = [String]() @@ -50,31 +55,34 @@ final class AsyncStreamTests: XCTestCase { elementIsReceivedExpectation.fulfill() } - sut.send("element") - sut.send(.finished) + await sut.send("element") + await sut.send(.finished) - wait(for: [elementIsReceivedExpectation], timeout: 1) + await waitForExpectations(timeout: 1) } - func testContinuations_unregister_removes_the_continuation() { + func testContinuations_unregister_removes_the_continuation() async { let continuationIsRegisteredExpectation = expectation(description: "Continuation is registered") let id = UUID() let sut = AsyncStreams.Continuations() _ = AsyncThrowingStream(String.self, bufferingPolicy: .unbounded) { continuation in - sut.register(continuation: continuation, forId: id) - continuationIsRegisteredExpectation.fulfill() + Task { + await sut.register(continuation: continuation, forId: id) + continuationIsRegisteredExpectation.fulfill() + } } - wait(for: [continuationIsRegisteredExpectation], timeout: 1) - let continuationsAfterRegister = sut.continuations + await waitForExpectations(timeout: 1) + + let continuationsAfterRegister = await sut.continuations XCTAssertNotNil(continuationsAfterRegister[id]) - sut.unregister(id: id) + await sut.unregister(id: id) - let continuationsAfterUnregister = sut.continuations + let continuationsAfterUnregister = await sut.continuations XCTAssertNil(continuationsAfterUnregister[id]) } @@ -84,17 +92,20 @@ final class AsyncStreamTests: XCTestCase { extension AsyncStreamTests { func testIterator_unregisters_continuation_when_cancelled() { let asyncStreamIsReadyToBeIteratedExpectation = expectation(description: "The AsyncStream can be iterated") - let taskCanBeCancelledExpectation = expectation(description: "The iterator task can be cancelled") - let taskHasBeenCancelledExpectation = expectation(description: "The iterator task has been cancelled") + let taskCanBeCancelledExpectation = expectation(description: "Task can be cancelled") + let taskHasBeenCancelledExpectation = expectation(description: "Task has been cancelled") + let continuationHasBeenUnregisteredContinuation = expectation(description: "Continuation has been unregistered") let clientId = UUID() let continuations = AsyncStreams.Continuations() let asyncStream = AsyncThrowingStream { continuation in - continuations.register(continuation: continuation, forId: clientId) - asyncStreamIsReadyToBeIteratedExpectation.fulfill() - (0...1000).forEach { element in - continuation.yield(element) + Task { + (0...100).forEach { element in + continuation.yield(element) + } + await continuations.register(continuation: continuation, forId: clientId) + asyncStreamIsReadyToBeIteratedExpectation.fulfill() } } @@ -102,6 +113,7 @@ extension AsyncStreamTests { let baseIterator = asyncStream.makeAsyncIterator() + let task = Task { var sut = AsyncStreams.Iterator( clientId: clientId, @@ -109,12 +121,12 @@ extension AsyncStreamTests { continuations: continuations ) - let count = continuations.continuations.count + let count = await continuations.continuations.count XCTAssertEqual(count, 1) try await withTaskCancellationHandler { while let element = try await sut.next() { - if element == 500 { + if element == 50 { taskCanBeCancelledExpectation.fulfill() } } @@ -129,8 +141,114 @@ extension AsyncStreamTests { wait(for: [taskHasBeenCancelledExpectation], timeout: 1) - let count = continuations.continuations.count + Task { + let count = await continuations.continuations.count + XCTAssertEqual(count, 0) + continuationHasBeenUnregisteredContinuation.fulfill() + } + + wait(for: [continuationHasBeenUnregisteredContinuation], timeout: 1) + } + + func testIterator_unregisters_continuation_when_cancelled_with_cancellationError() { + let asyncStreamIsReadyToBeIteratedExpectation = expectation(description: "The AsyncStream can be iterated") + let taskHasBeenCancelledExpectation = expectation(description: "Task has been cancelled") + let continuationHasBeenUnregisteredContinuation = expectation(description: "Continuation has been unregistered") + + let clientId = UUID() + let continuations = AsyncStreams.Continuations() + + let asyncStream = AsyncThrowingStream { continuation in + Task { + (0...100).forEach { element in + continuation.yield(element) + } + continuation.finish(throwing: CancellationError()) + await continuations.register(continuation: continuation, forId: clientId) + asyncStreamIsReadyToBeIteratedExpectation.fulfill() + } + } + + wait(for: [asyncStreamIsReadyToBeIteratedExpectation], timeout: 1) + + let baseIterator = asyncStream.makeAsyncIterator() + + + Task { + var sut = AsyncStreams.Iterator( + clientId: clientId, + baseIterator: baseIterator, + continuations: continuations + ) + + let count = await continuations.continuations.count + XCTAssertEqual(count, 1) + + do { + while let _ = try await sut.next() {} + } catch is CancellationError { + taskHasBeenCancelledExpectation.fulfill() + } catch {} + } + + wait(for: [taskHasBeenCancelledExpectation], timeout: 2) + + Task { + let count = await continuations.continuations.count + XCTAssertEqual(count, 0) + continuationHasBeenUnregisteredContinuation.fulfill() + } + + wait(for: [continuationHasBeenUnregisteredContinuation], timeout: 1) + } + + func testIterator_unregisters_continuation_when_cancelled_before_iterating() { + let asyncStreamIsReadyToBeIteratedExpectation = expectation(description: "The AsyncStream can be iterated") + let taskHasBeenCancelledExpectation = expectation(description: "Task has been cancelled") + let continuationHasBeenUnregisteredContinuation = expectation(description: "Continuation has been unregistered") + + let clientId = UUID() + let continuations = AsyncStreams.Continuations() + + let asyncStream = AsyncThrowingStream { continuation in + Task { + (0...100).forEach { element in + continuation.yield(element) + } + await continuations.register(continuation: continuation, forId: clientId) + asyncStreamIsReadyToBeIteratedExpectation.fulfill() + } + } + + wait(for: [asyncStreamIsReadyToBeIteratedExpectation], timeout: 1) + + let baseIterator = asyncStream.makeAsyncIterator() + + Task { + var sut = AsyncStreams.Iterator( + clientId: clientId, + baseIterator: baseIterator, + continuations: continuations + ) + + let count = await continuations.continuations.count + XCTAssertEqual(count, 1) + + try await withTaskCancellationHandler { + while let _ = try await sut.next() {} + } onCancel: { + taskHasBeenCancelledExpectation.fulfill() + } + }.cancel() + + wait(for: [taskHasBeenCancelledExpectation], timeout: 1) + + Task { + let count = await continuations.continuations.count + XCTAssertEqual(count, 0) + continuationHasBeenUnregisteredContinuation.fulfill() + } - XCTAssertEqual(count, 0) + wait(for: [continuationHasBeenUnregisteredContinuation], timeout: 1) } } diff --git a/Tests/AsyncStreams/AsyncStreams+CurrentValueTests.swift b/Tests/AsyncStreams/AsyncStreams+CurrentValueTests.swift index 26ef9dc..05a3cbd 100644 --- a/Tests/AsyncStreams/AsyncStreams+CurrentValueTests.swift +++ b/Tests/AsyncStreams/AsyncStreams+CurrentValueTests.swift @@ -1,6 +1,6 @@ // // AsyncStreams+CurrentValueTests.swift -// +// // // Created by Thibault Wittemberg on 10/01/2022. // @@ -16,13 +16,13 @@ final class AsyncStreams_CurrentValueTests: XCTestCase { func testCurrentValue_stores_element_when_init() async { let value = Int.random(in: 0...100) let sut = AsyncStreams.CurrentValue(value) - let element = sut.element + let element = await sut.element XCTAssertEqual(value, element) } func testCurrentValue_replays_element_when_new_loop() { - let exp = expectation(description: "Current value is replayed when new loop") - exp.expectedFulfillmentCount = 2 + let currentValueHasBeenReplayedExpectation = expectation(description: "Current value is replayed when new loop") + currentValueHasBeenReplayedExpectation.expectedFulfillmentCount = 2 let expectedResult = [1] @@ -35,7 +35,7 @@ final class AsyncStreams_CurrentValueTests: XCTestCase { receivedElements.append(element) if element == 1 { XCTAssertEqual(receivedElements, expectedResult) - exp.fulfill() + currentValueHasBeenReplayedExpectation.fulfill() } } } @@ -47,7 +47,7 @@ final class AsyncStreams_CurrentValueTests: XCTestCase { receivedElements.append(element) if element == 1 { XCTAssertEqual(receivedElements, expectedResult) - exp.fulfill() + currentValueHasBeenReplayedExpectation.fulfill() } } } @@ -98,58 +98,11 @@ final class AsyncStreams_CurrentValueTests: XCTestCase { wait(for: [hasReceivedOneElementExpectation], timeout: 1) - sut.send(2) - sut.send(3) - - wait(for: [hasReceivedSentElementsExpectation], timeout: 1) - } - - func testSend_pushes_values_in_the_asyncSequence_when_directly_setting_the_value() { - let hasReceivedOneElementExpectation = expectation(description: "One element has been iterated in the async sequence") - hasReceivedOneElementExpectation.expectedFulfillmentCount = 2 - - let hasReceivedSentElementsExpectation = expectation(description: "Send pushes elements in created AsyncSequences") - hasReceivedSentElementsExpectation.expectedFulfillmentCount = 2 - - let expectedResult = [1, 2, 3] - - let sut = AsyncStreams.CurrentValue(1) - Task { - var receivedElements = [Int]() - - for try await element in sut { - if element == 1 { - hasReceivedOneElementExpectation.fulfill() - } - receivedElements.append(element) - if element == 3 { - XCTAssertEqual(receivedElements, expectedResult) - hasReceivedSentElementsExpectation.fulfill() - } - } + await sut.send(2) + await sut.send(3) } - Task { - var receivedElements = [Int]() - - for try await element in sut { - if element == 1 { - hasReceivedOneElementExpectation.fulfill() - } - receivedElements.append(element) - if element == 3 { - XCTAssertEqual(receivedElements, expectedResult) - hasReceivedSentElementsExpectation.fulfill() - } - } - } - - wait(for: [hasReceivedOneElementExpectation], timeout: 1) - - sut.element = 2 - sut.element = 3 - wait(for: [hasReceivedSentElementsExpectation], timeout: 1) } @@ -182,7 +135,7 @@ final class AsyncStreams_CurrentValueTests: XCTestCase { wait(for: [hasReceivedOneElementExpectation], timeout: 1) - sut.send(termination: .finished) + sut.nonBlockingSend(termination: .finished) wait(for: [hasFinishedExpectation], timeout: 1) } @@ -226,7 +179,7 @@ final class AsyncStreams_CurrentValueTests: XCTestCase { wait(for: [hasReceivedOneElementExpectation], timeout: 1) - sut.send(termination: .failure(expectedError)) + sut.nonBlockingSend(termination: .failure(expectedError)) wait(for: [hasFinishedWithFailureExpectation], timeout: 1) } @@ -293,21 +246,23 @@ final class AsyncStreams_CurrentValueTests: XCTestCase { // concurrently push values in the sut 1 let task1 = Task { for index in (1...1000) { - sut.send(index) + await sut.send(index) } } // concurrently push values in the sut 2 let task2 = Task { for index in (1001...2000) { - sut.send(index) + await sut.send(index) } } await task1.value await task2.value - sut.send(termination: .finished) + Task { + await sut.send(termination: .finished) + } let receivedElementsA = try await taskA.value let receivedElementsB = try await taskB.value diff --git a/Tests/AsyncStreams/AsyncStreams+PassthroughTests.swift b/Tests/AsyncStreams/AsyncStreams+PassthroughTests.swift index 7160f3e..7bc3716 100644 --- a/Tests/AsyncStreams/AsyncStreams+PassthroughTests.swift +++ b/Tests/AsyncStreams/AsyncStreams+PassthroughTests.swift @@ -54,9 +54,11 @@ final class AsyncStreams_PassthroughTests: XCTestCase { wait(for: [isReadyToBeIteratedExpectation], timeout: 1) - sut.send(1) - sut.send(2) - sut.send(3) + Task { + await sut.send(1) + await sut.send(2) + await sut.send(3) + } wait(for: [hasReceivedSentElementsExpectation], timeout: 1) } @@ -97,11 +99,11 @@ final class AsyncStreams_PassthroughTests: XCTestCase { wait(for: [isReadyToBeIteratedExpectation], timeout: 1) - sut.send(1) + sut.nonBlockingSend(1) wait(for: [hasReceivedOneElementExpectation], timeout: 1) - sut.send(termination: .finished) + sut.nonBlockingSend(termination: .finished) wait(for: [hasFinishedExpectation], timeout: 1) } @@ -152,11 +154,11 @@ final class AsyncStreams_PassthroughTests: XCTestCase { wait(for: [isReadyToBeIteratedExpectation], timeout: 1) - sut.send(1) + sut.nonBlockingSend(1) wait(for: [hasReceivedOneElementExpectation], timeout: 1) - sut.send(termination: .failure(expectedError)) + sut.nonBlockingSend(termination: .failure(expectedError)) wait(for: [hasFinishedWithFailureExpectation], timeout: 1) } @@ -185,7 +187,7 @@ final class AsyncStreams_PassthroughTests: XCTestCase { wait(for: [isReadyToBeIteratedExpectation], timeout: 1) - sut.send(1) + sut.nonBlockingSend(1) wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task @@ -231,21 +233,21 @@ final class AsyncStreams_PassthroughTests: XCTestCase { // concurrently push values in the sut 1 let task1 = Task { for index in (0...1000) { - sut.send(index) + await sut.send(index) } } // concurrently push values in the sut 2 let task2 = Task { for index in (1001...2000) { - sut.send(index) + await sut.send(index) } } await task1.value await task2.value - sut.send(termination: .finished) + await sut.send(termination: .finished) let receivedElementsA = try await taskA.value let receivedElementsB = try await taskB.value diff --git a/Tests/AsyncStreams/AsyncStreams+ReplayTests.swift b/Tests/AsyncStreams/AsyncStreams+ReplayTests.swift index dd59b53..138fa95 100644 --- a/Tests/AsyncStreams/AsyncStreams+ReplayTests.swift +++ b/Tests/AsyncStreams/AsyncStreams+ReplayTests.swift @@ -15,25 +15,25 @@ private struct MockError: Error, Equatable { final class AsyncStreams_ReplayTests: XCTestCase { func testInit_sets_storage() async { let sut = AsyncStreams.Replay(bufferSize: 10) - let buffer = sut.storage.elements - let bufferSize = sut.storage.size + let buffer = await sut.storage.elements + let bufferSize = await sut.storage.size XCTAssertEqual(bufferSize, 10) XCTAssertTrue(buffer.isEmpty) } - func testSend_pushes_element_in_storage() { + func testSend_pushes_element_in_storage() async { let exp = expectation(description: "Send has stacked elements in the replay the buffer") exp.expectedFulfillmentCount = 2 let expectedResult = [2, 3, 4, 5, 6] let sut = AsyncStreams.Replay(bufferSize: 5) - sut.send(1) - sut.send(2) - sut.send(3) - sut.send(4) - sut.send(5) - sut.send(6) + await sut.send(1) + await sut.send(2) + await sut.send(3) + await sut.send(4) + await sut.send(5) + await sut.send(6) Task { var receivedElements = [Int]() @@ -59,7 +59,7 @@ final class AsyncStreams_ReplayTests: XCTestCase { } } - waitForExpectations(timeout: 0.5) + await waitForExpectations(timeout: 0.5) } func testSend_pushes_element_in_the_streams() { @@ -73,7 +73,9 @@ final class AsyncStreams_ReplayTests: XCTestCase { let sut = AsyncStreams.Replay(bufferSize: 5) - sut.send(1) + Task { + await sut.send(1) + } Task { var receivedElements = [Int]() @@ -107,8 +109,10 @@ final class AsyncStreams_ReplayTests: XCTestCase { wait(for: [hasReceivedOneElementExpectation], timeout: 1) - sut.send(2) - sut.send(3) + Task { + await sut.send(2) + await sut.send(3) + } wait(for: [hasReceivedSentElementsExpectation], timeout: 1) } @@ -120,9 +124,11 @@ final class AsyncStreams_ReplayTests: XCTestCase { let hasFinishedExpectation = expectation(description: "Send(.finished) finishes all created AsyncSequences") hasFinishedExpectation.expectedFulfillmentCount = 2 + let hasClearedExpectation = expectation(description: "All internal resources have been cleared") + let sut = AsyncStreams.Replay(bufferSize: 1) - sut.send(1) + sut.nonBlockingSend(1) Task { for try await element in sut { @@ -144,15 +150,20 @@ final class AsyncStreams_ReplayTests: XCTestCase { wait(for: [hasReceivedOneElementExpectation], timeout: 1) - sut.send(termination: .finished) + sut.nonBlockingSend(termination: .finished) wait(for: [hasFinishedExpectation], timeout: 1) - let buffer = sut.storage.buffer - let continuations = sut.continuations.continuations + Task { + let buffer = await sut.storage.buffer + let continuations = await sut.continuations.continuations - XCTAssertTrue(buffer.isEmpty) - XCTAssertTrue(continuations.isEmpty) + XCTAssertTrue(buffer.isEmpty) + XCTAssertTrue(continuations.isEmpty) + hasClearedExpectation.fulfill() + } + + wait(for: [hasClearedExpectation], timeout: 1) } func testSendFailure_ends_the_streams_with_an_error_and_clear_the_storage() { @@ -162,11 +173,15 @@ final class AsyncStreams_ReplayTests: XCTestCase { let hasFinishedWithFailureExpectation = expectation(description: "Send(.failure) finishes all created AsyncSequences with error") hasFinishedWithFailureExpectation.expectedFulfillmentCount = 2 + let hasClearedExpectation = expectation(description: "All internal resources have been cleared") + let expectedError = MockError(code: Int.random(in: 0...100)) let sut = AsyncStreams.Replay(bufferSize: 1) - sut.send(1) + Task { + await sut.send(1) + } Task { do { @@ -196,15 +211,20 @@ final class AsyncStreams_ReplayTests: XCTestCase { wait(for: [hasReceivedOneElementExpectation], timeout: 1) - sut.send(termination: .failure(expectedError)) + sut.nonBlockingSend(termination: .failure(expectedError)) wait(for: [hasFinishedWithFailureExpectation], timeout: 1) - let buffer = sut.storage.buffer - let continuations = sut.continuations.continuations + Task { + let buffer = await sut.storage.buffer + let continuations = await sut.continuations.continuations - XCTAssertTrue(buffer.isEmpty) - XCTAssertTrue(continuations.isEmpty) + XCTAssertTrue(buffer.isEmpty) + XCTAssertTrue(continuations.isEmpty) + hasClearedExpectation.fulfill() + } + + wait(for: [hasClearedExpectation], timeout: 1) } func testReplay_finishes_when_task_is_cancelled() { @@ -213,7 +233,7 @@ final class AsyncStreams_ReplayTests: XCTestCase { let taskHasFinishedExpectation = expectation(description: "The task has finished") let sut = AsyncStreams.Replay(bufferSize: 1) - sut.send(1) + sut.nonBlockingSend(1) let task = Task { var firstElement: Int? @@ -270,21 +290,21 @@ final class AsyncStreams_ReplayTests: XCTestCase { // concurrently push values in the sut 1 let task1 = Task { for index in (0...1000) { - sut.send(index) + await sut.send(index) } } // concurrently push values in the sut 2 let task2 = Task { for index in (1001...2000) { - sut.send(index) + await sut.send(index) } } await task1.value await task2.value - sut.send(termination: .finished) + sut.nonBlockingSend(termination: .finished) let receivedElementsA = try await taskA.value let receivedElementsB = try await taskB.value diff --git a/Tests/Common/SharedAsyncIteratorTests.swift b/Tests/Internal/AsyncIteratorByRefTests.swift similarity index 75% rename from Tests/Common/SharedAsyncIteratorTests.swift rename to Tests/Internal/AsyncIteratorByRefTests.swift index 841739e..c1363e0 100644 --- a/Tests/Common/SharedAsyncIteratorTests.swift +++ b/Tests/Internal/AsyncIteratorByRefTests.swift @@ -1,5 +1,5 @@ // -// SharedAsyncIteratorTests.swift +// AsyncIteratorByRefTests.swift // // // Created by Thibault Wittemberg on 23/01/2022. @@ -12,10 +12,10 @@ private struct MockError: Error, Equatable { let code: Int } -final class SharedAsyncIteratorTests: XCTestCase { - func testSharedAsyncIterator_forwards_elements_from_input_iterator() async throws { +final class AsyncIteratorByRefTests: XCTestCase { + func testAsyncIteratorByRef_forwards_elements_from_input_iterator() async throws { let baseSequence = [1, 2, 3].asyncElements - let sut = SharedAsyncIterator(iterator: baseSequence.makeAsyncIterator()) + let sut = AsyncIteratorByRef(iterator: baseSequence.makeAsyncIterator()) var receivedElements = [Int]() while let element = try await sut.next() { @@ -24,11 +24,11 @@ final class SharedAsyncIteratorTests: XCTestCase { XCTAssertEqual(receivedElements, [1, 2, 3]) } - func testSharedAsyncIterator_forwards_errors_from_input_iterator() async throws { + func testAsyncIteratorByRef_forwards_errors_from_input_iterator() async throws { let mockError = MockError(code: Int.random(in: 0...100)) let baseSequence = AsyncSequences.Fail(error: mockError) - let sut = SharedAsyncIterator(iterator: baseSequence.makeAsyncIterator()) + let sut = AsyncIteratorByRef(iterator: baseSequence.makeAsyncIterator()) do { while let _ = try await sut.next() {} @@ -37,13 +37,13 @@ final class SharedAsyncIteratorTests: XCTestCase { } } - func testSharedAsyncIterator_finishes_when_task_is_cancelled() { + func testAsyncIteratorByRef_finishes_when_task_is_cancelled() { let canCancelExpectation = expectation(description: "The first element has been emitted") let hasCancelExceptation = expectation(description: "The task has been cancelled") let taskHasFinishedExpectation = expectation(description: "The task has finished") let baseSequence = [1, 2, 3, 4, 5].asyncElements - let sut = SharedAsyncIterator(iterator: baseSequence.makeAsyncIterator()) + let sut = AsyncIteratorByRef(iterator: baseSequence.makeAsyncIterator()) let task = Task { var firstElement: Int? diff --git a/Tests/Internal/SharedAsyncIteratorTests.swift b/Tests/Internal/SharedAsyncIteratorTests.swift new file mode 100644 index 0000000..ef52ab1 --- /dev/null +++ b/Tests/Internal/SharedAsyncIteratorTests.swift @@ -0,0 +1,157 @@ +// +// SharedAsyncIteratorTests.swift +// +// +// Created by Thibault Wittemberg on 08/02/2022. +// + +@testable import AsyncExtensions +import XCTest + +private actor Spy { + var storage = [Element]() + + func record(_ element: Element) { + self.storage.append(element) + } +} + +final class SharedAsyncIteratorTests: XCTestCase {} + +// MARK: tests for NonBlockingGate +extension SharedAsyncIteratorTests { + func testNonBlockingGate_inits_isLockable_to_true() async { + let sut = NonBlockingGate() + let isLockable = await sut.isLockable + XCTAssertTrue(isLockable) + } + + func testNonBlockingGate_lock_sets_isLockable_to_false_and_returns_true() async { + let sut = NonBlockingGate() + let wasLockable = await sut.lock() + let isLockable = await sut.isLockable + XCTAssertTrue(wasLockable) + XCTAssertFalse(isLockable) + } + + func testNonBlockingGate_unlock_sets_isLockable_to_true() async { + let sut = NonBlockingGate() + _ = await sut.lock() + let isLockableBefore = await sut.isLockable + XCTAssertFalse(isLockableBefore) + _ = await sut.unlock() + let isLockableAfter = await sut.isLockable + XCTAssertTrue(isLockableAfter) + } +} + +// MARK: tests for FinishState +extension SharedAsyncIteratorTests { + func testFinishState_inits_finished_to_false() async { + let sut = FinishState() + let isFinished = await sut.finished + XCTAssertFalse(isFinished) + } + + func testFinishState_setFinished_sets_finished_to_true() async { + let sut = FinishState() + await sut.setFinished() + let isFinished = await sut.finished + XCTAssertTrue(isFinished) + } + + func testFinishState_isFinished_gets_finished() async { + let sut = FinishState() + let isFinishedA = await sut.isFinished() + let isFinishedB = await sut.finished + XCTAssertEqual(isFinishedA, isFinishedB) + } +} + +// MARK: tests for SharedAsyncIterator +extension SharedAsyncIteratorTests { + func testSharedAsyncIterator_returns_notAvailable_when_already_processing_next() async { + let tasksHaveRecordedTheIteratorNextElementExpectation = expectation(description: "Tasks have received an element from shared iterator") + tasksHaveRecordedTheIteratorNextElementExpectation.expectedFulfillmentCount = 2 + + let asyncSequence = AsyncSequences.From([1], interval: .milliSeconds(500)) + let spy = Spy>() + + let sut = SharedAsyncIterator(iterator: asyncSequence.makeAsyncIterator()) + + Task { + let next = try await sut.next() + await spy.record(next!) + tasksHaveRecordedTheIteratorNextElementExpectation.fulfill() + } + + Task { + let next = try await sut.next() + await spy.record(next!) + tasksHaveRecordedTheIteratorNextElementExpectation.fulfill() + } + + await waitForExpectations(timeout: 1) + + let recorded = await spy.storage + XCTAssertEqual(recorded.count, 2) + + if case .notAvailable = recorded[0] { + XCTAssertEqual(recorded[1], .value(1)) + } + + if case .notAvailable = recorded[1] { + XCTAssertEqual(recorded[0], .value(1)) + } + } + + func testSharedAsyncIterator_propagates_finish() async throws { + let asyncSequence = AsyncSequences.Empty() + let sut = SharedAsyncIterator(iterator: asyncSequence.makeAsyncIterator()) + + _ = try await sut.next() + let isFinishedA = await sut.isFinished() + XCTAssertTrue(isFinishedA) + + _ = try await sut.next() + let isFinishedB = await sut.isFinished() + XCTAssertTrue(isFinishedB) + } + + func testSharedAsyncIterator_unlocks_gate_when_next_element() async throws { + let asyncSequence = AsyncSequences.Empty() + let sut = SharedAsyncIterator(iterator: asyncSequence.makeAsyncIterator()) + + _ = try await sut.next() + let isLockable = await sut.gate.isLockable + XCTAssertTrue(isLockable) + } + + func testSharedAsyncIterator_finishes_when_task_is_cancelled() { + let canCancelExpectation = expectation(description: "The first element has been emitted") + let hasCancelExceptation = expectation(description: "The task has been cancelled") + let taskHasFinishedExpectation = expectation(description: "The task has finished") + + let asyncSequence = AsyncSequences.From([1, 2]) + let sut = SharedAsyncIterator(iterator: asyncSequence.makeAsyncIterator()) + + let task = Task { + var firstElement: SharedElement? + while let element = try await sut.next() { + firstElement = element + canCancelExpectation.fulfill() + wait(for: [hasCancelExceptation], timeout: 5) + } + XCTAssertEqual(firstElement, .value(1)) + taskHasFinishedExpectation.fulfill() + } + + wait(for: [canCancelExpectation], timeout: 5) // one element has been emitted, we can cancel the task + + task.cancel() + + hasCancelExceptation.fulfill() // we can release the lock in the for loop + + wait(for: [taskHasFinishedExpectation], timeout: 5) // task has been cancelled and has finished + } +} diff --git a/Tests/Operators/AsyncSequence+FlatMapLatestTests.swift b/Tests/Operators/AsyncSequence+FlatMapLatestTests.swift index 1b9036d..a3fefa5 100644 --- a/Tests/Operators/AsyncSequence+FlatMapLatestTests.swift +++ b/Tests/Operators/AsyncSequence+FlatMapLatestTests.swift @@ -59,60 +59,84 @@ private struct LongAsyncSequence: AsyncSequence, AsyncIteratorProtocol { } } -final class AsyncSequence_FlatMapLatestTests: XCTestCase { - func testFlatMapLatest_switches_to_latest_asyncSequence_and_cancels_previous_ones() { - let firstChildSequenceHasEmittedTwoElementsExpectation = expectation(description: "The first child sequence has emitted two elements") - let secondChildSequenceHasEmittedOneElementsExpectation = expectation(description: "The second child sequence has emitted one element") - let thirdChildSequenceHasEmittedTwoElementsExpectation = expectation(description: "The third child sequence has emitted two elements") - - let firstChildSequenceIsCancelled = expectation(description: "First child sequence has been cancelled") - let secondChildSequenceIsCancelled = expectation(description: "Second child sequence has been cancelled") - - let baseSequence = AsyncStreams.CurrentValue(1) +private struct TimedAsyncSequence: AsyncSequence, AsyncIteratorProtocol { + typealias Element = Element + typealias AsyncIterator = TimedAsyncSequence + + private let intervalInMills: [UInt64] + private var iterator: Array.Iterator + private var index = 0 + private let indexOfError: Int? + + init(intervalInMills: [UInt64], sequence: [Element], indexOfError: Int? = nil) { + self.intervalInMills = intervalInMills + self.iterator = sequence.makeIterator() + self.indexOfError = indexOfError + } - let sut = baseSequence.flatMapLatest { element -> AnyAsyncSequence in - let childElement1 = (element * 10) + 1 - let childElement2 = (element * 10) + 2 - let childElement3 = (element * 10) + 3 + mutating func next() async throws -> Element? { - return AsyncSequences - .From([childElement1, childElement2, childElement3], interval: .milliSeconds(100)) - .eraseToAnyAsyncSequence() + if let indexOfError = self.indexOfError, self.index == indexOfError { + throw MockError(code: 1) } - Task { - var receivedElements = [Int]() - for try await element in sut { - receivedElements.append(element) - - if element == 12 { - firstChildSequenceHasEmittedTwoElementsExpectation.fulfill() - wait(for: [firstChildSequenceIsCancelled], timeout: 1) - } + if self.index < self.intervalInMills.count { + try await Task.sleep(nanoseconds: self.intervalInMills[index] * 1_000_000) + self.index += 1 + } + return self.iterator.next() + } - if element == 21 { - secondChildSequenceHasEmittedOneElementsExpectation.fulfill() - wait(for: [secondChildSequenceIsCancelled], timeout: 1) - } + func makeAsyncIterator() -> AsyncIterator { + self + } +} - if element == 32 { - XCTAssertEqual(receivedElements, [11, 12, 21, 31, 32]) - thirdChildSequenceHasEmittedTwoElementsExpectation.fulfill() - } +final class AsyncSequence_FlatMapLatestTests: XCTestCase { + func testFlatMapLatest_emits_elements_from_newest_sequence_and_cancels_previous_sequences() async throws { + // ---- 50 ------- 500 ---------- 1000 ------------------------------- + // -------- 70 ------------ 750 -------------------------------------- + // -------------------- 520 ---------------------------------- 1520 -- + // ------------------------------------ 1100 -- 1200 -- 1300 --------- + // ---- 1 -- a ---- 2 -- c ------- 3 --- e ----- f ------ g ---------- + // + // output should be: a c e f g + // childAsyncSequence1 and childAsyncSequence2 will be cancelled + let expectedElements = ["a", "c", "e", "f", "g"] + var childAsyncSequence1Cancelled = false + var childAsyncSequence2Cancelled = false + var childAsyncSequence3Cancelled = false + + let rootAsyncSequence = TimedAsyncSequence(intervalInMills: [50, 450, 500], sequence: [1, 2, 3]) + + let childAsyncSequence1 = TimedAsyncSequence(intervalInMills: [20, 680], sequence: ["a", "b"]) + .handleEvents(onCancel: { childAsyncSequence1Cancelled = true }) + + let childAsyncSequence2 = TimedAsyncSequence(intervalInMills: [20, 1500], sequence: ["c", "d"]) + .handleEvents(onCancel: { childAsyncSequence2Cancelled = true }) + + let childAsyncSequence3 = TimedAsyncSequence(intervalInMills: [100, 100, 100], sequence: ["e", "f", "g"]) + .handleEvents(onCancel: { childAsyncSequence3Cancelled = true }) + + let sut = rootAsyncSequence.flatMapLatest { element -> AsyncHandleEventsSequence> in + switch element { + case 1: return childAsyncSequence1 + case 2: return childAsyncSequence2 + default: return childAsyncSequence3 } } - wait(for: [firstChildSequenceHasEmittedTwoElementsExpectation], timeout: 1) - - baseSequence.send(2) - firstChildSequenceIsCancelled.fulfill() + var receivedElements = [String]() - wait(for: [secondChildSequenceHasEmittedOneElementsExpectation], timeout: 1) + for try await element in sut { + receivedElements.append(element) + } - baseSequence.send(3) - secondChildSequenceIsCancelled.fulfill() + XCTAssertEqual(receivedElements, expectedElements) + XCTAssertTrue(childAsyncSequence1Cancelled) + XCTAssertTrue(childAsyncSequence2Cancelled) + XCTAssertFalse(childAsyncSequence3Cancelled) - wait(for: [thirdChildSequenceHasEmittedTwoElementsExpectation], timeout: 1) } func testFlatMapLatest_propagates_errors_when_transform_function_fails() async { @@ -150,7 +174,7 @@ final class AsyncSequence_FlatMapLatestTests: XCTestCase { XCTAssertEqual(error as? MockError, expectedError) } } - + func testFlatMapLatest_finishes_when_task_is_cancelled_after_switched() { let canCancelExpectation = expectation(description: "The first element has been emitted") let hasCancelExceptation = expectation(description: "The task has been cancelled") diff --git a/Tests/Operators/AsyncSequence+HandleEventsTests.swift b/Tests/Operators/AsyncSequence+HandleEventsTests.swift index 6ece57a..2fcf6c3 100644 --- a/Tests/Operators/AsyncSequence+HandleEventsTests.swift +++ b/Tests/Operators/AsyncSequence+HandleEventsTests.swift @@ -1,6 +1,6 @@ // // AsyncSequence+HandleEventsTests.swift -// +// // // Created by Thibault Wittemberg on 01/01/2022. // diff --git a/Tests/Operators/AsyncSequence+SwitchToLatestTests.swift b/Tests/Operators/AsyncSequence+SwitchToLatestTests.swift index d0f939b..80a744a 100644 --- a/Tests/Operators/AsyncSequence+SwitchToLatestTests.swift +++ b/Tests/Operators/AsyncSequence+SwitchToLatestTests.swift @@ -58,7 +58,9 @@ final class AsyncSequence_SwitchToLatestTests: XCTestCase { let childAsyncSequence3 = LongAsyncSequence(elements: [9, 10, 11], interval: .milliSeconds(200), onCancel: { asyncSequence3IsCancelled = true }) .prepend(8) - let mainAsyncSequence = LongAsyncSequence(elements: [childAsyncSequence1, childAsyncSequence2, childAsyncSequence3], interval: .milliSeconds(30), onCancel: { print("Cancelled main") }) + let mainAsyncSequence = LongAsyncSequence(elements: [childAsyncSequence1, childAsyncSequence2, childAsyncSequence3], + interval: .milliSeconds(30), + onCancel: {}) let sut = mainAsyncSequence.switchToLatest()