From 90456a292ae772c17e470c5bcc5a9594f289b3dc Mon Sep 17 00:00:00 2001 From: Bryce Wilson Date: Sun, 16 Nov 2025 15:11:46 -0800 Subject: [PATCH] [Concurrency] Set thread base priority when running escalated Tasks --- include/swift/ABI/Task.h | 24 +++++++++++++- include/swift/Runtime/Concurrency.h | 7 ---- include/swift/Runtime/DispatchShims.h | 21 ++++++++++++ stdlib/public/Concurrency/Actor.cpp | 7 +++- stdlib/public/Concurrency/Task.cpp | 22 +++++++++---- stdlib/public/Concurrency/TaskGroup.cpp | 7 +++- stdlib/public/Concurrency/TaskPrivate.h | 37 +++++++++++++++------- test/Concurrency/async_task_priority.swift | 37 ++++++++++++++++++++++ 8 files changed, 134 insertions(+), 28 deletions(-) diff --git a/include/swift/ABI/Task.h b/include/swift/ABI/Task.h index e80fd3b889559..a587747658283 100644 --- a/include/swift/ABI/Task.h +++ b/include/swift/ABI/Task.h @@ -29,6 +29,13 @@ #include "bitset" #include "queue" // TODO: remove and replace with our own mpsc +// Does the runtime integrate with libdispatch? +#if defined(SWIFT_CONCURRENCY_USES_DISPATCH) +#define SWIFT_CONCURRENCY_ENABLE_DISPATCH SWIFT_CONCURRENCY_USES_DISPATCH +#else +#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0 +#endif + // Does the runtime provide priority escalation support? #ifndef SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION #if SWIFT_CONCURRENCY_ENABLE_DISPATCH && \ @@ -422,7 +429,22 @@ class AsyncTask : public Job { /// /// Generally this should be done immediately after updating /// ActiveTask. - void flagAsRunning(); + /// + /// When Dispatch is used for the default executor: + /// * If the return value is non-zero, it must be passed + /// to swift_dispatch_thread_reset_override_self + /// before returning to the executor. + /// * If the return value is zero, it may be ignored or passed to + /// the aforementioned function (which will ignore values of zero). + /// The current implementation will always return zero + /// if you call flagAsRunning again before calling + /// swift_dispatch_thread_reset_override_self with the + /// initial value. This supports suspending and immediately + /// resuming a Task without returning up the callstack. + /// + /// For all other default executors, flagAsRunning + /// will return zero which may be ignored. + uint32_t flagAsRunning(); /// Flag that this task is now suspended with information about what it is /// waiting on. diff --git a/include/swift/Runtime/Concurrency.h b/include/swift/Runtime/Concurrency.h index e9a801873bd17..3e091c3976aca 100644 --- a/include/swift/Runtime/Concurrency.h +++ b/include/swift/Runtime/Concurrency.h @@ -38,13 +38,6 @@ #define SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL 0 #endif -// Does the runtime integrate with libdispatch? -#if defined(SWIFT_CONCURRENCY_USES_DISPATCH) -#define SWIFT_CONCURRENCY_ENABLE_DISPATCH SWIFT_CONCURRENCY_USES_DISPATCH -#else -#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0 -#endif - namespace swift { class DefaultActor; class TaskOptionRecord; diff --git a/include/swift/Runtime/DispatchShims.h b/include/swift/Runtime/DispatchShims.h index 71756882b0ea6..81020cd338c6f 100644 --- a/include/swift/Runtime/DispatchShims.h +++ b/include/swift/Runtime/DispatchShims.h @@ -48,6 +48,27 @@ swift_dispatch_thread_override_self(qos_class_t override_qos) { return 0; } +static inline uint32_t +swift_dispatch_thread_override_self_with_base(qos_class_t override_qos, qos_class_t base_qos) { + + if (__builtin_available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *)) { + return dispatch_thread_override_self_with_base(override_qos, base_qos); + } else if (__builtin_available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)) { + // If we don't have the ability to set our base qos correctly, at least set the override + // We want to return 0 here because we have nothing to reset in this case + (void) dispatch_thread_override_self(override_qos); + } + + return 0; +} + +static inline void +swift_dispatch_thread_reset_override_self(uint32_t opaque) { + if (__builtin_available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *)) { + dispatch_thread_reset_override_self(opaque); + } +} + static inline int swift_dispatch_lock_override_start_with_debounce(dispatch_lock_t *lock_addr, dispatch_tid_t expected_thread, qos_class_t override_to_apply) { diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 66f93cf57b6aa..915a64d273a17 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -237,12 +237,17 @@ void swift::runJobInEstablishedExecutorContext(Job *job) { // current thread. If the task suspends somewhere, it should // update the task status appropriately; we don't need to update // it afterwards. - task->flagAsRunning(); + [[maybe_unused]] + uint32_t dispatchOpaquePriority = task->flagAsRunning(); auto traceHandle = concurrency::trace::job_run_begin(job); task->runInFullyEstablishedContext(); concurrency::trace::job_run_end(traceHandle); +#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION + swift_dispatch_thread_reset_override_self(dispatchOpaquePriority); +#endif + assert(ActiveTask::get() == nullptr && "active task wasn't cleared before suspending?"); if (oldTask) ActiveTask::set(oldTask); diff --git a/stdlib/public/Concurrency/Task.cpp b/stdlib/public/Concurrency/Task.cpp index 740f2664e564b..8011df6247211 100644 --- a/stdlib/public/Concurrency/Task.cpp +++ b/stdlib/public/Concurrency/Task.cpp @@ -124,7 +124,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask, // NOTE: this acquire synchronizes with `completeFuture`. auto queueHead = fragment->waitQueue.load(std::memory_order_acquire); - bool contextInitialized = false; + bool suspendedWaiter = false; while (true) { switch (queueHead.getStatus()) { case Status::Error: @@ -132,7 +132,14 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask, SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, completed immediately", waitingTask, this); _swift_tsan_acquire(static_cast(this)); - if (contextInitialized) waitingTask->flagAsRunning(); + if (suspendedWaiter) { + // This will always return zero because we were just + // running this Task so its BasePriority (which is + // immutable) should've already been set on the thread. + [[maybe_unused]] + uint32_t opaque = waitingTask->flagAsRunning(); + assert(opaque == 0); + } // The task is done; we don't need to wait. return queueHead.getStatus(); @@ -146,8 +153,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask, break; } - if (!contextInitialized) { - contextInitialized = true; + if (!suspendedWaiter) { + suspendedWaiter = true; auto context = reinterpret_cast(waitingTaskContext); context->errorResult = nullptr; @@ -1659,8 +1666,11 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) { // we try to tail-call. } while (false); #else - // Restore the running state of the task and resume it. - task->flagAsRunning(); + // This will always return zero because we were just running this Task so its + // BasePriority (which is immutable) should've already been set on the thread. + [[maybe_unused]] + uint32_t opaque = task->flagAsRunning(); + assert(opaque == 0); #endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */ if (context->isExecutorSwitchForced()) diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index 0d1d2be13d156..2989ffcf89968 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -1822,7 +1822,12 @@ reevaluate_if_taskgroup_has_results:; // We're going back to running the task, so if we suspended before, // we need to flag it as running again. if (hasSuspended) { - waitingTask->flagAsRunning(); + // This will always return zero because we were just + // running this Task so its BasePriority (which is + // immutable) should've already been set on the thread. + [[maybe_unused]] + uint32_t opaque = waitingTask->flagAsRunning(); + assert(opaque == 0); } // Success! We are allowed to poll. diff --git a/stdlib/public/Concurrency/TaskPrivate.h b/stdlib/public/Concurrency/TaskPrivate.h index 50580b915eb07..57fc2e0662291 100644 --- a/stdlib/public/Concurrency/TaskPrivate.h +++ b/stdlib/public/Concurrency/TaskPrivate.h @@ -935,32 +935,40 @@ inline bool AsyncTask::isCancelled() const { .isCancelled(); } -inline void AsyncTask::flagAsRunning() { +inline uint32_t AsyncTask::flagAsRunning() { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION dispatch_thread_override_info_s threadOverrideInfo; threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor(); qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor; + qos_class_t basePriorityCeil = overrideFloor; + qos_class_t taskBasePriority = (qos_class_t) _private().BasePriority; #endif auto oldStatus = _private()._status().load(std::memory_order_relaxed); assert(!oldStatus.isRunning()); assert(!oldStatus.isComplete()); + uint32_t dispatchOpaquePriority = 0; if (!oldStatus.hasTaskDependency()) { SWIFT_TASK_DEBUG_LOG("%p->flagAsRunning() with no task dependency", this); assert(_private().dependencyRecord == nullptr); while (true) { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION - // Task's priority is greater than the thread's - do a self escalation + // If the base priority is not equal to the current override floor then + // dispqatch may need to apply the base priority to the thread. If the + // current priority is higher than the override floor, then dispatch may + // need to apply a self-override. In either case, call into dispatch to + // do this. qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority(); - if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) { - SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x", - overrideFloor, this, maxTaskPriority); + if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) { + SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x", + overrideFloor, this, maxTaskPriority, taskBasePriority); - (void) swift_dispatch_thread_override_self(maxTaskPriority); + dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority); overrideFloor = maxTaskPriority; + basePriorityCeil = taskBasePriority; } #endif // Set self as executor and remove escalation bit if any - the task's @@ -989,14 +997,19 @@ inline void AsyncTask::flagAsRunning() { ActiveTaskStatus& newStatus) { #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION - // Task's priority is greater than the thread's - do a self escalation + // If the base priority is not equal to the current override floor then + // dispqatch may need to apply the base priority to the thread. If the + // current priority is higher than the override floor, then dispatch may + // need to apply a self-override. In either case, call into dispatch to + // do this. qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority(); - if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) { - SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x", - overrideFloor, this, maxTaskPriority); + if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) { + SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x", + overrideFloor, this, maxTaskPriority, taskBasePriority); - (void) swift_dispatch_thread_override_self(maxTaskPriority); + dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority); overrideFloor = maxTaskPriority; + basePriorityCeil = taskBasePriority; } #endif // Set self as executor and remove escalation bit if any - the task's @@ -1012,7 +1025,7 @@ inline void AsyncTask::flagAsRunning() { swift_task_enterThreadLocalContext( (char *)&_private().ExclusivityAccessSet[0]); } - + return dispatchOpaquePriority; } /// TODO (rokhinip): We need the handoff of the thread to the next executor to diff --git a/test/Concurrency/async_task_priority.swift b/test/Concurrency/async_task_priority.swift index 7d3ed8765bdec..b45b259f050e1 100644 --- a/test/Concurrency/async_task_priority.swift +++ b/test/Concurrency/async_task_priority.swift @@ -322,6 +322,43 @@ actor Test { await task2.value // Escalate task2 which should be queued behind task1 on the actor } + // This test will only work properly on 27.0+ + if #available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *) { + tests.test("Task escalation doesn't impact qos_class_self") { + let task = Task(priority: .utility) { + let initialQos = DispatchQoS( + qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!, + relativePriority: 0) + expectEqual(initialQos, DispatchQoS.utility) + let childTask = Task { + let qosBeforeEscalate = DispatchQoS( + qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!, + relativePriority: 0) + // Unstructured task should inherit utility priority + expectEqual(qosBeforeEscalate, DispatchQoS.utility) + // Escalate priority override, not base QoS + withUnsafeCurrentTask { + $0!.escalatePriority(to: .userInitiated) + } + let qosAfterEscalate = DispatchQoS( + qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!, + relativePriority: 0) + // qos_class_self should remain utility after escalation + expectEqual(qosAfterEscalate, DispatchQoS.utility) + await Task.yield() + let qosAfterYield = DispatchQoS( + qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!, + relativePriority: 0) + // qos_class_self should remain utility after yield + expectEqual(qosAfterYield, DispatchQoS.utility) + } + + await childTask.value + } + + await task.value + } + } } await runAllTestsAsync() }