Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion include/swift/ABI/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
#include "bitset"
#include "queue" // TODO: remove and replace with our own mpsc

// Does the runtime integrate with libdispatch?
#if defined(SWIFT_CONCURRENCY_USES_DISPATCH)
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH SWIFT_CONCURRENCY_USES_DISPATCH
#else
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0
#endif

// Does the runtime provide priority escalation support?
#ifndef SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
#if SWIFT_CONCURRENCY_ENABLE_DISPATCH && \
Expand Down Expand Up @@ -422,7 +429,22 @@ class AsyncTask : public Job {
///
/// Generally this should be done immediately after updating
/// ActiveTask.
void flagAsRunning();
///
/// When Dispatch is used for the default executor:
/// * If the return value is non-zero, it must be passed
/// to swift_dispatch_thread_reset_override_self
/// before returning to the executor.
/// * If the return value is zero, it may be ignored or passed to
/// the aforementioned function (which will ignore values of zero).
/// The current implementation will always return zero
/// if you call flagAsRunning again before calling
/// swift_dispatch_thread_reset_override_self with the
/// initial value. This supports suspending and immediately
/// resuming a Task without returning up the callstack.
///
/// For all other default executors, flagAsRunning
/// will return zero which may be ignored.
uint32_t flagAsRunning();

/// Flag that this task is now suspended with information about what it is
/// waiting on.
Expand Down
7 changes: 0 additions & 7 deletions include/swift/Runtime/Concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@
#define SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL 0
#endif

// Does the runtime integrate with libdispatch?
#if defined(SWIFT_CONCURRENCY_USES_DISPATCH)
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH SWIFT_CONCURRENCY_USES_DISPATCH
#else
#define SWIFT_CONCURRENCY_ENABLE_DISPATCH 0
#endif

namespace swift {
class DefaultActor;
class TaskOptionRecord;
Expand Down
21 changes: 21 additions & 0 deletions include/swift/Runtime/DispatchShims.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,27 @@ swift_dispatch_thread_override_self(qos_class_t override_qos) {
return 0;
}

static inline uint32_t
swift_dispatch_thread_override_self_with_base(qos_class_t override_qos, qos_class_t base_qos) {

if (__builtin_available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *)) {
return dispatch_thread_override_self_with_base(override_qos, base_qos);
} else if (__builtin_available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)) {
// If we don't have the ability to set our base qos correctly, at least set the override
// We want to return 0 here because we have nothing to reset in this case
(void) dispatch_thread_override_self(override_qos);
}

return 0;
}

static inline void
swift_dispatch_thread_reset_override_self(uint32_t opaque) {
if (__builtin_available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *)) {
dispatch_thread_reset_override_self(opaque);
}
}

