Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions Sources/AsyncSequenceReader/AsyncBufferedIterator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public struct AsyncBufferedIterator<BaseIterator: AsyncIteratorProtocol>: AsyncI
///
/// - Note:Ideally, this should be implemented using some sort of cyclical buffer like a Deque, but in practice, it will only ever have one entry.
@usableFromInline
var unconsumedBuffer: [BaseIterator.Element] = []
var unconsumedBuffer: [Result<BaseIterator.Element, any Error>] = []

@usableFromInline
init(_ baseIterator: BaseIterator) {
Expand All @@ -32,7 +32,7 @@ public struct AsyncBufferedIterator<BaseIterator: AsyncIteratorProtocol>: AsyncI
@inlinable
public mutating func next(isolation actor: isolated (any Actor)? = #isolation) async rethrows -> BaseIterator.Element? {
guard unconsumedBuffer.isEmpty else {
return unconsumedBuffer.removeFirst()
return try unconsumedBuffer.removeFirst().get()
}

return try await baseIterator._nextIsolated()
Expand All @@ -41,26 +41,37 @@ public struct AsyncBufferedIterator<BaseIterator: AsyncIteratorProtocol>: AsyncI
/// Read ahead, and store the value for later, or throw if the base iterator also throws.
/// - Returns: The read-ahead value.
@usableFromInline
mutating func nextUnconsumed(isolation actor: isolated (any Actor)? = #isolation) async rethrows -> BaseIterator.Element? {
let next = try await baseIterator._nextIsolated()
if let value = next {
unconsumedBuffer.append(value)
mutating func nextUnconsumed(isolation actor: isolated (any Actor)? = #isolation) async -> Result<BaseIterator.Element, any Error>? {
/// Prevent a compiler crash catching a re-throwing result by casting it first.
func _preventCompilerCrashCacthingError(_ actor: isolated (any Actor)? = #isolation) async throws -> Element? {
try await baseIterator._nextIsolated()
}

return next
do {
let next = try await _preventCompilerCrashCacthingError()
if let value = next {
unconsumedBuffer.append(.success(value))
return .success(value)
}

return nil
} catch {
unconsumedBuffer.append(.failure(error))
return .failure(error)
}
}

/// Returns if the iterator has more elements to consume.
///
/// If it does, the iterator saves the elements, and will deliver them immediately on the next call to `next()`
/// - Returns: A Bool indicating if there is more to consume or not.
@inlinable
public mutating func hasMoreData(isolation actor: isolated (any Actor)? = #isolation) async rethrows -> Bool {
public mutating func hasMoreData(isolation actor: isolated (any Actor)? = #isolation) async -> Bool {
guard unconsumedBuffer.isEmpty else {
return true
}

return try await nextUnconsumed() != nil
return await nextUnconsumed() != nil
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/AsyncSequenceReader/AsyncIteratorMapSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ extension AsyncIteratorMapSequence: AsyncSequence {
/// This iterator calls `next()` on its (wrapped) base iterator, and stores the result; if this call returns `nil`, `next()` returns `nil`. Otherwise, `next()` returns the result of calling the transforming closure on the received element. If calling the transformation throws an error, the sequence ends and `next()` rethrows the error.
@inlinable
public mutating func next(isolation actor: isolated (any Actor)? = #isolation) async throws -> Transformed? {
guard !encounteredError, try await baseIterator.hasMoreData(isolation: actor) else {
guard !encounteredError, await baseIterator.hasMoreData(isolation: actor) else {
return nil
}
do {
Expand All @@ -220,7 +220,7 @@ extension AsyncIteratorMapSequence: AsyncSequence {
/// This iterator calls `next()` on its (wrapped) base iterator, and stores the result; if this call returns `nil`, `next()` returns `nil`. Otherwise, `next()` returns the result of calling the transforming closure on the received element.
@inlinable
public mutating func next(isolation actor: isolated (any Actor)? = #isolation) async rethrows -> Transformed? where TransformFailure == Never {
guard try await baseIterator.hasMoreData(isolation: actor) else {
guard await baseIterator.hasMoreData(isolation: actor) else {
return nil
}
#if compiler(>=6.2)
Expand Down
22 changes: 14 additions & 8 deletions Sources/AsyncSequenceReader/AsyncReadSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ extension AsyncIteratorProtocol {
/// - Parameter sequenceTransform: A transformation that accepts a sequence that can be read from, or stopped prematurely by returning `nil`. The receiving iterator will have moved forward by the same amount of items consumed within `sequenceTransform`.
/// - Parameter readSequenceFactory: A factory to create a suitable ``AsyncReadSequence`` that will determine the logical bounds of the transformation within the receiving iterator.
/// - Returns: A transformed value read from the iterator, or `nil` if there were no values left to read.
public mutating func transform<Transformed, ReadSequence: AsyncReadSequence>(
with sequenceTransform: sending (sending ReadSequence) async throws -> Transformed,
public mutating func transform<
Transformed, ReadSequence: AsyncReadSequence,
TransformFailure: Error
>(
with sequenceTransform: sending (sending ReadSequence) async throws(TransformFailure) -> Transformed,
readSequenceFactory: (inout AsyncBufferedIterator<Self>) -> ReadSequence
) async throws -> Transformed? where ReadSequence.BaseIterator == Self {
) async throws(TransformFailure) -> Transformed? where ReadSequence.BaseIterator == Self {
var results: Transformed? = nil
var wrappedIterator = AsyncBufferedIterator(self)
if try await wrappedIterator.hasMoreData() {
if await wrappedIterator.hasMoreData() {
nonisolated(unsafe) let readSequence = readSequenceFactory(&wrappedIterator)
results = try await sequenceTransform(readSequence)
wrappedIterator = readSequence.baseIterator
Expand All @@ -51,12 +54,15 @@ extension AsyncBufferedIterator {
/// - Parameter sequenceTransform: A transformation that accepts a sequence that can be read from, or stopped prematurely by returning `nil`. The receiving iterator will have moved forward by the same amount of items consumed within `sequenceTransform`.
/// - Parameter readSequenceFactory: A factory to create a suitable ``AsyncReadSequence`` that will determine the logical bounds of the transformation within the receiving iterator.
/// - Returns: A transformed value read from the iterator, or `nil` if there were no values left to read.
public mutating func transform<Transformed, ReadSequence: AsyncReadSequence>(
with sequenceTransform: sending (sending ReadSequence) async throws -> Transformed,
public mutating func transform<
Transformed, ReadSequence: AsyncReadSequence,
TransformFailure: Error
>(
with sequenceTransform: sending (sending ReadSequence) async throws(TransformFailure) -> Transformed,
readSequenceFactory: (inout Self) -> ReadSequence
) async throws -> Transformed? where ReadSequence.BaseIterator == BaseIterator {
) async throws(TransformFailure) -> Transformed? where ReadSequence.BaseIterator == BaseIterator {
var results: Transformed? = nil
if try await self.hasMoreData() {
if await self.hasMoreData() {
nonisolated(unsafe) let readSequence = readSequenceFactory(&self)
results = try await sequenceTransform(readSequence)
self = readSequence.baseIterator
Expand Down
23 changes: 13 additions & 10 deletions Sources/AsyncSequenceReader/AsyncReadUpToElementsSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,13 @@ extension AsyncIteratorProtocol {
/// - Parameter termination: The element marking the end of the sequence the `sequenceTransform` closure will have access to.
/// - Parameter sequenceTransform: A transformation that accepts a sequence containing elements up to the termination that can be read from, or stopped prematurely by returning early. The receiving iterator will have moved forward by the same amount of items consumed within `sequenceTransform`.
/// - Returns: A transformed value as returned by `sequenceTransform`, or `nil` if the sequence was already finished.
public mutating func collect<Transformed>(
public mutating func collect<
Transformed,
TransformFailure: Error
>(
upToIncluding termination: Element,
sequenceTransform: sending (AsyncReadUpToElementsSequence<Self, Array<Element>>) async throws -> Transformed
) async throws -> Transformed? where Element: Equatable {
sequenceTransform: sending (AsyncReadUpToElementsSequence<Self, Array<Element>>) async throws(TransformFailure) -> Transformed
) async throws(TransformFailure) -> Transformed? where Element: Equatable {
try await collect(upToIncluding: [termination], sequenceTransform: sequenceTransform)
}

Expand Down Expand Up @@ -221,17 +224,17 @@ extension AsyncIteratorProtocol {
/// - Returns: A transformed value as returned by `sequenceTransform`, or `nil` if the sequence was already finished.
public mutating func collect<
TerminationCollection: Collection<Element>,
Transformed
Transformed,
TransformFailure: Error
>(
upToIncluding termination: TerminationCollection,
sequenceTransform: sending (AsyncReadUpToElementsSequence<Self, TerminationCollection>) async throws -> Transformed
) async throws -> Transformed? where Element: Equatable {
sequenceTransform: sending (AsyncReadUpToElementsSequence<Self, TerminationCollection>) async throws(TransformFailure) -> Transformed
) async throws(TransformFailure) -> Transformed? where Element: Equatable {
try await transform(with: sequenceTransform) { .init($0, termination: termination) }
}
}

extension AsyncBufferedIterator {

extension AsyncBufferedIterator where Element: Equatable {
/// Collect elements into a sequence until the termination sequence is encountered, and transform it using the provided closure.
///
/// - Note: It is up to the caller to verify if the termination sequence was encountered or not, which can easily be done by checking `result.suffix(termination.count).elementsEqual(termination)`.
Expand Down Expand Up @@ -265,7 +268,7 @@ extension AsyncBufferedIterator {
public mutating func collect<Transformed>(
upToIncluding termination: Element,
sequenceTransform: sending (AsyncReadUpToElementsSequence<Self, Array<Element>>) async throws -> Transformed
) async throws -> Transformed? where Element: Equatable {
) async rethrows -> Transformed? {
try await collect(upToIncluding: [termination], sequenceTransform: sequenceTransform)
}

Expand Down Expand Up @@ -305,7 +308,7 @@ extension AsyncBufferedIterator {
>(
upToIncluding termination: TerminationCollection,
sequenceTransform: sending (AsyncReadUpToElementsSequence<BaseIterator, TerminationCollection>) async throws -> Transformed
) async throws -> Transformed? where Element: Equatable {
) async rethrows -> Transformed? {
try await transform(with: sequenceTransform) { .init($0, termination: termination) }
}
}
Expand Down
14 changes: 7 additions & 7 deletions Tests/AsyncSequenceReaderTests/AsyncBufferedIteratorTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ import Testing
#expect(try await bufferedIterator.next() == 2)
#expect(try await bufferedIterator.next() == 3)
#expect(try await bufferedIterator.next() == 4)
#expect(try await bufferedIterator.hasMoreData() == true)
#expect(try await bufferedIterator.hasMoreData() == true)
#expect(try await bufferedIterator.hasMoreData() == true)
#expect(await bufferedIterator.hasMoreData() == true)
#expect(await bufferedIterator.hasMoreData() == true)
#expect(await bufferedIterator.hasMoreData() == true)
#expect(try await bufferedIterator.next() == 5)
#expect(try await bufferedIterator.nonIsolatedNext() == 6)
#expect(try await bufferedIterator.next() == 7)
#expect(try await bufferedIterator.next() == 8)
#expect(try await bufferedIterator.hasMoreData() == true)
#expect(await bufferedIterator.hasMoreData() == true)
#expect(try await bufferedIterator.next() == 9)
#expect(try await bufferedIterator.hasMoreData() == false)
#expect(await bufferedIterator.hasMoreData() == false)
#expect(try await bufferedIterator.next() == nil)
#expect(try await bufferedIterator.hasMoreData() == false)
#expect(try await bufferedIterator.hasMoreData() == false)
#expect(await bufferedIterator.hasMoreData() == false)
#expect(await bufferedIterator.hasMoreData() == false)
#expect(try await bufferedIterator.next() == nil)
}
}
Loading
Loading