From 675b11dd5706e24705d69dfb6cdc4a7068a042e1 Mon Sep 17 00:00:00 2001 From: mtj0928 Date: Thu, 16 Oct 2025 10:02:01 +0900 Subject: [PATCH] Fixed a performance bug Fixed a bug where the iteration is not run enough concurrently when the number of elements is greater than the numbnerOfConcurrentTasks. --- README.md | 2 - .../AsyncSequence+AsyncForEach.swift | 2 +- .../Documentation.docc/Documentation.md | 2 - .../Sequence/InternalForEach.swift | 28 +++++++++---- .../Sequence/Sequence+AsyncCompactMap.swift | 2 +- .../Sequence/Sequence+AsyncFilter.swift | 2 +- .../Sequence/Sequence+AsyncFlatMap.swift | 2 +- .../Sequence/Sequence+AsyncForEach.swift | 2 +- .../Sequence/Sequence+AsyncMap.swift | 2 +- .../AsyncSequenceAsyncForEachTests.swift | 8 +++- .../Sequence/SequenceAsyncForEachTests.swift | 40 +++++++++++++++++-- .../Sequence/SequenceAsyncMapTests.swift | 2 +- .../Utils/ConcurrentTaskEvent.swift | 4 ++ 13 files changed, 74 insertions(+), 24 deletions(-) create mode 100644 Tests/AsyncOperationsTests/Utils/ConcurrentTaskEvent.swift diff --git a/README.md b/README.md index 9cbdc91..c655db0 100644 --- a/README.md +++ b/README.md @@ -171,8 +171,6 @@ let results = await withOrderedTaskGroup(of: Int.self) { group in print(results) // 😁 [0, 2, 4, 6, 8, 10] ``` -They are also used for async functions of `Sequence`. - ## Requirements Swift 5.10 or later. diff --git a/Sources/AsyncOperations/AsyncSequence/AsyncSequence+AsyncForEach.swift b/Sources/AsyncOperations/AsyncSequence/AsyncSequence+AsyncForEach.swift index cb3e09f..d6ac24d 100644 --- a/Sources/AsyncOperations/AsyncSequence/AsyncSequence+AsyncForEach.swift +++ b/Sources/AsyncOperations/AsyncSequence/AsyncSequence+AsyncForEach.swift @@ -32,7 +32,7 @@ extension AsyncSequence where Element: Sendable { priority: TaskPriority? = nil, _ body: @escaping @Sendable (Element) async throws -> Void ) async rethrows { - try await withThrowingOrderedTaskGroup(of: Void.self) { group in + try await withThrowingTaskGroup(of: Void.self) { group in var counter = 0 var asyncIterator = self.makeAsyncIterator() while let element = try await asyncIterator.next() { diff --git a/Sources/AsyncOperations/Documentation.docc/Documentation.md b/Sources/AsyncOperations/Documentation.docc/Documentation.md index 90d93ea..4c9b8b0 100644 --- a/Sources/AsyncOperations/Documentation.docc/Documentation.md +++ b/Sources/AsyncOperations/Documentation.docc/Documentation.md @@ -129,5 +129,3 @@ let results = await withOrderedTaskGroup(of: Int.self) { group in print(results) // 😁 [0, 2, 4, 6, 8, 10] ``` -They are also used for async functions of `Sequence`. - diff --git a/Sources/AsyncOperations/Sequence/InternalForEach.swift b/Sources/AsyncOperations/Sequence/InternalForEach.swift index 426a2ab..aab924c 100644 --- a/Sources/AsyncOperations/Sequence/InternalForEach.swift +++ b/Sources/AsyncOperations/Sequence/InternalForEach.swift @@ -1,24 +1,38 @@ extension Sequence where Element: Sendable { func internalForEach( - group: inout ThrowingOrderedTaskGroup, + group: inout ThrowingTaskGroup<(T, Int), any Error>, numberOfConcurrentTasks: UInt, priority: TaskPriority?, taskOperation: @escaping @Sendable (Element) async throws -> T, nextOperation: (T) -> () ) async throws { + var currentIndex = 0 + var results: [Int: T] = [:] + + func doNextOperationIfNeeded() { + while let result = results[currentIndex] { + let index = currentIndex + nextOperation(result) + currentIndex += 1 + results.removeValue(forKey: index) + } + } + for (index, element) in self.enumerated() { - if index >= numberOfConcurrentTasks { - if let value = try await group.next() { - nextOperation(value) + if numberOfConcurrentTasks <= index { + if let (value, index) = try await group.next() { + results[index] = value + doNextOperationIfNeeded() } } group.addTask(priority: priority) { - try await taskOperation(element) + try await (taskOperation(element), index) } } - for try await value in group { - nextOperation(value) + for try await (value, index) in group { + results[index] = value + doNextOperationIfNeeded() } } } diff --git a/Sources/AsyncOperations/Sequence/Sequence+AsyncCompactMap.swift b/Sources/AsyncOperations/Sequence/Sequence+AsyncCompactMap.swift index 53f5ef1..0cb5c08 100644 --- a/Sources/AsyncOperations/Sequence/Sequence+AsyncCompactMap.swift +++ b/Sources/AsyncOperations/Sequence/Sequence+AsyncCompactMap.swift @@ -10,7 +10,7 @@ extension Sequence where Element: Sendable { priority: TaskPriority? = nil, _ transform: @escaping @Sendable (Element) async throws -> T? ) async rethrows -> [T] { - try await withThrowingOrderedTaskGroup(of: T?.self) { group in + try await withThrowingTaskGroup(of: (T?, Int).self) { group in var values: [T] = [] try await internalForEach( diff --git a/Sources/AsyncOperations/Sequence/Sequence+AsyncFilter.swift b/Sources/AsyncOperations/Sequence/Sequence+AsyncFilter.swift index bd78a68..5ae3dca 100644 --- a/Sources/AsyncOperations/Sequence/Sequence+AsyncFilter.swift +++ b/Sources/AsyncOperations/Sequence/Sequence+AsyncFilter.swift @@ -10,7 +10,7 @@ extension Sequence where Element: Sendable { priority: TaskPriority? = nil, _ isIncluded: @escaping @Sendable (Element) async throws -> Bool ) async rethrows -> [Element] { - try await withThrowingOrderedTaskGroup(of: Element?.self) { group in + try await withThrowingTaskGroup(of: (Element?, Int).self) { group in var values: [Element] = [] try await internalForEach( diff --git a/Sources/AsyncOperations/Sequence/Sequence+AsyncFlatMap.swift b/Sources/AsyncOperations/Sequence/Sequence+AsyncFlatMap.swift index f5fdb5f..d081eb3 100644 --- a/Sources/AsyncOperations/Sequence/Sequence+AsyncFlatMap.swift +++ b/Sources/AsyncOperations/Sequence/Sequence+AsyncFlatMap.swift @@ -10,7 +10,7 @@ extension Sequence where Element: Sendable { priority: TaskPriority? = nil, _ transform: @escaping @Sendable (Element) async throws -> [T] ) async rethrows -> [T] { - try await withThrowingOrderedTaskGroup(of: [T].self) { group in + try await withThrowingTaskGroup(of: ([T], Int).self) { group in var values: [T] = [] try await internalForEach( diff --git a/Sources/AsyncOperations/Sequence/Sequence+AsyncForEach.swift b/Sources/AsyncOperations/Sequence/Sequence+AsyncForEach.swift index d04f8db..85accc6 100644 --- a/Sources/AsyncOperations/Sequence/Sequence+AsyncForEach.swift +++ b/Sources/AsyncOperations/Sequence/Sequence+AsyncForEach.swift @@ -9,7 +9,7 @@ extension Sequence where Element: Sendable { priority: TaskPriority? = nil, _ body: @escaping @Sendable (Element) async throws -> Void ) async rethrows { - try await withThrowingOrderedTaskGroup(of: Void.self) { group in + try await withThrowingTaskGroup(of: (Void, Int).self) { group in try await internalForEach( group: &group, numberOfConcurrentTasks: numberOfConcurrentTasks, diff --git a/Sources/AsyncOperations/Sequence/Sequence+AsyncMap.swift b/Sources/AsyncOperations/Sequence/Sequence+AsyncMap.swift index 2948adb..55844c1 100644 --- a/Sources/AsyncOperations/Sequence/Sequence+AsyncMap.swift +++ b/Sources/AsyncOperations/Sequence/Sequence+AsyncMap.swift @@ -10,7 +10,7 @@ extension Sequence where Element: Sendable { priority: TaskPriority? = nil, _ transform: @escaping @Sendable (Element) async throws -> T ) async rethrows -> [T] { - try await withThrowingOrderedTaskGroup(of: T.self) { group in + try await withThrowingTaskGroup(of: (T, Int).self) { group in var values: [T] = [] try await internalForEach( diff --git a/Tests/AsyncOperationsTests/AsyncSequence/AsyncSequenceAsyncForEachTests.swift b/Tests/AsyncOperationsTests/AsyncSequence/AsyncSequenceAsyncForEachTests.swift index 1796f66..d74ebc4 100644 --- a/Tests/AsyncOperationsTests/AsyncSequence/AsyncSequenceAsyncForEachTests.swift +++ b/Tests/AsyncOperationsTests/AsyncSequence/AsyncSequenceAsyncForEachTests.swift @@ -5,17 +5,21 @@ final class AsyncSequenceAsyncForEachTests: XCTestCase { @MainActor func testAsyncForEach() async throws { var results: [Int] = [] + var events: [ConcurrentTaskEvent] = [] let asyncSequence = AsyncStream { c in (0..<5).forEach { c.yield($0) } c.finish() } - await asyncSequence.asyncForEach(numberOfConcurrentTasks: 3) { @MainActor number in - await Task.yield() + try await asyncSequence.asyncForEach(numberOfConcurrentTasks: 3) { @MainActor number in + events.append(.start) + try await Task.sleep(for: .milliseconds(100 * (5 - number))) + events.append(.end) results.append(number) } XCTAssertEqual(results.count, 5) XCTAssertEqual(Set(results), [0, 1, 2, 3, 4]) + XCTAssertEqual(events, [.start, .start, .start, .end, .start, .end, .start, .end, .end, .end]) } } diff --git a/Tests/AsyncOperationsTests/Sequence/SequenceAsyncForEachTests.swift b/Tests/AsyncOperationsTests/Sequence/SequenceAsyncForEachTests.swift index 1303376..57616c8 100644 --- a/Tests/AsyncOperationsTests/Sequence/SequenceAsyncForEachTests.swift +++ b/Tests/AsyncOperationsTests/Sequence/SequenceAsyncForEachTests.swift @@ -5,11 +5,43 @@ final class SequenceAsyncForEachTests: XCTestCase { @MainActor func testAsyncForEach() async throws { var results: [Int] = [] - await [0, 1, 2, 3, 4].asyncForEach { @MainActor number in - await Task.yield() + try await [0, 1, 2, 3, 4].asyncForEach { @MainActor number in + try await Task.sleep(for: .milliseconds(100 * (5 - number))) results.append(number) } - XCTAssertEqual(results.count, 5) - XCTAssertEqual(Set(results), [0, 1, 2, 3, 4]) + XCTAssertEqual(results, [0, 1, 2, 3, 4]) + } + + @MainActor + func testAsyncForEachConcurrently() async throws { + var events: [ConcurrentTaskEvent] = [] + let numberOfElements = 10 + let numberOfConcurrentTasks = 3 + // A random offset to stagger iteration timings. + let randomOffsets: [Double] = [ + 470, + 150, + 420, + 290, + 670, + 810, + 930, + 120, + 540, + 760, + ] + try await (0..