static inline int
swift_dispatch_lock_override_start_with_debounce(dispatch_lock_t *lock_addr,
dispatch_tid_t expected_thread, qos_class_t override_to_apply) {
Expand Down
7 changes: 6 additions & 1 deletion stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,17 @@ void swift::runJobInEstablishedExecutorContext(Job *job) {
// current thread. If the task suspends somewhere, it should
// update the task status appropriately; we don't need to update
// it afterwards.
task->flagAsRunning();
[[maybe_unused]]
uint32_t dispatchOpaquePriority = task->flagAsRunning();

auto traceHandle = concurrency::trace::job_run_begin(job);
task->runInFullyEstablishedContext();
concurrency::trace::job_run_end(traceHandle);

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
swift_dispatch_thread_reset_override_self(dispatchOpaquePriority);
#endif

assert(ActiveTask::get() == nullptr &&
"active task wasn't cleared before suspending?");
if (oldTask) ActiveTask::set(oldTask);
Expand Down
22 changes: 16 additions & 6 deletions stdlib/public/Concurrency/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,22 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,

// NOTE: this acquire synchronizes with `completeFuture`.
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
bool contextInitialized = false;
bool suspendedWaiter = false;
while (true) {
switch (queueHead.getStatus()) {
case Status::Error:
case Status::Success:
SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, completed immediately",
waitingTask, this);
_swift_tsan_acquire(static_cast<Job *>(this));
if (contextInitialized) waitingTask->flagAsRunning();
if (suspendedWaiter) {
// This will always return zero because we were just
// running this Task so its BasePriority (which is
// immutable) should've already been set on the thread.
[[maybe_unused]]
uint32_t opaque = waitingTask->flagAsRunning();
assert(opaque == 0);
}
// The task is done; we don't need to wait.
return queueHead.getStatus();

Expand All @@ -146,8 +153,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
break;
}

if (!contextInitialized) {
contextInitialized = true;
if (!suspendedWaiter) {
suspendedWaiter = true;
auto context =
reinterpret_cast<TaskFutureWaitAsyncContext *>(waitingTaskContext);
context->errorResult = nullptr;
Expand Down Expand Up @@ -1659,8 +1666,11 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
// we try to tail-call.
} while (false);
#else
// Restore the running state of the task and resume it.
task->flagAsRunning();
// This will always return zero because we were just running this Task so its
// BasePriority (which is immutable) should've already been set on the thread.
[[maybe_unused]]
uint32_t opaque = task->flagAsRunning();
assert(opaque == 0);
#endif /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */

if (context->isExecutorSwitchForced())
Expand Down
7 changes: 6 additions & 1 deletion stdlib/public/Concurrency/TaskGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,12 @@ reevaluate_if_taskgroup_has_results:;
// We're going back to running the task, so if we suspended before,
// we need to flag it as running again.
if (hasSuspended) {
waitingTask->flagAsRunning();
// This will always return zero because we were just
// running this Task so its BasePriority (which is
// immutable) should've already been set on the thread.
[[maybe_unused]]
uint32_t opaque = waitingTask->flagAsRunning();
assert(opaque == 0);
}

// Success! We are allowed to poll.
Expand Down
37 changes: 25 additions & 12 deletions stdlib/public/Concurrency/TaskPrivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -935,32 +935,40 @@ inline bool AsyncTask::isCancelled() const {
.isCancelled();
}

inline void AsyncTask::flagAsRunning() {
inline uint32_t AsyncTask::flagAsRunning() {

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
dispatch_thread_override_info_s threadOverrideInfo;
threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor();
qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor;
qos_class_t basePriorityCeil = overrideFloor;
qos_class_t taskBasePriority = (qos_class_t) _private().BasePriority;
#endif

auto oldStatus = _private()._status().load(std::memory_order_relaxed);
assert(!oldStatus.isRunning());
assert(!oldStatus.isComplete());

uint32_t dispatchOpaquePriority = 0;
if (!oldStatus.hasTaskDependency()) {
SWIFT_TASK_DEBUG_LOG("%p->flagAsRunning() with no task dependency", this);
assert(_private().dependencyRecord == nullptr);

while (true) {
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
// Task's priority is greater than the thread's - do a self escalation
// If the base priority is not equal to the current override floor then
// dispqatch may need to apply the base priority to the thread. If the
// current priority is higher than the override floor, then dispatch may
// need to apply a self-override. In either case, call into dispatch to
// do this.
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
overrideFloor, this, maxTaskPriority);
if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) {
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x",
overrideFloor, this, maxTaskPriority, taskBasePriority);

(void) swift_dispatch_thread_override_self(maxTaskPriority);
dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority);
overrideFloor = maxTaskPriority;
basePriorityCeil = taskBasePriority;
}
#endif
// Set self as executor and remove escalation bit if any - the task's
Expand Down Expand Up @@ -989,14 +997,19 @@ inline void AsyncTask::flagAsRunning() {
ActiveTaskStatus& newStatus) {

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
// Task's priority is greater than the thread's - do a self escalation
// If the base priority is not equal to the current override floor then
// dispqatch may need to apply the base priority to the thread. If the
// current priority is higher than the override floor, then dispatch may
// need to apply a self-override. In either case, call into dispatch to
// do this.
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
overrideFloor, this, maxTaskPriority);
if (threadOverrideInfo.can_override && (taskBasePriority != basePriorityCeil || maxTaskPriority > overrideFloor)) {
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x and base priority %#x",
overrideFloor, this, maxTaskPriority, taskBasePriority);

(void) swift_dispatch_thread_override_self(maxTaskPriority);
dispatchOpaquePriority = swift_dispatch_thread_override_self_with_base(maxTaskPriority, taskBasePriority);
overrideFloor = maxTaskPriority;
basePriorityCeil = taskBasePriority;
}
#endif
// Set self as executor and remove escalation bit if any - the task's
Expand All @@ -1012,7 +1025,7 @@ inline void AsyncTask::flagAsRunning() {
swift_task_enterThreadLocalContext(
(char *)&_private().ExclusivityAccessSet[0]);
}

return dispatchOpaquePriority;
}

/// TODO (rokhinip): We need the handoff of the thread to the next executor to
Expand Down
37 changes: 37 additions & 0 deletions test/Concurrency/async_task_priority.swift
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,43 @@ actor Test {
await task2.value // Escalate task2 which should be queued behind task1 on the actor
}

// This test will only work properly on 27.0+
if #available(macOS 27.0, iOS 27.0, tvOS 27.0, watchOS 27.0, *) {
tests.test("Task escalation doesn't impact qos_class_self") {
let task = Task(priority: .utility) {
let initialQos = DispatchQoS(
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
relativePriority: 0)
expectEqual(initialQos, DispatchQoS.utility)
let childTask = Task {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nitpick, that's not a "child" task, we can fix in followup tho

let qosBeforeEscalate = DispatchQoS(
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
relativePriority: 0)
// Unstructured task should inherit utility priority
expectEqual(qosBeforeEscalate, DispatchQoS.utility)
// Escalate priority override, not base QoS
withUnsafeCurrentTask {
$0!.escalatePriority(to: .userInitiated)
}
let qosAfterEscalate = DispatchQoS(
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
relativePriority: 0)
// qos_class_self should remain utility after escalation
expectEqual(qosAfterEscalate, DispatchQoS.utility)
await Task.yield()
let qosAfterYield = DispatchQoS(
qosClass: DispatchQoS.QoSClass(rawValue: qos_class_self())!,
relativePriority: 0)
// qos_class_self should remain utility after yield
expectEqual(qosAfterYield, DispatchQoS.utility)
}

await childTask.value
}

await task.value
}
}
}
await runAllTestsAsync()
}
Expand Down