From 746b5f96e8a27c6fe616172937ab9b813e9a353f Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 22 Nov 2022 10:13:08 +0900 Subject: [PATCH] [Concurrency] Optimize Void task group, to not store completed tasks anymore --- .../BackDeployConcurrency/TaskGroup.cpp | 99 ++++++++++--- .../BackDeployConcurrency/TaskGroup.swift | 1 + stdlib/public/Concurrency/TaskGroup.cpp | 130 +++++++++++++++--- stdlib/public/Concurrency/TaskGroup.swift | 30 +++- ...c_taskgroup_void_neverConsumingTasks.swift | 76 ++++++++++ 5 files changed, 291 insertions(+), 45 deletions(-) create mode 100644 test/Concurrency/Runtime/async_taskgroup_void_neverConsumingTasks.swift diff --git a/stdlib/public/BackDeployConcurrency/TaskGroup.cpp b/stdlib/public/BackDeployConcurrency/TaskGroup.cpp index 359d2d8603dcb..acdc25e8529df 100644 --- a/stdlib/public/BackDeployConcurrency/TaskGroup.cpp +++ b/stdlib/public/BackDeployConcurrency/TaskGroup.cpp @@ -133,6 +133,15 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /*task*/ asyncTask }; } + + static PollResult getVoid() { + return PollResult{ + /*status*/ PollStatus::Success, + /*storage*/ nullptr, + /*successType*/nullptr, // TODO: Void.self + /*task*/ nullptr + }; + } }; /// An item within the message queue of a group. @@ -555,13 +564,37 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context, } } +static void fillGroupNextVoidResult(TaskFutureWaitAsyncContext *context, + PollResult result) { + /// Fill in the result value + switch (result.status) { + case PollStatus::MustWait: + assert(false && "filling a waiting status?"); + return; + + case PollStatus::Error: { + assert(false && "this type of task group cannot throw"); + return; + } + + case PollStatus::Success: + case PollStatus::Empty: { + // "Success" type is guaranteed to be Void + // Initialize the result as a nil Optional. + const Metadata *successType = result.successType; + OpaqueValue *destPtr = context->successResultPointer; + successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1); + return; + } + } +} + void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { assert(completedTask); assert(completedTask->isFuture()); assert(completedTask->hasChildFragment()); assert(completedTask->hasGroupChildFragment()); assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this)); - SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this); mutex.lock(); // TODO: remove fragment lock, and use status for synchronization @@ -572,6 +605,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { // W:n R:0 P:1 -> W:y R:1 P:1 // complete immediately // W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks auto assumed = statusAddReadyAssumeAcquire(); + SWIFT_TASK_DEBUG_LOG("offer task %p to group %p, tasks pending = %d", completedTask, assumed.pendingTasks()); auto asyncContextPrefix = reinterpret_cast( reinterpret_cast(context) - sizeof(FutureAsyncContextPrefix)); @@ -607,7 +641,13 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { static_cast( waitingTask->ResumeContext); - fillGroupNextResult(waitingContext, result); + if (this->eagerlyReleaseCompleteTasks) { + fprintf(stderr, "[%s:%d](%s) offer: eagerlyReleaseCompleteTasks\n", __FILE_NAME__, __LINE__, __FUNCTION__); + fillGroupNextResult(waitingContext, result); + } else { + fprintf(stderr, "[%s:%d](%s) offer: NOT\n", __FILE_NAME__, __LINE__, __FUNCTION__); + fillGroupNextResult(waitingContext, result); + } detachChild(result.retainedTask); _swift_tsan_acquire(static_cast(waitingTask)); @@ -627,20 +667,31 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { // queue when a task polls during next() it will notice that we have a value // ready for it, and will process it immediately without suspending. assert(!waitQueue.load(std::memory_order_relaxed)); - SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p", - completedTask); - // Retain the task while it is in the queue; - // it must remain alive until the task group is alive. - swift_retain(completedTask); - - auto readyItem = ReadyQueueItem::get( - hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success, - completedTask - ); + if (!this->eagerlyReleaseCompleteTasks) { + SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p", + completedTask); + // Retain the task while it is in the queue; + // it must remain alive until the task group is alive. + swift_retain(completedTask); + + auto readyItem = ReadyQueueItem::get( + hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success, + completedTask + ); + + assert(completedTask == readyItem.getTask()); + assert(readyItem.getTask()->isFuture()); + readyQueue.enqueue(readyItem); + } else { + assert(this->eagerlyReleaseCompleteTasks); + // DO NOT retain the task; and do not store the value in the readyQueue at all (!) + // + // In the "eagerlyRelease" completed tasks mode, we are guaranteed that tasks are of Void type, + // and thus there is no necessity to store values, because we can always "make them up" when polled. + // From the user's perspective, it is indistinguishable if they received the "real value" or one we "made up", + // because Void is always the same, and cannot be examined in any way to determine if it was the "actual" Void or not. + } - assert(completedTask == readyItem.getTask()); - assert(readyItem.getTask()->isFuture()); - readyQueue.enqueue(readyItem); mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization return; } @@ -698,7 +749,7 @@ static void swift_taskGroup_wait_next_throwingImpl( PollResult polled = group->poll(waitingTask); switch (polled.status) { case PollStatus::MustWait: - SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p", + SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks ready = 0, waiting task = %p", group, waitingTask); // The waiting task has been queued on the channel, // there were pending tasks so it will be woken up eventually. @@ -714,13 +765,17 @@ static void swift_taskGroup_wait_next_throwingImpl( case PollStatus::Success: SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p", group, waitingTask, polled.retainedTask); - fillGroupNextResult(context, polled); + if (this->eagerlyReleaseCompleteTasks) { + fillGroupNextVoidResult(context, polled); + } else { + fillGroupNextResult(context, polled); + } + if (auto completedTask = polled.retainedTask) { // it would be null for PollStatus::Empty, then we don't need to release - group->detachChild(polled.retainedTask); - swift_release(polled.retainedTask); + group->detachChild(completedTask); + swift_release(completedTask); } - return waitingTask->runInFullyEstablishedContext(); } } @@ -755,8 +810,8 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) { // ==== 2) Ready task was polled, return with it immediately ----------------- if (assumed.readyTasks()) { - SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d", - this, assumed.readyTasks()); + SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks ready=%d, pending=%d", + this, assumed.readyTasks(), assumed.pendingTasks()); auto assumedStatus = assumed.status; auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus}; diff --git a/stdlib/public/BackDeployConcurrency/TaskGroup.swift b/stdlib/public/BackDeployConcurrency/TaskGroup.swift index a10a20571fe52..e52f3a149840c 100644 --- a/stdlib/public/BackDeployConcurrency/TaskGroup.swift +++ b/stdlib/public/BackDeployConcurrency/TaskGroup.swift @@ -462,6 +462,7 @@ public struct ThrowingTaskGroup { } } + // TODO(ktoso): doesn't seem to be used? @usableFromInline internal mutating func _waitForAll() async throws { while let _ = try await next() { } diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index bd4a3efffa606..c9d7da520be72 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -137,6 +137,15 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /*task*/ asyncTask }; } + + static PollResult getVoid() { + return PollResult{ + /*status*/ PollStatus::Empty, + /*storage*/ nullptr, + /*successType*/nullptr, // TODO: Void.self + /*task*/ nullptr + }; + } }; /// An item within the message queue of a group. @@ -322,11 +331,14 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { friend class ::swift::AsyncTask; public: - explicit TaskGroupImpl(const Metadata *T) + const bool eagerlyReleaseCompleteTasks; + explicit TaskGroupImpl(const Metadata *T, bool eagerlyReleaseCompleteTasks) : TaskGroupTaskStatusRecord(), status(GroupStatus::initial().status), readyQueue(), - waitQueue(nullptr), successType(T) {} + waitQueue(nullptr), + successType(T), + eagerlyReleaseCompleteTasks(eagerlyReleaseCompleteTasks) {} TaskGroupTaskStatusRecord *getTaskRecord() { return reinterpret_cast(this); @@ -488,7 +500,7 @@ SWIFT_CC(swift) static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T) { SWIFT_TASK_DEBUG_LOG("creating task group = %p", group); - TaskGroupImpl *impl = ::new (group) TaskGroupImpl(T); + TaskGroupImpl *impl = ::new (group) TaskGroupImpl(T, /*eagerlyReleaseCompleteTasks=*/true); auto record = impl->getTaskRecord(); assert(impl == record && "the group IS the task record"); @@ -525,6 +537,12 @@ static void swift_taskGroup_destroyImpl(TaskGroup *group) { void TaskGroupImpl::destroy() { SWIFT_TASK_DEBUG_LOG("destroying task group = %p", this); + if (!this->isEmpty()) { + auto status = this->statusLoadRelaxed(); + SWIFT_TASK_DEBUG_LOG("destroying task group = %p, tasks .ready = %d, .pending = %d", + this, status.readyTasks(), status.pendingTasks()); + } + assert(this->isEmpty() && "Attempted to destroy non-empty task group!"); // First, remove the group from the task and deallocate the record removeStatusRecord(getTaskRecord()); @@ -583,20 +601,68 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context, } } +static void fillGroupNextVoidResult(TaskFutureWaitAsyncContext *context, + PollResult result) { + /// Fill in the result value + switch (result.status) { + case PollStatus::MustWait: + assert(false && "filling a waiting status?"); + return; + + case PollStatus::Error: { + assert(false && "cannot have errors"); + return; + } + + case PollStatus::Success: { + // Initialize the result as an Optional. + const Metadata *successType = result.successType; + OpaqueValue *destPtr = context->successResultPointer; + // TODO: figure out a way to try to optimistically take the + // value out of the finished task's future, if there are no + // remaining references to it. + successType->vw_initializeWithCopy(destPtr, result.storage); + successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1); + return; + } + + case PollStatus::Empty: { + // Initialize the result as a nil Optional. + const Metadata *successType = result.successType; + OpaqueValue *destPtr = context->successResultPointer; + successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1); + return; + } + } +} + // TaskGroup is locked upon entry and exit void TaskGroupImpl::enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) { - // Retain the task while it is in the queue; - // it must remain alive until the task group is alive. - swift_retain(completedTask); - - auto readyItem = ReadyQueueItem::get( - hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success, - completedTask - ); - - assert(completedTask == readyItem.getTask()); - assert(readyItem.getTask()->isFuture()); - readyQueue.enqueue(readyItem); + if (this->eagerlyReleaseCompleteTasks) { + SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, eager release mode; release result task = %p", + completedTask); + // DO NOT RETAIN THE TASK. + // We know it is Void, so we don't need to store the result; + // By releasing tasks eagerly we're able to keep "infinite" task groups, + // running, that never consume their values. Even more-so, + return; + } + + SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p", + completedTask); + + // Retain the task while it is in the queue; + // it must remain alive until the task group is alive. + swift_retain(completedTask); + + auto readyItem = ReadyQueueItem::get( + hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success, + completedTask + ); + + assert(completedTask == readyItem.getTask()); + assert(readyItem.getTask()->isFuture()); + readyQueue.enqueue(readyItem); } void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { @@ -617,6 +683,9 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { // W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks auto assumed = statusAddReadyAssumeAcquire(); + SWIFT_TASK_DEBUG_LOG("group %p, ready: %d, pending: %d", + this, assumed.readyTasks(), assumed.pendingTasks()); + auto asyncContextPrefix = reinterpret_cast( reinterpret_cast(context) - sizeof(FutureAsyncContextPrefix)); bool hadErrorResult = false; @@ -686,8 +755,6 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { // ready for it, and will process it immediately without suspending. assert(!waitQueue.load(std::memory_order_relaxed)); - SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p", - completedTask); enqueueCompletedTask(completedTask, hadErrorResult); unlock(); // TODO: remove fragment lock, and use status for synchronization } @@ -764,7 +831,11 @@ static void swift_taskGroup_wait_next_throwingImpl( case PollStatus::Success: SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p", group, waitingTask, polled.retainedTask); - fillGroupNextResult(context, polled); + if (group->eagerlyReleaseCompleteTasks) { + fillGroupNextVoidResult(context, polled); + } else { + fillGroupNextResult(context, polled); + } if (auto completedTask = polled.retainedTask) { // it would be null for PollStatus::Empty, then we don't need to release group->detachChild(polled.retainedTask); @@ -811,8 +882,8 @@ reevaluate_if_taskgroup_has_results:; // ==== 2) Ready task was polled, return with it immediately ----------------- if (assumed.readyTasks()) { - SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d", - this, assumed.readyTasks()); + SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks .ready = %d, .pending = %d", + this, assumed.readyTasks(), assumed.pendingTasks()); auto assumedStatus = assumed.status; auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus}; @@ -829,6 +900,17 @@ reevaluate_if_taskgroup_has_results:; // Success! We are allowed to poll. ReadyQueueItem item; + if (this->eagerlyReleaseCompleteTasks) { + SWIFT_TASK_DEBUG_LOG("poll group = %p; polled in eager-release mode; make up Void value to yield", + this, assumed.readyTasks(), assumed.pendingTasks()); + result.status = PollStatus::Success; + result.storage = nullptr; + result.retainedTask = nullptr; + result.successType = this->successType; + unlock(); // TODO: remove fragment lock, and use status for synchronization + return result; + } + bool taskDequeued = readyQueue.dequeue(item); assert(taskDequeued); (void) taskDequeued; @@ -956,10 +1038,14 @@ bool TaskGroupImpl::cancelAll() { // ============================================================================= // ==== addPending ------------------------------------------------------------- + SWIFT_CC(swift) static bool swift_taskGroup_addPendingImpl(TaskGroup *group, bool unconditionally) { - auto assumedStatus = asImpl(group)->statusAddPendingTaskRelaxed(unconditionally); - return !assumedStatus.isCancelled(); + auto assumed = asImpl(group)->statusAddPendingTaskRelaxed(unconditionally); + SWIFT_TASK_DEBUG_LOG("add pending %s to group %p, tasks pending = %d", + unconditionally ? "unconditionally" : "", + group, assumed.pendingTasks()); + return !assumed.isCancelled(); } #define OVERRIDE_TASK_GROUP COMPATIBILITY_OVERRIDE diff --git a/stdlib/public/Concurrency/TaskGroup.swift b/stdlib/public/Concurrency/TaskGroup.swift index 3067ded7ef414..2ca4b385dd5ec 100644 --- a/stdlib/public/Concurrency/TaskGroup.swift +++ b/stdlib/public/Concurrency/TaskGroup.swift @@ -10,8 +10,10 @@ // //===----------------------------------------------------------------------===// + import Swift @_implementationOnly import _SwiftConcurrencyShims +import Darwin // ==== TaskGroup -------------------------------------------------------------- @@ -90,6 +92,32 @@ public func withTaskGroup( #endif } +@available(SwiftStdlib 5.1, *) +@_unsafeInheritExecutor +@inlinable +public func withTaskGroupSuper( + of childTaskResultType: Void.Type = Void.self, + returning returnType: GroupResult.Type = GroupResult.self, + body: (inout TaskGroup) async -> GroupResult +) async -> GroupResult { + #if compiler(>=5.5) && $BuiltinTaskGroupWithArgument + + let _group = Builtin.createTaskGroup(Void.self) + var group = TaskGroup(group: _group) + + // Run the withTaskGroup body. + let result = await body(&group) + + await group.awaitAllRemainingTasks() + + Builtin.destroyTaskGroup(_group) + return result + + #else + fatalError("Swift compiler is incompatible with this SDK version") + #endif +} + /// Starts a new scope that can contain a dynamic number of throwing child tasks. /// /// A group waits for all of its child tasks @@ -481,7 +509,7 @@ public struct ThrowingTaskGroup { @usableFromInline internal mutating func _waitForAll() async throws { - while let _ = try await next() { } + while let _ = try? await next() { } } /// Wait for all of the group's remaining tasks to complete. diff --git a/test/Concurrency/Runtime/async_taskgroup_void_neverConsumingTasks.swift b/test/Concurrency/Runtime/async_taskgroup_void_neverConsumingTasks.swift new file mode 100644 index 0000000000000..987de6486c9fe --- /dev/null +++ b/test/Concurrency/Runtime/async_taskgroup_void_neverConsumingTasks.swift @@ -0,0 +1,76 @@ +// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s --dump-input=always +// REQUIRES: executable_test +// REQUIRES: concurrency +// REQUIRES: concurrency_runtime +// UNSUPPORTED: back_deployment_runtime +// UNSUPPORTED: OS=linux-gnu +import Darwin + +actor Waiter { + let until: Int + var count: Int + + var cc: CheckedContinuation? + + init(until: Int) { + self.until = until + self.count = 0 + } + + func increment() { + self.count += 1 + fputs("> increment (\(self.count)/\(self.until))\n", stderr); + if self.until <= self.count { + if let cc = self.cc { + cc.resume(returning: self.count) + } + } + } + + func wait() async -> Int { + if self.until <= self.count { + fputs("> RETURN in Waiter\n", stderr); + return self.count + } + + return await withCheckedContinuation { cc in + fputs("> WAIT in Waiter\n", stderr); + self.cc = cc + } + } +} + +@available(SwiftStdlib 5.1, *) +func test_taskGroup_void_neverConsume() async { + let until = 100_000_000 + let waiter = Waiter(until: until) + + let allTasks = await withTaskGroupSuper(of: Void.self, returning: Int.self) { group in + for n in 1...until { + fputs("> enqueue: \(n)\n", stderr); + group.addTask { + fputs("> run: \(n)\n", stderr); + try? await Task.sleep(until: .now + .milliseconds(100), clock: .continuous) + await waiter.increment() + } + } + + let void = await next() + + // wait a little bit, so some tasks complete before we hit the implicit "wait at end of task group scope" + try? await Task.sleep(until: .now + .milliseconds(500), clock: .continuous) + + return until + } + + // CHECK: all tasks: 100 + print("all tasks: \(allTasks)") + print("actor: \(allTasks)") +} + +@available(SwiftStdlib 5.1, *) +@main struct Main { + static func main() async { + await test_taskGroup_void_neverConsume() + } +}