diff --git a/docs/ConcurrencyRuntime.md b/docs/ConcurrencyRuntime.md new file mode 100644 index 0000000000000..0db155b52e2fa --- /dev/null +++ b/docs/ConcurrencyRuntime.md @@ -0,0 +1,27 @@ +## Concurrency Runtime + +This is a general guide to help you get started understanding the Swift concurrency runtime. +For high level semantics refer to Swift Evolution proposals, this document is aimed at developers working in the concurrency runtime, and discusses features which are outside of the scope of Swift evolution, such as runtime optimizations and implementation techniques. + +### Actors and Executors + +Swift's concurrency model is largely based around actors (and tasks). + +**Actors** are the primary way of ensuring "isolation". + +Actors execute at most one task at any given time. However a task may be suspended while "on" an actor. When this happens, another task may execute on the actor -- this is called "actor reentrancy. + +*** + +### Executor Switching + +Swift performs so called "switching" when executing a task and it has to "hop" between different executor (e.g. actors). +If Swift naively enqueued a job every single time we called some async function, the overhead of async functions or actors +would be prohibitive, therefore Swift employs a technique called (actor) switching. + +Switching is a runtime optimization, and has no effect on semantic isolation of a program. + +For this discussion the two kinds of `Executor` sub-types that matter are: +- `SerialExecutor` which ensures isolation, i.e. it can be used to guarantee the isolation an actor provides - this is why it is "serial" +- `TaskExecutor` which may be used to ensure a task prefers to run on a specific task executor which may be using a specific thread or group of threads. Task executors do not provide isolation. + diff --git a/include/swift/ABI/Executor.h b/include/swift/ABI/Executor.h index 4f4fcf8927253..764ce01786b31 100644 --- a/include/swift/ABI/Executor.h +++ b/include/swift/ABI/Executor.h @@ -140,9 +140,19 @@ class SerialExecutorRef { } const char* getIdentityDebugName() const { - return isMainExecutor() ? " (MainActorExecutor)" - : isGeneric() ? (isForSynchronousStart() ? " (GenericExecutor/SynchronousStart)" : " (GenericExecutor)") - : ""; + if (isMainExecutor()) { + return " (MainActorExecutor)"; + } + if (isDefaultActor()) { + return " (DefaultActor)"; + } + if (isGeneric()) { + if (isForSynchronousStart()) { + return " (GenericExecutor/SynchronousStart)"; + } + return " (GenericExecutor)"; + } + return ""; } /// Is this the generic executor reference? @@ -186,12 +196,6 @@ class SerialExecutorRef { return reinterpret_cast(table); } - /// Do we have to do any work to start running as the requested - /// executor? - bool mustSwitchToRun(SerialExecutorRef newExecutor) const { - return Identity != newExecutor.Identity; - } - /// Is this executor the main executor? bool isMainExecutor() const; diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 66f93cf57b6aa..0702341336b1b 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -17,6 +17,7 @@ #include "swift/Runtime/Concurrency.h" #include +#include #include #if __has_feature(ptrauth_calls) #include @@ -178,12 +179,14 @@ class ExecutorTrackingInfo { } bool allowsSwitching() const { + // fprintf(stdout, "[swift][%p] allows switching? = %d\n", this, AllowsSwitching); return AllowsSwitching; } /// Disallow switching in this tracking context. This should only /// be set on a new tracking info, before any jobs are run in it. - void disallowSwitching() { + void disallowSwitching() { // TODO: does this make sense still or not? + // fprintf(stdout, "[swift][%p] disallow switching!\n", this); AllowsSwitching = false; } @@ -499,6 +502,8 @@ SWIFT_CC(swift) static bool swift_task_isCurrentExecutorWithFlagsImpl( SerialExecutorRef expectedExecutor, swift_task_is_current_executor_flag flags) { + + SWIFT_TASK_DEBUG_LOG("executor checking: expected is main executor & current thread is main thread => pass", nullptr); auto current = ExecutorTrackingInfo::current(); auto options = SwiftTaskIsCurrentExecutorOptions(flags); @@ -520,7 +525,7 @@ static bool swift_task_isCurrentExecutorWithFlagsImpl( // and we do not have a 'current' executor here. // Invoke the 'isIsolatingCurrentContext', if "undecided" (i.e. nil), we need to make further calls - SWIFT_TASK_DEBUG_LOG("executor checking, invoke (%p).isIsolatingCurrentContext", + SWIFT_TASK_DEBUG_LOG("executor checking: invoke (%p).isIsolatingCurrentContext", expectedExecutor.getIdentity()); // The executor has the most recent 'isIsolatingCurrentContext' API // so available so we prefer calling that to 'checkIsolated'. @@ -528,7 +533,7 @@ static bool swift_task_isCurrentExecutorWithFlagsImpl( getIsIsolatingCurrentContextDecisionFromInt( swift_task_isIsolatingCurrentContext(expectedExecutor)); - SWIFT_TASK_DEBUG_LOG("executor checking mode option: UseIsIsolatingCurrentContext; invoke (%p).isIsolatingCurrentContext => %s", + SWIFT_TASK_DEBUG_LOG("executor checking: mode option: UseIsIsolatingCurrentContext; invoke (%p).isIsolatingCurrentContext => %s", expectedExecutor.getIdentity(), getIsIsolatingCurrentContextDecisionNameStr(isIsolatingCurrentContextDecision)); switch (isIsolatingCurrentContextDecision) { case IsIsolatingCurrentContextDecision::Isolated: @@ -546,7 +551,7 @@ static bool swift_task_isCurrentExecutorWithFlagsImpl( // Otherwise, as last resort, let the expected executor check using // external means, as it may "know" this thread is managed by it etc. if (options.contains(swift_task_is_current_executor_flag::Assert)) { - SWIFT_TASK_DEBUG_LOG("executor checking mode option: Assert; invoke (%p).expectedExecutor", + SWIFT_TASK_DEBUG_LOG("executor checking: mode option: Assert; invoke (%p).expectedExecutor", expectedExecutor.getIdentity()); swift_task_checkIsolated(expectedExecutor); // will crash if not same context @@ -754,11 +759,14 @@ static unsigned unexpectedExecutorLogLevel = : 2; // new apps will only crash upon concurrency violations, and will call into `checkIsolated` static void checkUnexpectedExecutorLogLevel(void *context) { + SWIFT_TASK_DEBUG_LOG("executor checking: checkUnexpectedExecutorLogLevel %s", "checking..."); + #if SWIFT_STDLIB_HAS_ENVIRON const char *levelStr = getenv("SWIFT_UNEXPECTED_EXECUTOR_LOG_LEVEL"); if (!levelStr) return; + SWIFT_TASK_DEBUG_LOG("executor checking: SWIFT_UNEXPECTED_EXECUTOR_LOG_LEVEL is set: %s", levelStr); long level = strtol(levelStr, nullptr, 0); if (level >= 0 && level < 3) { auto options = SwiftTaskIsCurrentExecutorOptions( @@ -2206,12 +2214,16 @@ static void swift_job_runImpl(Job *job, SerialExecutorRef executor) { // during this operation. But do allow switching if the executor // is generic. if (!executor.isGeneric()) { + SWIFT_TASK_DEBUG_LOG("Disable switching for job %p, executor %p%s is not generic. Disable in trackingInfo.", + job, executor.getIdentity(), executor.getIdentityDebugName()); trackingInfo.disallowSwitching(); } + // FIXME: why were we "hiding" the executor, we can just pass it? auto taskExecutor = executor.isGeneric() ? TaskExecutorRef::fromTaskExecutorPreference(job) : TaskExecutorRef::undefined(); + // auto taskExecutor = TaskExecutorRef::fromTaskExecutorPreference(job); trackingInfo.enterAndShadow(executor, taskExecutor); @@ -2237,6 +2249,8 @@ static void swift_job_run_on_serial_and_task_executorImpl(Job *job, SWIFT_TASK_DEBUG_LOG("Run job %p on serial executor %p task executor %p", job, serialExecutor.getIdentity(), taskExecutor.getIdentity()); + // We DO allow switching! + // TODO: we don't allow switching trackingInfo.disallowSwitching(); trackingInfo.enterAndShadow(serialExecutor, taskExecutor); @@ -2323,31 +2337,36 @@ namespace swift { /****************************** ACTOR SWITCHING ******************************/ /*****************************************************************************/ -/// Can the current executor give up its thread? +/// Determine if the source executor CAN give up its thread static bool canGiveUpThreadForSwitch(ExecutorTrackingInfo *trackingInfo, - SerialExecutorRef currentExecutor) { - assert(trackingInfo || currentExecutor.isGeneric()); + SerialExecutorRef currentSerialExecutor) { + assert(trackingInfo || currentSerialExecutor.isGeneric()); // Some contexts don't allow switching at all. - if (trackingInfo && !trackingInfo->allowsSwitching()) { + if (trackingInfo && !trackingInfo->allowsSwitching()) { // TODO: remove that and just base it on + SWIFT_TASK_DEBUG_LOG("task_switch: Can not give up thread; TrackingInfo %p does not allow switching (from serial executor %p)", trackingInfo, currentSerialExecutor); return false; } - // We can certainly "give up" a generic executor to try to run - // a task for an actor. - if (currentExecutor.isGeneric()) { - if (currentExecutor.isForSynchronousStart()) { - return false; - } + // If this is a Task.immediate start, we must not give up the calling thread. + if (currentSerialExecutor.isForSynchronousStart()) { + SWIFT_TASK_DEBUG_LOG("task_switch: Can not give up thread; synchronous start (immediate) serial executor %p", currentSerialExecutor.getIdentity()); + return false; + } + // We certainly can "give up" a generic executor to try to run a task for an actor. + if (currentSerialExecutor.isGeneric()) { + SWIFT_TASK_DEBUG_LOG("task_switch: Can give up thread; source executor is generic %p", currentSerialExecutor.getIdentity()); return true; } - // If the current executor is a default actor, we know how to make - // it give up its thread. - if (currentExecutor.isDefaultActor()) + // If the current executor is a default actor, we know how to make it give up its thread. + if (currentSerialExecutor.isDefaultActor()) { + SWIFT_TASK_DEBUG_LOG("task_switch: Can give up thread, source executor is default actor %p", currentSerialExecutor.getIdentity()); return true; + } + SWIFT_TASK_DEBUG_LOG("task_switch: Can not give up thread %d", 0); return false; } @@ -2358,12 +2377,16 @@ static bool canGiveUpThreadForSwitch(ExecutorTrackingInfo *trackingInfo, /// do that in runOnAssumedThread. static void giveUpThreadForSwitch(SerialExecutorRef currentExecutor) { if (currentExecutor.isGeneric()) { - SWIFT_TASK_DEBUG_LOG("Giving up current generic executor %p", + SWIFT_TASK_DEBUG_LOG("task_switch: Giving up thread of generic executor is no-op, ok. Generic executor: %p", currentExecutor.getIdentity()); return; } - asImpl(currentExecutor.getDefaultActor())->unlock(true); + if (currentExecutor.isDefaultActor()) { + SWIFT_TASK_DEBUG_LOG("task_switch: Giving up thread default actor executor: %p, unlock default actor", + currentExecutor.getIdentity()); + asImpl(currentExecutor.getDefaultActor())->unlock(true); + } } /// Try to assume control of the current thread for the given executor @@ -2388,39 +2411,66 @@ static bool tryAssumeThreadForSwitch(SerialExecutorRef newExecutor, return false; } -static bool mustSwitchToRun(SerialExecutorRef currentSerialExecutor, - SerialExecutorRef newSerialExecutor, - TaskExecutorRef currentTaskExecutor, - TaskExecutorRef newTaskExecutor) { +struct AnyExecutorRef { +private: + std::variant Ref; + +public: + AnyExecutorRef(SerialExecutorRef ref): Ref(std::move(ref)) {} + AnyExecutorRef(TaskExecutorRef ref) : Ref(std::move(ref)) {} + + HeapObject *getIdentity() const { + return std::visit([](auto& ref) { + return ref.getIdentity(); + }, Ref); + } + + bool isSerialExecutorRef() const { + return std::holds_alternative(Ref); + } + + bool isTaskExecutorRef() const { + return std::holds_alternative(Ref); + } +}; + +// FIXME: remove, this is not used in new impl +static bool canRunFastpathInline(SerialExecutorRef currentSerialExecutor, + SerialExecutorRef newSerialExecutor, + TaskExecutorRef currentTaskExecutor, + TaskExecutorRef newTaskExecutor) { if (currentSerialExecutor.getIdentity() != newSerialExecutor.getIdentity()) { - return true; // must switch, new isolation context + return false; // must switch, new isolation context } // else, we may have to switch if the preferred task executor is different if (currentTaskExecutor.getIdentity() == newTaskExecutor.getIdentity()) - return false; + return true; if (currentTaskExecutor.isUndefined()) currentTaskExecutor = swift_getDefaultExecutor(); if (newTaskExecutor.isUndefined()) newTaskExecutor = swift_getDefaultExecutor(); - return currentTaskExecutor.getIdentity() != newTaskExecutor.getIdentity(); + return currentTaskExecutor.getIdentity() == newTaskExecutor.getIdentity(); } /// Given that we've assumed control of an executor on this thread, /// continue to run the given task on it. SWIFT_CC(swiftasync) -static void runOnAssumedThread(AsyncTask *task, SerialExecutorRef executor, +static void runOnAssumedThread(AsyncTask *task, + SerialExecutorRef serialExecutor, + TaskExecutorRef taskExecutor, ExecutorTrackingInfo *oldTracking) { // Note that this doesn't change the active task and so doesn't // need to either update ActiveTask or flagAsRunning/flagAsSuspended. + assert(task->getPreferredTaskExecutor().getIdentity() == taskExecutor.getIdentity()); // If there's already tracking info set up, just change the executor // there and tail-call the task. We don't want these frames to // potentially accumulate linearly. if (oldTracking) { - oldTracking->setActiveExecutor(executor); + oldTracking->setActiveExecutor(serialExecutor); oldTracking->setTaskExecutor(task->getPreferredTaskExecutor()); return task->runInFullyEstablishedContext(); // 'return' forces tail call @@ -2428,7 +2478,7 @@ static void runOnAssumedThread(AsyncTask *task, SerialExecutorRef executor, // Otherwise, set up tracking info. ExecutorTrackingInfo trackingInfo; - trackingInfo.enterAndShadow(executor, task->getPreferredTaskExecutor()); + trackingInfo.enterAndShadow(serialExecutor, task->getPreferredTaskExecutor()); // Run the new task. task->runInFullyEstablishedContext(); @@ -2439,18 +2489,25 @@ static void runOnAssumedThread(AsyncTask *task, SerialExecutorRef executor, // In principle, we could execute more tasks from the actor here, but // that's probably not a reasonable thing to do in an assumed context // rather than a dedicated actor-processing job. - executor = trackingInfo.getActiveExecutor(); + serialExecutor = trackingInfo.getActiveExecutor(); trackingInfo.leave(); - SWIFT_TASK_DEBUG_LOG("leaving assumed thread, current executor is %p", - executor.getIdentity()); + SWIFT_TASK_DEBUG_LOG("leaving assumed thread, current serialExecutor is %p", + serialExecutor.getIdentity()); - if (executor.isDefaultActor()) - asImpl(executor.getDefaultActor())->unlock(true); + if (serialExecutor.isDefaultActor()) + asImpl(serialExecutor.getDefaultActor())->unlock(true); } + + + + + + +// FIXME: remove this OLD switch implementation, only keeping it here as a reference while moving to the new one during development SWIFT_CC(swiftasync) -static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContext, +static void swift_task_switchImpl_OLD(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContext, TaskContinuationFunction *resumeFunction, SerialExecutorRef newExecutor) { auto task = swift_task_getCurrent(); @@ -2478,8 +2535,8 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex // If the current executor is compatible with running the new executor, // we can just immediately continue running with the resume function // we were passed in. - if (!mustSwitchToRun(currentExecutor, newExecutor, currentTaskExecutor, - newTaskExecutor)) { + if (canRunFastpathInline(currentExecutor, newExecutor, + currentTaskExecutor, newTaskExecutor)) { SWIFT_TASK_DEBUG_LOG("Task %p run inline", task); return resumeFunction(resumeContext); // 'return' forces tail call } @@ -2515,6 +2572,212 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex task->flagAsAndEnqueueOnExecutor(newExecutor); } + +/// Implements revised "task switching" logic, which takes into account task and global executors. +SWIFT_CC(swiftasync) +static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContext, + TaskContinuationFunction *resumeFunction, + SerialExecutorRef newSerialExecutor) { + auto task = swift_task_getCurrent(); + assert(task && "no current task!"); + + + auto trackingInfo = ExecutorTrackingInfo::current(); + SerialExecutorRef currentSerialExecutor = trackingInfo ? + trackingInfo->getActiveExecutor() : + SerialExecutorRef::generic(); + + if (!trackingInfo) { + SWIFT_TASK_DEBUG_LOG("Missing executor tracking, assume currentSerialExecutor = %p %s.", SerialExecutorRef::generic().getIdentity(), "generic"); + } + + // If the physical executors are different, you have to suspend. + // If the logical executors are the same, the switch is trivial. + // Otherwise, the serial executors should either be switchable or equal to the current task executor. If + // the context lets you switch (including the yield check), and + // acquiring the new serial executor succeeds (it always succeeds if it's the current task executor), then + // you release the current serial executor (if it's not the current task executor). + // Otherwise you suspend. + // A serial executor is switchable if it's default or nil + + TaskExecutorRef currentTaskExecutor = (trackingInfo ? trackingInfo->getTaskExecutor() + : TaskExecutorRef::undefined()); + if (!trackingInfo) SWIFT_TASK_DEBUG_LOG("Missing executor tracking, assume currentTaskExecutor = %s.", "undefined"); + TaskExecutorRef newTaskExecutor = task->getPreferredTaskExecutor(); + + // The physical executor is the serial executor, if it exists and is non-default, + // and otherwise the task executor. + auto currentPhysicalExecutor = + (currentSerialExecutor.isDefaultActor() || currentSerialExecutor.isGeneric()) ? + AnyExecutorRef(currentTaskExecutor) : + AnyExecutorRef(currentSerialExecutor); + + auto newPhysicalExecutor = + (newSerialExecutor.isDefaultActor() || newSerialExecutor.isGeneric()) ? + AnyExecutorRef(newTaskExecutor) : + AnyExecutorRef(newSerialExecutor); + + // The logical executor is the serial executor (e.g. an actor's executor), if it exists (regardless of whether it's default), + // and otherwise the task executor. + auto currentLogicalExecutor = !currentSerialExecutor.isGeneric() ? + AnyExecutorRef(currentSerialExecutor) : + AnyExecutorRef(currentTaskExecutor); + + auto newLogicalExecutor = newSerialExecutor.isGeneric() ? + AnyExecutorRef(newSerialExecutor) : + AnyExecutorRef(newTaskExecutor); + + SWIFT_TASK_DEBUG_LOG("task_switch: Task %p trying to switch executors: serial executor: %p%s->%p%s; task executor: %p%s->%p%s; (physical:%p->%p, logical:%p->%p)", + task, + currentSerialExecutor.getIdentity(), currentSerialExecutor.getIdentityDebugName(), + newSerialExecutor.getIdentity(), newSerialExecutor.getIdentityDebugName(), + currentTaskExecutor.getIdentity(), currentTaskExecutor.isDefined() ? "" : " (undefined)", + newTaskExecutor.getIdentity(), newTaskExecutor.isDefined() ? "" : " (undefined)", + currentPhysicalExecutor.getIdentity(), newPhysicalExecutor.getIdentity(), + currentLogicalExecutor.getIdentity(), newLogicalExecutor.getIdentity()); + + // === Decisions that will influence if and how to switch the task + + // - Decision 1: Can we fastpath execute 'inline' immediately? + bool canFastpathExecuteInline = false; + + // Determine if the current and new "physical" executor, which provides the thread we run on, + // are the same; If so, we can just run inline immediately. + + // If the physical executors are different, we have to suspend. + // if (currentPhysicalExecutor.getIdentity() != newPhysicalExecutor.getIdentity()) { + if (currentPhysicalExecutor.getIdentity() == newPhysicalExecutor.getIdentity() && + newSerialExecutor.isGeneric()) { + SWIFT_TASK_DEBUG_LOG("task_switch: Task %p can run inline, same physical executor executors", task); + canFastpathExecuteInline = true; + } else if (currentSerialExecutor.getIdentity() != newSerialExecutor.getIdentity()) { + SWIFT_TASK_DEBUG_LOG("task_switch: Task %p cannot run inline, different serial executors", task); + canFastpathExecuteInline = false; + } else if (currentTaskExecutor.isDefined() && newTaskExecutor.isUndefined()) { + // If we're leaving a task executor, we definitely must enqueue to avoid overhanging on it. + SWIFT_TASK_DEBUG_LOG("task_switch: Task %p cannot run inline, leaving task executor %p to undefined task executor %p", task, currentTaskExecutor.getIdentity(), newTaskExecutor.getIdentity()); + canFastpathExecuteInline = false; + } else if (newTaskExecutor.isDefined() && + currentTaskExecutor.getIdentity() == newTaskExecutor.getIdentity() && + currentLogicalExecutor.getIdentity() == newLogicalExecutor.getIdentity()) { + // 2. If the logical executors are the same, the switch is trivial. + SWIFT_TASK_DEBUG_LOG("task_switch: Same logical executor (%p)", newLogicalExecutor.getIdentity()); + // if (newSerialExecutor.isGeneric() && currentTaskExecutor.isUndefined() && currentTaskExecutor.isDefined()) { + // SWIFT_TASK_DEBUG_LOG("Task %p cannot run inline, logical executors are the same %p but moving to generic serial executor with defined task executor, must hop to it.", task, currentLogicalExecutor.getIdentity()); + // executeInline = false; + // } else { + SWIFT_TASK_DEBUG_LOG("task_switch: Task %p CAN run inline, logical executors are the same %p", task, currentLogicalExecutor.getIdentity()); + canFastpathExecuteInline = true; + // } + } + + // Last attempt, perhaps we are on the same actor isolation, and even though the task executors are not defined, + // we _are_ effectively on the same physical executor, because we're going to enqueue on the default global executor. + if (!canFastpathExecuteInline && currentSerialExecutor.getIdentity() == newSerialExecutor.getIdentity()) { + // The consider task executors, however if any of them is not defined, + // fallback to the global executor as the source of threads. + auto currentEffectiveTaskExecutor = currentTaskExecutor.isDefined() ? + currentTaskExecutor : swift_getDefaultExecutor(); + auto newEffectiveTaskExecutor = newTaskExecutor.isDefined() ? + newTaskExecutor : swift_getDefaultExecutor(); + + canFastpathExecuteInline = currentEffectiveTaskExecutor.getIdentity() == newEffectiveTaskExecutor.getIdentity(); + SWIFT_TASK_DEBUG_LOG("task_switch: Task %p %s run inline. Current 'effective' task executor %p, new 'effective' task executor %p, default global task executor %p", + task, canFastpathExecuteInline ? "can" : "cannot", currentEffectiveTaskExecutor, newEffectiveTaskExecutor, swift_getDefaultExecutor()); + // canFastpathExecuteInline = false; + } + + // If the current executor is compatible with running the new executor, + // we can just immediately continue running with the resume function + // we were passed in. + if (canFastpathExecuteInline) { + SWIFT_TASK_DEBUG_LOG("task_switch: Task %p can fastpath, run inline.", task); + + return resumeFunction(resumeContext); // 'return' forces tail call + } + + // --- We could not fast-path, let's consider our alternative options. + + // - Decision 2: Do we need to perform any actor locking? + + // // Since we may need to switch actor contexts, we may need to either unlock the actor we're leaving (if there is one), + // // and/or lock the target actor (if there is one). This can only be done for default actors. + // bool mustUnlockSourceDefaultActor = currentSerialExecutor.isDefaultActor(); + // bool mustLockTargetDefaultActor = newSerialExecutor.isDefaultActor(); + + // SWIFT_TASK_DEBUG_LOG("task_switch: Task %p must unlock source default actor = %s", + // task, mustUnlockSourceDefaultActor ? "yes" : "no"); + // SWIFT_TASK_DEBUG_LOG("task_switch: Task %p must lock target default actor = %s", + // task, mustLockTargetDefaultActor ? "yes" : "no"); + + // Park the task for simplicity instead of trying to thread the + // initial resumption information into everything below. + task->ResumeContext = resumeContext; + task->ResumeTask = resumeFunction; + + // Otherwise, the serial executors should either be switchable or equal to the current task executor. + // - If the context lets you switch (including the yield check), + // - and acquiring the new serial executor succeeds (it always succeeds if it's the current task executor), + // >>> then you release the current serial executor (if it's not the current task executor). + // Otherwise you suspend. + + // --- Decision: Can the source serial executor "give up" its thread? + auto sourceCanGiveUpThread = [&]() -> bool { + if (trackingInfo && !trackingInfo->allowsSwitching()) { + // Some contexts don't allow switching at all. + SWIFT_TASK_DEBUG_LOG("task_switch: Can not give up thread; TrackingInfo %p does not allow switching (from serial executor %p)", trackingInfo, currentSerialExecutor); + return false; + } + + if (currentSerialExecutor.isForSynchronousStart()) { + // A synchronous start promises that it will not enqueue, and therefore must not give up the thread. + SWIFT_TASK_DEBUG_LOG("task_switch: Can not give up thread; synchronous start (immediate) serial executor %p", currentSerialExecutor.getIdentity()); + return false; + } + + if (currentSerialExecutor.isGeneric()) { + // We certainly can "give up" a generic executor to try to run a task for an actor. + SWIFT_TASK_DEBUG_LOG("task_switch: Can give up thread; source executor is generic %p", currentSerialExecutor.getIdentity()); + return true; + } + + if (currentSerialExecutor.isDefaultActor()) { + // If the current executor is a default actor, we know how to make it give up its thread. + SWIFT_TASK_DEBUG_LOG("task_switch: Can give up thread, source executor is default actor %p", currentSerialExecutor.getIdentity()); + assert(mustUnlockSourceDefaultActor); + return true; + } + + return false; + }; + + // ===== Attempt to take over the thread + // If the current executor can give up its thread, and the new executor + // can take over a thread, try to do so; but don't do this if we've + // been asked to yield the thread. + SWIFT_TASK_DEBUG_LOG("task_switch: Can task %p give up thread?", task); + if ( + // currentTaskExecutor.isUndefined() && + canGiveUpThreadForSwitch(trackingInfo, currentSerialExecutor) && + sourceCanGiveUpThread() && + !shouldYieldThread() && + tryAssumeThreadForSwitch(newSerialExecutor, newTaskExecutor)) { + SWIFT_TASK_DEBUG_LOG("task_switch: switch succeeded, task %p assumed thread for executor %p", + task, newSerialExecutor.getIdentity()); + giveUpThreadForSwitch(currentSerialExecutor); + // 'return' forces tail call + return runOnAssumedThread(task, newSerialExecutor, newTaskExecutor, trackingInfo); + } + + // [ENQUEUE] Otherwise, just asynchronously enqueue the task on the given executor. + SWIFT_TASK_DEBUG_LOG( + "task_switch: switch failed, task %p enqueue on serial executor %p (task executor: %p)", + task, newSerialExecutor.getIdentity(), newTaskExecutor.getIdentity()); + + _swift_task_clearCurrent(); + task->flagAsAndEnqueueOnExecutor(newSerialExecutor); +} + SWIFT_CC(swift) static void swift_task_immediateImpl(AsyncTask *task, diff --git a/stdlib/public/Concurrency/CheckedContinuation.swift b/stdlib/public/Concurrency/CheckedContinuation.swift index a4f0f1553d050..5bc7338af1259 100644 --- a/stdlib/public/Concurrency/CheckedContinuation.swift +++ b/stdlib/public/Concurrency/CheckedContinuation.swift @@ -44,7 +44,7 @@ internal final class CheckedContinuationCanary: @unchecked Sendable { let tailPtr = unsafe UnsafeMutableRawPointer( Builtin.projectTailElems(self, (UnsafeRawPointer?, String).self)) - let functionPtr = unsafe tailPtr + let functionPtr = unsafe tailPtr + MemoryLayout<(UnsafeRawPointer?, String)>.offset(of: \(UnsafeRawPointer?, String).1)! return unsafe functionPtr.assumingMemoryBound(to: String.self) @@ -125,7 +125,7 @@ internal final class CheckedContinuationCanary: @unchecked Sendable { @available(SwiftStdlib 5.1, *) public struct CheckedContinuation: Sendable { private let canary: CheckedContinuationCanary - + /// Creates a checked continuation from an unsafe continuation. /// /// Instead of calling this initializer, @@ -148,7 +148,7 @@ public struct CheckedContinuation: Sendable { continuation: continuation, function: function) } - + /// Resume the task awaiting the continuation by having it return normally /// from its suspension point. /// @@ -172,7 +172,7 @@ public struct CheckedContinuation: Sendable { #endif } } - + /// Resume the task awaiting the continuation by having it throw an error /// from its suspension point. /// @@ -290,12 +290,35 @@ extension CheckedContinuation { /// - SeeAlso: `withCheckedThrowingContinuation(function:_:)` /// - SeeAlso: `withUnsafeContinuation(function:_:)` /// - SeeAlso: `withUnsafeThrowingContinuation(function:_:)` +@inlinable +@_alwaysEmitIntoClient +@available(SwiftStdlib 5.1, *) +// ABI Note: We need to use @abi here because the ABI of this function otherwise conflicts with the legacy +// @_unsafeInheritExecutor declaration, as none of them have (or mangle) the implicit +@abi( + nonisolated(nonsending) func withCheckedContinuationNonisolatedNonsending( + function: String, + _ body: (CheckedContinuation) -> Void + ) async -> sending T +) +public nonisolated(nonsending) func withCheckedContinuation( + function: String = #function, +_ body: (CheckedContinuation) -> Void +) async -> sending T { + return await Builtin.withUnsafeContinuation { + let unsafeContinuation = unsafe UnsafeContinuation($0) + return body(unsafe CheckedContinuation(continuation: unsafeContinuation, + function: function)) + } +} + @inlinable @available(SwiftStdlib 5.1, *) #if !$Embedded @backDeployed(before: SwiftStdlib 6.0) #endif -public func withCheckedContinuation( +@available(*, deprecated, message: "Replaced by nonisolated(nonsending) overload") +func withCheckedContinuation( isolation: isolated (any Actor)? = #isolation, function: String = #function, _ body: (CheckedContinuation) -> Void @@ -354,6 +377,28 @@ public func _unsafeInheritExecutor_withCheckedContinuation( /// - SeeAlso: `withCheckedContinuation(function:_:)` /// - SeeAlso: `withUnsafeContinuation(function:_:)` /// - SeeAlso: `withUnsafeThrowingContinuation(function:_:)` +@inlinable +@_alwaysEmitIntoClient +@available(SwiftStdlib 5.1, *) +// ABI Note: We need to use @abi here because the ABI of this function otherwise conflicts with the legacy +// @_unsafeInheritExecutor declaration, as none of them have (or mangle) the implicit +@abi( + nonisolated(nonsending) func withCheckedThrowingContinuationNonisolatedNonsending( + function: String, + _ body: (CheckedContinuation) -> Void + ) async throws -> sending T +) +public nonisolated(nonsending) func withCheckedThrowingContinuation( + function: String = #function, + _ body: (CheckedContinuation) -> Void +) async throws -> sending T { + return try await Builtin.withUnsafeThrowingContinuation { + let unsafeContinuation = unsafe UnsafeContinuation($0) + return body(unsafe CheckedContinuation(continuation: unsafeContinuation, + function: function)) + } +} + @inlinable @available(SwiftStdlib 5.1, *) #if !$Embedded @@ -435,4 +480,4 @@ internal func _resumeCheckedThrowingContinuationWithError( continuation.resume(throwing: error) } -#endif +#endif \ No newline at end of file diff --git a/stdlib/public/Concurrency/CooperativeExecutor.swift b/stdlib/public/Concurrency/CooperativeExecutor.swift index 654db0b65d66a..4d95dbfa586de 100644 --- a/stdlib/public/Concurrency/CooperativeExecutor.swift +++ b/stdlib/public/Concurrency/CooperativeExecutor.swift @@ -304,6 +304,7 @@ extension CooperativeExecutor: RunLoopExecutor { // Now run any queued jobs var runQ = runQueue.take() while let job = runQ.pop() { + // FIXME: this seems to be using the wrong run method, missing to pass the task executor. unsafe ExecutorJob(job).runSynchronously( on: self.asUnownedSerialExecutor() ) diff --git a/stdlib/public/Concurrency/PartialAsyncTask.swift b/stdlib/public/Concurrency/PartialAsyncTask.swift index a255d34d3454a..ae3e724d45dc1 100644 --- a/stdlib/public/Concurrency/PartialAsyncTask.swift +++ b/stdlib/public/Concurrency/PartialAsyncTask.swift @@ -889,8 +889,7 @@ internal func _resumeUnsafeThrowingContinuationWithError( @available(SwiftStdlib 5.1, *) @_alwaysEmitIntoClient @unsafe -public func withUnsafeContinuation( - isolation: isolated (any Actor)? = #isolation, +public nonisolated(nonsending) func withUnsafeContinuation( _ fn: (UnsafeContinuation) -> Void ) async -> sending T { return await Builtin.withUnsafeContinuation { @@ -926,8 +925,7 @@ public func withUnsafeContinuation( @available(SwiftStdlib 5.1, *) @_alwaysEmitIntoClient @unsafe -public func withUnsafeThrowingContinuation( - isolation: isolated (any Actor)? = #isolation, +public nonisolated(nonsending) func withUnsafeThrowingContinuation( _ fn: (UnsafeContinuation) -> Void ) async throws -> sending T { return try await Builtin.withUnsafeThrowingContinuation { @@ -984,4 +982,4 @@ public func _swift_createJobForTestingOnly( job.priority = JobPriority(priority) return job } -#endif +#endif \ No newline at end of file diff --git a/stdlib/public/Concurrency/Task+TaskExecutor.swift b/stdlib/public/Concurrency/Task+TaskExecutor.swift index ca04d74c63afe..04e6e740e3ef4 100644 --- a/stdlib/public/Concurrency/Task+TaskExecutor.swift +++ b/stdlib/public/Concurrency/Task+TaskExecutor.swift @@ -146,7 +146,7 @@ public func withTaskExecutorPreference( } let taskExecutorBuiltin: Builtin.Executor = - unsafe taskExecutor.asUnownedTaskExecutor().executor + taskExecutor.asUnownedTaskExecutor().executor let record = unsafe _pushTaskExecutorPreference(taskExecutorBuiltin) defer { diff --git a/stdlib/public/Concurrency/TaskGroup.cpp b/stdlib/public/Concurrency/TaskGroup.cpp index 0d1d2be13d156..ffd3d28e9fb85 100644 --- a/stdlib/public/Concurrency/TaskGroup.cpp +++ b/stdlib/public/Concurrency/TaskGroup.cpp @@ -1817,7 +1817,7 @@ reevaluate_if_taskgroup_has_results:; assumed = TaskGroupStatus{assumedStatus}; continue; // We raced with something, try again. } - SWIFT_TASK_DEBUG_LOG("poll, after CAS: %s", status.to_string().c_str()); + // SWIFT_TASK_DEBUG_LOG("poll, after CAS: %s", status.to_string().c_str()); // We're going back to running the task, so if we suspended before, // we need to flag it as running again. diff --git a/stdlib/public/Concurrency/VoucherSupport.h b/stdlib/public/Concurrency/VoucherSupport.h index f4be545d2462c..cedf0e772b65c 100644 --- a/stdlib/public/Concurrency/VoucherSupport.h +++ b/stdlib/public/Concurrency/VoucherSupport.h @@ -33,7 +33,7 @@ class VoucherManager { public: VoucherManager() { - SWIFT_TASK_DEBUG_LOG("[%p] Constructing VoucherManager", this); + // SWIFT_TASK_DEBUG_LOG("[%p] Constructing VoucherManager", this); } /// Clean up after completing async work, restoring the original voucher on @@ -42,8 +42,8 @@ class VoucherManager { /// places to restore the original voucher and reset the VoucherManager. void leave() { if (OriginalVoucher) { - SWIFT_TASK_DEBUG_LOG("[%p] Restoring original voucher %p", this, - *OriginalVoucher); + // SWIFT_TASK_DEBUG_LOG("[%p] Restoring original voucher %p", this, + // *OriginalVoucher); if (swift_voucher_needs_adopt(*OriginalVoucher)) { auto previous = voucher_adopt(*OriginalVoucher); swift_voucher_release(previous); @@ -51,8 +51,9 @@ class VoucherManager { swift_voucher_release(*OriginalVoucher); } OriginalVoucher = std::nullopt; - } else - SWIFT_TASK_DEBUG_LOG("[%p] Leaving empty VoucherManager", this); + } else { + // SWIFT_TASK_DEBUG_LOG("[%p] Leaving empty VoucherManager", this); + } } ~VoucherManager() { assert(!OriginalVoucher); } @@ -62,22 +63,22 @@ class VoucherManager { /// this is permanent. For Tasks, the voucher must be restored using /// restoreVoucher if the task suspends. void swapToJob(Job *job) { - SWIFT_TASK_DEBUG_LOG("[%p] Swapping jobs to %p", this, job); + // SWIFT_TASK_DEBUG_LOG("[%p] Swapping jobs to %p", this, job); assert(job); assert(job->Voucher != SWIFT_DEAD_VOUCHER); voucher_t previous; if (swift_voucher_needs_adopt(job->Voucher)) { // If we need to adopt the voucher, do so, and grab the old one. - SWIFT_TASK_DEBUG_LOG("[%p] Swapping jobs to %p, adopting voucher %p", - this, job, job->Voucher); + // SWIFT_TASK_DEBUG_LOG("[%p] Swapping jobs to %p, adopting voucher %p", + // this, job, job->Voucher); previous = voucher_adopt(job->Voucher); } else { // If we don't need to adopt the voucher, take the voucher out of Job // directly. - SWIFT_TASK_DEBUG_LOG( - "[%p] Swapping jobs to to %p, voucher %p does not need adoption", - this, job, job->Voucher); + // SWIFT_TASK_DEBUG_LOG( + // "[%p] Swapping jobs to to %p, voucher %p does not need adoption", + // this, job, job->Voucher); previous = job->Voucher; } @@ -88,7 +89,7 @@ class VoucherManager { // If we don't yet have an original voucher, then save the one we grabbed // above to restore later. OriginalVoucher = previous; - SWIFT_TASK_DEBUG_LOG("[%p] Saved original voucher %p", this, previous); + // SWIFT_TASK_DEBUG_LOG("[%p] Saved original voucher %p", this, previous); } else { // We already have an original voucher. The one we grabbed above is not // needed. We own it, so destroy it here. @@ -99,8 +100,8 @@ class VoucherManager { // Take the current thread's adopted voucher and place it back into the task // that previously owned it, re-adopting the thread's original voucher. void restoreVoucher(AsyncTask *task) { - SWIFT_TASK_DEBUG_LOG("[%p] Restoring %svoucher on task %p", this, - OriginalVoucher ? "" : "missing ", task); + // SWIFT_TASK_DEBUG_LOG("[%p] Restoring %svoucher on task %p", this, + // OriginalVoucher ? "" : "missing ", task); assert(OriginalVoucher); assert(task->Voucher == SWIFT_DEAD_VOUCHER); diff --git a/stdlib/public/Observation/Sources/Observation/Observations.swift b/stdlib/public/Observation/Sources/Observation/Observations.swift index f87cc24eaf72f..7ba7af4f0bf50 100644 --- a/stdlib/public/Observation/Sources/Observation/Observations.swift +++ b/stdlib/public/Observation/Sources/Observation/Observations.swift @@ -114,7 +114,7 @@ public struct Observations: AsyncSequence, Se // install a willChange continuation into the set of continuations // this must take a locally unique id (to the active calls of next) static func willChange(isolation iterationIsolation: isolated (any Actor)? = #isolation, state: _ManagedCriticalState, id: Int) async { - return await withUnsafeContinuation(isolation: iterationIsolation) { continuation in + return await withUnsafeContinuation { continuation in state.withCriticalRegion { state in defer { state.dirty = false } switch state.continuations[id] { diff --git a/stdlib/public/runtime/EnvironmentVariables.cpp b/stdlib/public/runtime/EnvironmentVariables.cpp index 45dd45af138a5..aedad1c9c1b99 100644 --- a/stdlib/public/runtime/EnvironmentVariables.cpp +++ b/stdlib/public/runtime/EnvironmentVariables.cpp @@ -317,5 +317,7 @@ SWIFT_RUNTIME_STDLIB_SPI bool concurrencyValidateUncheckedContinuations() { } SWIFT_RUNTIME_STDLIB_SPI const char *concurrencyIsCurrentExecutorLegacyModeOverride() { - return runtime::environment::SWIFT_IS_CURRENT_EXECUTOR_LEGACY_MODE_OVERRIDE(); + auto value = runtime::environment::SWIFT_IS_CURRENT_EXECUTOR_LEGACY_MODE_OVERRIDE(); + // fprintf(stdout, "[swift] SWIFT_IS_CURRENT_EXECUTOR_LEGACY_MODE_OVERRIDE = %s\n", value);; + return value; } diff --git a/stdlib/toolchain/Compatibility56/include/Concurrency/Executor.h b/stdlib/toolchain/Compatibility56/include/Concurrency/Executor.h index 013f013c82251..512fc1118ab5d 100644 --- a/stdlib/toolchain/Compatibility56/include/Concurrency/Executor.h +++ b/stdlib/toolchain/Compatibility56/include/Concurrency/Executor.h @@ -116,12 +116,6 @@ class ExecutorRef { return reinterpret_cast(table); } - /// Do we have to do any work to start running as the requested - /// executor? - bool mustSwitchToRun(ExecutorRef newExecutor) const { - return Identity != newExecutor.Identity; - } - /// Is this executor the main executor? bool isMainExecutor() const; diff --git a/test/Concurrency/Runtime/actor_assume_executor.swift b/test/Concurrency/Runtime/actor_assume_executor.swift index 0536a7a0df5bf..55b125797c24a 100644 --- a/test/Concurrency/Runtime/actor_assume_executor.swift +++ b/test/Concurrency/Runtime/actor_assume_executor.swift @@ -140,6 +140,7 @@ final class MainActorEcho { #endif tests.test("assumeOnActorExecutor: assume someone's executor, from SomeoneOnDefaultExecutor") { + print("Before: someone.callCheckSomeone()") await someone.callCheckSomeone() } diff --git a/test/Concurrency/Runtime/actor_counters.swift b/test/Concurrency/Runtime/actor_counters.swift index 26c19a1143839..231aae20dae3e 100644 --- a/test/Concurrency/Runtime/actor_counters.swift +++ b/test/Concurrency/Runtime/actor_counters.swift @@ -24,7 +24,7 @@ actor Counter { let current = value // Make sure we haven't produced this value before - assert(scratchBuffer[current] == 0) + assert(scratchBuffer[current] == 0, "Expected scratchBuffer[current] to be zero, was: \(scratchBuffer[current])") scratchBuffer[current] = 1 value = value + 1 @@ -33,7 +33,7 @@ actor Counter { deinit { for i in 0..= 2 ? Int(args[1])! : 10 let workers = args.count >= 3 ? Int(args[2])! : 100 - let iterations = args.count >= 4 ? Int(args[3])! : 1000 + let iterations = args.count >= 4 ? Int(args[3])! : 100 print("counters: \(counters), workers: \(workers), iterations: \(iterations)") await runTest(numCounters: counters, numWorkers: workers, numIterations: iterations) } diff --git a/test/Concurrency/Runtime/actor_detach.swift b/test/Concurrency/Runtime/actor_detach.swift index 22b65d6e33b7d..10e0cb4b3f46b 100644 --- a/test/Concurrency/Runtime/actor_detach.swift +++ b/test/Concurrency/Runtime/actor_detach.swift @@ -28,20 +28,25 @@ actor Manager { @available(SwiftStdlib 5.1, *) -func test() { - detach { +func test() async -> [Task<(), Never>] { + let t1 = Task.detached { let x = await Manager.shared.manage() print(x) } - detach { + let t2 = Task.detached { let x = await Manager.shared.other() print(x) } + + return [t1, t2] } if #available(SwiftStdlib 5.1, *) { - test() - sleep(30) + let ts = await test() + + for t in ts { + await t.value + } } else { print("manage") print("0") diff --git a/test/Concurrency/Runtime/async_task_executor_and_serial_executor_nonisolated_async_func_legacy.swift b/test/Concurrency/Runtime/async_task_executor_and_serial_executor_nonisolated_async_func_legacy.swift index bcbf905b80dba..c7eb50ed36d53 100644 --- a/test/Concurrency/Runtime/async_task_executor_and_serial_executor_nonisolated_async_func_legacy.swift +++ b/test/Concurrency/Runtime/async_task_executor_and_serial_executor_nonisolated_async_func_legacy.swift @@ -30,9 +30,9 @@ final class NaiveQueueExecutor: TaskExecutor, SerialExecutor { } } - public func checkIsolated() { - print("\(Self.self).\(#function)") + public func isIsolatingCurrentContext() -> Bool? { dispatchPrecondition(condition: .onQueue(self.queue)) + return true } @inlinable @@ -47,7 +47,13 @@ final class NaiveQueueExecutor: TaskExecutor, SerialExecutor { } nonisolated func nonisolatedFunc(expectedExecutor: NaiveQueueExecutor) async { + // we are on the serial queue dispatchPrecondition(condition: .onQueue(expectedExecutor.queue)) + + // This will trigger isIsolatingCurrentContext on the serial executor. + // + // In this specific test we are in 'legacy' mode which means "can not crash", + // therefore checkIsolated cannot be invoked. expectedExecutor.preconditionIsolated() } diff --git a/test/Concurrency/Runtime/async_task_executor_withExecutor.swift b/test/Concurrency/Runtime/async_task_executor_withExecutor.swift index 51bbc08ea1b8d..e9c6c40a07e71 100644 --- a/test/Concurrency/Runtime/async_task_executor_withExecutor.swift +++ b/test/Concurrency/Runtime/async_task_executor_withExecutor.swift @@ -12,6 +12,10 @@ import Dispatch @_spi(ConcurrencyExecutors) import _Concurrency +func p(_ message: String, file: String = #fileID, line: Int = #line) { + print("[\(file):\(line)] \(message)") +} + final class MyTaskExecutor: TaskExecutor, @unchecked Sendable, CustomStringConvertible { let queue: DispatchQueue @@ -43,7 +47,7 @@ func testNestingWithExecutorMainActor(_ firstExecutor: MyTaskExecutor, await withTaskExecutorPreference(firstExecutor) { // the block immediately hops to the expected executor dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) - print("OK: withTaskExecutor body") + p("OK: withTaskExecutor body") await nonisolatedAsyncMethod(expectedOn: firstExecutor) } @@ -52,33 +56,51 @@ func testNestingWithExecutorMainActor(_ firstExecutor: MyTaskExecutor, // the block immediately hops to the expected executor dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) dispatchPrecondition(condition: .onQueue(secondExecutor.queue)) - print("OK: withTaskExecutor { withTaskExecutor { ... } }") + p("OK: withTaskExecutor { withTaskExecutor { ... } }") await nonisolatedAsyncMethod(expectedOn: secondExecutor) } } + p("withTaskExecutorPreference(first) before") await withTaskExecutorPreference(firstExecutor) { + p("withTaskExecutorPreference(first) inside") dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) + p("withTaskExecutorPreference(first) - withTaskExecutorPreference(second) before") await withTaskExecutorPreference(secondExecutor) { + p("withTaskExecutorPreference(first) - withTaskExecutorPreference(second) inside") dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) dispatchPrecondition(condition: .onQueue(secondExecutor.queue)) + p("withTaskExecutorPreference(first) - withTaskExecutorPreference(second) - withTaskExecutorPreference(first) before") await withTaskExecutorPreference(firstExecutor) { + p("withTaskExecutorPreference(first) - withTaskExecutorPreference(second) - withTaskExecutorPreference(first) inside") // the block immediately hops to the expected executor dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) - print("OK: withTaskExecutor { withTaskExecutor withTaskExecutor { { ... } } }") + p("OK: withTaskExecutor { withTaskExecutor withTaskExecutor { { ... } } }") await nonisolatedAsyncMethod(expectedOn: firstExecutor) + p("withTaskExecutorPreference(first) - withTaskExecutorPreference(second) - withTaskExecutorPreference(first) leave ...") } + p("withTaskExecutorPreference(first) - withTaskExecutorPreference(second) - withTaskExecutorPreference(first) after") + p("OK: withTaskExecutor { withTaskExecutor { ... } }") dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .onQueue(secondExecutor.queue)) + p("withTaskExecutorPreference(first) - withTaskExecutorPreference(second) leave ...") } + p("withTaskExecutorPreference(first) - withTaskExecutorPreference(second) after") dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) + p("withTaskExecutorPreference(first) leave ...") } + p("withTaskExecutorPreference(first) after") + p("Overhang!") dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) + p("OK ...") } func testNestingWithExecutorNonisolated(_ firstExecutor: MyTaskExecutor, @@ -86,7 +108,7 @@ func testNestingWithExecutorNonisolated(_ firstExecutor: MyTaskExecutor, await withTaskExecutorPreference(firstExecutor) { // the block immediately hops to the expected executor dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) - print("OK: withTaskExecutor body") + p("OK: withTaskExecutor body") await nonisolatedAsyncMethod(expectedOn: firstExecutor) } @@ -95,7 +117,7 @@ func testNestingWithExecutorNonisolated(_ firstExecutor: MyTaskExecutor, // the block immediately hops to the expected executor dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) dispatchPrecondition(condition: .onQueue(secondExecutor.queue)) - print("OK: withTaskExecutor { withTaskExecutor { ... } }") + p("OK: withTaskExecutor { withTaskExecutor { ... } }") await nonisolatedAsyncMethod(expectedOn: secondExecutor) } } @@ -110,39 +132,57 @@ func testNestingWithExecutorNonisolated(_ firstExecutor: MyTaskExecutor, // the block immediately hops to the expected executor dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) - print("OK: withTaskExecutor { withTaskExecutor withTaskExecutor { { ... } } }") + p("OK: withTaskExecutor { withTaskExecutor withTaskExecutor { { ... } } }") await nonisolatedAsyncMethod(expectedOn: firstExecutor) } // on first + p("OK ...") dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .onQueue(secondExecutor.queue)) + p("OK ...") } // on second + p("OK ...") dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) + p("OK ...") } // on first + p("Disabled checks, we're overhanging now ...") dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) } func testDisablingTaskExecutorPreference(_ firstExecutor: MyTaskExecutor, _ secondExecutor: MyTaskExecutor) async { + // FIXME: overhang!!!! + p("Overhang...!!!") dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) await withTaskExecutorPreference(firstExecutor) { + p("OK ...") dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) await withTaskExecutorPreference(globalConcurrentExecutor) { + p("OK ...") dispatchPrecondition(condition: .notOnQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) - print("OK: withTaskExecutorPreference(globalConcurrentExecutor) { ... }") + p("OK: withTaskExecutorPreference(globalConcurrentExecutor) { ... }") } // on second await withTaskExecutorPreference(nil) { // no specific preference == okey to inherit + p("OK ...") dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) - print("OK: withTaskExecutorPreference(nil) { ... }") + p("OK: withTaskExecutorPreference(nil) { ... }") } // on second + p("OK ...") dispatchPrecondition(condition: .onQueue(firstExecutor.queue)) + p("OK ...") dispatchPrecondition(condition: .notOnQueue(secondExecutor.queue)) } // on first } @@ -165,8 +205,9 @@ func testGetCurrentTaskExecutor(_ firstExecutor: MyTaskExecutor, fatalError("Expected to have task executor") } // Test that we can compare UnownedExecutors: + p("OK ...") precondition(currentTaskExecutor == firstExecutor.asUnownedTaskExecutor()) - print("OK: currentTaskExecutor == firstExecutor.asUnownedTaskExecutor()") + p("OK: currentTaskExecutor == firstExecutor.asUnownedTaskExecutor()") } } } diff --git a/test/Concurrency/Runtime/custom_task_executor_fast_path.swift b/test/Concurrency/Runtime/custom_task_executor_fast_path.swift index 8e13b8f95e42b..d6362a53ef385 100644 --- a/test/Concurrency/Runtime/custom_task_executor_fast_path.swift +++ b/test/Concurrency/Runtime/custom_task_executor_fast_path.swift @@ -96,7 +96,7 @@ final class SimpleTaskExecutor: TaskExecutor, @unchecked Sendable { await withTaskGroup { group in for _ in 0..<3 { group.addTask() { - for _ in 0..<100 { + for _ in 0..<20 { await withUnsafeContinuation { cont in cont.resume() } @@ -108,7 +108,7 @@ final class SimpleTaskExecutor: TaskExecutor, @unchecked Sendable { } } -// If we're on the fast path, we'll only enqueue four times (once per group) +// If we're on the fast path, we'll only enqueue four times (once per group task) // CHECK: Enqueued // CHECK: Enqueued diff --git a/test/Concurrency/Runtime/must_not_hop_SerialAndTaskExecutor_async_func.swift b/test/Concurrency/Runtime/must_not_hop_SerialAndTaskExecutor_async_func.swift new file mode 100644 index 0000000000000..a83926277bec4 --- /dev/null +++ b/test/Concurrency/Runtime/must_not_hop_SerialAndTaskExecutor_async_func.swift @@ -0,0 +1,238 @@ +// RUN: %target-run-simple-swift( -target %target-swift-5.1-abi-triple %import-libdispatch) | %FileCheck %s --dump-input=always + +// REQUIRES: concurrency +// REQUIRES: executable_test + +// REQUIRES: concurrency_runtime +// UNSUPPORTED: back_deployment_runtime +// UNSUPPORTED: freestanding +// REQUIRES: libdispatch +// REQUIRES: synchronization + +import Synchronization +import Dispatch + +if #available(SwiftStdlib 6.3, *) { + await Task.detached { + print("\n\n\n=====================================================================================") + print("=== MyActorSerial.call() async -- sync closure") + print("------------------------------------------------------------------------------------") + // CHECK: === MyActorSerial.call() async -- sync closure + // CHECK: [executor][MySerialExecutor] Enqueue (1) + // CHECK: [MyActorSerial] inside actor; before f() + + // This enqueue is bad + // CHECK-NOT: [executor][MySerialExecutor] Enqueue (2) + + // CHECK: [f] inside async func (isolation = Optional(main.MyActorSerial)) + // CHECK: [MyActorSerial] inside f - inside closure + // CHECK: [MyActorSerial] inside actor; after f() + // No more enqueues; + // CHECK-NOT: [executor][MySerialExecutor] Enqueue + let actorSerial = MyActorSerial() + _ = await actorSerial.call(x: 10) + dispatchPrecondition(condition: .notOnQueue(actorSerial.executor.queue)) + + + // print("\n\n\n=====================================================================================") + // print("=== MyActorSerialAndTask.call() async -- sync closure") + // print("------------------------------------------------------------------------------------") + // let actorSerialAndTaskExecutor = MyActorSerialAndTask() + // _ = await actorSerialAndTaskExecutor.call(x: 10) + + // print("\n\n\n=====================================================================================") + // print("=====================================================================================") + // print("=====================================================================================") + + // print("==== call() async -- sync closure") + // print("------------------------------------------------------------------------------------") + // _ = await call(x: 10) + + print("\n\n\n=====================================================================================") + print("==== call() async - sync closure -- withTaskExecutor") + print("Before withTaskExecutorPreference") + let serialAndTaskExecutor = SerialAndTaskExecutor(name: "SerialAndTaskExecutor") + await withTaskExecutorPreference(serialAndTaskExecutor) { + print("Inside withTaskExecutorPreference") + _ = await call(x: 10) + } + print("After withTaskExecutorPreference") + dispatchPrecondition(condition: .notOnQueue(serialAndTaskExecutor.queue)) + // CHECK: ==== call() async - sync closure -- withTaskExecutor + // CHECK: Before withTaskExecutorPreference + // CHECK: [executor][SerialAndTaskExecutor] Enqueue (1) + // CHECK: Inside withTaskExecutorPreference + // CHECK-NOT: [executor][SerialAndTaskExecutor] Enqueue + // CHECK: [func call() async] inside 'call' async func; before f() + // CHECK-NOT: [executor][SerialAndTaskExecutor] Enqueue + // CHECK: [f] inside async func (isolation = nil) + // CHECK: [func call() async] --- inside 'f' - inside closure + // CHECK-NOT: [executor][SerialAndTaskExecutor] Enqueue + // CHECK: [func call() async] inside 'call' async func; after f() + // CHECK-NOT: [executor][SerialAndTaskExecutor] Enqueue + // CHECK: After withTaskExecutorPreference + + // print("\n\n\n=====================================================================================") + // print("==== foo() async - sync closure -- withTaskExecutor ") + // print("Before withTaskExecutorPreference") + // await withTaskExecutorPreference(SerialAndTaskExecutor(name: "SerialAndTaskExecutor")) { + // print("Inside withTaskExecutorPreference") + // _ = await call(x: 10) + // } + // print("After withTaskExecutorPreference") + + // print("\n\n\n=====================================================================================") + // print("==== foo() async - sync closure -- withTaskExecutor ") + // print("Before withTaskExecutorPreference") + // await withTaskExecutorPreference(SerialAndTaskExecutor(name: "SerialAndTaskExecutor")) { + // print("Inside withTaskExecutorPreference") + // _ = await call(x: 10) + // } + // print("After withTaskExecutorPreference") + + print("\n\n\n=====================================================================================") + print("=====================================================================================") + print("=====================================================================================") + + // print("==== defaultActor.call() async -- sync closure") + // print("------------------------------------------------------------------------------------") + // await MyDefaultActor().call(x: 10) + + // print("\n\n\n=====================================================================================") + // print("==== defaultActor.call() async - sync closure -- withTaskExecutor ") + // print("Before withTaskExecutorPreference") + // await withTaskExecutorPreference(SerialAndTaskExecutor(name: "SerialAndTaskExecutor")) { + // print("Inside withTaskExecutorPreference") + // await MyDefaultActor().call(x: 10) + // } + print("After withTaskExecutorPreference") + }.value +} + +@available(SwiftStdlib 6.3, *) +func call(x: Int) async -> Int { + print("[func call() async] inside 'call' async func; before f()") + let x2 = await f() { + print("[func call() async] --- inside 'f' - inside closure") + return x + 1 + } + print("[func call() async] inside 'call' async func; after f()") + return x2 +} + + + +// NOTES: But that's the absolute happy case: + +@available(SwiftStdlib 6.3, *) +func f(_ iso: isolated (any Actor)? = #isolation, closure: () -> Int) async -> Int { + print("[f] inside async func (isolation = \(iso))") + return closure() +} + +@available(SwiftStdlib 6.3, *) +actor MyDefaultActor { + func call(x: Int) async -> Int { + print("[\(Self.self)] inside actor; before f()") + let x2 = await f() { + print("[\(Self.self)] inside f - inside closure") + self.preconditionIsolated("Must be on the actor.") + return x + 1 + } + print("[\(Self.self)] inside actor; after f()") + return x2 + } +} + +@available(SwiftStdlib 6.3, *) +actor MyActorSerial { + + let executor = JustSerialExecutor(name: "MySerialExecutor") + + nonisolated var unownedExecutor: UnownedSerialExecutor { + executor.asUnownedSerialExecutor() + } + + func call(x: Int) async -> Int { + print("[\(Self.self)] inside actor; before f()") + let x2 = await f() { + print("[\(Self.self)] inside f - inside closure") + return x + 1 + } + print("[\(Self.self)] inside actor; after f()") + return x2 + } +} + +@available(SwiftStdlib 6.3, *) +actor MyActorSerialAndTask { + + let executor: SerialAndTaskExecutor + + nonisolated var unownedExecutor: UnownedSerialExecutor { + executor.asUnownedSerialExecutor() + } + + init() { + self.executor = SerialAndTaskExecutor(name: "SerialAndTaskExecutor") + } + + func call(x: Int) async -> Int { + print("[\(Self.self)] inside actor; before f()") + let x2 = await f() { + print("[\(Self.self)] inside f - inside closure") + return x + 1 + } + print("[\(Self.self)] inside actor; after f()") + return x2 + } +} + +@available(SwiftStdlib 6.3, *) +final class JustSerialExecutor: SerialExecutor { + let enqueueCount: Atomic + let queue: DispatchSerialQueue + let name: String + + init(name: String) { + self.enqueueCount = .init(0) + self.queue = DispatchSerialQueue(label: name) + self.name = name + } + + public func enqueue(_ job: consuming ExecutorJob) { + let job = UnownedJob(job) + self.queue.async { + let newEnqueueValue = self.enqueueCount.add(1, ordering: .relaxed).newValue + print("[executor][\(self.name)] Enqueue (\(newEnqueueValue))") + job.runSynchronously(on: self.asUnownedSerialExecutor()) + } + } +} + +@available(SwiftStdlib 6.3, *) +final class SerialAndTaskExecutor: TaskExecutor, SerialExecutor { + let enqueueCount: Atomic + let queue: DispatchSerialQueue + let name: String + + init(name: String) { + self.enqueueCount = .init(0) + self.queue = DispatchSerialQueue(label: name) + self.name = name + } + + public func enqueue(_ job: consuming ExecutorJob) { + let job = UnownedJob(job) + self.queue.async { + let newEnqueueValue = self.enqueueCount.add(1, ordering: .relaxed).newValue + print("[executor][\(self.name)] Enqueue (\(newEnqueueValue))") + job.runSynchronously( + isolatedTo: self.asUnownedSerialExecutor(), + taskExecutor: self.asUnownedTaskExecutor()) + } + } + +} + +print("done") // CHECK: done \ No newline at end of file diff --git a/test/Concurrency/Runtime/must_not_hop_withContinuation.swift b/test/Concurrency/Runtime/must_not_hop_withContinuation.swift new file mode 100644 index 0000000000000..edadc203b881c --- /dev/null +++ b/test/Concurrency/Runtime/must_not_hop_withContinuation.swift @@ -0,0 +1,210 @@ +// RUN: %target-run-simple-swift( -target %target-swift-5.1-abi-triple %import-libdispatch) | %FileCheck %s --dump-input=always +// REQUIRES: concurrency +// REQUIRES: executable_test + +// REQUIRES: concurrency_runtime +// UNSUPPORTED: back_deployment_runtime +// UNSUPPORTED: freestanding +// REQUIRES: libdispatch +// REQUIRES: synchronization + +import Synchronization +import Dispatch + + +if #available(SwiftStdlib 6.0, *) { + print("=== foo() async") + print("---------------------------------------") + await foo() +} + +// CHECK: === foo() async +// CHECK-NEXT: --------------------------------------- + +// We hop to the task executor: +// CHECK: foo - withTaskExecutorPreference before +// CHECK: [executor][task-executor] Enqueue (1) +// CHECK: foo - withTaskExecutorPreference + +// CHECK: foo - withTaskExecutorPreference - withCheckedContinuation before +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: foo - withTaskExecutorPreference - withCheckedContinuation +// CHECK: foo - withTaskExecutorPreference - withCheckedContinuation done + +// CHECK: foo - withTaskExecutorPreference - withUnsafeContinuation before +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: foo - withTaskExecutorPreference - withUnsafeContinuation +// CHECK: foo - withTaskExecutorPreference - withUnsafeContinuation done + +// CHECK: foo - withTaskExecutorPreference - withCheckedThrowingContinuation before +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: foo - withTaskExecutorPreference - withCheckedThrowingContinuation +// CHECK: foo - withTaskExecutorPreference - withCheckedThrowingContinuation done + +// CHECK: foo - withTaskExecutorPreference - withUnsafeThrowingContinuation before +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: foo - withTaskExecutorPreference - withUnsafeThrowingContinuation done + +// By checking that this is the second enqueue here, +// we check that there was no stray enqueues between with... invocations: +// NOT: [executor][task-executor] Enqueue (2) + +// CHECK: foo - withTaskExecutorPreference done + +// CHECK-NEXT: == Make: actor Foo +// CHECK-NEXT: --------------------------------------- +// CHECK: [executor][actor-executor] Enqueue (1) +// CHECK: actor.foo + +// CHECK: actor.foo - withCheckedContinuation before +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: actor.foo - withCheckedContinuation +// CHECK: actor.foo - withCheckedContinuation done + +// CHECK: actor.foo - withUnsafeContinuation before +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: actor.foo - withUnsafeContinuation +// CHECK: actor.foo - withUnsafeContinuation done + +// CHECK: actor.foo - withCheckedThrowingContinuation before +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: actor.foo - withCheckedThrowingContinuation +// CHECK: actor.foo - withCheckedThrowingContinuation done + +// CHECK: actor.foo - withUnsafeThrowingContinuation before +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: actor.foo - withUnsafeThrowingContinuation +// CHECK: actor.foo - withUnsafeThrowingContinuation done + +// CHECK-NOT: [executor][task-executor] Enqueue +// CHECK: actor.foo done + +// No more enqueues are expected afterwards +// CHECK-NOT: [executor] + +@available(SwiftStdlib 6.0, *) +@concurrent +func foo() async { + print("foo - withTaskExecutorPreference before") + await withTaskExecutorPreference(AssertExactEnqueueCountExecutor(name: "task-executor")) { + print("foo - withTaskExecutorPreference") + + // --- + + print("foo - withTaskExecutorPreference - withCheckedContinuation before") + await withCheckedContinuation { cont in + print("foo - withTaskExecutorPreference - withCheckedContinuation") + cont.resume() + } + print("foo - withTaskExecutorPreference - withCheckedContinuation done") + + // --- + + print("foo - withTaskExecutorPreference - withUnsafeContinuation before") + await withUnsafeContinuation { cont in + print("foo - withTaskExecutorPreference - withUnsafeContinuation") + cont.resume() + } + print("foo - withTaskExecutorPreference - withUnsafeContinuation done") + + // --- + + print("foo - withTaskExecutorPreference - withCheckedThrowingContinuation before") + try! await withCheckedThrowingContinuation { cont in + print("foo - withTaskExecutorPreference - withCheckedThrowingContinuation") + cont.resume() + } + print("foo - withTaskExecutorPreference - withCheckedThrowingContinuation done") + + // --- + + print("foo - withTaskExecutorPreference - withUnsafeThrowingContinuation before") + try! await withUnsafeThrowingContinuation { cont in + print("foo - withTaskExecutorPreference - withUnsafeThrowingContinuation") + cont.resume() + } + print("foo - withTaskExecutorPreference - withUnsafeThrowingContinuation done") + } + print("foo - withTaskExecutorPreference done") + + print("== Make: actor Foo") + print("---------------------------------------") + await Foo().foo() +} + +@available(SwiftStdlib 6.0, *) +actor Foo { + let exec = AssertExactEnqueueCountExecutor(name: "actor-executor") + + nonisolated var unownedExecutor: UnownedSerialExecutor { + self.exec.asUnownedSerialExecutor() + } + + func foo() async { + print("actor.foo") + + print("actor.foo - withCheckedContinuation before") + await withCheckedContinuation { cont in + print("actor.foo - withCheckedContinuation") + cont.resume() + } + print("actor.foo - withCheckedContinuation done") + + // --- + + print("actor.foo - withUnsafeContinuation before") + await withUnsafeContinuation { cont in + print("actor.foo - withUnsafeContinuation") + cont.resume() + } + print("actor.foo - withUnsafeContinuation done") + + // --- + + print("actor.foo - withCheckedThrowingContinuation before") + try! await withCheckedThrowingContinuation { cont in + print("actor.foo - withCheckedThrowingContinuation") + cont.resume() + } + print("actor.foo - withCheckedThrowingContinuation done") + + // --- + + print("actor.foo - withUnsafeThrowingContinuation before") + try! await withUnsafeThrowingContinuation { cont in + print("actor.foo - withUnsafeThrowingContinuation") + cont.resume() + } + print("actor.foo - withUnsafeThrowingContinuation done") + + print("actor.foo done") + } +} + +@available(SwiftStdlib 6.0, *) +final class AssertExactEnqueueCountExecutor: TaskExecutor, SerialExecutor { + let enqueueCount: Atomic + let queue: DispatchSerialQueue + let name: String + + init(name: String) { + self.enqueueCount = .init(0) + self.queue = DispatchSerialQueue(label: name) + self.name = name + } + + public func enqueue(_ job: consuming ExecutorJob) { + let job = UnownedJob(job) + queue.async { + let newEnqueueValue = self.enqueueCount.add(1, ordering: .relaxed).newValue + print("[executor][\(self.name)] Enqueue (\(newEnqueueValue))") + job.runSynchronously(on: self.asUnownedSerialExecutor()) + } + } + + public func asUnownedSerialExecutor() -> UnownedSerialExecutor { + UnownedSerialExecutor(ordinary: self) + } +} + +print("done") // CHECK: done \ No newline at end of file