diff --git a/include/swift/ABI/MetadataValues.h b/include/swift/ABI/MetadataValues.h index 87f3235f41726..403b18ec99295 100644 --- a/include/swift/ABI/MetadataValues.h +++ b/include/swift/ABI/MetadataValues.h @@ -2383,6 +2383,24 @@ enum class TaskOptionRecordKind : uint8_t { RunInline = UINT8_MAX, }; +/// Flags for TaskGroup. +class TaskGroupFlags : public FlagSet { +public: + enum { + // 8 bits are reserved for future use + /// Request the TaskGroup to immediately release completed tasks, + /// and not store their results. This also effectively disables `next()`. + TaskGroup_DiscardResults = 8, + }; + + explicit TaskGroupFlags(uint32_t bits) : FlagSet(bits) {} + constexpr TaskGroupFlags() {} + + FLAGSET_DEFINE_FLAG_ACCESSORS(TaskGroup_DiscardResults, + isDiscardResults, + setIsDiscardResults) +}; + /// Flags for cancellation records. class TaskStatusRecordFlags : public FlagSet { public: diff --git a/include/swift/AST/Builtins.def b/include/swift/AST/Builtins.def index 8c001b736e682..76b70af4858b7 100644 --- a/include/swift/AST/Builtins.def +++ b/include/swift/AST/Builtins.def @@ -793,6 +793,10 @@ BUILTIN_MISC_OPERATION(ResumeThrowingContinuationThrowing, BUILTIN_MISC_OPERATION(CreateTaskGroup, "createTaskGroup", "", Special) +/// Create a task group, with options. +BUILTIN_MISC_OPERATION(CreateTaskGroupWithFlags, + "createTaskGroupWithFlags", "", Special) + /// Destroy a task group. BUILTIN_MISC_OPERATION(DestroyTaskGroup, "destroyTaskGroup", "", Special) diff --git a/include/swift/Runtime/Concurrency.h b/include/swift/Runtime/Concurrency.h index 141459ae62fe3..386b58aaebc77 100644 --- a/include/swift/Runtime/Concurrency.h +++ b/include/swift/Runtime/Concurrency.h @@ -185,13 +185,15 @@ void swift_task_future_wait_throwing( /// func swift_taskGroup_wait_next_throwing( /// waitingTask: Builtin.NativeObject, // current task /// group: Builtin.RawPointer -/// ) async -> T +/// ) async throws -> T /// \endcode SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swiftasync) void swift_taskGroup_wait_next_throwing( - OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext, - TaskGroup *group, ThrowingTaskFutureWaitContinuationFunction *resumeFn, + OpaqueValue *resultPointer, + SWIFT_ASYNC_CONTEXT AsyncContext *callerContext, + TaskGroup *group, + ThrowingTaskFutureWaitContinuationFunction *resumeFn, AsyncContext *callContext); /// Initialize a `TaskGroup` in the passed `group` memory location. @@ -205,6 +207,17 @@ void swift_taskGroup_wait_next_throwing( SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) void swift_taskGroup_initialize(TaskGroup *group, const Metadata *T); +/// Initialize a `TaskGroup` in the passed `group` memory location. +/// The caller is responsible for retaining and managing the group's lifecycle. +/// +/// Its Swift signature is +/// +/// \code +/// func swift_taskGroup_initialize(flags: Int, group: Builtin.RawPointer) +/// \endcode +SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) +void swift_taskGroup_initializeWithFlags(size_t flags, TaskGroup *group, const Metadata *T); + /// Attach a child task to the parent task's task group record. /// /// This function MUST be called from the AsyncTask running the task group. @@ -276,6 +289,38 @@ void swift_taskGroup_cancelAll(TaskGroup *group); SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) bool swift_taskGroup_isCancelled(TaskGroup *group); +/// Check if the task group is discarding results or not. +/// +/// This can be called from any thread. Its Swift signature is +/// +/// \code +/// func swift_taskGroup_isDiscardingResults(group: Builtin.RawPointer) +/// \endcode +SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) +bool swift_taskGroup_isDiscardingResults(TaskGroup *group); + +/// Wait until all pending tasks from the task group have completed. +/// If this task group is accumulating results, this also discards all those results. +/// +/// This can be called from any thread. Its Swift signature is +/// +/// \code +/// func swift_taskGroup_waitAll( +/// waitingTask: Builtin.NativeObject, // current task +/// group: Builtin.RawPointer, +/// bodyError: Swift.Error? +/// ) async throws +/// \endcode + SWIFT_EXPORT_FROM(swift_Concurrency) + SWIFT_CC(swiftasync) + void swift_taskGroup_waitAll( + OpaqueValue *resultPointer, + SWIFT_ASYNC_CONTEXT AsyncContext *callerContext, + TaskGroup *group, + SwiftError *bodyError, + ThrowingTaskFutureWaitContinuationFunction *resumeFn, + AsyncContext *callContext); + /// Check the readyQueue of a task group, return true if it has no pending tasks. /// /// This can be called from any thread. Its Swift signature is diff --git a/include/swift/Runtime/RuntimeFunctions.def b/include/swift/Runtime/RuntimeFunctions.def index d9ad0eb62d8c0..8926f7befedab 100644 --- a/include/swift/Runtime/RuntimeFunctions.def +++ b/include/swift/Runtime/RuntimeFunctions.def @@ -2060,6 +2060,18 @@ FUNCTION(TaskGroupInitialize, ATTRS(NoUnwind), EFFECT(Concurrency)) +// void swift_taskGroup_initializeWithFlags(size_t flags, TaskGroup *group); +FUNCTION(TaskGroupInitializeWithFlags, + swift_taskGroup_initializeWithFlags, SwiftCC, + ConcurrencyAvailability, + RETURNS(VoidTy), + ARGS(SizeTy, // flags + Int8PtrTy, // group + TypeMetadataPtrTy // T.Type + ), + ATTRS(NoUnwind), + EFFECT(Concurrency)) + // void swift_taskGroup_destroy(TaskGroup *group); FUNCTION(TaskGroupDestroy, swift_taskGroup_destroy, SwiftCC, diff --git a/lib/AST/Builtins.cpp b/lib/AST/Builtins.cpp index f840a4c953f58..c2de8f1d7c979 100644 --- a/lib/AST/Builtins.cpp +++ b/lib/AST/Builtins.cpp @@ -1539,6 +1539,24 @@ static ValueDecl *getCreateTaskGroup(ASTContext &ctx, Identifier id) { _rawPointer); } +static ValueDecl *getCreateTaskGroupWithFlags(ASTContext &ctx, Identifier id) { + ModuleDecl *M = ctx.TheBuiltinModule; + DeclContext *DC = &M->getMainFile(FileUnitKind::Builtin); + SynthesisContext SC(ctx, DC); + + BuiltinFunctionBuilder builder(ctx); + + // int + builder.addParameter(makeConcrete(ctx.getIntType())); // 0 flags + + // T.self + builder.addParameter(makeMetatype(makeGenericParam(0))); // 1 ChildTaskResult.Type + + // -> Builtin.RawPointer + builder.setResult(makeConcrete(synthesizeType(SC, _rawPointer))); + return builder.build(id); +} + static ValueDecl *getDestroyTaskGroup(ASTContext &ctx, Identifier id) { return getBuiltinFunction(ctx, id, _thin, _parameters(_rawPointer), @@ -2908,6 +2926,8 @@ ValueDecl *swift::getBuiltinValueDecl(ASTContext &Context, Identifier Id) { case BuiltinValueKind::CreateTaskGroup: return getCreateTaskGroup(Context, Id); + case BuiltinValueKind::CreateTaskGroupWithFlags: + return getCreateTaskGroupWithFlags(Context, Id); case BuiltinValueKind::DestroyTaskGroup: return getDestroyTaskGroup(Context, Id); diff --git a/lib/IRGen/Callee.h b/lib/IRGen/Callee.h index 36dbbb4d2e1fd..aca4a802aa711 100644 --- a/lib/IRGen/Callee.h +++ b/lib/IRGen/Callee.h @@ -175,6 +175,7 @@ namespace irgen { AsyncLetGetThrowing, AsyncLetFinish, TaskGroupWaitNext, + TaskGroupWaitAll, DistributedExecuteTarget, }; @@ -247,6 +248,7 @@ namespace irgen { case SpecialKind::AsyncLetGetThrowing: case SpecialKind::AsyncLetFinish: case SpecialKind::TaskGroupWaitNext: + case SpecialKind::TaskGroupWaitAll: return true; case SpecialKind::DistributedExecuteTarget: return false; @@ -277,6 +279,7 @@ namespace irgen { case SpecialKind::AsyncLetGetThrowing: case SpecialKind::AsyncLetFinish: case SpecialKind::TaskGroupWaitNext: + case SpecialKind::TaskGroupWaitAll: return true; case SpecialKind::DistributedExecuteTarget: return false; diff --git a/lib/IRGen/GenBuiltin.cpp b/lib/IRGen/GenBuiltin.cpp index c4c1d0a3ec241..cfe29335d93da 100644 --- a/lib/IRGen/GenBuiltin.cpp +++ b/lib/IRGen/GenBuiltin.cpp @@ -277,9 +277,22 @@ void irgen::emitBuiltinCall(IRGenFunction &IGF, const BuiltinInfo &Builtin, } if (Builtin.ID == BuiltinValueKind::CreateTaskGroup) { + llvm::Value *groupFlags = nullptr; // Claim metadata pointer. (void)args.claimAll(); - out.add(emitCreateTaskGroup(IGF, substitutions)); + out.add(emitCreateTaskGroup(IGF, substitutions, groupFlags)); + return; + } + + if (Builtin.ID == BuiltinValueKind::CreateTaskGroupWithFlags) { + auto groupFlags = args.claimNext(); + // Claim the remaining metadata pointer. + if (args.size() == 1) { + (void)args.claimNext(); + } else if (args.size() > 1) { + llvm_unreachable("createTaskGroupWithFlags expects 1 or 2 arguments"); + } + out.add(emitCreateTaskGroup(IGF, substitutions, groupFlags)); return; } diff --git a/lib/IRGen/GenCall.cpp b/lib/IRGen/GenCall.cpp index 153b69431112c..34712f8fdd2d2 100644 --- a/lib/IRGen/GenCall.cpp +++ b/lib/IRGen/GenCall.cpp @@ -162,6 +162,7 @@ FunctionPointerKind::getStaticAsyncContextSize(IRGenModule &IGM) const { case SpecialKind::AsyncLetGetThrowing: case SpecialKind::AsyncLetFinish: case SpecialKind::TaskGroupWaitNext: + case SpecialKind::TaskGroupWaitAll: case SpecialKind::DistributedExecuteTarget: // The current guarantee for all of these functions is the same. // See TaskFutureWaitAsyncContext. diff --git a/lib/IRGen/GenConcurrency.cpp b/lib/IRGen/GenConcurrency.cpp index f47be01168baf..bfd4d69b27d3d 100644 --- a/lib/IRGen/GenConcurrency.cpp +++ b/lib/IRGen/GenConcurrency.cpp @@ -271,7 +271,8 @@ void irgen::emitEndAsyncLet(IRGenFunction &IGF, llvm::Value *alet) { } llvm::Value *irgen::emitCreateTaskGroup(IRGenFunction &IGF, - SubstitutionMap subs) { + SubstitutionMap subs, + llvm::Value *groupFlags) { auto ty = llvm::ArrayType::get(IGF.IGM.Int8PtrTy, NumWords_TaskGroup); auto address = IGF.createAlloca(ty, Alignment(Alignment_TaskGroup)); auto group = IGF.Builder.CreateBitCast(address.getAddress(), @@ -282,9 +283,14 @@ llvm::Value *irgen::emitCreateTaskGroup(IRGenFunction &IGF, auto resultType = subs.getReplacementTypes()[0]->getCanonicalType(); auto resultTypeMetadata = IGF.emitAbstractTypeMetadataRef(resultType); - auto *call = - IGF.Builder.CreateCall(IGF.IGM.getTaskGroupInitializeFunctionPointer(), - {group, resultTypeMetadata}); + llvm::CallInst *call; + if (groupFlags) { + call = IGF.Builder.CreateCall(IGF.IGM.getTaskGroupInitializeWithFlagsFunctionPointer(), + {groupFlags, group, resultTypeMetadata}); + } else { + call = IGF.Builder.CreateCall(IGF.IGM.getTaskGroupInitializeFunctionPointer(), + {group, resultTypeMetadata}); + } call->setDoesNotThrow(); call->setCallingConv(IGF.IGM.SwiftCC); diff --git a/lib/IRGen/GenConcurrency.h b/lib/IRGen/GenConcurrency.h index 6ee88c61c5397..37996d7eb3fdb 100644 --- a/lib/IRGen/GenConcurrency.h +++ b/lib/IRGen/GenConcurrency.h @@ -69,7 +69,8 @@ llvm::Value *emitBuiltinStartAsyncLet(IRGenFunction &IGF, void emitEndAsyncLet(IRGenFunction &IGF, llvm::Value *alet); /// Emit the createTaskGroup builtin. -llvm::Value *emitCreateTaskGroup(IRGenFunction &IGF, SubstitutionMap subs); +llvm::Value *emitCreateTaskGroup(IRGenFunction &IGF, SubstitutionMap subs, + llvm::Value *groupFlags); /// Emit the destroyTaskGroup builtin. void emitDestroyTaskGroup(IRGenFunction &IGF, llvm::Value *group); diff --git a/lib/IRGen/IRGenSIL.cpp b/lib/IRGen/IRGenSIL.cpp index 9d5b723f1cf1f..23b6451de9111 100644 --- a/lib/IRGen/IRGenSIL.cpp +++ b/lib/IRGen/IRGenSIL.cpp @@ -2658,6 +2658,9 @@ FunctionPointer::Kind irgen::classifyFunctionPointerKind(SILFunction *fn) { if (name.equals("swift_taskGroup_wait_next_throwing")) return SpecialKind::TaskGroupWaitNext; + if (name.equals("swift_taskGroup_waitAll")) + return SpecialKind::TaskGroupWaitAll; + if (name.equals("swift_distributed_execute_target")) return SpecialKind::DistributedExecuteTarget; } diff --git a/lib/SIL/IR/OperandOwnership.cpp b/lib/SIL/IR/OperandOwnership.cpp index d9af75486ef59..c74c91c1a072c 100644 --- a/lib/SIL/IR/OperandOwnership.cpp +++ b/lib/SIL/IR/OperandOwnership.cpp @@ -821,6 +821,7 @@ BUILTIN_OPERAND_OWNERSHIP(DestroyingConsume, EndAsyncLet) BUILTIN_OPERAND_OWNERSHIP(DestroyingConsume, StartAsyncLetWithLocalBuffer) BUILTIN_OPERAND_OWNERSHIP(DestroyingConsume, EndAsyncLetLifetime) BUILTIN_OPERAND_OWNERSHIP(InstantaneousUse, CreateTaskGroup) +BUILTIN_OPERAND_OWNERSHIP(InstantaneousUse, CreateTaskGroupWithFlags) BUILTIN_OPERAND_OWNERSHIP(InstantaneousUse, DestroyTaskGroup) BUILTIN_OPERAND_OWNERSHIP(ForwardingConsume, COWBufferForReading) diff --git a/lib/SIL/IR/ValueOwnership.cpp b/lib/SIL/IR/ValueOwnership.cpp index e90c49d0ef5a7..d5451e93f7b3b 100644 --- a/lib/SIL/IR/ValueOwnership.cpp +++ b/lib/SIL/IR/ValueOwnership.cpp @@ -559,6 +559,7 @@ CONSTANT_OWNERSHIP_BUILTIN(None, EndAsyncLet) CONSTANT_OWNERSHIP_BUILTIN(None, StartAsyncLetWithLocalBuffer) CONSTANT_OWNERSHIP_BUILTIN(None, EndAsyncLetLifetime) CONSTANT_OWNERSHIP_BUILTIN(None, CreateTaskGroup) +CONSTANT_OWNERSHIP_BUILTIN(None, CreateTaskGroupWithFlags) CONSTANT_OWNERSHIP_BUILTIN(None, DestroyTaskGroup) CONSTANT_OWNERSHIP_BUILTIN(None, TaskRunInline) CONSTANT_OWNERSHIP_BUILTIN(None, Copy) diff --git a/lib/SIL/Utils/MemAccessUtils.cpp b/lib/SIL/Utils/MemAccessUtils.cpp index 0f79f22796a1a..448bb534a65b0 100644 --- a/lib/SIL/Utils/MemAccessUtils.cpp +++ b/lib/SIL/Utils/MemAccessUtils.cpp @@ -2505,6 +2505,7 @@ static void visitBuiltinAddress(BuiltinInst *builtin, case BuiltinValueKind::EndAsyncLet: case BuiltinValueKind::EndAsyncLetLifetime: case BuiltinValueKind::CreateTaskGroup: + case BuiltinValueKind::CreateTaskGroupWithFlags: case BuiltinValueKind::DestroyTaskGroup: return; diff --git a/lib/SILOptimizer/Transforms/AccessEnforcementReleaseSinking.cpp b/lib/SILOptimizer/Transforms/AccessEnforcementReleaseSinking.cpp index 2c8ac128c60db..6e34118645335 100644 --- a/lib/SILOptimizer/Transforms/AccessEnforcementReleaseSinking.cpp +++ b/lib/SILOptimizer/Transforms/AccessEnforcementReleaseSinking.cpp @@ -150,6 +150,7 @@ static bool isBarrier(SILInstruction *inst) { case BuiltinValueKind::EndAsyncLet: case BuiltinValueKind::EndAsyncLetLifetime: case BuiltinValueKind::CreateTaskGroup: + case BuiltinValueKind::CreateTaskGroupWithFlags: case BuiltinValueKind::DestroyTaskGroup: case BuiltinValueKind::StackAlloc: case BuiltinValueKind::StackDealloc: diff --git a/stdlib/public/BackDeployConcurrency/CompatibilityOverrideConcurrency.def b/stdlib/public/BackDeployConcurrency/CompatibilityOverrideConcurrency.def index c194db12be8b6..2c85a0920b078 100644 --- a/stdlib/public/BackDeployConcurrency/CompatibilityOverrideConcurrency.def +++ b/stdlib/public/BackDeployConcurrency/CompatibilityOverrideConcurrency.def @@ -241,6 +241,10 @@ OVERRIDE_TASK_GROUP(taskGroup_initialize, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group, const Metadata *T), (group, T)) +OVERRIDE_TASK_GROUP(taskGroup_initializeWithFlags, void, + SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), + swift::, (size_t flags, TaskGroup *group, const Metadata *T), (flags, group, T)) + OVERRIDE_TASK_STATUS(taskGroup_attachChild, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group, AsyncTask *child), @@ -269,6 +273,21 @@ OVERRIDE_TASK_GROUP(taskGroup_isCancelled, bool, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group), (group)) +OVERRIDE_TASK_GROUP(taskGroup_isDiscardingResults, bool, + SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), + swift::, (TaskGroup *group), (group)) + +OVERRIDE_TASK_GROUP(taskGroup_wait_all, void, + SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swiftasync), + swift::, + (OpaqueValue *resultPointer, + SWIFT_ASYNC_CONTEXT AsyncContext *callerContext, + TaskGroup *_group, + ThrowingTaskFutureWaitContinuationFunction *resumeFn, + AsyncContext *callContext), + (resultPointer, callerContext, _group, resumeFn, + callContext)) + OVERRIDE_TASK_GROUP(taskGroup_cancelAll, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group), (group)) diff --git a/stdlib/public/BackDeployConcurrency/ConcurrencyRuntime.h b/stdlib/public/BackDeployConcurrency/ConcurrencyRuntime.h index ba6b78fa053a7..754436fe2ffa1 100644 --- a/stdlib/public/BackDeployConcurrency/ConcurrencyRuntime.h +++ b/stdlib/public/BackDeployConcurrency/ConcurrencyRuntime.h @@ -173,6 +173,9 @@ void swift_taskGroup_wait_next_throwing( SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) void swift_taskGroup_initialize(TaskGroup *group, const Metadata *T); +SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) +void swift_taskGroup_initializeWithFlags(size_t flags, TaskGroup *group, const Metadata *T); + /// Attach a child task to the parent task's task group record. /// /// This function MUST be called from the AsyncTask running the task group. diff --git a/stdlib/public/BackDeployConcurrency/TaskGroup.cpp b/stdlib/public/BackDeployConcurrency/TaskGroup.cpp index 359d2d8603dcb..64af22e396e9c 100644 --- a/stdlib/public/BackDeployConcurrency/TaskGroup.cpp +++ b/stdlib/public/BackDeployConcurrency/TaskGroup.cpp @@ -114,12 +114,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /// until a next() call eventually picks it up. AsyncTask *retainedTask; - bool isStorageAccessible() { - return status == PollStatus::Success || - status == PollStatus::Error || - status == PollStatus::Empty; - } - static PollResult get(AsyncTask *asyncTask, bool hadErrorResult) { auto fragment = asyncTask->futureFragment(); return PollResult{ @@ -133,6 +127,15 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /*task*/ asyncTask }; } + + static PollResult getEmpty(const Metadata *successType) { + return PollResult{ + /*status*/ PollStatus::Success, + /*storage*/ nullptr, + /*successType*/successType, + /*task*/ nullptr + }; + } }; /// An item within the message queue of a group. @@ -426,6 +429,17 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /// or a `PollStatus::MustWait` result if there are tasks in flight /// and the waitingTask eventually be woken up by a completion. PollResult poll(AsyncTask *waitingTask); + + /// A `discardResults` TaskGroup is not able to wait on individual completions, + /// instead, it can only await on "all pending tasks have been processed". + /// + /// + /// If unable to complete the waiting task immediately (with an readily + /// available completed task), either returns an `PollStatus::Empty` + /// result if it is known that no pending tasks in the group, + /// or a `PollStatus::MustWait` result if there are tasks in flight + /// and the waitingTask eventually be woken up by a completion. + PollResult waitAll(AsyncTask *waitingTask); }; } // end anonymous namespace @@ -460,7 +474,7 @@ TaskGroupTaskStatusRecord * TaskGroup::getTaskRecord() { // Initializes into the preallocated _group an actual TaskGroupImpl. SWIFT_CC(swift) -static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T) { +static void swift_taskGroup_initializeWithFlagsImpl(size_t flags, TaskGroup *group, const Metadata *T) { SWIFT_TASK_DEBUG_LOG("creating task group = %p", group); TaskGroupImpl *impl = new (group) TaskGroupImpl(T); @@ -475,6 +489,12 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T) if (!notCancelled) impl->statusCancel(); } +// Initializes into the preallocated _group an actual TaskGroupImpl. +SWIFT_CC(swift) +static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T) { + swift_taskGroup_initializeWithFlagsImpl(0, group, T); +} + // ============================================================================= // ==== add / attachChild ------------------------------------------------------ @@ -546,7 +566,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context, } case PollStatus::Empty: { - // Initialize the result as a nil Optional. + // Initialize the result as a .none Optional. const Metadata *successType = result.successType; OpaqueValue *destPtr = context->successResultPointer; successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1); @@ -555,13 +575,19 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context, } } +static void fillGroupNextNilResult(TaskFutureWaitAsyncContext *context) { + /// Fill in the result value with 'nil' + const Metadata *successType = result.successType; + OpaqueValue *destPtr = context->successResultPointer; + successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1); +} + void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { assert(completedTask); assert(completedTask->isFuture()); assert(completedTask->hasChildFragment()); assert(completedTask->hasGroupChildFragment()); assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this)); - SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this); mutex.lock(); // TODO: remove fragment lock, and use status for synchronization @@ -572,6 +598,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { // W:n R:0 P:1 -> W:y R:1 P:1 // complete immediately // W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks auto assumed = statusAddReadyAssumeAcquire(); + SWIFT_TASK_DEBUG_LOG("offer task %p to group(%p), tasks pending = %d", completedTask, assumed.pendingTasks()); auto asyncContextPrefix = reinterpret_cast( reinterpret_cast(context) - sizeof(FutureAsyncContextPrefix)); @@ -641,6 +668,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { assert(completedTask == readyItem.getTask()); assert(readyItem.getTask()->isFuture()); readyQueue.enqueue(readyItem); + mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization return; } @@ -685,6 +713,9 @@ static void swift_taskGroup_wait_next_throwingImpl( waitingTask->ResumeTask = task_group_wait_resume_adapter; waitingTask->ResumeContext = rawContext; + auto group = asImpl(_group); + assert(group && "swift_taskGroup_wait_next_throwing was passed context without group!"); + auto context = static_cast(rawContext); context->ResumeParent = reinterpret_cast(resumeFunction); @@ -692,13 +723,10 @@ static void swift_taskGroup_wait_next_throwingImpl( context->errorResult = nullptr; context->successResultPointer = resultPointer; - auto group = asImpl(_group); - assert(group && "swift_taskGroup_wait_next_throwing was passed context without group!"); - PollResult polled = group->poll(waitingTask); switch (polled.status) { case PollStatus::MustWait: - SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p", + SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks ready = 0, waiting task = %p", group, waitingTask); // The waiting task has been queued on the channel, // there were pending tasks so it will be woken up eventually. @@ -715,12 +743,12 @@ static void swift_taskGroup_wait_next_throwingImpl( SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p", group, waitingTask, polled.retainedTask); fillGroupNextResult(context, polled); + if (auto completedTask = polled.retainedTask) { // it would be null for PollStatus::Empty, then we don't need to release - group->detachChild(polled.retainedTask); - swift_release(polled.retainedTask); + group->detachChild(completedTask); + swift_release(completedTask); } - return waitingTask->runInFullyEstablishedContext(); } } @@ -755,13 +783,13 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) { // ==== 2) Ready task was polled, return with it immediately ----------------- if (assumed.readyTasks()) { - SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d", - this, assumed.readyTasks()); + SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks ready=%d, pending=%d", + this, assumed.readyTasks(), assumed.pendingTasks()); auto assumedStatus = assumed.status; auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus}; if (status.compare_exchange_strong( - assumedStatus, newStatus.completingPendingReadyWaiting().status, + assumedStatus, newStatus.completingPendingReadyWaiting(this).status, /*success*/ std::memory_order_relaxed, /*failure*/ std::memory_order_acquire)) { @@ -840,14 +868,13 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) { } // ============================================================================= -// ==== isEmpty ---------------------------------------------------------------- +// ==== Task Group status and flag checks ------------------------------------- + SWIFT_CC(swift) static bool swift_taskGroup_isEmptyImpl(TaskGroup *group) { return asImpl(group)->isEmpty(); } -// ============================================================================= -// ==== isCancelled ------------------------------------------------------------ SWIFT_CC(swift) static bool swift_taskGroup_isCancelledImpl(TaskGroup *group) { return asImpl(group)->isCancelled(); diff --git a/stdlib/public/BackDeployConcurrency/TaskGroup.swift b/stdlib/public/BackDeployConcurrency/TaskGroup.swift index 6955c5ee52552..55e9fca22f6a9 100644 --- a/stdlib/public/BackDeployConcurrency/TaskGroup.swift +++ b/stdlib/public/BackDeployConcurrency/TaskGroup.swift @@ -464,6 +464,7 @@ public struct ThrowingTaskGroup { } } + // TODO(ktoso): doesn't seem to be used? @usableFromInline internal mutating func _waitForAll() async throws { while let _ = try await next() { } diff --git a/stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def b/stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def index 877170f5927e9..ec8307ddd2de9 100644 --- a/stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def +++ b/stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def @@ -275,6 +275,10 @@ OVERRIDE_TASK_GROUP(taskGroup_initialize, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group, const Metadata *T), (group, T)) +OVERRIDE_TASK_GROUP(taskGroup_initializeWithFlags, void, + SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), + swift::, (size_t flags, TaskGroup *group, const Metadata *T), (flags, group, T)) + OVERRIDE_TASK_STATUS(taskGroup_attachChild, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group, AsyncTask *child), @@ -303,6 +307,10 @@ OVERRIDE_TASK_GROUP(taskGroup_isCancelled, bool, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group), (group)) +OVERRIDE_TASK_GROUP(taskGroup_isDiscardingResults, bool, + SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), + swift::, (TaskGroup *group), (group)) + OVERRIDE_TASK_GROUP(taskGroup_cancelAll, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group), (group)) @@ -312,6 +320,17 @@ OVERRIDE_TASK_GROUP(taskGroup_addPending, bool, swift::, (TaskGroup *group, bool unconditionally), (group, unconditionally)) +OVERRIDE_TASK_GROUP(taskGroup_waitAll, void, + SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swiftasync), + swift::, + (OpaqueValue *resultPointer, + SWIFT_ASYNC_CONTEXT AsyncContext *callerContext, + TaskGroup *_group, + SwiftError *bodyError, + ThrowingTaskFutureWaitContinuationFunction *resumeFn, + AsyncContext *callContext), + (resultPointer, callerContext, _group, bodyError, + resumeFn, callContext)) OVERRIDE_TASK_LOCAL(task_reportIllegalTaskLocalBindingWithinWithTaskGroup, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, diff --git a/stdlib/public/Concurrency/CMakeLists.txt b/stdlib/public/Concurrency/CMakeLists.txt index 2d62eccc64838..6f198d0101ce0 100644 --- a/stdlib/public/Concurrency/CMakeLists.txt +++ b/stdlib/public/Concurrency/CMakeLists.txt @@ -114,6 +114,7 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I TaskStatus.cpp TaskGroup.cpp TaskGroup.swift + DiscardingTaskGroup.swift TaskLocal.cpp TaskLocal.swift TaskSleep.swift diff --git a/stdlib/public/Concurrency/DiscardingTaskGroup.swift b/stdlib/public/Concurrency/DiscardingTaskGroup.swift new file mode 100644 index 0000000000000..4f6f25493e7f8 --- /dev/null +++ b/stdlib/public/Concurrency/DiscardingTaskGroup.swift @@ -0,0 +1,558 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2020 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import Swift +@_implementationOnly import _SwiftConcurrencyShims + +// ==== DiscardingTaskGroup --------------------------------------------------- + +/// Starts a new scope that can contain a dynamic number of child tasks. +/// +/// Unlike a ``TaskGroup``, the child tasks as well as their results are +/// discarded as soon as the tasks complete. This prevents the discarding +/// task group from accumulating many results waiting to be consumed, and is +/// best applied in situations where the result of a child task is some form +/// of side-effect. +/// +/// A group waits for all of its child tasks +/// to complete before it returns. Even cancelled tasks must run until +/// completion before this function returns. +/// Cancelled child tasks cooperatively react to cancellation and attempt +/// to return as early as possible. +/// After this function returns, the task group is always empty. +/// +/// It is not possible to explicitly await completion of child-tasks, +/// however the group will automatically await *all* child task completions +/// before returning from this function: +/// +/// ``` +/// await withDiscardingTaskGroup { group in +/// group.addTask { /* slow-task */ } +/// // slow-task executes... +/// } +/// // guaranteed that slow-task has completed and the group is empty & destroyed +/// ``` +/// +/// Task Group Cancellation +/// ======================= +/// +/// You can cancel a task group and all of its child tasks +/// by calling the ``TaskGroup/cancelAll()`` method on the task group, +/// or by canceling the task in which the group is running. +/// +/// If you call `addTask(priority:operation:)` to create a new task in a canceled group, +/// that task is immediately canceled after creation. +/// Alternatively, you can call `asyncUnlessCancelled(priority:operation:)`, +/// which doesn't create the task if the group has already been canceled +/// Choosing between these two functions +/// lets you control how to react to cancellation within a group: +/// some child tasks need to run regardless of cancellation, +/// but other tasks are better not even being created +/// when you know they can't produce useful results. +/// +/// Because the tasks you add to a group with this method are nonthrowing, +/// those tasks can't respond to cancellation by throwing `CancellationError`. +/// The tasks must handle cancellation in some other way, +/// such as returning the work completed so far, returning an empty result, or returning `nil`. +/// For tasks that need to handle cancellation by throwing an error, +/// use the `withThrowingDiscardingTaskGroup(returning:body:)` method instead. +/// +/// - SeeAlso: ``withThrowingDiscardingTaskGroup(returning:body:) +@available(SwiftStdlib 5.8, *) +@inlinable +@_unsafeInheritExecutor +public func withDiscardingTaskGroup( + returning returnType: GroupResult.Type = GroupResult.self, + body: (inout DiscardingTaskGroup) async -> GroupResult +) async -> GroupResult { + #if compiler(>=5.5) && $BuiltinTaskGroupWithArgument + let flags = taskGroupCreateFlags( + discardResults: true + ) + + let _group = Builtin.createTaskGroupWithFlags(flags, GroupResult.self) + var group = DiscardingTaskGroup(group: _group) + defer { Builtin.destroyTaskGroup(_group) } + + let result = await body(&group) + + try! await group.awaitAllRemainingTasks() // try!-safe, cannot throw since this is a non throwing group + return result + #else + fatalError("Swift compiler is incompatible with this SDK version") + #endif +} + +/// A discarding group that contains dynamically created child tasks. +/// +/// To create a discarding task group, +/// call the ``withDiscardingTaskGroup(returning:body:)`` method. +/// +/// Don't use a task group from outside the task where you created it. +/// In most cases, +/// the Swift type system prevents a task group from escaping like that +/// because adding a child task to a task group is a mutating operation, +/// and mutation operations can't be performed +/// from a concurrent execution context like a child task. +/// +/// ### Task execution order +/// Tasks added to a task group execute concurrently, and may be scheduled in +/// any order. +/// +/// ### Discarding behavior +/// A discarding task group eagerly discards and releases its child tasks as +/// soon as they complete. This allows for the efficient releasing of memory used +/// by those tasks, which are not retained for future `next()` calls, as would +/// be the case with a ``TaskGroup``. +/// +/// ### Cancellation behavior +/// A task group becomes cancelled in one of two ways: when ``cancelAll()`` is +/// invoked on it, or when the ``Task`` running this task group is cancelled. +/// +/// Since a `TaskGroup` is a structured concurrency primitive, cancellation is +/// automatically propagated through all of its child-tasks (and their child +/// tasks). +/// +/// A cancelled task group can still keep adding tasks, however they will start +/// being immediately cancelled, and may act accordingly to this. To avoid adding +/// new tasks to an already cancelled task group, use ``addTaskUnlessCancelled(priority:body:)`` +/// rather than the plain ``addTask(priority:body:)`` which adds tasks unconditionally. +/// +/// For information about the language-level concurrency model that `DiscardingTaskGroup` is part of, +/// see [Concurrency][concurrency] in [The Swift Programming Language][tspl]. +/// +/// [concurrency]: https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html +/// [tspl]: https://docs.swift.org/swift-book/ +/// +/// - SeeAlso: ``TaskGroup`` +/// - SeeAlso: ``ThrowingTaskGroup`` +/// - SeeAlso: ``ThrowingDiscardingTaskGroup`` +@available(SwiftStdlib 5.8, *) +@frozen +public struct DiscardingTaskGroup { + + @usableFromInline + internal let _group: Builtin.RawPointer + + // No public initializers + @inlinable + init(group: Builtin.RawPointer) { + self._group = group + } + + /// Await all the remaining tasks on this group. + /// + /// - Throws: The first error that was encountered by this group. + @usableFromInline + internal mutating func awaitAllRemainingTasks() async throws { + let _: Void? = try await _taskGroupWaitAll(group: _group, bodyError: nil) + } + + @_alwaysEmitIntoClient + #if SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY + @available(*, unavailable, message: "Unavailable in task-to-thread concurrency model", renamed: "addTask(operation:)") + #endif + public mutating func addTask( + priority: TaskPriority? = nil, + operation: __owned @Sendable @escaping () async -> Void + ) { +#if SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY + let flags = taskCreateFlags( + priority: priority, isChildTask: true, copyTaskLocals: false, + inheritContext: false, enqueueJob: false, + addPendingGroupTaskUnconditionally: true + ) +#else + let flags = taskCreateFlags( + priority: priority, isChildTask: true, copyTaskLocals: false, + inheritContext: false, enqueueJob: true, + addPendingGroupTaskUnconditionally: true + ) +#endif + + // Create the task in this group. + _ = Builtin.createAsyncTaskInGroup(flags, _group, operation) + } + + @_alwaysEmitIntoClient + #if SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY + @available(*, unavailable, message: "Unavailable in task-to-thread concurrency model", renamed: "addTask(operation:)") + #endif + public mutating func addTaskUnlessCancelled( + priority: TaskPriority? = nil, + operation: __owned @Sendable @escaping () async -> Void + ) -> Bool { + let canAdd = _taskGroupAddPendingTask(group: _group, unconditionally: false) + + guard canAdd else { + // the group is cancelled and is not accepting any new work + return false + } +#if SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY + let flags = taskCreateFlags( + priority: priority, isChildTask: true, copyTaskLocals: false, + inheritContext: false, enqueueJob: false, + addPendingGroupTaskUnconditionally: false + ) +#else + let flags = taskCreateFlags( + priority: priority, isChildTask: true, copyTaskLocals: false, + inheritContext: false, enqueueJob: true, + addPendingGroupTaskUnconditionally: false + ) +#endif + + // Create the task in this group. + _ = Builtin.createAsyncTaskInGroup(flags, _group, operation) + + return true + } + + @_alwaysEmitIntoClient + public mutating func addTask( + operation: __owned @Sendable @escaping () async -> Void + ) { + let flags = taskCreateFlags( + priority: nil, isChildTask: true, copyTaskLocals: false, + inheritContext: false, enqueueJob: true, + addPendingGroupTaskUnconditionally: true + ) + + // Create the task in this group. + _ = Builtin.createAsyncTaskInGroup(flags, _group, operation) + } + +#if SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY + @available(*, unavailable, message: "Unavailable in task-to-thread concurrency model", renamed: "addTaskUnlessCancelled(operation:)") +#endif + @_alwaysEmitIntoClient + public mutating func addTaskUnlessCancelled( + operation: __owned @Sendable @escaping () async -> Void + ) -> Bool { +#if compiler(>=5.5) && $BuiltinCreateAsyncTaskInGroup + let canAdd = _taskGroupAddPendingTask(group: _group, unconditionally: false) + + guard canAdd else { + // the group is cancelled and is not accepting any new work + return false + } + + let flags = taskCreateFlags( + priority: nil, isChildTask: true, copyTaskLocals: false, + inheritContext: false, enqueueJob: true, + addPendingGroupTaskUnconditionally: false + ) + + // Create the task in this group. + _ = Builtin.createAsyncTaskInGroup(flags, _group, operation) + + return true +#else + fatalError("Unsupported Swift compiler") +#endif + } + + public var isEmpty: Bool { + _taskGroupIsEmpty(_group) + } + + public func cancelAll() { + _taskGroupCancelAll(group: _group) + } + + public var isCancelled: Bool { + return _taskGroupIsCancelled(group: _group) + } +} + +@available(SwiftStdlib 5.8, *) +@available(*, unavailable) +extension DiscardingTaskGroup: Sendable { } + +// ==== ThrowingDiscardingTaskGroup ------------------------------------------- + +/// Starts a new scope that can contain a dynamic number of child tasks. +/// +/// Unlike a ``ThrowingTaskGroup``, the child tasks as well as their results are +/// discarded as soon as the tasks complete. This prevents the discarding +/// task group from accumulating many results waiting to be consumed, and is +/// best applied in situations where the result of a child task is some form +/// of side-effect. +/// +/// A group waits for all of its child tasks +/// to complete before it returns. Even cancelled tasks must run until +/// completion before this function returns. +/// Cancelled child tasks cooperatively react to cancellation and attempt +/// to return as early as possible. +/// After this function returns, the task group is always empty. +/// +/// It is not possible to explicitly await completion of child-tasks, +/// however the group will automatically await *all* child task completions +/// before returning from this function: +/// +/// ``` +/// try await withThrowingDiscardingTaskGroup { group in +/// group.addTask { /* slow-task */ } +/// // slow-task executes... +/// } +/// // guaranteed that slow-task has completed and the group is empty & destroyed +/// ``` +/// +/// Task Group Cancellation +/// ======================= +/// +/// You can cancel a task group and all of its child tasks +/// by calling the ``TaskGroup/cancelAll()`` method on the task group, +/// or by canceling the task in which the group is running. +/// +/// If you call `addTask(priority:operation:)` to create a new task in a canceled group, +/// that task is immediately canceled after creation. +/// Alternatively, you can call `asyncUnlessCancelled(priority:operation:)`, +/// which doesn't create the task if the group has already been canceled +/// Choosing between these two functions +/// lets you control how to react to cancellation within a group: +/// some child tasks need to run regardless of cancellation, +/// but other tasks are better not even being created +/// when you know they can't produce useful results. +/// +/// Error Handling and Implicit Cancellation +/// ======================================== +/// +/// Since it is not possible to explicitly await individual task completions, +/// it is also not possible to "re-throw" an error thrown by one of the child +/// tasks using the same pattern as one would in a ``ThrowingTaskGroup``: +/// +/// ``` +/// // ThrowingTaskGroup, pattern not applicable to ThrowingDiscardingTaskGroup +/// try await withThrowingTaskGroup { group in +/// group.addTask { try boom() } +/// try await group.next() // re-throws "boom" +/// } +/// ``` +/// +/// Since discarding task groups don't have access to `next()`, this pattern +/// cannot be used. +/// Instead, +/// a *throwing discarding task group implicitly cancels itself whenever any +/// of its child tasks throws*. +/// +/// The *first error* thrown inside such task group +/// is then retained and thrown +/// out of the `withThrowingDiscardingTaskGroup` method when it returns. +/// +/// ``` +/// try await withThrowingDiscardingTaskGroup() { group in +/// group.addTask { try boom(1) } +/// group.addTask { try boom(2, after: .seconds(5)) } +/// group.addTask { try boom(3, after: .seconds(5)) } +/// } +/// ``` +/// +/// +/// +/// Generally, this suits the typical use-cases of a +/// discarding task group well, however, if you wanted to prevent specific +/// errors from cancelling the group +/// +/// +/// +/// +/// Throwing an error in one of the child tasks of a task group +/// doesn't immediately cancel the other tasks in that group. +/// However, +/// throwing out of the `body` of the `withThrowingTaskGroup` method does cancel +/// the group, and all of its child tasks. +@available(SwiftStdlib 5.8, *) +@inlinable +@_unsafeInheritExecutor +public func withThrowingDiscardingTaskGroup( + returning returnType: GroupResult.Type = GroupResult.self, + body: (inout ThrowingDiscardingTaskGroup) async throws -> GroupResult +) async throws -> GroupResult { + #if compiler(>=5.5) && $BuiltinTaskGroupWithArgument + let flags = taskGroupCreateFlags( + discardResults: true + ) + + let _group = Builtin.createTaskGroupWithFlags(flags, GroupResult.self) + var group = ThrowingDiscardingTaskGroup(group: _group) + defer { Builtin.destroyTaskGroup(_group) } + + let result: GroupResult + do { + result = try await body(&group) + } catch { + group.cancelAll() + + try await group.awaitAllRemainingTasks(bodyError: error) + + throw error + } + + try await group.awaitAllRemainingTasks(bodyError: nil) + + return result + #else + fatalError("Swift compiler is incompatible with this SDK version") + #endif +} + + +/// A throwing discarding group that contains dynamically created child tasks. +/// +/// To create a discarding task group, +/// call the ``withDiscardingTaskGroup(returning:body:)`` method. +/// +/// Don't use a task group from outside the task where you created it. +/// In most cases, +/// the Swift type system prevents a task group from escaping like that +/// because adding a child task to a task group is a mutating operation, +/// and mutation operations can't be performed +/// from a concurrent execution context like a child task. +/// +/// ### Task execution order +/// Tasks added to a task group execute concurrently, and may be scheduled in +/// any order. +/// +/// ### Discarding behavior +/// A discarding task group eagerly discards and releases its child tasks as +/// soon as they complete. This allows for the efficient releasing of memory used +/// by those tasks, which are not retained for future `next()` calls, as would +/// be the case with a ``TaskGroup``. +/// +/// ### Cancellation behavior +/// A task group becomes cancelled in one of two ways: when ``cancelAll()`` is +/// invoked on it, or when the ``Task`` running this task group is cancelled. +/// +/// Since a `TaskGroup` is a structured concurrency primitive, cancellation is +/// automatically propagated through all of its child-tasks (and their child +/// tasks). +/// +/// A cancelled task group can still keep adding tasks, however they will start +/// being immediately cancelled, and may act accordingly to this. To avoid adding +/// new tasks to an already cancelled task group, use ``addTaskUnlessCancelled(priority:body:)`` +/// rather than the plain ``addTask(priority:body:)`` which adds tasks unconditionally. +/// +/// For information about the language-level concurrency model that `DiscardingTaskGroup` is part of, +/// see [Concurrency][concurrency] in [The Swift Programming Language][tspl]. +/// +/// [concurrency]: https://docs.swift.org/swift-book/LanguageGuide/Concurrency.html +/// [tspl]: https://docs.swift.org/swift-book/ +/// +/// - SeeAlso: ``TaskGroup`` +/// - SeeAlso: ``ThrowingTaskGroup`` +/// - SeeAlso: ``DiscardingTaskGroup`` +@available(SwiftStdlib 5.8, *) +@frozen +public struct ThrowingDiscardingTaskGroup { + + @usableFromInline + internal let _group: Builtin.RawPointer + + // No public initializers + @inlinable + init(group: Builtin.RawPointer) { + self._group = group + } + + /// Await all the remaining tasks on this group. + @usableFromInline + internal mutating func awaitAllRemainingTasks(bodyError: Error?) async throws { + let _: Void? = try await _taskGroupWaitAll(group: _group, bodyError: bodyError) + } + +#if SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY + @available(*, unavailable, message: "Unavailable in task-to-thread concurrency model", renamed: "addTask(operation:)") +#endif + @_alwaysEmitIntoClient + public mutating func addTask( + priority: TaskPriority? = nil, + operation: __owned @Sendable @escaping () async throws -> Void + ) { +#if compiler(>=5.5) && $BuiltinCreateAsyncTaskInGroup + let flags = taskCreateFlags( + priority: priority, isChildTask: true, copyTaskLocals: false, + inheritContext: false, enqueueJob: true, + addPendingGroupTaskUnconditionally: true + ) + + // Create the task in this group. + _ = Builtin.createAsyncTaskInGroup(flags, _group, operation) +#else + fatalError("Unsupported Swift compiler") +#endif + } + +#if SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY + @available(*, unavailable, message: "Unavailable in task-to-thread concurrency model", renamed: "addTask(operation:)") +#endif + @_alwaysEmitIntoClient + public mutating func addTaskUnlessCancelled( + priority: TaskPriority? = nil, + operation: __owned @Sendable @escaping () async throws -> Void + ) -> Bool { +#if compiler(>=5.5) && $BuiltinCreateAsyncTaskInGroup + let canAdd = _taskGroupAddPendingTask(group: _group, unconditionally: false) + + guard canAdd else { + // the group is cancelled and is not accepting any new work + return false + } + + let flags = taskCreateFlags( + priority: priority, isChildTask: true, copyTaskLocals: false, + inheritContext: false, enqueueJob: true, + addPendingGroupTaskUnconditionally: false + ) + + // Create the task in this group. + _ = Builtin.createAsyncTaskInGroup(flags, _group, operation) + + return true +#else + fatalError("Unsupported Swift compiler") +#endif + } + + public var isEmpty: Bool { + _taskGroupIsEmpty(_group) + } + + public func cancelAll() { + _taskGroupCancelAll(group: _group) + } + + public var isCancelled: Bool { + return _taskGroupIsCancelled(group: _group) + } +} + +@available(SwiftStdlib 5.8, *) +@available(*, unavailable) +extension ThrowingDiscardingTaskGroup: Sendable { } + +// ==== ----------------------------------------------------------------------- +// MARK: Runtime functions + +/// Always returns `nil`. +@available(SwiftStdlib 5.8, *) +@usableFromInline +@discardableResult +@_silgen_name("swift_taskGroup_waitAll") +func _taskGroupWaitAll( + group: Builtin.RawPointer, + bodyError: Error? +) async throws -> T? + +@available(SwiftStdlib 5.8, *) // FIXME: remove +@_silgen_name("swift_taskGroup_isDiscardingResults") +func _taskGroupIsDiscardingResults(group: Builtin.RawPointer) -> Bool diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index d8782586ee31b..aed46eb62f210 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -54,6 +54,17 @@ using namespace swift; /*************************** TASK GROUP ***************************************/ /******************************************************************************/ +#if 0 +#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \ +fprintf(stderr, "[%#lx] [%s:%d](%s) group(%p%s) " fmt "\n", \ + (unsigned long)Thread::current().platformThreadId(), \ + __FILE__, __LINE__, __FUNCTION__, \ + group, group->isDiscardingResults() ? ",discardResults" : "", \ + __VA_ARGS__) +#else +#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) (void)0 +#endif + using FutureFragment = AsyncTask::FutureFragment; namespace { @@ -69,8 +80,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /// The storage is not accessible. Empty = 0b00, - // not used: 0b01; same value as the PollStatus MustWait, - // which does not make sense for the ReadyStatus + /// A raw SwiftError is stored in the item's storage, rather than a Task with an Error inside. + RawError = 0b01, /// The future has completed with result (of type \c resultType). Success = 0b10, @@ -118,12 +129,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /// until a next() call eventually picks it up. AsyncTask *retainedTask; - bool isStorageAccessible() { - return status == PollStatus::Success || - status == PollStatus::Error || - status == PollStatus::Empty; - } - static PollResult get(AsyncTask *asyncTask, bool hadErrorResult) { auto fragment = asyncTask->futureFragment(); return PollResult{ @@ -137,6 +142,25 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /*task*/ asyncTask }; } + + static PollResult getEmpty(const Metadata *successType) { + return PollResult{ + /*status*/PollStatus::Empty, + /*storage*/nullptr, + /*successType*/successType, + /*task*/nullptr + }; + } + + static PollResult getError(SwiftError *error) { + assert(error); + return PollResult{ + /*status*/PollStatus::Error, + /*storage*/reinterpret_cast(error), + /*successType*/nullptr, + /*task*/nullptr + }; + } }; /// An item within the message queue of a group. @@ -151,14 +175,25 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { } AsyncTask *getTask() const { + assert(getStatus() != ReadyStatus::RawError && "storage did contain raw error pointer, not task!"); return reinterpret_cast(storage & ~statusMask); } + SwiftError *getRawError() const { + assert(getStatus() == ReadyStatus::RawError && "storage did not contain raw error pointer!"); + return reinterpret_cast(storage & ~statusMask); + } + static ReadyQueueItem get(ReadyStatus status, AsyncTask *task) { assert(task == nullptr || task->isFuture()); return ReadyQueueItem{ reinterpret_cast(task) | static_cast(status)}; } + + static ReadyQueueItem getRawError(SwiftError *error) { + return ReadyQueueItem{ + reinterpret_cast(error) | static_cast(ReadyStatus::RawError)}; + } }; /// An item within the pending queue. @@ -183,9 +218,11 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { static const uint64_t maskReady = 0b0011111111111111111111111111111110000000000000000000000000000000; static const uint64_t oneReadyTask = 0b0000000000000000000000000000000010000000000000000000000000000000; - // 31 bits for pending tasks counter - static const uint64_t maskPending = 0b0000000000000000000000000000000001111111111111111111111111111111; - static const uint64_t onePendingTask = 0b0000000000000000000000000000000000000000000000000000000000000001; + // 31 bits for pending tasks counter, while accumulating results (default mode) + static const uint64_t maskAccumulatingPending = 0b0000000000000000000000000000000001111111111111111111111111111111; + // 62 bits for pending tasks counter, while discarding results (discardResults) + static const uint64_t maskDiscardingPending = 0b0011111111111111111111111111111111111111111111111111111111111111; + static const uint64_t onePendingTask = 0b0000000000000000000000000000000000000000000000000000000000000001; uint64_t status; @@ -197,50 +234,72 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { return (status & waiting) > 0; } - unsigned int readyTasks() { + unsigned int readyTasks(const TaskGroupImpl* _Nonnull group) { + assert(group->isAccumulatingResults() + && "attempted to check ready tasks on group that does not accumulate results!"); return (status & maskReady) >> 31; } - unsigned int pendingTasks() { - return (status & maskPending); + uint64_t pendingTasks(const TaskGroupImpl* _Nonnull group) { + if (group->isAccumulatingResults()) { + return (status & maskAccumulatingPending); + } else { + return (status & maskDiscardingPending); + } } - bool isEmpty() { - return pendingTasks() == 0; + bool isEmpty(const TaskGroupImpl *group) { + return pendingTasks(group) == 0; } /// Status value decrementing the Ready, Pending and Waiting counters by one. - GroupStatus completingPendingReadyWaiting() { - assert(pendingTasks() && + GroupStatus completingPendingReadyWaiting(const TaskGroupImpl* _Nonnull group) { + assert(pendingTasks(group) && "can only complete waiting task when pending tasks available"); - assert(readyTasks() && + assert(group->isDiscardingResults() || readyTasks(group) && "can only complete waiting task when ready tasks available"); assert(hasWaitingTask() && "can only complete waiting task when waiting task available"); - return GroupStatus{status - waiting - oneReadyTask - onePendingTask}; + uint64_t change = waiting + onePendingTask; + // only while accumulating results does the status contain "ready" bits; + // so if we're in "discard results" mode, we must not decrement the ready count, + // as there is no ready count in the status. + change += group->isAccumulatingResults() ? oneReadyTask : 0; + return GroupStatus{status - change}; } - GroupStatus completingPendingReady() { - assert(pendingTasks() && + GroupStatus completingPendingReady(const TaskGroupImpl* _Nonnull group) { + assert(pendingTasks(group) && "can only complete waiting task when pending tasks available"); - assert(readyTasks() && + assert(group->isDiscardingResults() || readyTasks(group) && "can only complete waiting task when ready tasks available"); - return GroupStatus{status - oneReadyTask - onePendingTask}; + auto change = onePendingTask; + change += group->isAccumulatingResults() ? oneReadyTask : 0; + return GroupStatus{status - change}; + } + + GroupStatus asCancelled(bool cancel) { + return GroupStatus{status | (cancel ? cancelled : 0)}; } /// Pretty prints the status, as follows: - /// GroupStatus{ P:{pending tasks} W:{waiting tasks} {binary repr} } - std::string to_string() { + /// If accumulating results: + /// GroupStatus{ C:{cancelled} W:{waiting task} R:{ready tasks} P:{pending tasks} {binary repr} } + /// If discarding results: + /// GroupStatus{ C:{cancelled} W:{waiting task} P:{pending tasks} {binary repr} } + std::string to_string(const TaskGroupImpl* _Nonnull group) { std::string str; str.append("GroupStatus{ "); str.append("C:"); // cancelled - str.append(isCancelled() ? "y " : "n "); - str.append("W:"); // has waiting task - str.append(hasWaitingTask() ? "y " : "n "); - str.append("R:"); // ready - str.append(std::to_string(readyTasks())); + str.append(isCancelled() ? "y" : "n"); + str.append(" W:"); // has waiting task + str.append(hasWaitingTask() ? "y" : "n"); + if (group && group->isAccumulatingResults()) { + str.append(" R:"); // ready + str.append(std::to_string(readyTasks(group))); + } str.append(" P:"); // pending - str.append(std::to_string(pendingTasks())); + str.append(std::to_string(pendingTasks(group))); str.append(" " + std::bitset<64>(status).to_string()); str.append(" }"); return str; @@ -278,6 +337,10 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { return true; } + bool isEmpty() const { + return queue.empty(); + } + void enqueue(const T item) { queue.push(item); } @@ -320,14 +383,23 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { const Metadata *successType; + /// If true, this group will never accumulate results, + /// and release tasks as soon as they complete. + const bool discardResults; + friend class ::swift::AsyncTask; public: - explicit TaskGroupImpl(const Metadata *T) + + explicit TaskGroupImpl(const Metadata *T, bool discardResults) : TaskGroupTaskStatusRecord(), status(GroupStatus::initial().status), readyQueue(), - waitQueue(nullptr), successType(T) {} + waitQueue(nullptr), + successType(T), + discardResults(discardResults) { + SWIFT_TASK_GROUP_DEBUG_LOG(this, "init discardResults=%d", discardResults); + } TaskGroupTaskStatusRecord *getTaskRecord() { return reinterpret_cast(this); @@ -336,12 +408,19 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /// Destroy the storage associated with the group. void destroy(); - bool isEmpty() { + bool isAccumulatingResults() const { + return !isDiscardingResults(); + } + bool isDiscardingResults() const { + return this->discardResults; + } + + bool isEmpty() const { auto oldStatus = GroupStatus{status.load(std::memory_order_relaxed)}; - return oldStatus.pendingTasks() == 0; + return oldStatus.pendingTasks(this) == 0; } - bool isCancelled() { + bool isCancelled() const { auto oldStatus = GroupStatus{status.load(std::memory_order_relaxed)}; return oldStatus.isCancelled(); } @@ -363,19 +442,27 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { return GroupStatus{old | GroupStatus::waiting}; } - GroupStatus statusRemoveWaiting() { + GroupStatus statusRemoveWaitingRelease() { auto old = status.fetch_and(~GroupStatus::waiting, std::memory_order_release); return GroupStatus{old}; } - /// Returns *assumed* new status, including the just performed +1. - GroupStatus statusAddReadyAssumeAcquire() { - auto old = status.fetch_add(GroupStatus::oneReadyTask, - std::memory_order_acquire); - auto s = GroupStatus{old + GroupStatus::oneReadyTask}; - assert(s.readyTasks() <= s.pendingTasks()); - return s; + /// Returns *assumed* new status. + /// + /// If the group is not accumulating results, the "ready" count does not exist, + /// and this is just a plan load(). + GroupStatus statusAddReadyAssumeAcquire(const TaskGroupImpl *group) { + if (group->isAccumulatingResults()) { + auto old = status.fetch_add(GroupStatus::oneReadyTask, + std::memory_order_acquire); + auto s = GroupStatus{old + GroupStatus::oneReadyTask}; + assert(s.readyTasks(group) <= s.pendingTasks(group)); + return s; + } else { + assert(group->isDiscardingResults()); + return GroupStatus{status.load(std::memory_order_acquire)}; + } } /// Add a single pending task to the status counter. @@ -419,19 +506,30 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /// This is used to atomically perform a waiting task completion. bool statusCompletePendingReadyWaiting(GroupStatus &old) { return status.compare_exchange_strong( - old.status, old.completingPendingReadyWaiting().status, + old.status, old.completingPendingReadyWaiting(this).status, /*success*/ std::memory_order_relaxed, /*failure*/ std::memory_order_relaxed); } - bool statusCompletePendingReady(GroupStatus &old) { - return status.compare_exchange_strong( - old.status, old.completingPendingReady().status, - /*success*/ std::memory_order_relaxed, - /*failure*/ std::memory_order_relaxed); +// bool statusCompletePendingReady(GroupStatus &old) { +// return status.compare_exchange_strong( +// old.status, old.completingPendingReady(this).status, +// /*success*/ std::memory_order_relaxed, +// /*failure*/ std::memory_order_relaxed); +// } + + /// Decrement the pending status count. + /// Returns the *assumed* new status, including the just performed -1. + GroupStatus statusCompletePendingAssumeRelease() { + assert(this->isDiscardingResults() + && "only a discardResults TaskGroup may use completePending, " + "since it avoids updating the ready count, which other groups need."); + auto old = status.fetch_sub(GroupStatus::onePendingTask, + std::memory_order_release); + assert(GroupStatus{old}.pendingTasks(this) > 0 && "attempted to decrement pending count when it was 0 already"); + return GroupStatus{old - GroupStatus::onePendingTask}; } - /// Offer result of a task into this task group. /// /// If possible, and an existing task is already waiting on next(), this will @@ -448,10 +546,28 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord { /// and the waitingTask eventually be woken up by a completion. PollResult poll(AsyncTask *waitingTask); + /// A `discardResults` TaskGroup is not able to wait on individual completions, + /// instead, it can only await on "all pending tasks have been processed". + /// + /// + /// If unable to complete the waiting task immediately (with an readily + /// available completed task), either returns an `PollStatus::Empty` + /// result if it is known that no pending tasks in the group, + /// or a `PollStatus::MustWait` result if there are tasks in flight + /// and the waitingTask eventually be woken up by a completion. + PollResult tryEnqueueWaitingTask(AsyncTask *waitingTask); + + bool offerBodyError(SwiftError* _Nonnull bodyError); + private: - // Enqueue the completed task onto ready queue if there are no waiting tasks - // yet + // Enqueue the completed task onto ready queue if there are no waiting tasks yet void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult); + + /// Resume waiting task with result from `completedTask` + void resumeWaitingTask(AsyncTask *completedTask, GroupStatus &assumed, bool hadErrorResult); + + /// Resume waiting task with specified error + void resumeWaitingTaskWithError(SwiftError *error, GroupStatus &assumed); }; } // end anonymous namespace @@ -487,9 +603,18 @@ TaskGroupTaskStatusRecord * TaskGroup::getTaskRecord() { // Initializes into the preallocated _group an actual TaskGroupImpl. SWIFT_CC(swift) static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T) { + swift_taskGroup_initializeWithFlags(0, group, T); +} + +// Initializes into the preallocated _group an actual TaskGroupImpl. +SWIFT_CC(swift) +static void swift_taskGroup_initializeWithFlagsImpl(size_t rawGroupFlags, TaskGroup *group, const Metadata *T) { SWIFT_TASK_DEBUG_LOG("creating task group = %p", group); - TaskGroupImpl *impl = ::new (group) TaskGroupImpl(T); + TaskGroupFlags groupFlags(rawGroupFlags); + + TaskGroupImpl *impl = ::new(group) + TaskGroupImpl(T, groupFlags.isDiscardResults()); auto record = impl->getTaskRecord(); assert(impl == record && "the group IS the task record"); @@ -544,7 +669,16 @@ static void swift_taskGroup_destroyImpl(TaskGroup *group) { } void TaskGroupImpl::destroy() { - SWIFT_TASK_DEBUG_LOG("destroying task group = %p", this); +#if SWIFT_TASK_DEBUG_LOG_ENABLED + if (!this->isEmpty()) { + auto status = this->statusLoadRelaxed(); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "destroy, tasks .ready = %d, .pending = %llu", + status.readyTasks(this), status.pendingTasks(this)); + } else { + SWIFT_TASK_DEBUG_LOG("destroying task group = %p", this); + } +#endif + assert(this->isEmpty() && "Attempted to destroy non-empty task group!"); // First, remove the group from the task and deallocate the record removeStatusRecord(getTaskRecord()); @@ -568,6 +702,11 @@ bool TaskGroup::isCancelled() { return asImpl(this)->isCancelled(); } +static void fillGroupNextErrorResult(TaskFutureWaitAsyncContext *context, + SwiftError *error) { + context->fillWithError(error); +} + static void fillGroupNextResult(TaskFutureWaitAsyncContext *context, PollResult result) { /// Fill in the result value @@ -577,7 +716,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context, return; case PollStatus::Error: { - context->fillWithError(reinterpret_cast(result.storage)); + fillGroupNextErrorResult(context, reinterpret_cast(result.storage)); return; } @@ -594,7 +733,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context, } case PollStatus::Empty: { - // Initialize the result as a nil Optional. + // Initialize the result as a .none Optional. const Metadata *successType = result.successType; OpaqueValue *destPtr = context->successResultPointer; successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1); @@ -603,8 +742,37 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context, } } +static void fillGroupNextNilResult(TaskFutureWaitAsyncContext *context, + PollResult result) { + // Initialize the result as a .none Optional. + const Metadata *successType = result.successType; + OpaqueValue *destPtr = context->successResultPointer; + successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1); +} + // TaskGroup is locked upon entry and exit void TaskGroupImpl::enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) { + if (discardResults) { + if (hadErrorResult) { + // we only store the FIRST error in discardResults mode + if (readyQueue.isEmpty()) { + SWIFT_TASK_GROUP_DEBUG_LOG(this, "store first error, completedTask:%p", completedTask); + // continue handling as usual, which will perform the enqueue + } else { + SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard error result, we already have an error stored, completedTask:%p", completedTask); + // DO NOT RETAIN THE TASK. + return; + } + } else { + SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard successful result, %p", completedTask); + // DO NOT RETAIN THE TASK. + // We know it is Void, so we don't need to store the result; + // By releasing tasks eagerly we're able to keep "infinite" task groups, + // running, that never consume their values. Even more-so, + return; + } + } + // Retain the task while it is in the queue; it must remain alive until // it is found by poll. This retain will balanced by the release in poll. swift_retain(completedTask); @@ -625,7 +793,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { assert(completedTask->hasChildFragment()); assert(completedTask->hasGroupChildFragment()); assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this)); - SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p , status:%s", completedTask, statusLoadRelaxed().to_string(this).c_str()); // The current ownership convention is that we are *not* given ownership // of a retain on completedTask; we're called from the task completion @@ -635,16 +803,19 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { // This is wasteful, and the task completion function should be fixed to // transfer ownership of a retain into this function, in which case we // will need to release in the other path. - lock(); // TODO: remove fragment lock, and use status for synchronization // Immediately increment ready count and acquire the status + // + // NOTE: If the group is `discardResults` this becomes a plain load(), + // since there is no ready count to maintain. + // // Examples: // W:n R:0 P:3 -> W:n R:1 P:3 // no waiter, 2 more pending tasks // W:n R:0 P:1 -> W:n R:1 P:1 // no waiter, no more pending tasks // W:n R:0 P:1 -> W:y R:1 P:1 // complete immediately // W:n R:0 P:1 -> W:y R:1 P:3 // complete immediately, 2 more pending tasks - auto assumed = statusAddReadyAssumeAcquire(); + GroupStatus assumed = statusAddReadyAssumeAcquire(this); auto asyncContextPrefix = reinterpret_cast( reinterpret_cast(context) - sizeof(FutureAsyncContextPrefix)); @@ -655,75 +826,73 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { hadErrorResult = true; } - // ==== a) has waiting task, so let us complete it right away - if (assumed.hasWaitingTask()) { - auto waitingTask = waitQueue.load(std::memory_order_acquire); - SWIFT_TASK_DEBUG_LOG("group has waiting task = %p, complete with = %p", - waitingTask, completedTask); - while (true) { - // ==== a) run waiting task directly ------------------------------------- - assert(assumed.hasWaitingTask()); - assert(assumed.pendingTasks() && "offered to group with no pending tasks!"); - // We are the "first" completed task to arrive, - // and since there is a task waiting we immediately claim and complete it. - if (waitQueue.compare_exchange_strong( - waitingTask, nullptr, - /*success*/ std::memory_order_release, - /*failure*/ std::memory_order_acquire)) { + if (isDiscardingResults()) { + /// If we're the last task we've been waiting for, and there is a waiting task on the group + bool lastPendingTaskAndWaitingTask = + assumed.pendingTasks(this) == 1 && assumed.hasWaitingTask(); + + // Immediately decrement the pending count. + // We can do this, since in this mode there is no ready count to keep track of, + // and we immediately discard the result. + SWIFT_TASK_GROUP_DEBUG_LOG(this, "discard result, hadError:%d, was pending:%llu", + hadErrorResult, assumed.pendingTasks(this)); + // If this was the last pending task, and there is a waiting task (from waitAll), + // we must resume the task; but not otherwise. There cannot be any waiters on next() + // while we're discarding results. + if (lastPendingTaskAndWaitingTask) { + ReadyQueueItem item; + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, offered last pending task, resume waiting task:%p", + waitQueue.load(std::memory_order_relaxed)); + if (readyQueue.dequeue(item)) { + switch (item.getStatus()) { + case ReadyStatus::RawError: + resumeWaitingTaskWithError(item.getRawError(), assumed); + break; + case ReadyStatus::Error: + resumeWaitingTask(item.getTask(), assumed, /*hadErrorResult=*/true); + break; + default: + // FIXME: why can't we use llvm_unreachable here? + assert(false && "only errors can be stored by a discarding task group, yet it wasn't an error!"); + } + } else { + resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/hadErrorResult); + } + } else { + assert(!lastPendingTaskAndWaitingTask); + if (hadErrorResult && readyQueue.isEmpty()) { + // a discardResults throwing task group must retain the FIRST error it encounters. + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer error, completedTask:%p", completedTask); + enqueueCompletedTask(completedTask, /*hadErrorResult=*/hadErrorResult); + } else { + // we just are going to discard it. + _swift_taskGroup_detachChild(asAbstract(this), completedTask); + } -#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL - // In the task-to-thread model, child tasks are always actually - // run synchronously on the parent task's thread. For task groups - // specifically, this means that poll() will pick a child task - // that was added to the group and run it to completion as a - // subroutine. Therefore, when we enter offer(), we know that - // the parent task is waiting and we can just return to it. - - // The task-to-thread logic in poll() currently expects the child - // task to enqueue itself instead of just filling in the result in - // the waiting task. This is a little wasteful; there's no reason - // we can't just have the parent task set itself up as a waiter. - // But since it's what we're doing, we basically take the same - // path as we would if there wasn't a waiter. - enqueueCompletedTask(completedTask, hadErrorResult); - unlock(); // TODO: remove fragment lock, and use status for synchronization - return; + auto afterComplete = statusCompletePendingAssumeRelease(); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, either more pending tasks, or no waiting task, status:%s", + afterComplete.to_string(this).c_str()); + } -#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ - if (statusCompletePendingReadyWaiting(assumed)) { - // Run the task. - auto result = PollResult::get(completedTask, hadErrorResult); + // Discarding results mode, immediately treats a child failure as group cancellation. + // "All for one, one for all!" - any task failing must cause the group and all sibling tasks to be cancelled, + // such that the discarding group can exit as soon as possible. + if (hadErrorResult) { + cancelAll(); + } - unlock(); // TODO: remove fragment lock, and use status for synchronization + unlock(); + return; + } // isDiscardingResults - // Remove the child from the task group's running tasks list. - // The parent task isn't currently running (we're about to wake - // it up), so we're still synchronous with it. We can safely - // acquire our parent's status record lock here (which would - // ordinarily run the risk of deadlock, since e.g. cancellation - // does a parent -> child traversal while recursively holding - // locks) because we know that the child task is completed and - // we can't be holding its locks ourselves. - _swift_taskGroup_detachChild(asAbstract(this), completedTask); - - auto waitingContext = - static_cast( - waitingTask->ResumeContext); - - fillGroupNextResult(waitingContext, result); - - _swift_tsan_acquire(static_cast(waitingTask)); - // TODO: allow the caller to suggest an executor - waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); - - // completedTask will be released by the remainder of its - // completion function. - return; - } // else, try again -#endif - } - } - llvm_unreachable("should have enqueued and returned."); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "ready: %d, pending: %llu", + assumed.readyTasks(this), assumed.pendingTasks(this)); + + // ==== a) has waiting task, so let us complete it right away + if (assumed.hasWaitingTask()) { + resumeWaitingTask(completedTask, assumed, hadErrorResult); + unlock(); // TODO: remove fragment lock, and use status for synchronization + return; } else { // ==== b) enqueue completion ------------------------------------------------ // @@ -732,13 +901,136 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) { // ready for it, and will process it immediately without suspending. assert(!waitQueue.load(std::memory_order_relaxed)); - SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p", - completedTask); enqueueCompletedTask(completedTask, hadErrorResult); unlock(); // TODO: remove fragment lock, and use status for synchronization } +} - return; +/// Must be called while holding the TaskGroup lock. +void TaskGroupImpl::resumeWaitingTask( + AsyncTask *completedTask, + TaskGroupImpl::GroupStatus &assumed, + bool hadErrorResult) { + auto waitingTask = waitQueue.load(std::memory_order_acquire); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, complete with = %p", + waitingTask, completedTask); + while (true) { + // ==== a) run waiting task directly ------------------------------------- + assert(assumed.hasWaitingTask()); + // assert(assumed.pendingTasks() && "offered to group with no pending tasks!"); + // We are the "first" completed task to arrive, + // and since there is a task waiting we immediately claim and complete it. + if (waitQueue.compare_exchange_strong( + waitingTask, nullptr, + /*success*/ std::memory_order_release, + /*failure*/ std::memory_order_acquire)) { + +#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL + // In the task-to-thread model, child tasks are always actually + // run synchronously on the parent task's thread. For task groups + // specifically, this means that poll() will pick a child task + // that was added to the group and run it to completion as a + // subroutine. Therefore, when we enter offer(), we know that + // the parent task is waiting and we can just return to it. + + // The task-to-thread logic in poll() currently expects the child + // task to enqueue itself instead of just filling in the result in + // the waiting task. This is a little wasteful; there's no reason + // we can't just have the parent task set itself up as a waiter. + // But since it's what we're doing, we basically take the same + // path as we would if there wasn't a waiter. + enqueueCompletedTask(completedTask, hadErrorResult); + return; + +#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ + if (statusCompletePendingReadyWaiting(assumed)) { + // Run the task. + auto result = PollResult::get(completedTask, hadErrorResult); + + // MOVED IT unlock(); // TODO: remove fragment lock, and use status for synchronization + + // Remove the child from the task group's running tasks list. + // The parent task isn't currently running (we're about to wake + // it up), so we're still synchronous with it. We can safely + // acquire our parent's status record lock here (which would + // ordinarily run the risk of deadlock, since e.g. cancellation + // does a parent -> child traversal while recursively holding + // locks) because we know that the child task is completed and + // we can't be holding its locks ourselves. + _swift_taskGroup_detachChild(asAbstract(this), completedTask); + + auto waitingContext = + static_cast( + waitingTask->ResumeContext); + + fillGroupNextResult(waitingContext, result); + + _swift_tsan_acquire(static_cast(waitingTask)); + // TODO: allow the caller to suggest an executor + waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + return; + } // else, try again +#endif + } + } + llvm_unreachable("should have enqueued and returned."); +} + +/// Must be called while holding the TaskGroup lock. +void TaskGroupImpl::resumeWaitingTaskWithError( + SwiftError *error, + TaskGroupImpl::GroupStatus &assumed) { + auto waitingTask = waitQueue.load(std::memory_order_acquire); + SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting task = %p, with error = %p", + waitingTask, error); + while (true) { + // ==== a) run waiting task directly ------------------------------------- + assert(assumed.hasWaitingTask()); + // assert(assumed.pendingTasks() && "offered to group with no pending tasks!"); + // We are the "first" completed task to arrive, + // and since there is a task waiting we immediately claim and complete it. + if (waitQueue.compare_exchange_strong( + waitingTask, nullptr, + /*success*/ std::memory_order_release, + /*failure*/ std::memory_order_acquire)) { + +//#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL +// // In the task-to-thread model, child tasks are always actually +// // run synchronously on the parent task's thread. For task groups +// // specifically, this means that poll() will pick a child task +// // that was added to the group and run it to completion as a +// // subroutine. Therefore, when we enter offer(), we know that +// // the parent task is waiting and we can just return to it. +// +// // The task-to-thread logic in poll() currently expects the child +// // task to enqueue itself instead of just filling in the result in +// // the waiting task. This is a little wasteful; there's no reason +// // we can't just have the parent task set itself up as a waiter. +// // But since it's what we're doing, we basically take the same +// // path as we would if there wasn't a waiter. +// enqueueCompletedTask(completedTask, hadErrorResult); +// return; +// +//#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ + if (statusCompletePendingReadyWaiting(assumed)) { + // Run the task. + auto result = PollResult::getError(error); + + auto waitingContext = + static_cast( + waitingTask->ResumeContext); + + fillGroupNextResult(waitingContext, result); + + _swift_tsan_acquire(static_cast(waitingTask)); + // TODO: allow the caller to suggest an executor + waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic()); + return; + } // else, try again +//#endif + } + } + llvm_unreachable("should have enqueued and returned."); } SWIFT_CC(swiftasync) @@ -766,6 +1058,22 @@ SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_wait_next_t ); return; } + +__attribute__((noinline)) +SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_waitAllImpl( + OpaqueValue *result, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext, + TaskGroup *_group, + SwiftError *bodyError, + ThrowingTaskFutureWaitContinuationFunction resumeFunction, + AsyncContext *callContext) { + // Make sure we don't eliminate calls to this function. + asm volatile("" // Do nothing. + : // Output list, empty. + : "r"(result), "r"(callerContext), "r"(_group) // Input list. + : // Clobber list, empty. + ); + return; +} #endif // ============================================================================= @@ -808,8 +1116,8 @@ static void swift_taskGroup_wait_next_throwingImpl( case PollStatus::Empty: case PollStatus::Error: case PollStatus::Success: - SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p", - group, waitingTask, polled.retainedTask); + SWIFT_TASK_GROUP_DEBUG_LOG(group, "poll, task = %p, ready task available = %p", + waitingTask, polled.retainedTask); fillGroupNextResult(context, polled); if (auto completedTask = polled.retainedTask) { // Remove the child from the task group's running tasks list. @@ -824,8 +1132,17 @@ static void swift_taskGroup_wait_next_throwingImpl( } PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) { + SWIFT_TASK_GROUP_DEBUG_LOG(this, "poll, waitingTask:%p", waitingTask); + if (isDiscardingResults()) { + // while in "discard results" mode, the awaitable next method + // cannot be implemented, and we must immediately return `nil` + // on the swift side. + return PollResult::getEmpty(this->successType); + } + lock(); // TODO: remove group lock, and use status for synchronization - SWIFT_TASK_DEBUG_LOG("poll group = %p", this); + assert(isAccumulatingResults() && + "attempted to poll TaskGroup in discard-results mode!"); PollResult result; result.storage = nullptr; @@ -841,16 +1158,16 @@ reevaluate_if_taskgroup_has_results:; #endif auto assumed = statusMarkWaitingAssumeAcquire(); if (haveRunOneChildTaskInline) { - assert(assumed.readyTasks()); + assert(assumed.readyTasks(this)); } // ==== 1) bail out early if no tasks are pending ---------------------------- - if (assumed.isEmpty()) { + if (assumed.isEmpty(this)) { SWIFT_TASK_DEBUG_LOG("poll group = %p, group is empty, no pending tasks", this); // No tasks in flight, we know no tasks were submitted before this poll // was issued, and if we parked here we'd potentially never be woken up. // Bail out and return `nil` from `group.next()`. - statusRemoveWaiting(); + statusRemoveWaitingRelease(); result.status = PollStatus::Empty; result.successType = this->successType; unlock(); // TODO: remove group lock, and use status for synchronization @@ -860,14 +1177,14 @@ reevaluate_if_taskgroup_has_results:; auto waitHead = waitQueue.load(std::memory_order_acquire); // ==== 2) Ready task was polled, return with it immediately ----------------- - if (assumed.readyTasks()) { - SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d", - this, assumed.readyTasks()); + if (assumed.readyTasks(this)) { + SWIFT_TASK_DEBUG_LOG("poll group = %p, tasks .ready = %d, .pending = %llu", + this, assumed.readyTasks(this), assumed.pendingTasks(this)); auto assumedStatus = assumed.status; auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus}; if (status.compare_exchange_strong( - assumedStatus, newStatus.completingPendingReadyWaiting().status, + assumedStatus, newStatus.completingPendingReadyWaiting(this).status, /*success*/ std::memory_order_relaxed, /*failure*/ std::memory_order_acquire)) { @@ -882,13 +1199,14 @@ reevaluate_if_taskgroup_has_results:; bool taskDequeued = readyQueue.dequeue(item); assert(taskDequeued); (void) taskDequeued; - assert(item.getTask()->isFuture()); - auto futureFragment = item.getTask()->futureFragment(); + auto futureFragment = + item.getStatus() == ReadyStatus::RawError ? + nullptr : + item.getTask()->futureFragment(); // Store the task in the result, so after we're done processing it may // be swift_release'd; we kept it alive while it was in the readyQueue by // an additional retain issued as we enqueued it there. - result.retainedTask = item.getTask(); // Note that the task was detached from the task group when it // completed, so we don't need to do that bit of record-keeping here. @@ -899,6 +1217,7 @@ reevaluate_if_taskgroup_has_results:; result.status = PollStatus::Success; result.storage = futureFragment->getStoragePtr(); result.successType = futureFragment->getResultType(); + result.retainedTask = item.getTask(); assert(result.retainedTask && "polled a task, it must be not null"); _swift_tsan_acquire(static_cast(result.retainedTask)); unlock(); // TODO: remove fragment lock, and use status for synchronization @@ -910,11 +1229,23 @@ reevaluate_if_taskgroup_has_results:; result.storage = reinterpret_cast(futureFragment->getError()); result.successType = nullptr; + result.retainedTask = item.getTask(); assert(result.retainedTask && "polled a task, it must be not null"); _swift_tsan_acquire(static_cast(result.retainedTask)); unlock(); // TODO: remove fragment lock, and use status for synchronization return result; + case ReadyStatus::RawError: + // Immediately return the error stored + assert(isDiscardingResults() && "raw errors are only stored in discarding results mode"); + result.status = PollStatus::Error; + result.storage = + reinterpret_cast(item.getRawError()); + result.successType = nullptr; + result.retainedTask = nullptr; + unlock(); // TODO: remove fragment lock, and use status for synchronization + return result; + case ReadyStatus::Empty: result.status = PollStatus::Empty; result.storage = nullptr; @@ -928,7 +1259,7 @@ reevaluate_if_taskgroup_has_results:; } // ==== 3) Add to wait queue ------------------------------------------------- - assert(assumed.readyTasks() == 0); + assert(assumed.readyTasks(this) == 0); _swift_tsan_release(static_cast(waitingTask)); while (true) { if (!hasSuspended) { @@ -972,19 +1303,188 @@ reevaluate_if_taskgroup_has_results:; } // ============================================================================= -// ==== isEmpty ---------------------------------------------------------------- +// ==== _taskGroupWaitAll implementation --------------------------------------- + +SWIFT_CC(swiftasync) +static void swift_taskGroup_waitAllImpl( + OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext, + TaskGroup *_group, + SwiftError *bodyError, + ThrowingTaskFutureWaitContinuationFunction *resumeFunction, + AsyncContext *rawContext) { + auto waitingTask = swift_task_getCurrent(); + waitingTask->ResumeTask = task_group_wait_resume_adapter; + waitingTask->ResumeContext = rawContext; + + auto context = static_cast(rawContext); + context->ResumeParent = + reinterpret_cast(resumeFunction); + context->Parent = callerContext; + context->errorResult = nullptr; + context->successResultPointer = resultPointer; + + auto group = asImpl(_group); + SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s", + waitingTask, bodyError, group->statusLoadRelaxed().to_string(group).c_str()); + + PollResult polled = group->tryEnqueueWaitingTask(waitingTask); + switch (polled.status) { + case PollStatus::MustWait: + SWIFT_TASK_GROUP_DEBUG_LOG(group, "tryEnqueueWaitingTask MustWait, pending tasks exist, waiting task = %p", + waitingTask); + if (bodyError) { + bool storedBodyError = group->offerBodyError(bodyError); + if (storedBodyError) { + SWIFT_TASK_GROUP_DEBUG_LOG(group, + "tryEnqueueWaitingTask, stored error thrown by with...Group body, error = %p", + bodyError); + } + } + + // The waiting task has been queued on the channel, + // there were pending tasks so it will be woken up eventually. +#ifdef __ARM_ARCH_7K__ + return workaround_function_swift_taskGroup_waitAllImpl( + resultPointer, callerContext, _group, bodyError, resumeFunction, rawContext); +#else /* __ARM_ARCH_7K__ */ + return; +#endif /* __ARM_ARCH_7K__ */ + + case PollStatus::Error: + SWIFT_TASK_GROUP_DEBUG_LOG(group, "tryEnqueueWaitingTask found error, waiting task = %p, status:%s", + waitingTask, group->statusLoadRelaxed().to_string(group).c_str()); + fillGroupNextResult(context, polled); + if (auto completedTask = polled.retainedTask) { + // Remove the child from the task group's running tasks list. + _swift_taskGroup_detachChild(asAbstract(group), completedTask); + + // Balance the retain done by enqueueCompletedTask. + swift_release(completedTask); + } + + return waitingTask->runInFullyEstablishedContext(); + + case PollStatus::Empty: + case PollStatus::Success: + /// Anything else than a "MustWait" can be treated as a successful poll. + /// Only if there are in flight pending tasks do we need to wait after all. + SWIFT_TASK_GROUP_DEBUG_LOG(group, "tryEnqueueWaitingTask %s, waiting task = %p, status:%s", + polled.status == TaskGroupImpl::PollStatus::Empty ? "empty" : "success", + waitingTask, group->statusLoadRelaxed().to_string(group).c_str()); + + if (bodyError) { + // None of the inner tasks have thrown, so we have to "re throw" the body error: + fillGroupNextErrorResult(context, bodyError); + } else { + fillGroupNextNilResult(context, polled); + } + + return waitingTask->runInFullyEstablishedContext(); + } +} + +bool TaskGroupImpl::offerBodyError(SwiftError* _Nonnull bodyError) { + lock(); // TODO: remove group lock, and use status for synchronization + + if (!readyQueue.isEmpty()) { + // already other error stored, discard this one + unlock(); + return false; + } + + auto readyItem = ReadyQueueItem::getRawError(bodyError); + readyQueue.enqueue(readyItem); + unlock(); + + return true; +} + +PollResult TaskGroupImpl::tryEnqueueWaitingTask(AsyncTask *waitingTask) { + SWIFT_TASK_DEBUG_LOG("group(%p) tryEnqueueWaitingTask, status = %s", this, statusLoadRelaxed().to_string(this).c_str()); + PollResult result = PollResult::getEmpty(this->successType); + result.storage = nullptr; + result.retainedTask = nullptr; + + // Have we suspended the task? + bool hasSuspended = false; + bool haveRunOneChildTaskInline = false; + + reevaluate_if_TaskGroup_has_results:; + auto assumed = statusMarkWaitingAssumeAcquire(); + // ==== 1) bail out early if no tasks are pending ---------------------------- + if (assumed.isEmpty(this)) { + SWIFT_TASK_DEBUG_LOG("group(%p) waitAll, is empty, no pending tasks", this); + // No tasks in flight, we know no tasks were submitted before this poll + // was issued, and if we parked here we'd potentially never be woken up. + // Bail out and return `nil` from `group.next()`. + statusRemoveWaitingRelease(); + return result; + } + + lock(); // TODO: remove pool lock, and use status for synchronization + auto waitHead = waitQueue.load(std::memory_order_acquire); + + // ==== 2) Add to wait queue ------------------------------------------------- + _swift_tsan_release(static_cast(waitingTask)); + while (true) { + if (!hasSuspended) { + hasSuspended = true; + waitingTask->flagAsSuspended(); + } + // Put the waiting task at the beginning of the wait queue. + if (waitQueue.compare_exchange_strong( + waitHead, waitingTask, + /*success*/ std::memory_order_release, + /*failure*/ std::memory_order_acquire)) { + unlock(); // TODO: remove fragment lock, and use status for synchronization +#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL + // The logic here is paired with the logic in TaskGroupImpl::offer. Once + // we run the + auto oldTask = _swift_task_clearCurrent(); + assert(oldTask == waitingTask); + + auto childTask = getTaskRecord()->getFirstChild(); + assert(childTask != NULL); + + SWIFT_TASK_DEBUG_LOG("[RunInline] Switching away from running %p to now running %p", oldTask, childTask); + // Run the new task on the same thread now - this should run the new task to + // completion. All swift tasks in task-to-thread model run on generic + // executor + swift_job_run(childTask, ExecutorRef::generic()); + haveRunOneChildTaskInline = true; + + SWIFT_TASK_DEBUG_LOG("[RunInline] Switching back from running %p to now running %p", childTask, oldTask); + // We are back to being the parent task and now that we've run the child + // task, we should reevaluate parent task + _swift_task_setCurrent(oldTask); + goto reevaluate_if_TaskGroup_has_results; +#endif + // no ready tasks, so we must wait. + result.status = PollStatus::MustWait; + _swift_task_clearCurrent(); + return result; + } // else, try again + } +} + +// ============================================================================= +// ==== Task Group status and flag checks ------------------------------------- + SWIFT_CC(swift) static bool swift_taskGroup_isEmptyImpl(TaskGroup *group) { return asImpl(group)->isEmpty(); } -// ============================================================================= -// ==== isCancelled ------------------------------------------------------------ SWIFT_CC(swift) static bool swift_taskGroup_isCancelledImpl(TaskGroup *group) { return asImpl(group)->isCancelled(); } +SWIFT_CC(swift) +static bool swift_taskGroup_isDiscardingResultsImpl(TaskGroup *group) { + return asImpl(group)->isDiscardingResults(); +} + // ============================================================================= // ==== cancelAll -------------------------------------------------------------- @@ -1037,10 +1537,15 @@ void swift::_swift_taskGroup_cancelAllChildren(TaskGroup *group) { // ============================================================================= // ==== addPending ------------------------------------------------------------- + SWIFT_CC(swift) -static bool swift_taskGroup_addPendingImpl(TaskGroup *group, bool unconditionally) { - auto assumedStatus = asImpl(group)->statusAddPendingTaskRelaxed(unconditionally); - return !assumedStatus.isCancelled(); +static bool swift_taskGroup_addPendingImpl(TaskGroup *_group, bool unconditionally) { + auto group = asImpl(_group); + auto assumed = group->statusAddPendingTaskRelaxed(unconditionally); + SWIFT_TASK_DEBUG_LOG("add pending %s to group(%p), tasks pending = %d", + unconditionally ? "unconditionally" : "", + group, assumed.pendingTasks(group)); + return !assumed.isCancelled(); } #define OVERRIDE_TASK_GROUP COMPATIBILITY_OVERRIDE diff --git a/stdlib/public/Concurrency/TaskGroup.swift b/stdlib/public/Concurrency/TaskGroup.swift index 0802ad8b2e59b..3ba0a18045111 100644 --- a/stdlib/public/Concurrency/TaskGroup.swift +++ b/stdlib/public/Concurrency/TaskGroup.swift @@ -47,9 +47,9 @@ import Swift /// by calling the `cancelAll()` method on the task group, /// or by canceling the task in which the group is running. /// -/// If you call `async(priority:operation:)` to create a new task in a canceled group, +/// If you call `addTask(priority:operation:)` to create a new task in a canceled group, /// that task is immediately canceled after creation. -/// Alternatively, you can call `asyncUnlessCancelled(priority:operation:)`, +/// Alternatively, you can call `addTaskUnlessCancelled(priority:operation:)`, /// which doesn't create the task if the group has already been canceled /// Choosing between these two functions /// lets you control how to react to cancellation within a group: @@ -93,7 +93,10 @@ public func withTaskGroup( /// Starts a new scope that can contain a dynamic number of throwing child tasks. /// /// A group waits for all of its child tasks -/// to complete, throw an error, or be canceled before it returns. +/// to complete before it returns. Even cancelled tasks must run until +/// completion before this function returns. +/// Cancelled child tasks cooperatively react to cancellation and attempt +/// to return as early as possible. /// After this function returns, the task group is always empty. /// /// To collect the results of the group's child tasks, @@ -122,9 +125,9 @@ public func withTaskGroup( /// by calling the `cancelAll()` method on the task group, /// or by canceling the task in which the group is running. /// -/// If you call `async(priority:operation:)` to create a new task in a canceled group, +/// If you call `addTask(priority:operation:)` to create a new task in a canceled group, /// that task is immediately canceled after creation. -/// Alternatively, you can call `asyncUnlessCancelled(priority:operation:)`, +/// Alternatively, you can call `addTaskUnlessCancelled(priority:operation:)`, /// which doesn't create the task if the group has already been canceled /// Choosing between these two functions /// lets you control how to react to cancellation within a group: @@ -132,24 +135,30 @@ public func withTaskGroup( /// but other tasks are better not even being created /// when you know they can't produce useful results. /// -/// Throwing an error in one of the tasks of a task group +/// Error Handling +/// ============== +/// +/// Throwing an error in one of the child tasks of a task group /// doesn't immediately cancel the other tasks in that group. /// However, +/// throwing out of the `body` of the `withThrowingTaskGroup` method does cancel +/// the group, and all of its child tasks. +/// For example, /// if you call `next()` in the task group and propagate its error, /// all other tasks are canceled. /// For example, in the code below, /// nothing is canceled and the group doesn't throw an error: /// -/// withThrowingTaskGroup { group in +/// try await withThrowingTaskGroup { group in /// group.addTask { throw SomeError() } /// } /// /// In contrast, this example throws `SomeError` /// and cancels all of the tasks in the group: /// -/// withThrowingTaskGroup { group in +/// try await withThrowingTaskGroup { group in /// group.addTask { throw SomeError() } -/// try group.next() +/// try await group.next() /// } /// /// An individual task throws its error @@ -204,6 +213,30 @@ public func withThrowingTaskGroup( /// and mutation operations can't be performed /// from a concurrent execution context like a child task. /// +/// ### Task execution order +/// +/// Tasks added to a task group execute concurrently, and may be scheduled in +/// any order. +/// +/// ### Discarding behavior +/// A discarding task group eagerly discards and releases its child tasks as +/// soon as they complete. This allows for the efficient releasing of memory used +/// by those tasks, which are not retained for future `next()` calls, as would +/// be the case with a ``TaskGroup``. +/// +/// ### Cancellation behavior +/// A task group becomes cancelled in one of two ways: when ``cancelAll()`` is +/// invoked on it, or when the ``Task`` running this task group is cancelled. +/// +/// Since a `TaskGroup` is a structured concurrency primitive, cancellation is +/// automatically propagated through all of its child-tasks (and their child +/// tasks). +/// +/// A cancelled task group can still keep adding tasks, however they will start +/// being immediately cancelled, and may act accordingly to this. To avoid adding +/// new tasks to an already cancelled task group, use ``addTaskUnlessCancelled(priority:body:)`` +/// rather than the plain ``addTask(priority:body:)`` which adds tasks unconditionally. +/// /// For information about the language-level concurrency model that `TaskGroup` is part of, /// see [Concurrency][concurrency] in [The Swift Programming Language][tspl]. /// @@ -263,7 +296,7 @@ public struct TaskGroup { /// Adds a child task to the group, unless the group has been canceled. /// /// - Parameters: - /// - overridingPriority: The priority of the operation task. + /// - priority: The priority of the operation task. /// Omit this parameter or pass `.unspecified` /// to set the child task's priority to the priority of the group. /// - operation: The operation to execute as part of the task group. @@ -429,10 +462,10 @@ public struct TaskGroup { public mutating func next() async -> ChildTaskResult? { // try!-safe because this function only exists for Failure == Never, // and as such, it is impossible to spawn a throwing child task. - return try! await _taskGroupWaitNext(group: _group) + return try! await _taskGroupWaitNext(group: _group) // !-safe cannot throw, we're a non-throwing TaskGroup } - /// Await all of the remaining tasks on this group. + /// Await all of the pending tasks added this group. @usableFromInline internal mutating func awaitAllRemainingTasks() async { while let _ = await next() {} @@ -521,6 +554,29 @@ extension TaskGroup: Sendable { } /// and mutation operations can't be performed /// from concurrent execution contexts like a child task. /// +/// ### Task execution order +/// Tasks added to a task group execute concurrently, and may be scheduled in +/// any order. +/// +/// ### Discarding behavior +/// A discarding task group eagerly discards and releases its child tasks as +/// soon as they complete. This allows for the efficient releasing of memory used +/// by those tasks, which are not retained for future `next()` calls, as would +/// be the case with a ``TaskGroup``. +/// +/// ### Cancellation behavior +/// A task group becomes cancelled in one of two ways: when ``cancelAll()`` is +/// invoked on it, or when the ``Task`` running this task group is cancelled. +/// +/// Since a `TaskGroup` is a structured concurrency primitive, cancellation is +/// automatically propagated through all of its child-tasks (and their child +/// tasks). +/// +/// A cancelled task group can still keep adding tasks, however they will start +/// being immediately cancelled, and may act accordingly to this. To avoid adding +/// new tasks to an already cancelled task group, use ``addTaskUnlessCancelled(priority:body:)`` +/// rather than the plain ``addTask(priority:body:)`` which adds tasks unconditionally. +/// /// For information about the language-level concurrency model that `ThrowingTaskGroup` is part of, /// see [Concurrency][concurrency] in [The Swift Programming Language][tspl]. /// @@ -556,13 +612,15 @@ public struct ThrowingTaskGroup { @usableFromInline internal mutating func _waitForAll() async throws { - while let _ = try await next() { } + await self.awaitAllRemainingTasks() } /// Wait for all of the group's remaining tasks to complete. + /// + /// - Throws: only during @_alwaysEmitIntoClient public mutating func waitForAll() async throws { - while let _ = try await next() { } + await self.awaitAllRemainingTasks() } #if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY @@ -1046,7 +1104,8 @@ extension ThrowingTaskGroup: AsyncSequence { } } -/// ==== ----------------------------------------------------------------------- +// ==== ----------------------------------------------------------------------- +// MARK: Runtime functions @available(SwiftStdlib 5.1, *) @_silgen_name("swift_taskGroup_destroy") @@ -1091,3 +1150,46 @@ enum PollStatus: Int { func _taskGroupIsEmpty( _ group: Builtin.RawPointer ) -> Bool + + +// ==== TaskGroup Flags -------------------------------------------------------------- + +/// Flags for task groups. +/// +/// This is a port of the C++ FlagSet. +@available(SwiftStdlib 5.8, *) +struct TaskGroupFlags { + /// The actual bit representation of these flags. + var bits: Int32 = 0 + + /// The priority given to the job. + var discardResults: Bool? { + get { + let value = (Int(bits) & 1 << 24) + + return value > 0 + } + + set { + if newValue == true { + bits = bits | 1 << 24 + } else { + bits = (bits & ~(1 << 23)) + } + } + } +} + +// ==== Task Creation Flags -------------------------------------------------- + +/// Form task creation flags for use with the createAsyncTask builtins. +@available(SwiftStdlib 5.8, *) +@_alwaysEmitIntoClient +func taskGroupCreateFlags( + discardResults: Bool) -> Int { + var bits = 0 + if discardResults { + bits |= 1 << 8 + } + return bits +} diff --git a/stdlib/public/Concurrency/TaskPrivate.h b/stdlib/public/Concurrency/TaskPrivate.h index 7de4046bb6bf0..07122ef100b4c 100644 --- a/stdlib/public/Concurrency/TaskPrivate.h +++ b/stdlib/public/Concurrency/TaskPrivate.h @@ -39,11 +39,13 @@ namespace swift { // Set to 1 to enable helpful debug spew to stderr // If this is enabled, tests with `swift_task_debug_log` requirement can run. #if 0 +#define SWIFT_TASK_DEBUG_LOG_ENABLED 1 #define SWIFT_TASK_DEBUG_LOG(fmt, ...) \ fprintf(stderr, "[%#lx] [%s:%d](%s) " fmt "\n", \ (unsigned long)Thread::current().platformThreadId(), __FILE__, \ __LINE__, __FUNCTION__, __VA_ARGS__) #else +#define SWIFT_TASK_DEBUG_LOG_ENABLED 0 #define SWIFT_TASK_DEBUG_LOG(fmt, ...) (void)0 #endif @@ -134,6 +136,12 @@ namespace { /// @_silgen_name("swift_taskGroup_wait_next_throwing") /// func _taskGroupWaitNext(group: Builtin.RawPointer) async throws -> T? /// +/// @_silgen_name("swift_taskGroup_waitAll") +/// func _taskGroupWaitAll( +/// group: Builtin.RawPointer, +/// bodyError: Swift.Error? +/// ) async throws -> T? +/// class TaskFutureWaitAsyncContext : public AsyncContext { public: // The ABI reserves three words of storage for these contexts, which diff --git a/stdlib/toolchain/Compatibility56/CompatibilityOverrideConcurrency.def b/stdlib/toolchain/Compatibility56/CompatibilityOverrideConcurrency.def index 74952605ff37e..3ec61dbc5596c 100644 --- a/stdlib/toolchain/Compatibility56/CompatibilityOverrideConcurrency.def +++ b/stdlib/toolchain/Compatibility56/CompatibilityOverrideConcurrency.def @@ -264,9 +264,9 @@ OVERRIDE_TASK_GROUP(taskGroup_isEmpty, bool, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (TaskGroup *group), (group)) -OVERRIDE_TASK_GROUP(taskGroup_isCancelled, bool, - SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), - swift::, (TaskGroup *group), (group)) +//OVERRIDE_TASK_GROUP(taskGroup_isCancelled, bool, +// SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), +// swift::, (TaskGroup *group), (group)) OVERRIDE_TASK_GROUP(taskGroup_cancelAll, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), diff --git a/stdlib/toolchain/Compatibility56/include/Concurrency/TaskPrivate.h b/stdlib/toolchain/Compatibility56/include/Concurrency/TaskPrivate.h index fea2deb6f5d47..362246bead1ef 100644 --- a/stdlib/toolchain/Compatibility56/include/Concurrency/TaskPrivate.h +++ b/stdlib/toolchain/Compatibility56/include/Concurrency/TaskPrivate.h @@ -60,6 +60,9 @@ namespace { /// @_silgen_name("swift_taskGroup_wait_next_throwing") /// func _taskGroupWaitNext(group: Builtin.RawPointer) async throws -> T? /// +/// @_silgen_name("swift_taskGroup_wait_nextAll") +/// func _taskGroupWaitAll(group: Builtin.RawPointer) async throws -> T? +/// class TaskFutureWaitAsyncContext : public AsyncContext { public: SwiftError *errorResult; diff --git a/test/Concurrency/Runtime/async_taskgroup_throw_rethrow.swift b/test/Concurrency/Runtime/async_taskgroup_throw_rethrow.swift index 6ef47f6524116..8864fd29bcaf4 100644 --- a/test/Concurrency/Runtime/async_taskgroup_throw_rethrow.swift +++ b/test/Concurrency/Runtime/async_taskgroup_throw_rethrow.swift @@ -1,4 +1,4 @@ -// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s +// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s --dump-input=always // REQUIRES: executable_test // REQUIRES: concurrency @@ -8,28 +8,34 @@ // REQUIRES: concurrency_runtime // UNSUPPORTED: back_deployment_runtime -struct Boom: Error {} -struct IgnoredBoom: Error {} +struct Boom: Error { + let id: String -@available(SwiftStdlib 5.1, *) + init(file: String = #fileID, line: UInt = #line) { + self.id = "\(file):\(line)" + } + init(id: String) { + self.id = id + } +} + +struct IgnoredBoom: Error {} func echo(_ i: Int) async -> Int { i } -@available(SwiftStdlib 5.1, *) -func boom() async throws -> Int { throw Boom() } -@available(SwiftStdlib 5.1, *) func test_taskGroup_throws_rethrows() async { + print("==== \(#function) ------") // CHECK-LABEL: test_taskGroup_throws_rethrows do { let got = try await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in group.addTask { await echo(1) } group.addTask { await echo(2) } - group.addTask { try await boom() } + group.addTask { throw Boom() } do { while let r = try await group.next() { print("next: \(r)") } } catch { - // CHECK: error caught and rethrown in group: Boom() + // CHECK: error caught and rethrown in group: Boom( print("error caught and rethrown in group: \(error)") throw error } @@ -40,15 +46,145 @@ func test_taskGroup_throws_rethrows() async { print("Expected error to be thrown, but got: \(got)") } catch { - // CHECK: rethrown: Boom() + // CHECK: rethrown: Boom( + print("rethrown: \(error)") + } +} + +func test_taskGroup_noThrow_ifNotAwaitedThrowingTask() async { + print("==== \(#function) ------") // CHECK-LABEL: test_taskGroup_noThrow_ifNotAwaitedThrowingTask + let got = await withThrowingTaskGroup(of: Int.self, returning: Int.self) { group in + group.addTask { await echo(1) } + guard let r = try! await group.next() else { + return 0 + } + + group.addTask { throw Boom() } + // don't consume this task, so we're not throwing here + + return r + } + + print("Expected no error to be thrown, got: \(got)") // CHECK: Expected no error to be thrown, got: 1 +} + +func test_discardingTaskGroup_automaticallyRethrows() async { + print("==== \(#function) ------") // CHECK-LABEL: test_discardingTaskGroup_automaticallyRethrows + do { + let got = try await withThrowingDiscardingTaskGroup(returning: Int.self) { group in + group.addTask { await echo(1) } + group.addTask { throw Boom() } + // add a throwing task, but don't consume it explicitly + // since we're in discard results mode, all will be awaited and the first error it thrown + return 13 + } + + print("Expected error to be thrown, but got: \(got)") + } catch { + // CHECK: rethrown: Boom( + print("rethrown: \(error)") + } +} + +func test_discardingTaskGroup_automaticallyRethrowsOnlyFirst() async { + print("==== \(#function) ------") // CHECK-LABEL: test_discardingTaskGroup_automaticallyRethrowsOnlyFirst + do { + let got = try await withThrowingDiscardingTaskGroup(returning: Int.self) { group in + group.addTask { + await echo(1) + } + group.addTask { + let error = Boom(id: "first, isCancelled:\(Task.isCancelled)") + print("Throwing: \(error)") + throw error + } + group.addTask { + // we wait "forever" but since the group will get cancelled after + // the first error, this will be woken up and throw a cancellation + do { + try await Task.sleep(until: .now + .seconds(120), clock: .continuous) + } catch { + print("Awoken, throwing: \(error)") + throw error + } + } + return 4 + } + + print("Expected error to be thrown, but got: \(got)") + } catch { + // CHECK: Throwing: Boom(id: "first, isCancelled:false + // CHECK: Awoken, throwing: CancellationError() + // and only then the re-throw happens: + // CHECK: rethrown: Boom(id: "first + print("rethrown: \(error)") + } +} + +func test_discardingTaskGroup_automaticallyRethrows_first_withThrowingBodyFirst() async { + print("==== \(#function) ------") // CHECK-LABEL: test_discardingTaskGroup_automaticallyRethrows_first_withThrowingBodyFirst + do { + try await withThrowingDiscardingTaskGroup(returning: Int.self) { group in + group.addTask { + await echo(1) + } + group.addTask { + try? await Task.sleep(until: .now + .seconds(10), clock: .continuous) + let error = Boom(id: "task, second, isCancelled:\(Task.isCancelled)") + print("Throwing: \(error)") + throw error + } + + let bodyError = Boom(id: "body, first, isCancelled:\(group.isCancelled)") + print("Throwing: \(bodyError)") + throw bodyError + } + + print("Expected error to be thrown") + } catch { + // CHECK: Throwing: Boom(id: "body, first, isCancelled:false + // CHECK: Throwing: Boom(id: "task, second, isCancelled:true + // and only then the re-throw happens: + // CHECK: rethrown: Boom(id: "body, first print("rethrown: \(error)") } } +func test_discardingTaskGroup_automaticallyRethrows_first_withThrowingBodySecond() async { + print("==== \(#function) ------") // CHECK-LABEL: test_discardingTaskGroup_automaticallyRethrows_first_withThrowingBodySecond + do { + try await withThrowingDiscardingTaskGroup(returning: Int.self) { group in + group.addTask { + let error = Boom(id: "task, first, isCancelled:\(Task.isCancelled)") + print("Throwing: \(error)") + throw error + } + + try await Task.sleep(until: .now + .seconds(1), clock: .continuous) + + let bodyError = Boom(id: "body, second, isCancelled:\(group.isCancelled)") + print("Throwing: \(bodyError)") + throw bodyError + } + + print("Expected error to be thrown") + } catch { + // CHECK: Throwing: Boom(id: "task, first, isCancelled:false + // CHECK: Throwing: Boom(id: "body, second, isCancelled:true + // and only then the re-throw happens: + // CHECK: rethrown: Boom(id: "body, second + print("rethrown: \(error)") + } +} @available(SwiftStdlib 5.1, *) @main struct Main { static func main() async { await test_taskGroup_throws_rethrows() + await test_taskGroup_noThrow_ifNotAwaitedThrowingTask() + await test_discardingTaskGroup_automaticallyRethrows() + await test_discardingTaskGroup_automaticallyRethrowsOnlyFirst() + await test_discardingTaskGroup_automaticallyRethrows_first_withThrowingBodyFirst() + await test_discardingTaskGroup_automaticallyRethrows_first_withThrowingBodySecond() } } diff --git a/test/Concurrency/Runtime/async_taskgroup_void_neverConsumingTasks.swift b/test/Concurrency/Runtime/async_taskgroup_void_neverConsumingTasks.swift new file mode 100644 index 0000000000000..98edc646860e9 --- /dev/null +++ b/test/Concurrency/Runtime/async_taskgroup_void_neverConsumingTasks.swift @@ -0,0 +1,97 @@ +// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s --dump-input=always +// REQUIRES: executable_test +// REQUIRES: concurrency +// REQUIRES: concurrency_runtime +// UNSUPPORTED: back_deployment_runtime +// UNSUPPORTED: OS=linux-gnu +import Darwin + +actor Waiter { + let until: Int + var count: Int + + var cc: CheckedContinuation? + + init(until: Int) { + self.until = until + self.count = 0 + } + + func increment() { + self.count += 1 + fputs("> increment (\(self.count)/\(self.until))\n", stderr); + if self.until <= self.count { + if let cc = self.cc { + cc.resume(returning: self.count) + } + } + } + + func wait() async -> Int { + if self.until <= self.count { + fputs("> RETURN in Waiter\n", stderr); + return self.count + } + + return await withCheckedContinuation { cc in + fputs("> WAIT in Waiter\n", stderr); + self.cc = cc + } + } +} + +func test_taskGroup_void_neverConsume() async { + print(">>> \(#function)") + let until = 100 + let waiter = Waiter(until: until) + + print("Start tasks: \(until)") + let allTasks = await withDiscardingTaskGroup() { group in + for n in 1...until { + fputs("> enqueue: \(n)\n", stderr); + group.addTask { + fputs("> run: \(n)\n", stderr); + try? await Task.sleep(until: .now + .milliseconds(100), clock: .continuous) + await waiter.increment() + } + } + + return until + } + + // CHECK: all tasks: 100 + print("all tasks: \(allTasks)") +} + +func test_taskGroup_void_neverConsume(sleepBeforeGroupWaitAll: Duration) async { + print(">>> \(#function)") + let until = 100 + let waiter = Waiter(until: until) + + print("Start tasks: \(until)") + let allTasks = await withDiscardingTaskGroup() { group in + for n in 1...until { + fputs("> enqueue: \(n)\n", stderr); + group.addTask { + fputs("> run: \(n)\n", stderr); + try? await Task.sleep(until: .now + .milliseconds(100), clock: .continuous) + await waiter.increment() + } + } + + // wait a little bit, so some tasks complete before we hit the implicit "wait at end of task group scope" + try? await Task.sleep(until: .now + sleepBeforeGroupWaitAll, clock: .continuous) + + return until + } + + // CHECK: all tasks: 100 + print("all tasks: \(allTasks)") +} + +@main struct Main { + static func main() async { + await test_taskGroup_void_neverConsume() + await test_taskGroup_void_neverConsume(sleepBeforeGroupWaitAll: .milliseconds(500)) + } +} diff --git a/unittests/runtime/CompatibilityOverrideConcurrency.cpp b/unittests/runtime/CompatibilityOverrideConcurrency.cpp index 8af3425c458bb..fe278c38dc3bb 100644 --- a/unittests/runtime/CompatibilityOverrideConcurrency.cpp +++ b/unittests/runtime/CompatibilityOverrideConcurrency.cpp @@ -185,6 +185,10 @@ TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_taskGroup_initialize) { swift_taskGroup_initialize(nullptr, nullptr); } +TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_taskGroup_initializeWithFlags) { + swift_taskGroup_initializeWithFlags(0, nullptr, nullptr); +} + TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_taskGroup_attachChild) { swift_taskGroup_attachChild(nullptr, nullptr); } @@ -211,6 +215,14 @@ TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_taskGroup_cancelAll) { swift_taskGroup_cancelAll(nullptr); } +TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_taskGroup_waitAll) { + swift_taskGroup_waitAll(nullptr, nullptr, nullptr, nullptr, nullptr, nullptr); +} + +TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_taskGroup_isDiscardingResults) { + swift_taskGroup_isDiscardingResults(nullptr); +} + TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_taskGroup_addPending) { swift_taskGroup_addPending(nullptr, true); }