From 66a7b0ce4fc8a7da02384ec7ef46352333e44f4f Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Fri, 21 Feb 2025 14:25:42 +0900 Subject: [PATCH 01/11] [Concurrency] Initial steps for startSynchronously for Task --- include/swift/Runtime/Concurrency.h | 3 + .../CompatibilityOverrideConcurrency.def | 5 + stdlib/public/Concurrency/Actor.cpp | 30 +- stdlib/public/Concurrency/Executor.swift | 1 - stdlib/public/Concurrency/Task.swift | 39 ++- .../Runtime/startSynchronously.swift | 257 ++++++++++++++++++ test/abi/macOS/arm64/concurrency.swift | 3 + .../CompatibilityOverrideConcurrency.cpp | 9 + 8 files changed, 339 insertions(+), 8 deletions(-) create mode 100644 test/Concurrency/Runtime/startSynchronously.swift diff --git a/include/swift/Runtime/Concurrency.h b/include/swift/Runtime/Concurrency.h index ee4e713c592b7..40f55be311c3b 100644 --- a/include/swift/Runtime/Concurrency.h +++ b/include/swift/Runtime/Concurrency.h @@ -1025,6 +1025,9 @@ JobPriority swift_task_getCurrentThreadPriority(void); SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) void swift_task_startOnMainActor(AsyncTask* job); +SWIFT_EXPORT_FROM(swift_Concurrency) SWIFT_CC(swift) +void swift_task_startSynchronously(AsyncTask* job); + /// Donate this thread to the global executor until either the /// given condition returns true or we've run out of cooperative /// tasks to run. diff --git a/stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def b/stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def index ba7bd75d9437a..40cd713e4f629 100644 --- a/stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def +++ b/stdlib/public/CompatibilityOverride/CompatibilityOverrideConcurrency.def @@ -438,6 +438,11 @@ OVERRIDE_TASK(task_startOnMainActor, void, SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), swift::, (AsyncTask *task), (task)) +// In ACTOR since we need ExecutorTracking info +OVERRIDE_ACTOR(task_startSynchronously, void, + SWIFT_EXPORT_FROM(swift_Concurrency), SWIFT_CC(swift), + swift::, (AsyncTask *task), (task)) + #undef OVERRIDE #undef OVERRIDE_ACTOR #undef OVERRIDE_TASK diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 5c7e1219d78b5..9b5b7d8963daa 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -1683,8 +1683,8 @@ static void defaultActorDrain(DefaultActorImpl *actor) { trackingInfo.setTaskExecutor(taskExecutor); } - // This thread is now going to follow the task on this actor. It may hop off - // the actor + // This thread is now going to follow the task on this actor. + // It may hop off the actor runJobInEstablishedExecutorContext(job); // We could have come back from the job on a generic executor and not as @@ -2027,7 +2027,9 @@ static void swift_job_runImpl(Job *job, SerialExecutorRef executor) { // run jobs. Actor executors won't expect us to switch off them // during this operation. But do allow switching if the executor // is generic. - if (!executor.isGeneric()) trackingInfo.disallowSwitching(); + if (!executor.isGeneric()) { + trackingInfo.disallowSwitching(); + } auto taskExecutor = executor.isGeneric() ? TaskExecutorRef::fromTaskExecutorPreference(job) @@ -2325,6 +2327,28 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex task->flagAsAndEnqueueOnExecutor(newExecutor); } +SWIFT_CC(swift) +static void swift_task_startSynchronouslyImpl(AsyncTask* task) { + swift_retain(task); + + auto currentTracking = ExecutorTrackingInfo::current(); + if (currentTracking) { + auto currentExecutor = currentTracking->getActiveExecutor(); + AsyncTask * originalTask = _swift_task_clearCurrent(); + + swift_job_run(task, currentExecutor); + _swift_task_setCurrent(originalTask); + } else { + // FIXME: this is not correct; we'll keep reusing this thread when we should not have been. See tests with actors. + SerialExecutorRef executor = SerialExecutorRef::generic(); // _task_task_getRunSynchronouslyExecutor(); + auto originalTask = ActiveTask::swap(task); + assert(!originalTask); + + swift_job_run(task, executor); + _swift_task_setCurrent(originalTask); + } +} + #if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS namespace { /// Job that allows to use executor API to schedule a block of task-less diff --git a/stdlib/public/Concurrency/Executor.swift b/stdlib/public/Concurrency/Executor.swift index be4e46c95d983..b12239f79ccc8 100644 --- a/stdlib/public/Concurrency/Executor.swift +++ b/stdlib/public/Concurrency/Executor.swift @@ -542,7 +542,6 @@ internal final class DispatchQueueShim: @unchecked Sendable, SerialExecutor { } #endif // SWIFT_CONCURRENCY_USES_DISPATCH - @available(SwiftStdlib 6.1, *) @_silgen_name("swift_task_deinitOnExecutor") @usableFromInline diff --git a/stdlib/public/Concurrency/Task.swift b/stdlib/public/Concurrency/Task.swift index 7ba55ae85d351..896bc7d33f389 100644 --- a/stdlib/public/Concurrency/Task.swift +++ b/stdlib/public/Concurrency/Task.swift @@ -273,6 +273,8 @@ extension Task: Equatable { } } +// ==== Starting tasks synchronously ------------------------------------------- + #if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY && !SWIFT_CONCURRENCY_EMBEDDED @available(SwiftStdlib 5.9, *) extension Task where Failure == Error { @@ -282,14 +284,14 @@ extension Task where Failure == Error { @discardableResult public static func startOnMainActor( priority: TaskPriority? = nil, - @_inheritActorContext @_implicitSelfCapture _ work: __owned @Sendable @escaping @MainActor() async throws -> Success + @_inheritActorContext @_implicitSelfCapture _ operation: __owned @Sendable @escaping @MainActor() async throws -> Success ) -> Task { let flags = taskCreateFlags(priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: false, addPendingGroupTaskUnconditionally: false, isDiscardingTask: false) - let (task, _) = Builtin.createAsyncTask(flags, work) + let (task, _) = Builtin.createAsyncTask(flags, operation) _startTaskOnMainActor(task) return Task(task) } @@ -305,20 +307,46 @@ extension Task where Failure == Never { @discardableResult public static func startOnMainActor( priority: TaskPriority? = nil, - @_inheritActorContext @_implicitSelfCapture _ work: __owned @Sendable @escaping @MainActor() async -> Success + @_inheritActorContext @_implicitSelfCapture _ operation: __owned @Sendable @escaping @MainActor() async -> Success ) -> Task { let flags = taskCreateFlags(priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: false, addPendingGroupTaskUnconditionally: false, isDiscardingTask: false) - let (task, _) = Builtin.createAsyncTask(flags, work) + let (task, _) = Builtin.createAsyncTask(flags, operation) _startTaskOnMainActor(task) return Task(task) } } #endif +@available(SwiftStdlib 5.9, *) +extension Task where Failure == Never { + @available(SwiftStdlib 5.9, *) + @discardableResult + public static func startSynchronously( + name: String? = nil, + priority: TaskPriority? = nil, + @_inheritActorContext @_implicitSelfCapture operation: __owned @Sendable @escaping () async -> Success + ) -> Task { + let flags = taskCreateFlags( + priority: priority, + isChildTask: false, + copyTaskLocals: true, + inheritContext: true, + enqueueJob: false, // don't enqueue, we'll run it manually + addPendingGroupTaskUnconditionally: false, + isDiscardingTask: false + ) + + let (task, _) = Builtin.createAsyncTask(flags, operation) + _startTaskSynchronously(task) + return Task(task) + } +} + + // ==== Task Priority ---------------------------------------------------------- /// The priority of a task. @@ -1420,6 +1448,9 @@ public func _getCurrentAsyncTask() -> Builtin.NativeObject? @_silgen_name("swift_task_startOnMainActor") fileprivate func _startTaskOnMainActor(_ task: Builtin.NativeObject) +@_silgen_name("swift_task_startSynchronously") +fileprivate func _startTaskSynchronously(_ task: Builtin.NativeObject) + @available(SwiftStdlib 5.1, *) @_silgen_name("swift_task_getJobFlags") func getJobFlags(_ task: Builtin.NativeObject) -> JobFlags diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift new file mode 100644 index 0000000000000..622684bc2a9d8 --- /dev/null +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -0,0 +1,257 @@ +// RUN: %empty-directory(%t) +// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s -swift-version 6 -o %t/a.out +// RUN: %target-codesign %t/a.out +// RUN: %target-run %t/a.out | %FileCheck %s --dump-input=always + +// REQUIRES: executable_test +// REQUIRES: concurrency +// REQUIRES: concurrency_runtime +// REQUIRES: libdispatch + +// UNSUPPORTED: back_deployment_runtime +// UNSUPPORTED: back_deploy_concurrency +// UNSUPPORTED: use_os_stdlib +// UNSUPPORTED: freestanding + +@_spi(MainActorUtilities) import _Concurrency +import Dispatch + +enum CompareHow { + case equal + case notEqual + + func check(_ wasEqual: Bool) -> Bool { + switch self { + case .equal: wasEqual + case .notEqual: !wasEqual + } + } +} + +#if canImport(Darwin) +import Darwin +typealias ThreadID = pthread_t +func getCurrentThreadID() -> ThreadID { pthread_self() } +func compareThreadIDs(_ a: ThreadID, _ how: CompareHow, _ b: ThreadID) -> Bool { + how.check(pthread_equal(a, b) != 0) +} +#elseif canImport(Glibc) +import Glibc +typealias ThreadID = pthread_t +func getCurrentThreadID() -> ThreadID { pthread_self() } +func compareThreadIDs(_ a: ThreadID, _ how: CompareHow, _ b: ThreadID) -> Bool { + how.check(pthread_equal(a, b) != 0) +} +#elseif os(Windows) +import WinSDK +typealias ThreadID = UInt32 +func getCurrentThreadID() -> ThreadID { GetCurrentThreadId() } +func compareThreadIDs(_ a: ThreadID, _ how: CompareHow, _ b: ThreadID) -> Bool { + how.check(a == b) +} +#elseif os(WASI) +typealias ThreadID = UInt32 +func getCurrentThreadID() -> ThreadID { 0 } +func compareThreadIDs(_ a: ThreadID, _ how: CompareHow, _ b: ThreadID) -> Bool { + how.check(a == b) +} +#endif +extension ThreadID: @unchecked Sendable {} + +@globalActor +actor MyGlobalActor { + static let shared: MyGlobalActor = MyGlobalActor() +} + +func syncOnMyGlobalActor() -> [Task] { + MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here") + print("Confirmed to be on @MyGlobalActor") + + // This task must be guaranteed to happen AFTER 'tt' because we are already on this actor + // so this enqueue must happen after we give up the actor. + print("schedule Task { @MyGlobalActor }, before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") + let t1 = Task { @MyGlobalActor in + print("inside Task { @MyGlobalActor }, after sleep [thread:\(getCurrentThreadID())] @ :\(#line)") + } + + print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") + let outerTID = getCurrentThreadID() + let tt = Task.startSynchronously { @MyGlobalActor in + let innerTID = getCurrentThreadID() + print("inside startSynchronously, outer thread = \(outerTID)") + print("inside startSynchronously, inner thread = \(innerTID)") + if (compareThreadIDs(outerTID, .notEqual, innerTID)) { + print("ERROR! Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!") + } + + print("inside startSynchronously, sleep now [thread:\(getCurrentThreadID())] @ :\(#line)") + try! await Task.sleep(for: .seconds(1)) + print("after sleep, inside startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") + } + + return [t1, tt] +} + +func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) { + let sem1 = DispatchSemaphore(value: 0) + let sem2 = DispatchSemaphore(value: 0) + let queue = DispatchQueue(label: "CustomQueue") + + queue.async { + // This is in order so we don't have a "current task" nor any "current executor" + print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") + + let outerTID = getCurrentThreadID() + let tt = Task.startSynchronously { @MyGlobalActor in + let innerTID = getCurrentThreadID() + if compareThreadIDs(outerTID, .notEqual, innerTID) { + print("inside startSynchronously, outer thread = \(outerTID)") + print("inside startSynchronously, inner thread = \(innerTID)") + print("ERROR! Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!") + } + print("inside startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") + + switch behavior { + case .suspend: + // sleep until woken up by outer task; i.e. actually suspend + print("inside startSynchronously, before sleep [thread:\(getCurrentThreadID())] @ :\(#line)") + _ = try? await Task.sleep(for: .seconds(10)) + print("inside startSynchronously, after sleep [thread:\(getCurrentThreadID())] @ :\(#line)") + case .dontSuspend: + print("inside startSynchronously, done [thread:\(getCurrentThreadID())] @ :\(#line)") + () + } + sem1.signal() + } + print("after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:\(getCurrentThreadID())] @ :\(#line)") + tt.cancel() // wake up the sleep + + sem1.wait() + sem2.signal() + } + + sem2.wait() +} + +enum SynchronousTaskBehavior { + case suspend + case dontSuspend +} + +print("==== ------------------------------------------------------------------") +print("syncOnMyGlobalActor()") + +await Task { @MyGlobalActor in + MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here") + for t in syncOnMyGlobalActor() { + await t.value + } +}.value + +// CHECK-LABEL: syncOnMyGlobalActor() +// CHECK: Confirmed to be on @MyGlobalActor +// CHECK: schedule Task { @MyGlobalActor }, before startSynchronously [thread:[[CALLING_THREAD:0x.*]]] +// CHECK: before startSynchronously [thread:[[CALLING_THREAD]]] +// CHECK: inside startSynchronously, sleep now +// CHECK: inside Task { @MyGlobalActor }, after sleep +// resume on some other thread +// CHECK: after sleep, inside startSynchronously + +print("==== ------------------------------------------------------------------") +var behavior: SynchronousTaskBehavior = .suspend +print("syncOnNonTaskThread(synchronousTask: \(behavior))") +syncOnNonTaskThread(synchronousTask: behavior) + +// CHECK-LABEL: syncOnNonTaskThread(synchronousTask: suspend) +// CHECK-NEXT: before startSynchronously [thread:[[CALLING_THREAD2:0x.*]]] +// CHECK: inside startSynchronously [thread:[[CALLING_THREAD2]]] +// CHECK-NEXT: inside startSynchronously, before sleep [thread:[[CALLING_THREAD2]]] +// CHECK-NEXT: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD2]]] +// CHECK-NEXT: inside startSynchronously, after sleep + +print("==== ------------------------------------------------------------------") +behavior = .dontSuspend +print("syncOnNonTaskThread(synchronousTask: \(behavior))") +syncOnNonTaskThread(synchronousTask: behavior) + +// CHECK-LABEL: syncOnNonTaskThread(synchronousTask: dontSuspend) +// CHECK-NEXT: before startSynchronously [thread:[[CALLING_THREAD3:0x.*]]] +// CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD3]]] +// CHECK-NEXT: inside startSynchronously, done [thread:[[CALLING_THREAD3]]] +// CHECK-NEXT: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD3]]] + +print("==== ------------------------------------------------------------------") +print("callActorFromStartSynchronousTask()") +callActorFromStartSynchronousTask() + +// FIXME: this continues using our task on the actor + +actor Recipient { + func sync(syncTaskThreadID: ThreadID) { + self.preconditionIsolated() + if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { + print("ERROR! Sync start thread id = \(syncTaskThreadID)") + print("ERROR! Current actor thread id = \(getCurrentThreadID())") + print("ERROR! Actor must not run on the synchronous task's thread") + } + } + + func async(syncTaskThreadID: ThreadID) async { + self.preconditionIsolated() + if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { + print("ERROR! Sync start thread id = \(syncTaskThreadID)") + print("ERROR! Current actor thread id = \(getCurrentThreadID())") + print("ERROR! Actor must not run on the synchronous task's thread") + } + + await Task { + self.preconditionIsolated() + }.value + } +} + +func callActorFromStartSynchronousTask() { + let sem1 = DispatchSemaphore(value: 0) + let sem2 = DispatchSemaphore(value: 0) + let queue = DispatchQueue(label: "CustomQueue") + + queue.async { + let outerTID = getCurrentThreadID() + let tt = Task.startSynchronously { + let innerTID = getCurrentThreadID() + precondition(compareThreadIDs(outerTID, .equal, innerTID), "Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!") + print("inside startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") + + let rec = Recipient() + + print("inside startSynchronously, call rec.sync() [thread:\(getCurrentThreadID())] @ :\(#line)") + await rec.sync(syncTaskThreadID: innerTID) + print("inside startSynchronously, call rec.sync() done [thread:\(getCurrentThreadID())] @ :\(#line)") + + // after suspension we are supposed to hop off to the global pool, + // thus the thread IDs cannot be the same anymore + if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { + print("ERROR! Inner thread id = \(innerTID)") + print("ERROR! Current thread id = \(getCurrentThreadID())") + print("ERROR! Task resumed on same thread as it entered the synchronous task!") + } + + print("inside startSynchronously, call rec.async() [thread:\(getCurrentThreadID())] @ :\(#line)") + await rec.async(syncTaskThreadID: innerTID) + print("inside startSynchronously, done [thread:\(getCurrentThreadID())] @ :\(#line)") + + if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { + print("ERROR! Inner thread id = \(innerTID)") + print("ERROR! Current thread id = \(getCurrentThreadID())") + print("ERROR! Task resumed on same thread as it entered the synchronous task!") + } + print("inside startSynchronously, done [thread:\(getCurrentThreadID())] @ :\(#line)") + } + + print("after startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") + } + +// sem2.wait() +} + +try? await Task.sleep(for: .seconds(3)) \ No newline at end of file diff --git a/test/abi/macOS/arm64/concurrency.swift b/test/abi/macOS/arm64/concurrency.swift index d7198e513ac90..02bec6ef47f11 100644 --- a/test/abi/macOS/arm64/concurrency.swift +++ b/test/abi/macOS/arm64/concurrency.swift @@ -383,3 +383,6 @@ Added: _$sSct16escalatePriority_2toySct_ScPtFZ Added: _$sScT16escalatePriority_2toyScTyxq_G_ScPtFZ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lF Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lFTu + +// Task.startSynchronously +Added: _swift_task_startSynchronously \ No newline at end of file diff --git a/unittests/runtime/CompatibilityOverrideConcurrency.cpp b/unittests/runtime/CompatibilityOverrideConcurrency.cpp index e5dc8fe5e9cf6..265c96cffbe6c 100644 --- a/unittests/runtime/CompatibilityOverrideConcurrency.cpp +++ b/unittests/runtime/CompatibilityOverrideConcurrency.cpp @@ -109,6 +109,11 @@ static void swift_task_startOnMainActor_override(AsyncTask* task) { Ran = true; } +SWIFT_CC(swift) +static void swift_task_startSynchronously_override(AsyncTask* task) { + Ran = true; +} + #ifdef RUN_ASYNC_MAIN_DRAIN_QUEUE_TEST [[noreturn]] SWIFT_CC(swift) static void swift_task_asyncMainDrainQueue_override_fn( @@ -330,6 +335,10 @@ TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_startOnMainActorImpl) { swift_task_startOnMainActor(nullptr); } +TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_startSynchronously) { + swift_task_startSynchronously(nullptr); +} + TEST_F(CompatibilityOverrideConcurrencyTest, test_swift_task_isCurrentExecutorWithFlags) { swift_task_isCurrentExecutorWithFlags( From d6157fda83190c0ef0a254d4614b4fd93fbedf58 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Tue, 25 Feb 2025 23:35:41 +0900 Subject: [PATCH 02/11] [Concurrency] Rename to _startSynchronously while in development --- stdlib/public/Concurrency/Task.swift | 2 +- test/Concurrency/Runtime/startSynchronously.swift | 6 +++--- test/abi/macOS/arm64/concurrency.swift | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/stdlib/public/Concurrency/Task.swift b/stdlib/public/Concurrency/Task.swift index 896bc7d33f389..e09ad0bde00af 100644 --- a/stdlib/public/Concurrency/Task.swift +++ b/stdlib/public/Concurrency/Task.swift @@ -325,7 +325,7 @@ extension Task where Failure == Never { extension Task where Failure == Never { @available(SwiftStdlib 5.9, *) @discardableResult - public static func startSynchronously( + public static func _startSynchronously( name: String? = nil, priority: TaskPriority? = nil, @_inheritActorContext @_implicitSelfCapture operation: __owned @Sendable @escaping () async -> Success diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index 622684bc2a9d8..638939c7ffa6f 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -76,7 +76,7 @@ func syncOnMyGlobalActor() -> [Task] { print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") let outerTID = getCurrentThreadID() - let tt = Task.startSynchronously { @MyGlobalActor in + let tt = Task.Task._startSynchronously { @MyGlobalActor in let innerTID = getCurrentThreadID() print("inside startSynchronously, outer thread = \(outerTID)") print("inside startSynchronously, inner thread = \(innerTID)") @@ -102,7 +102,7 @@ func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) { print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") let outerTID = getCurrentThreadID() - let tt = Task.startSynchronously { @MyGlobalActor in + let tt = Task._startSynchronously { @MyGlobalActor in let innerTID = getCurrentThreadID() if compareThreadIDs(outerTID, .notEqual, innerTID) { print("inside startSynchronously, outer thread = \(outerTID)") @@ -217,7 +217,7 @@ func callActorFromStartSynchronousTask() { queue.async { let outerTID = getCurrentThreadID() - let tt = Task.startSynchronously { + let tt = Task._startSynchronously { let innerTID = getCurrentThreadID() precondition(compareThreadIDs(outerTID, .equal, innerTID), "Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!") print("inside startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") diff --git a/test/abi/macOS/arm64/concurrency.swift b/test/abi/macOS/arm64/concurrency.swift index 02bec6ef47f11..9abb3251c130c 100644 --- a/test/abi/macOS/arm64/concurrency.swift +++ b/test/abi/macOS/arm64/concurrency.swift @@ -384,5 +384,5 @@ Added: _$sScT16escalatePriority_2toyScTyxq_G_ScPtFZ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lF Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lFTu -// Task.startSynchronously +// Task._startSynchronously Added: _swift_task_startSynchronously \ No newline at end of file From b47a037880d8b8673bb1142664a75a8e76129500 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 26 Feb 2025 11:32:04 +0900 Subject: [PATCH 03/11] [Concurrency] StartSynchronously special executor to avoid switching --- include/swift/ABI/Executor.h | 14 ++- include/swift/ABI/MetadataValues.h | 7 +- stdlib/public/Concurrency/Actor.cpp | 43 ++++++-- .../Concurrency/DiscardingTaskGroup.swift | 24 ++-- .../Concurrency/Task+TaskExecutor.swift | 8 +- stdlib/public/Concurrency/Task.cpp | 3 +- stdlib/public/Concurrency/Task.swift | 34 +++--- .../Concurrency/TaskGroup+TaskExecutor.swift | 20 ++-- stdlib/public/Concurrency/TaskGroup.swift | 22 ++-- stdlib/public/Concurrency/TaskPrivate.h | 2 +- stdlib/public/Concurrency/TaskSleep.swift | 2 +- .../Concurrency/TaskSleepDuration.swift | 2 +- .../Runtime/startSynchronously.swift | 104 ++++++++++++------ test/abi/macOS/arm64/concurrency.swift | 4 +- .../Operations/DumpConcurrency.swift | 2 + 15 files changed, 197 insertions(+), 94 deletions(-) diff --git a/include/swift/ABI/Executor.h b/include/swift/ABI/Executor.h index f3f428b87e1ea..f1638887ba64b 100644 --- a/include/swift/ABI/Executor.h +++ b/include/swift/ABI/Executor.h @@ -77,6 +77,8 @@ class SerialExecutorRef { /// Executor that may need to participate in complex "same context" checks, /// by invoking `isSameExclusiveExecutionContext` when comparing execution contexts. ComplexEquality = 0b01, + /// + StartSynchronously = 0b10, }; static_assert(static_cast(ExecutorKind::Ordinary) == 0); @@ -101,6 +103,16 @@ class SerialExecutorRef { return SerialExecutorRef(actor, 0); } + static SerialExecutorRef forSynchronousStart() { + auto wtable = reinterpret_cast(nullptr) | + static_cast(ExecutorKind::StartSynchronously); + return SerialExecutorRef(nullptr, wtable); + } + bool isForSynchronousStart() const { + return getIdentity() == nullptr && + getExecutorKind() == ExecutorKind::StartSynchronously; + } + /// Given a pointer to a serial executor and its SerialExecutor /// conformance, return an executor reference for it. static SerialExecutorRef forOrdinary(HeapObject *identity, @@ -127,7 +139,7 @@ class SerialExecutorRef { const char* getIdentityDebugName() const { return isMainExecutor() ? " (MainActorExecutor)" - : isGeneric() ? " (GenericExecutor)" + : isGeneric() ? (isForSynchronousStart() ? " (GenericExecutor/SynchronousStart)" : " (GenericExecutor)") : ""; } diff --git a/include/swift/ABI/MetadataValues.h b/include/swift/ABI/MetadataValues.h index d514b103cd752..0c6d2023fff58 100644 --- a/include/swift/ABI/MetadataValues.h +++ b/include/swift/ABI/MetadataValues.h @@ -2668,13 +2668,13 @@ class TaskCreateFlags : public FlagSet { Task_EnqueueJob = 12, Task_AddPendingGroupTaskUnconditionally = 13, Task_IsDiscardingTask = 14, - /// The task function is consumed by calling it (@callee_owned). /// The context pointer should be treated as opaque and non-copyable; /// in particular, it should not be retained or released. /// /// Supported starting in Swift 6.1. - Task_IsTaskFunctionConsumed = 15, + Task_IsTaskFunctionConsumed = 15, + Task_IsStartSynchronouslyTask = 16, }; explicit constexpr TaskCreateFlags(size_t bits) : FlagSet(bits) {} @@ -2707,6 +2707,9 @@ class TaskCreateFlags : public FlagSet { FLAGSET_DEFINE_FLAG_ACCESSORS(Task_IsTaskFunctionConsumed, isTaskFunctionConsumed, setIsTaskFunctionConsumed) + FLAGSET_DEFINE_FLAG_ACCESSORS(Task_IsStartSynchronouslyTask, + isSynchronousStartTask, + setIsSYnchronousStartTask) }; /// Flags for schedulable jobs. diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 9b5b7d8963daa..56524841844e2 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -134,6 +134,8 @@ class ExecutorTrackingInfo { /// is `generic`. TaskExecutorRef TaskExecutor = TaskExecutorRef::undefined(); + bool StartedSynchronouslySkipSwitchOnce = false; + /// Whether this context allows switching. Some contexts do not; /// for example, we do not allow switching from swift_job_run /// unless the passed-in executor is generic. @@ -177,7 +179,7 @@ class ExecutorTrackingInfo { } bool allowsSwitching() const { - return AllowsSwitching; + return AllowsSwitching && !StartedSynchronouslySkipSwitchOnce; } /// Disallow switching in this tracking context. This should only @@ -186,6 +188,16 @@ class ExecutorTrackingInfo { AllowsSwitching = false; } + void markSynchronousStart() { + StartedSynchronouslySkipSwitchOnce = true; + } + bool isSynchronousStart() const { + return StartedSynchronouslySkipSwitchOnce; + } + void withoutStartSynchronously() { + StartedSynchronouslySkipSwitchOnce = false; + } + static ExecutorTrackingInfo *current() { return ActiveInfoInThread.get(); } @@ -2151,13 +2163,23 @@ static bool canGiveUpThreadForSwitch(ExecutorTrackingInfo *trackingInfo, assert(trackingInfo || currentExecutor.isGeneric()); // Some contexts don't allow switching at all. - if (trackingInfo && !trackingInfo->allowsSwitching()) + if (trackingInfo && !trackingInfo->allowsSwitching()) { return false; + } // We can certainly "give up" a generic executor to try to run // a task for an actor. - if (currentExecutor.isGeneric()) + if (currentExecutor.isGeneric()) { + if (trackingInfo->isSynchronousStart()) { + return false; + } + + if (currentExecutor.isForSynchronousStart()) { + return false; + } + return true; + } // If the current executor is a default actor, we know how to make // it give up its thread. @@ -2278,7 +2300,8 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex : TaskExecutorRef::undefined()); auto newTaskExecutor = task->getPreferredTaskExecutor(); SWIFT_TASK_DEBUG_LOG("Task %p trying to switch executors: executor %p%s to " - "new serial executor: %p%s; task executor: from %p%s to %p%s", + "new serial executor: %p%s; task executor: from %p%s to %p%s" + "%s", task, currentExecutor.getIdentity(), currentExecutor.getIdentityDebugName(), @@ -2287,13 +2310,15 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex currentTaskExecutor.getIdentity(), currentTaskExecutor.isDefined() ? "" : " (undefined)", newTaskExecutor.getIdentity(), - newTaskExecutor.isDefined() ? "" : " (undefined)"); + newTaskExecutor.isDefined() ? "" : " (undefined)", + trackingInfo->isSynchronousStart() ? "[synchronous start]" : "[NOT SYNC START]"); // If the current executor is compatible with running the new executor, // we can just immediately continue running with the resume function // we were passed in. if (!mustSwitchToRun(currentExecutor, newExecutor, currentTaskExecutor, newTaskExecutor)) { + SWIFT_TASK_DEBUG_LOG("Task %p run inline", task); return resumeFunction(resumeContext); // 'return' forces tail call } @@ -2305,6 +2330,7 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex // If the current executor can give up its thread, and the new executor // can take over a thread, try to do so; but don't do this if we've // been asked to yield the thread. + SWIFT_TASK_DEBUG_LOG("Task %p can give up thread?", task); if (currentTaskExecutor.isUndefined() && canGiveUpThreadForSwitch(trackingInfo, currentExecutor) && !shouldYieldThread() && @@ -2339,8 +2365,11 @@ static void swift_task_startSynchronouslyImpl(AsyncTask* task) { swift_job_run(task, currentExecutor); _swift_task_setCurrent(originalTask); } else { - // FIXME: this is not correct; we'll keep reusing this thread when we should not have been. See tests with actors. - SerialExecutorRef executor = SerialExecutorRef::generic(); // _task_task_getRunSynchronouslyExecutor(); + SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart(); + + ExecutorTrackingInfo trackingInfo; +// trackingInfo.markSynchronousStart(); + auto originalTask = ActiveTask::swap(task); assert(!originalTask); diff --git a/stdlib/public/Concurrency/DiscardingTaskGroup.swift b/stdlib/public/Concurrency/DiscardingTaskGroup.swift index 940757b014fda..554dfb8d06793 100644 --- a/stdlib/public/Concurrency/DiscardingTaskGroup.swift +++ b/stdlib/public/Concurrency/DiscardingTaskGroup.swift @@ -205,13 +205,15 @@ public struct DiscardingTaskGroup { let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: false, - addPendingGroupTaskUnconditionally: true, isDiscardingTask: true + addPendingGroupTaskUnconditionally: true, isDiscardingTask: true, + isSynchronousStart: false ) #else let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, - addPendingGroupTaskUnconditionally: true, isDiscardingTask: true + addPendingGroupTaskUnconditionally: true, isDiscardingTask: true, + isSynchronousStart: false ) #endif @@ -252,13 +254,15 @@ public struct DiscardingTaskGroup { let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: false, - addPendingGroupTaskUnconditionally: false, isDiscardingTask: true + addPendingGroupTaskUnconditionally: false, isDiscardingTask: true, + isSynchronousStart: false ) #else let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, - addPendingGroupTaskUnconditionally: false, isDiscardingTask: true + addPendingGroupTaskUnconditionally: false, isDiscardingTask: true, + isSynchronousStart: false ) #endif @@ -281,7 +285,8 @@ public struct DiscardingTaskGroup { let flags = taskCreateFlags( priority: nil, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, - addPendingGroupTaskUnconditionally: true, isDiscardingTask: true + addPendingGroupTaskUnconditionally: true, isDiscardingTask: true, + isSynchronousStart: false ) // Create the task in this group. @@ -317,7 +322,8 @@ public struct DiscardingTaskGroup { let flags = taskCreateFlags( priority: nil, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, - addPendingGroupTaskUnconditionally: false, isDiscardingTask: true + addPendingGroupTaskUnconditionally: false, isDiscardingTask: true, + isSynchronousStart: false ) // Create the task in this group. @@ -635,7 +641,8 @@ public struct ThrowingDiscardingTaskGroup { let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, - addPendingGroupTaskUnconditionally: true, isDiscardingTask: true + addPendingGroupTaskUnconditionally: true, isDiscardingTask: true, + isSynchronousStart: false ) // Create the task in this group. @@ -666,7 +673,8 @@ public struct ThrowingDiscardingTaskGroup { let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, - addPendingGroupTaskUnconditionally: false, isDiscardingTask: true + addPendingGroupTaskUnconditionally: false, isDiscardingTask: true, + isSynchronousStart: false ) // Create the task in this group. diff --git a/stdlib/public/Concurrency/Task+TaskExecutor.swift b/stdlib/public/Concurrency/Task+TaskExecutor.swift index f10493e13c1fe..ca01a6ac2433f 100644 --- a/stdlib/public/Concurrency/Task+TaskExecutor.swift +++ b/stdlib/public/Concurrency/Task+TaskExecutor.swift @@ -240,7 +240,7 @@ extension Task where Failure == Never { priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) #if $BuiltinCreateAsyncTaskOwnedTaskExecutor let (task, _) = Builtin.createTask( @@ -303,7 +303,7 @@ extension Task where Failure == Error { priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) #if $BuiltinCreateAsyncTaskOwnedTaskExecutor let (task, _) = Builtin.createTask( @@ -364,7 +364,7 @@ extension Task where Failure == Never { priority: priority, isChildTask: false, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) #if $BuiltinCreateAsyncTaskOwnedTaskExecutor let (task, _) = Builtin.createTask( @@ -425,7 +425,7 @@ extension Task where Failure == Error { priority: priority, isChildTask: false, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) #if $BuiltinCreateAsyncTaskOwnedTaskExecutor let (task, _) = Builtin.createTask( diff --git a/stdlib/public/Concurrency/Task.cpp b/stdlib/public/Concurrency/Task.cpp index efd730c84f5bc..c1cc3d7711d26 100644 --- a/stdlib/public/Concurrency/Task.cpp +++ b/stdlib/public/Concurrency/Task.cpp @@ -847,7 +847,8 @@ swift_task_create_commonImpl(size_t rawTaskCreateFlags, // basePriority should already be the right value } else if (taskIsUnstructured(taskCreateFlags, jobFlags)) { - SWIFT_TASK_DEBUG_LOG("Creating an unstructured task from %p", currentTask); + SWIFT_TASK_DEBUG_LOG("Creating an unstructured task from %p%s", currentTask, + taskCreateFlags.isSynchronousStartTask() ? " [start synchronously]" : ""); if (isUnspecified(basePriority)) { // Case 1: No priority specified diff --git a/stdlib/public/Concurrency/Task.swift b/stdlib/public/Concurrency/Task.swift index e09ad0bde00af..2db437e2dd14a 100644 --- a/stdlib/public/Concurrency/Task.swift +++ b/stdlib/public/Concurrency/Task.swift @@ -290,7 +290,7 @@ extension Task where Failure == Error { copyTaskLocals: true, inheritContext: true, enqueueJob: false, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) let (task, _) = Builtin.createAsyncTask(flags, operation) _startTaskOnMainActor(task) return Task(task) @@ -313,7 +313,7 @@ extension Task where Failure == Never { copyTaskLocals: true, inheritContext: true, enqueueJob: false, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) let (task, _) = Builtin.createAsyncTask(flags, operation) _startTaskOnMainActor(task) return Task(task) @@ -337,7 +337,8 @@ extension Task where Failure == Never { inheritContext: true, enqueueJob: false, // don't enqueue, we'll run it manually addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false + isDiscardingTask: false, + isSynchronousStart: true ) let (task, _) = Builtin.createAsyncTask(flags, operation) @@ -624,7 +625,8 @@ func taskCreateFlags( priority: TaskPriority?, isChildTask: Bool, copyTaskLocals: Bool, inheritContext: Bool, enqueueJob: Bool, addPendingGroupTaskUnconditionally: Bool, - isDiscardingTask: Bool + isDiscardingTask: Bool, + isSynchronousStart: Bool ) -> Int { var bits = 0 bits |= (bits & ~0xFF) | Int(priority?.rawValue ?? 0) @@ -646,6 +648,10 @@ func taskCreateFlags( if isDiscardingTask { bits |= 1 << 14 } + // 15 is used by 'IsTaskFunctionConsumed' + if isSynchronousStart { + bits |= 1 << 16 + } return bits } @@ -698,7 +704,7 @@ extension Task where Failure == Never { priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task. let builtinSerialExecutor = @@ -742,7 +748,7 @@ extension Task where Failure == Never { priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task. let (task, _) = Builtin.createAsyncTask(flags, operation) @@ -787,7 +793,7 @@ extension Task where Failure == Never { priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task. let builtinSerialExecutor = @@ -866,7 +872,7 @@ extension Task where Failure == Error { priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task future. let builtinSerialExecutor = @@ -911,7 +917,7 @@ extension Task where Failure == Error { priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task. let (task, _) = Builtin.createAsyncTask(flags, operation) @@ -956,7 +962,7 @@ self._task = task priority: priority, isChildTask: false, copyTaskLocals: true, inheritContext: true, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task. let builtinSerialExecutor = @@ -1034,7 +1040,7 @@ extension Task where Failure == Never { priority: priority, isChildTask: false, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task future. let builtinSerialExecutor = @@ -1097,7 +1103,7 @@ extension Task where Failure == Never { priority: priority, isChildTask: false, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task. let builtinSerialExecutor = @@ -1175,7 +1181,7 @@ extension Task where Failure == Error { priority: priority, isChildTask: false, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task future. let builtinSerialExecutor = @@ -1239,7 +1245,7 @@ extension Task where Failure == Error { priority: priority, isChildTask: false, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the asynchronous task future. let builtinSerialExecutor = diff --git a/stdlib/public/Concurrency/TaskGroup+TaskExecutor.swift b/stdlib/public/Concurrency/TaskGroup+TaskExecutor.swift index 531b1ee303bb0..77ac88aa44cc7 100644 --- a/stdlib/public/Concurrency/TaskGroup+TaskExecutor.swift +++ b/stdlib/public/Concurrency/TaskGroup+TaskExecutor.swift @@ -42,7 +42,7 @@ extension TaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) let builtinSerialExecutor = Builtin.extractFunctionIsolation(operation)?.unownedExecutor.executor @@ -98,7 +98,7 @@ extension TaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the task in this group with an executor preference. let builtinSerialExecutor = @@ -155,7 +155,7 @@ extension ThrowingTaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the task in this group with an executor preference. let builtinSerialExecutor = @@ -208,7 +208,7 @@ extension ThrowingTaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the task in this group with an executor preference. let builtinSerialExecutor = @@ -265,7 +265,8 @@ extension DiscardingTaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: true) + isDiscardingTask: true, + isSynchronousStart: false) // Create the task in this group with an executor preference. let builtinSerialExecutor = @@ -322,7 +323,8 @@ extension DiscardingTaskGroup { let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, - addPendingGroupTaskUnconditionally: false, isDiscardingTask: true + addPendingGroupTaskUnconditionally: false, isDiscardingTask: true, + isSynchronousStart: false ) // Create the task in this group with an executor preference. @@ -380,7 +382,8 @@ extension ThrowingDiscardingTaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: true) + isDiscardingTask: true, + isSynchronousStart: false) // Create the task in this group with an executor preference. let builtinSerialExecutor = @@ -437,7 +440,8 @@ extension ThrowingDiscardingTaskGroup { let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, - addPendingGroupTaskUnconditionally: false, isDiscardingTask: true + addPendingGroupTaskUnconditionally: false, isDiscardingTask: true, + isSynchronousStart: false ) // Create the task in this group with an executor preference. diff --git a/stdlib/public/Concurrency/TaskGroup.swift b/stdlib/public/Concurrency/TaskGroup.swift index a6fedaa3e2aae..ca2f5c7bb924e 100644 --- a/stdlib/public/Concurrency/TaskGroup.swift +++ b/stdlib/public/Concurrency/TaskGroup.swift @@ -349,14 +349,15 @@ public struct TaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: false, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: false + isDiscardingTask: false, + isSynchronousStart: false ) #else let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) #endif // Create the task in this group. @@ -393,13 +394,13 @@ public struct TaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: false, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) #else let flags = taskCreateFlags( priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) #endif // Create the task in this group. @@ -435,7 +436,7 @@ public struct TaskGroup { priority: nil, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the task in this group. let builtinSerialExecutor = @@ -476,7 +477,7 @@ public struct TaskGroup { priority: nil, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the task in this group. let builtinSerialExecutor = @@ -797,7 +798,8 @@ public struct ThrowingTaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: false + isDiscardingTask: false, + isSynchronousStart: false ) // Create the task in this group. @@ -837,7 +839,7 @@ public struct ThrowingTaskGroup { priority: priority, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the task in this group. let builtinSerialExecutor = @@ -874,7 +876,7 @@ public struct ThrowingTaskGroup { priority: nil, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: true, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the task in this group. _ = Builtin.createAsyncTaskInGroup(flags, _group, operation) @@ -913,7 +915,7 @@ public struct ThrowingTaskGroup { priority: nil, isChildTask: true, copyTaskLocals: false, inheritContext: false, enqueueJob: true, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) // Create the task in this group. _ = Builtin.createAsyncTaskInGroup(flags, _group, operation) diff --git a/stdlib/public/Concurrency/TaskPrivate.h b/stdlib/public/Concurrency/TaskPrivate.h index 361af35590ca3..05cb92bae6e43 100644 --- a/stdlib/public/Concurrency/TaskPrivate.h +++ b/stdlib/public/Concurrency/TaskPrivate.h @@ -44,7 +44,7 @@ namespace swift { #if 0 #define SWIFT_TASK_DEBUG_LOG_ENABLED 1 #define SWIFT_TASK_DEBUG_LOG(fmt, ...) \ - fprintf(stderr, "[%#lx] [%s:%d](%s) " fmt "\n", \ + fprintf(stdout, "[%#lx] [%s:%d](%s) " fmt "\n", \ (unsigned long)Thread::current().platformThreadId(), __FILE__, \ __LINE__, __FUNCTION__, __VA_ARGS__) #else diff --git a/stdlib/public/Concurrency/TaskSleep.swift b/stdlib/public/Concurrency/TaskSleep.swift index fcdd2df847d93..63094c36572c4 100644 --- a/stdlib/public/Concurrency/TaskSleep.swift +++ b/stdlib/public/Concurrency/TaskSleep.swift @@ -251,7 +251,7 @@ extension Task where Success == Never, Failure == Never { priority: nil, isChildTask: false, copyTaskLocals: false, inheritContext: false, enqueueJob: false, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) let (sleepTask, _) = Builtin.createAsyncTask(sleepTaskFlags) { onSleepWake(token) } diff --git a/stdlib/public/Concurrency/TaskSleepDuration.swift b/stdlib/public/Concurrency/TaskSleepDuration.swift index 2caa5bb12e053..648bc9554d5bf 100644 --- a/stdlib/public/Concurrency/TaskSleepDuration.swift +++ b/stdlib/public/Concurrency/TaskSleepDuration.swift @@ -49,7 +49,7 @@ extension Task where Success == Never, Failure == Never { priority: nil, isChildTask: false, copyTaskLocals: false, inheritContext: false, enqueueJob: false, addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false) + isDiscardingTask: false, isSynchronousStart: false) let (sleepTask, _) = Builtin.createAsyncTask(sleepTaskFlags) { onSleepWake(token) } diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index 638939c7ffa6f..16478e9d1e213 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -76,7 +76,7 @@ func syncOnMyGlobalActor() -> [Task] { print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") let outerTID = getCurrentThreadID() - let tt = Task.Task._startSynchronously { @MyGlobalActor in + let tt = Task._startSynchronously { @MyGlobalActor in let innerTID = getCurrentThreadID() print("inside startSynchronously, outer thread = \(outerTID)") print("inside startSynchronously, inner thread = \(innerTID)") @@ -85,7 +85,7 @@ func syncOnMyGlobalActor() -> [Task] { } print("inside startSynchronously, sleep now [thread:\(getCurrentThreadID())] @ :\(#line)") - try! await Task.sleep(for: .seconds(1)) + _ = try? await Task.sleep(for: .seconds(1)) print("after sleep, inside startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") } @@ -102,7 +102,9 @@ func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) { print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") let outerTID = getCurrentThreadID() - let tt = Task._startSynchronously { @MyGlobalActor in + let tt = Task._startSynchronously { + dispatchPrecondition(condition: .onQueue(queue)) + let innerTID = getCurrentThreadID() if compareThreadIDs(outerTID, .notEqual, innerTID) { print("inside startSynchronously, outer thread = \(outerTID)") @@ -138,7 +140,7 @@ enum SynchronousTaskBehavior { case dontSuspend } -print("==== ------------------------------------------------------------------") +print("\n\n==== ------------------------------------------------------------------") print("syncOnMyGlobalActor()") await Task { @MyGlobalActor in @@ -152,57 +154,78 @@ await Task { @MyGlobalActor in // CHECK: Confirmed to be on @MyGlobalActor // CHECK: schedule Task { @MyGlobalActor }, before startSynchronously [thread:[[CALLING_THREAD:0x.*]]] // CHECK: before startSynchronously [thread:[[CALLING_THREAD]]] +// CHECK-NOT: ERROR! // CHECK: inside startSynchronously, sleep now // CHECK: inside Task { @MyGlobalActor }, after sleep // resume on some other thread // CHECK: after sleep, inside startSynchronously -print("==== ------------------------------------------------------------------") +print("\n\n==== ------------------------------------------------------------------") var behavior: SynchronousTaskBehavior = .suspend print("syncOnNonTaskThread(synchronousTask: \(behavior))") syncOnNonTaskThread(synchronousTask: behavior) // CHECK-LABEL: syncOnNonTaskThread(synchronousTask: suspend) +// No interleaving allowed between "before" and "inside": // CHECK-NEXT: before startSynchronously [thread:[[CALLING_THREAD2:0x.*]]] -// CHECK: inside startSynchronously [thread:[[CALLING_THREAD2]]] +// CHECK-NOT: ERROR! +// CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD2]]] // CHECK-NEXT: inside startSynchronously, before sleep [thread:[[CALLING_THREAD2]]] // CHECK-NEXT: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD2]]] // CHECK-NEXT: inside startSynchronously, after sleep -print("==== ------------------------------------------------------------------") +print("\n\n==== ------------------------------------------------------------------") behavior = .dontSuspend print("syncOnNonTaskThread(synchronousTask: \(behavior))") syncOnNonTaskThread(synchronousTask: behavior) // CHECK-LABEL: syncOnNonTaskThread(synchronousTask: dontSuspend) // CHECK-NEXT: before startSynchronously [thread:[[CALLING_THREAD3:0x.*]]] +// CHECK-NOT: ERROR! // CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD3]]] -// CHECK-NEXT: inside startSynchronously, done [thread:[[CALLING_THREAD3]]] -// CHECK-NEXT: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD3]]] +// CHECK: inside startSynchronously, done [thread:[[CALLING_THREAD3]]] +// CHECK: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD3]]] -print("==== ------------------------------------------------------------------") +print("\n\n==== ------------------------------------------------------------------") print("callActorFromStartSynchronousTask()") callActorFromStartSynchronousTask() -// FIXME: this continues using our task on the actor +// CHECK: ==== ------------------------------------------------------------------ +// CHECK: callActorFromStartSynchronousTask() +// No interleaving allowed between "before" and "inside": +// CHECK: before startSynchronously [thread:[[CALLING_THREAD4:0x.*]]] +// CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD4]]] + +// It is important that as we suspend on the actor call, the 'after' startSynchronously gets to run +// CHECK-NEXT: inside startSynchronously, call rec.sync() [thread:[[CALLING_THREAD4]]] +// CHECK: after startSynchronously +// CHECK-NOT: ERROR! +// CHECK: inside startSynchronously, call rec.sync() done + +// CHECK-NOT: ERROR! +// CHECK: inside startSynchronously, call rec.async() +// CHECK-NOT: ERROR! +// CHECK: inside startSynchronously, call rec.async() done +// CHECK-NOT: ERROR! +// CHECK: inside startSynchronously, done actor Recipient { func sync(syncTaskThreadID: ThreadID) { self.preconditionIsolated() - if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { - print("ERROR! Sync start thread id = \(syncTaskThreadID)") - print("ERROR! Current actor thread id = \(getCurrentThreadID())") - print("ERROR! Actor must not run on the synchronous task's thread") - } +// if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { +// print("ERROR! Sync start thread id = \(syncTaskThreadID) \(#fileID):\(#line)") +// print("ERROR! Current actor thread id = \(getCurrentThreadID()) \(#fileID):\(#line)") +// print("ERROR! Actor must not run on the synchronous task's thread \(#fileID):\(#line)") +// } } func async(syncTaskThreadID: ThreadID) async { self.preconditionIsolated() - if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { - print("ERROR! Sync start thread id = \(syncTaskThreadID)") - print("ERROR! Current actor thread id = \(getCurrentThreadID())") - print("ERROR! Actor must not run on the synchronous task's thread") - } +// if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { +// print("ERROR! Sync start thread id = \(syncTaskThreadID) \(#fileID):\(#line)") +// print("ERROR! Current actor thread id = \(getCurrentThreadID()) \(#fileID):\(#line)") +// print("ERROR! Actor must not run on the synchronous task's thread \(#fileID):\(#line)") +// } await Task { self.preconditionIsolated() @@ -217,41 +240,52 @@ func callActorFromStartSynchronousTask() { queue.async { let outerTID = getCurrentThreadID() + print("before startSynchronously [thread:\(outerTID)] @ :\(#line)") let tt = Task._startSynchronously { + dispatchPrecondition(condition: .onQueue(queue)) + let innerTID = getCurrentThreadID() precondition(compareThreadIDs(outerTID, .equal, innerTID), "Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!") print("inside startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") let rec = Recipient() + for i in 1..<10 { + queue.async { + print("ASYNC WORK ON QUEUE") + } + } + print("inside startSynchronously, call rec.sync() [thread:\(getCurrentThreadID())] @ :\(#line)") await rec.sync(syncTaskThreadID: innerTID) print("inside startSynchronously, call rec.sync() done [thread:\(getCurrentThreadID())] @ :\(#line)") // after suspension we are supposed to hop off to the global pool, // thus the thread IDs cannot be the same anymore - if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { - print("ERROR! Inner thread id = \(innerTID)") - print("ERROR! Current thread id = \(getCurrentThreadID())") - print("ERROR! Task resumed on same thread as it entered the synchronous task!") - } + print("Inner thread id = \(innerTID)") + print("Current thread id = \(getCurrentThreadID())") +// if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { +// print("ERROR! Task resumed on same thread as it entered the synchronous task!") +// } print("inside startSynchronously, call rec.async() [thread:\(getCurrentThreadID())] @ :\(#line)") await rec.async(syncTaskThreadID: innerTID) - print("inside startSynchronously, done [thread:\(getCurrentThreadID())] @ :\(#line)") + print("inside startSynchronously, call rec.async() done [thread:\(getCurrentThreadID())] @ :\(#line)") + + print("Inner thread id = \(innerTID)") + print("Current thread id = \(getCurrentThreadID())") +// if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { +// print("ERROR! Task resumed on same thread as it entered the synchronous task!") +// } - if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { - print("ERROR! Inner thread id = \(innerTID)") - print("ERROR! Current thread id = \(getCurrentThreadID())") - print("ERROR! Task resumed on same thread as it entered the synchronous task!") - } print("inside startSynchronously, done [thread:\(getCurrentThreadID())] @ :\(#line)") + sem1.signal() } print("after startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") + sem2.signal() } -// sem2.wait() + sem1.wait() + sem2.wait() } - -try? await Task.sleep(for: .seconds(3)) \ No newline at end of file diff --git a/test/abi/macOS/arm64/concurrency.swift b/test/abi/macOS/arm64/concurrency.swift index 9abb3251c130c..efda234d7464d 100644 --- a/test/abi/macOS/arm64/concurrency.swift +++ b/test/abi/macOS/arm64/concurrency.swift @@ -385,4 +385,6 @@ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolatio Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lFTu // Task._startSynchronously -Added: _swift_task_startSynchronously \ No newline at end of file +Added: _swift_task_startSynchronously +// static (extension in Swift):Swift.Task< where B == Swift.Never>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task +Added: _$sScTss5NeverORs_rlE19_startSynchronously4name8priority9operationScTyxABGSSSg_ScPSgxyYaYbcntFZ \ No newline at end of file diff --git a/tools/swift-inspect/Sources/swift-inspect/Operations/DumpConcurrency.swift b/tools/swift-inspect/Sources/swift-inspect/Operations/DumpConcurrency.swift index 99988c396d636..058912da4f7de 100644 --- a/tools/swift-inspect/Sources/swift-inspect/Operations/DumpConcurrency.swift +++ b/tools/swift-inspect/Sources/swift-inspect/Operations/DumpConcurrency.swift @@ -55,6 +55,7 @@ fileprivate class ConcurrencyDumper { var isStatusRecordLocked: Bool var isEscalated: Bool var hasIsRunning: Bool + var isSynchronousStartTask: Bool var isRunning: Bool var isEnqueued: Bool var threadPort: UInt32? @@ -275,6 +276,7 @@ fileprivate class ConcurrencyDumper { if info.isEscalated { flags.append("escalated") } if info.hasIsRunning && info.isRunning { flags.append("running") } if info.isEnqueued { flags.append("enqueued") } + if info.isSynchronousStart { flags.append("isSynchronousStartTask") } let flagsStr = flags.isEmpty ? "0" : flags.joined(separator: "|") return flagsStr From 54676a8667600db0616f8256bdd71b4c13fa54c7 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 26 Feb 2025 13:58:02 +0900 Subject: [PATCH 04/11] startSynchronously bring back more info output --- .../Runtime/startSynchronously.swift | 41 +++++++++++-------- 1 file changed, 24 insertions(+), 17 deletions(-) diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index 16478e9d1e213..8ca924a6843b6 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -212,20 +212,21 @@ callActorFromStartSynchronousTask() actor Recipient { func sync(syncTaskThreadID: ThreadID) { self.preconditionIsolated() -// if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { -// print("ERROR! Sync start thread id = \(syncTaskThreadID) \(#fileID):\(#line)") -// print("ERROR! Current actor thread id = \(getCurrentThreadID()) \(#fileID):\(#line)") -// print("ERROR! Actor must not run on the synchronous task's thread \(#fileID):\(#line)") -// } + print("\(Recipient.self)/\(#function) Current actor thread id = \(getCurrentThreadID()) @ :\(#line)") + if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { + print("NOTICE: Actor must not run on the synchronous task's thread :\(#line)") + } } func async(syncTaskThreadID: ThreadID) async { self.preconditionIsolated() -// if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { -// print("ERROR! Sync start thread id = \(syncTaskThreadID) \(#fileID):\(#line)") -// print("ERROR! Current actor thread id = \(getCurrentThreadID()) \(#fileID):\(#line)") -// print("ERROR! Actor must not run on the synchronous task's thread \(#fileID):\(#line)") -// } + // Dispatch may end up reusing the thread used to service the queue so we + // cannot truly assert exact thread identity in such tests. + // Usually this will be on a different thread by now though. + print("\(Recipient.self)/\(#function) Current actor thread id = \(getCurrentThreadID()) @ :\(#line)") + if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { + print("NOTICE: Actor must not run on the synchronous task's thread :\(#line)") + } await Task { self.preconditionIsolated() @@ -252,7 +253,7 @@ func callActorFromStartSynchronousTask() { for i in 1..<10 { queue.async { - print("ASYNC WORK ON QUEUE") + print("- async work on queue") } } @@ -264,9 +265,12 @@ func callActorFromStartSynchronousTask() { // thus the thread IDs cannot be the same anymore print("Inner thread id = \(innerTID)") print("Current thread id = \(getCurrentThreadID())") -// if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { -// print("ERROR! Task resumed on same thread as it entered the synchronous task!") -// } + // Dispatch may end up reusing the thread used to service the queue so we + // cannot truly assert exact thread identity in such tests. + // Usually this will be on a different thread by now though. + if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { + print("NOTICE: Task resumed on same thread as it entered the synchronous task!") + } print("inside startSynchronously, call rec.async() [thread:\(getCurrentThreadID())] @ :\(#line)") await rec.async(syncTaskThreadID: innerTID) @@ -274,9 +278,12 @@ func callActorFromStartSynchronousTask() { print("Inner thread id = \(innerTID)") print("Current thread id = \(getCurrentThreadID())") -// if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { -// print("ERROR! Task resumed on same thread as it entered the synchronous task!") -// } + // Dispatch may end up reusing the thread used to service the queue so we + // cannot truly assert exact thread identity in such tests. + // Usually this will be on a different thread by now though. + if compareThreadIDs(innerTID, .equal, getCurrentThreadID()) { + print("NOTICE: Task resumed on same thread as it entered the synchronous task!") + } print("inside startSynchronously, done [thread:\(getCurrentThreadID())] @ :\(#line)") sem1.signal() From 1637ed28ff952bb7a5668dd1ef4616c83295fb3a Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 26 Feb 2025 14:34:12 +0900 Subject: [PATCH 05/11] [Concurrency] startSynchronously with more custom executor tests --- stdlib/public/Concurrency/Actor.cpp | 6 +- .../Runtime/startSynchronously.swift | 129 +++++++++++++++++- 2 files changed, 123 insertions(+), 12 deletions(-) diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 56524841844e2..0233c71550af4 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -2365,14 +2365,10 @@ static void swift_task_startSynchronouslyImpl(AsyncTask* task) { swift_job_run(task, currentExecutor); _swift_task_setCurrent(originalTask); } else { - SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart(); - - ExecutorTrackingInfo trackingInfo; -// trackingInfo.markSynchronousStart(); - auto originalTask = ActiveTask::swap(task); assert(!originalTask); + SerialExecutorRef executor = SerialExecutorRef::forSynchronousStart(); swift_job_run(task, executor); _swift_task_setCurrent(originalTask); } diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index 8ca924a6843b6..78aa4857e2614 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -14,7 +14,9 @@ // UNSUPPORTED: freestanding @_spi(MainActorUtilities) import _Concurrency +#if canImport(Darwin) import Dispatch +#endif enum CompareHow { case equal @@ -63,6 +65,7 @@ actor MyGlobalActor { static let shared: MyGlobalActor = MyGlobalActor() } +// Test on all platforms func syncOnMyGlobalActor() -> [Task] { MyGlobalActor.shared.preconditionIsolated("Should be executing on the global actor here") print("Confirmed to be on @MyGlobalActor") @@ -92,6 +95,7 @@ func syncOnMyGlobalActor() -> [Task] { return [t1, tt] } +#if canImport(Darwin) // because Dispatch func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) { let sem1 = DispatchSemaphore(value: 0) let sem2 = DispatchSemaphore(value: 0) @@ -134,6 +138,7 @@ func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) { sem2.wait() } +#endif // Darwin enum SynchronousTaskBehavior { case suspend @@ -160,6 +165,7 @@ await Task { @MyGlobalActor in // resume on some other thread // CHECK: after sleep, inside startSynchronously +#if canImport(Darwin) // because Dispatch print("\n\n==== ------------------------------------------------------------------") var behavior: SynchronousTaskBehavior = .suspend print("syncOnNonTaskThread(synchronousTask: \(behavior))") @@ -173,7 +179,9 @@ syncOnNonTaskThread(synchronousTask: behavior) // CHECK-NEXT: inside startSynchronously, before sleep [thread:[[CALLING_THREAD2]]] // CHECK-NEXT: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD2]]] // CHECK-NEXT: inside startSynchronously, after sleep +#endif +#if canImport(Darwin) // because Dispatch print("\n\n==== ------------------------------------------------------------------") behavior = .dontSuspend print("syncOnNonTaskThread(synchronousTask: \(behavior))") @@ -185,12 +193,13 @@ syncOnNonTaskThread(synchronousTask: behavior) // CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD3]]] // CHECK: inside startSynchronously, done [thread:[[CALLING_THREAD3]]] // CHECK: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD3]]] +#endif +#if canImport(Darwin) // because Dispatch print("\n\n==== ------------------------------------------------------------------") print("callActorFromStartSynchronousTask()") -callActorFromStartSynchronousTask() +callActorFromStartSynchronousTask(recipient: .recipient(Recipient())) -// CHECK: ==== ------------------------------------------------------------------ // CHECK: callActorFromStartSynchronousTask() // No interleaving allowed between "before" and "inside": // CHECK: before startSynchronously [thread:[[CALLING_THREAD4:0x.*]]] @@ -206,12 +215,27 @@ callActorFromStartSynchronousTask() // CHECK: inside startSynchronously, call rec.async() // CHECK-NOT: ERROR! // CHECK: inside startSynchronously, call rec.async() done + // CHECK-NOT: ERROR! // CHECK: inside startSynchronously, done +/// Don't want to involve protocol calls to not confuse the test with additional details, +/// so we use concrete types here. +enum TargetActorToCall { + case recipient(Recipient) + case recipientOnQueue(RecipientOnQueue) +} + +protocol RecipientProtocol where Self: Actor { + func sync(syncTaskThreadID: ThreadID) async + func async(syncTaskThreadID: ThreadID) async +} + +// default actor, must not declare an 'unownedExecutor' actor Recipient { func sync(syncTaskThreadID: ThreadID) { self.preconditionIsolated() + print("\(Recipient.self)/\(#function) Current actor thread id = \(getCurrentThreadID()) @ :\(#line)") if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { print("NOTICE: Actor must not run on the synchronous task's thread :\(#line)") @@ -220,6 +244,7 @@ actor Recipient { func async(syncTaskThreadID: ThreadID) async { self.preconditionIsolated() + // Dispatch may end up reusing the thread used to service the queue so we // cannot truly assert exact thread identity in such tests. // Usually this will be on a different thread by now though. @@ -234,7 +259,7 @@ actor Recipient { } } -func callActorFromStartSynchronousTask() { +func callActorFromStartSynchronousTask(recipient rec: TargetActorToCall) { let sem1 = DispatchSemaphore(value: 0) let sem2 = DispatchSemaphore(value: 0) let queue = DispatchQueue(label: "CustomQueue") @@ -249,8 +274,6 @@ func callActorFromStartSynchronousTask() { precondition(compareThreadIDs(outerTID, .equal, innerTID), "Outer Thread ID must be equal Thread ID inside runSynchronously synchronous part!") print("inside startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") - let rec = Recipient() - for i in 1..<10 { queue.async { print("- async work on queue") @@ -258,7 +281,10 @@ func callActorFromStartSynchronousTask() { } print("inside startSynchronously, call rec.sync() [thread:\(getCurrentThreadID())] @ :\(#line)") - await rec.sync(syncTaskThreadID: innerTID) + switch rec { + case .recipient(let recipient): await recipient.sync(syncTaskThreadID: innerTID) + case .recipientOnQueue(let recipient): await recipient.sync(syncTaskThreadID: innerTID) + } print("inside startSynchronously, call rec.sync() done [thread:\(getCurrentThreadID())] @ :\(#line)") // after suspension we are supposed to hop off to the global pool, @@ -273,7 +299,10 @@ func callActorFromStartSynchronousTask() { } print("inside startSynchronously, call rec.async() [thread:\(getCurrentThreadID())] @ :\(#line)") - await rec.async(syncTaskThreadID: innerTID) + switch rec { + case .recipient(let recipient): await recipient.async(syncTaskThreadID: innerTID) + case .recipientOnQueue(let recipient): await recipient.async(syncTaskThreadID: innerTID) + } print("inside startSynchronously, call rec.async() done [thread:\(getCurrentThreadID())] @ :\(#line)") print("Inner thread id = \(innerTID)") @@ -296,3 +325,89 @@ func callActorFromStartSynchronousTask() { sem1.wait() sem2.wait() } +#endif + +#if canImport(Darwin) // because Dispatch +print("\n\n==== ------------------------------------------------------------------") +print("callActorFromStartSynchronousTask() - actor in custom executor with its own queue") +let actorQueue = DispatchSerialQueue(label: "recipient-actor-queue") +callActorFromStartSynchronousTask(recipient: .recipientOnQueue(RecipientOnQueue(queue: actorQueue))) + +// CHECK-LABEL: callActorFromStartSynchronousTask() - actor in custom executor with its own queue +// No interleaving allowed between "before" and "inside": +// CHECK: before startSynchronously [thread:[[CALLING_THREAD4:0x.*]]] +// CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD4]]] + +// As we call into an actor, we must enqueue to its custom executor; +// Make sure the enqueue happens as expected and only then do we give up the calling thread +// allowing the 'after startSynchronously' to run. +// +// CHECK-NEXT: inside startSynchronously, call rec.sync() [thread:[[CALLING_THREAD4]]] +// CHECK: NaiveQueueExecutor(recipient-actor-queue).enqueue +// CHECK: after startSynchronously +// CHECK-NOT: ERROR! +// CHECK: inside startSynchronously, call rec.sync() done + +// CHECK-NOT: ERROR! +// CHECK: inside startSynchronously, call rec.async() +// CHECK: NaiveQueueExecutor(recipient-actor-queue).enqueue +// CHECK-NOT: ERROR! +// CHECK: inside startSynchronously, call rec.async() done + +// CHECK-NOT: ERROR! +// CHECK: inside startSynchronously, done + +final class NaiveQueueExecutor: SerialExecutor { + let queue: DispatchQueue + + init(queue: DispatchQueue) { + self.queue = queue + } + + public func enqueue(_ job: consuming ExecutorJob) { + let unowned = UnownedJob(job) + print("NaiveQueueExecutor(\(self.queue.label)).enqueue... [thread:\(getCurrentThreadID())]") + queue.async { + print("NaiveQueueExecutor(\(self.queue.label)).enqueue: run [thread:\(getCurrentThreadID())]") + unowned.runSynchronously(on: self.asUnownedSerialExecutor()) + } + } +} + +actor RecipientOnQueue { + let executor: NaiveQueueExecutor + nonisolated let unownedExecutor: UnownedSerialExecutor + + init(queue: DispatchSerialQueue) { + self.executor = NaiveQueueExecutor(queue: queue) + self.unownedExecutor = executor.asUnownedSerialExecutor() + } + + func sync(syncTaskThreadID: ThreadID) { + self.preconditionIsolated() + dispatchPrecondition(condition: .onQueue(self.executor.queue)) + + print("\(Recipient.self)/\(#function) Current actor thread id = \(getCurrentThreadID()) @ :\(#line)") + if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { + print("NOTICE: Actor must not run on the synchronous task's thread :\(#line)") + } + } + + func async(syncTaskThreadID: ThreadID) async { + self.preconditionIsolated() + dispatchPrecondition(condition: .onQueue(self.executor.queue)) + + // Dispatch may end up reusing the thread used to service the queue so we + // cannot truly assert exact thread identity in such tests. + // Usually this will be on a different thread by now though. + print("\(Recipient.self)/\(#function) Current actor thread id = \(getCurrentThreadID()) @ :\(#line)") + if compareThreadIDs(syncTaskThreadID, .equal, getCurrentThreadID()) { + print("NOTICE: Actor must not run on the synchronous task's thread :\(#line)") + } + + await Task { + self.preconditionIsolated() + }.value + } +} +#endif \ No newline at end of file From 4fe774f78d39d0d4b421b65477ec2301d6770f2c Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 26 Feb 2025 14:35:48 +0900 Subject: [PATCH 06/11] add missing ABI additions to test for x86 --- test/abi/macOS/arm64/concurrency.swift | 2 +- test/abi/macOS/x86_64/concurrency.swift | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/test/abi/macOS/arm64/concurrency.swift b/test/abi/macOS/arm64/concurrency.swift index efda234d7464d..b5354ba8f3da0 100644 --- a/test/abi/macOS/arm64/concurrency.swift +++ b/test/abi/macOS/arm64/concurrency.swift @@ -387,4 +387,4 @@ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolatio // Task._startSynchronously Added: _swift_task_startSynchronously // static (extension in Swift):Swift.Task< where B == Swift.Never>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task -Added: _$sScTss5NeverORs_rlE19_startSynchronously4name8priority9operationScTyxABGSSSg_ScPSgxyYaYbcntFZ \ No newline at end of file +Added: _$sScTss5NeverORs_rlE19_startSynchronously4name8priority9operationScTyxABGSSSg_ScPSgxyYaYbcntFZ diff --git a/test/abi/macOS/x86_64/concurrency.swift b/test/abi/macOS/x86_64/concurrency.swift index 02b8852656961..b9b1f3182174c 100644 --- a/test/abi/macOS/x86_64/concurrency.swift +++ b/test/abi/macOS/x86_64/concurrency.swift @@ -383,3 +383,8 @@ Added: _$sSct16escalatePriority_2toySct_ScPtFZ Added: _$sScT16escalatePriority_2toyScTyxq_G_ScPtFZ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lF Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lFTu + +// Task._startSynchronously +Added: _swift_task_startSynchronously +// static (extension in Swift):Swift.Task< where B == Swift.Never>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task +Added: _$sScTss5NeverORs_rlE19_startSynchronously4name8priority9operationScTyxABGSSSg_ScPSgxyYaYbcntFZ From fa8b5f7aa068305bbb92ed203b2b1737249591ba Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 26 Feb 2025 17:01:07 +0900 Subject: [PATCH 07/11] [Concurrency] gyb generate _startSynchronously --- Runtimes/Core/Concurrency/CMakeLists.txt | 6 +- stdlib/public/Concurrency/CMakeLists.txt | 3 + .../Task+startSynchronously.swift.gyb | 118 ++++++++++++++++++ stdlib/public/Concurrency/Task.swift | 81 ------------ .../Runtime/startSynchronously.swift | 14 +-- test/abi/macOS/arm64/concurrency.swift | 2 + test/abi/macOS/x86_64/concurrency.swift | 2 + 7 files changed, 131 insertions(+), 95 deletions(-) create mode 100644 stdlib/public/Concurrency/Task+startSynchronously.swift.gyb diff --git a/Runtimes/Core/Concurrency/CMakeLists.txt b/Runtimes/Core/Concurrency/CMakeLists.txt index ab7af17f61f4d..bf16f8019b26e 100644 --- a/Runtimes/Core/Concurrency/CMakeLists.txt +++ b/Runtimes/Core/Concurrency/CMakeLists.txt @@ -1,5 +1,7 @@ add_subdirectory(InternalShims) +gyb_expand(Task+startSynchronously.swift.gyb Task+startSynchronously.swift) + add_library(swift_Concurrency Actor.cpp AsyncLet.cpp @@ -80,7 +82,9 @@ add_library(swift_Concurrency TaskGroup+TaskExecutor.swift TaskLocal.swift TaskSleep.swift - TaskSleepDuration.swift) + TaskSleepDuration.swift + "${CMAKE_CURRENT_BINARY_DIR}/Task+startSynchronously.swift") + include(${SwiftCore_CONCURRENCY_GLOBAL_EXECUTOR}.cmake) target_compile_definitions(swift_Concurrency PRIVATE $<$:-DSWIFT_TARGET_LIBRARY_NAME=swift_Concurrency> diff --git a/stdlib/public/Concurrency/CMakeLists.txt b/stdlib/public/Concurrency/CMakeLists.txt index a38f4230192ad..1fb17ceeb8f07 100644 --- a/stdlib/public/Concurrency/CMakeLists.txt +++ b/stdlib/public/Concurrency/CMakeLists.txt @@ -181,6 +181,9 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I ${SWIFT_RUNTIME_CONCURRENCY_EXECUTOR_SOURCES} ${SWIFT_RUNTIME_CONCURRENCY_SWIFT_SOURCES} + GYB_SOURCES + Task+startSynchronously.swift.gyb + SWIFT_MODULE_DEPENDS_ANDROID Android SWIFT_MODULE_DEPENDS_LINUX Glibc SWIFT_MODULE_DEPENDS_LINUX_STATIC Musl diff --git a/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb b/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb new file mode 100644 index 0000000000000..d7000f876fd23 --- /dev/null +++ b/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb @@ -0,0 +1,118 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2020-2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +import Swift +@_implementationOnly import SwiftConcurrencyInternalShims + +% METHOD_VARIANTS = [ +% 'THROWING', +% 'NON_THROWING', +% ] +% OPERATION_PARAM = '@_inheritActorContext @_implicitSelfCapture _ operation: __owned @Sendable @escaping @MainActor () async throws -> Success' +% for METHOD_VARIANT in METHOD_VARIANTS: + +% IS_THROWING = METHOD_VARIANT == 'THROWING' +% if IS_THROWING: +% FAILURE_TYPE = 'Error' +% else: +% FAILURE_TYPE = 'Never' +% end + +// ==== Task.startSynchronously ------------------------------------------------ + +@available(SwiftStdlib 6.2, *) +extension Task where Failure == ${FAILURE_TYPE} { + + /// Create and immediately start running a new task in the context of the calling thread/task. + /// + /// This function _starts_ the created task on the calling context. + /// The task will continue executing on the caller's context until it suspends, + /// and after suspension will resume on the adequate executor. For a nonisolated + /// operation this means running on the global concurrent pool, and on an isolated + /// operation it means the appropriate executor of that isolation context. + /// + /// As indicated by the lack of `async` on this method, this method does _not_ + /// suspend, and instead takes over the calling task's (thread's) execution in + /// a synchronous manner. + /// + /// Other than the execution semantics discussed above, the created task + /// is semantically equivalent to its basic version which can be + /// created using ``Task/init``. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the ``Task/basePriority`` of the current task (if there is one). + /// - operation: the operation to be run immediately upon entering the task. + /// - Returns: + @available(SwiftStdlib 6.2, *) + @discardableResult + public static func _startSynchronously( + priority: TaskPriority? = nil, + @_inheritActorContext @_implicitSelfCapture operation: __owned @Sendable @escaping () async -> Success + ) -> Task { + let flags = taskCreateFlags( + priority: priority, + isChildTask: false, + copyTaskLocals: true, + inheritContext: true, + enqueueJob: false, // don't enqueue, we'll run it manually + addPendingGroupTaskUnconditionally: false, + isDiscardingTask: false, + isSynchronousStart: true + ) + + let (task, _) = Builtin.createAsyncTask(flags, operation) + _startTaskSynchronously(task) + return Task(task) + } +} + +// ==== Legacy SPI ------------------------------------------------------------- +#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY && !SWIFT_CONCURRENCY_EMBEDDED +@available(SwiftStdlib 5.9, *) +extension Task where Failure == ${FAILURE_TYPE} { + + @_spi(MainActorUtilities) + @MainActor + @available(SwiftStdlib 5.9, *) + @discardableResult + public static func startOnMainActor( + priority: TaskPriority? = nil, + ${OPERATION_PARAM if IS_THROWING else OPERATION_PARAM.replace("throws", "")} + ) -> Task { + let flags = taskCreateFlags( + priority: priority, + isChildTask: false, + copyTaskLocals: true, + inheritContext: true, + enqueueJob: false, + addPendingGroupTaskUnconditionally: false, + isDiscardingTask: false, + isSynchronousStart: false + ) + + let (task, _) = Builtin.createAsyncTask(flags, operation) + _startTaskOnMainActor(task) + + return Task(task) + } +} +#endif +% end + +// Internal Runtime Functions -------------------------------------------------- + +@_silgen_name("swift_task_startOnMainActor") +fileprivate func _startTaskOnMainActor(_ task: Builtin.NativeObject) + +@_silgen_name("swift_task_startSynchronously") +fileprivate func _startTaskSynchronously(_ task: Builtin.NativeObject) \ No newline at end of file diff --git a/stdlib/public/Concurrency/Task.swift b/stdlib/public/Concurrency/Task.swift index 2db437e2dd14a..0f15991919d13 100644 --- a/stdlib/public/Concurrency/Task.swift +++ b/stdlib/public/Concurrency/Task.swift @@ -273,81 +273,6 @@ extension Task: Equatable { } } -// ==== Starting tasks synchronously ------------------------------------------- - -#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY && !SWIFT_CONCURRENCY_EMBEDDED -@available(SwiftStdlib 5.9, *) -extension Task where Failure == Error { - @_spi(MainActorUtilities) - @MainActor - @available(SwiftStdlib 5.9, *) - @discardableResult - public static func startOnMainActor( - priority: TaskPriority? = nil, - @_inheritActorContext @_implicitSelfCapture _ operation: __owned @Sendable @escaping @MainActor() async throws -> Success - ) -> Task { - let flags = taskCreateFlags(priority: priority, isChildTask: false, - copyTaskLocals: true, inheritContext: true, - enqueueJob: false, - addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false, isSynchronousStart: false) - let (task, _) = Builtin.createAsyncTask(flags, operation) - _startTaskOnMainActor(task) - return Task(task) - } -} -#endif - -#if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY && !SWIFT_CONCURRENCY_EMBEDDED -@available(SwiftStdlib 5.9, *) -extension Task where Failure == Never { - @_spi(MainActorUtilities) - @MainActor - @available(SwiftStdlib 5.9, *) - @discardableResult - public static func startOnMainActor( - priority: TaskPriority? = nil, - @_inheritActorContext @_implicitSelfCapture _ operation: __owned @Sendable @escaping @MainActor() async -> Success - ) -> Task { - let flags = taskCreateFlags(priority: priority, isChildTask: false, - copyTaskLocals: true, inheritContext: true, - enqueueJob: false, - addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false, isSynchronousStart: false) - let (task, _) = Builtin.createAsyncTask(flags, operation) - _startTaskOnMainActor(task) - return Task(task) - } -} -#endif - -@available(SwiftStdlib 5.9, *) -extension Task where Failure == Never { - @available(SwiftStdlib 5.9, *) - @discardableResult - public static func _startSynchronously( - name: String? = nil, - priority: TaskPriority? = nil, - @_inheritActorContext @_implicitSelfCapture operation: __owned @Sendable @escaping () async -> Success - ) -> Task { - let flags = taskCreateFlags( - priority: priority, - isChildTask: false, - copyTaskLocals: true, - inheritContext: true, - enqueueJob: false, // don't enqueue, we'll run it manually - addPendingGroupTaskUnconditionally: false, - isDiscardingTask: false, - isSynchronousStart: true - ) - - let (task, _) = Builtin.createAsyncTask(flags, operation) - _startTaskSynchronously(task) - return Task(task) - } -} - - // ==== Task Priority ---------------------------------------------------------- /// The priority of a task. @@ -1451,12 +1376,6 @@ extension UnsafeCurrentTask: Equatable { @_silgen_name("swift_task_getCurrent") public func _getCurrentAsyncTask() -> Builtin.NativeObject? -@_silgen_name("swift_task_startOnMainActor") -fileprivate func _startTaskOnMainActor(_ task: Builtin.NativeObject) - -@_silgen_name("swift_task_startSynchronously") -fileprivate func _startTaskSynchronously(_ task: Builtin.NativeObject) - @available(SwiftStdlib 5.1, *) @_silgen_name("swift_task_getJobFlags") func getJobFlags(_ task: Builtin.NativeObject) -> JobFlags diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index 78aa4857e2614..4015544f8bf27 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -13,10 +13,8 @@ // UNSUPPORTED: use_os_stdlib // UNSUPPORTED: freestanding -@_spi(MainActorUtilities) import _Concurrency -#if canImport(Darwin) +import _Concurrency import Dispatch -#endif enum CompareHow { case equal @@ -95,7 +93,6 @@ func syncOnMyGlobalActor() -> [Task] { return [t1, tt] } -#if canImport(Darwin) // because Dispatch func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) { let sem1 = DispatchSemaphore(value: 0) let sem2 = DispatchSemaphore(value: 0) @@ -138,7 +135,6 @@ func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) { sem2.wait() } -#endif // Darwin enum SynchronousTaskBehavior { case suspend @@ -165,7 +161,6 @@ await Task { @MyGlobalActor in // resume on some other thread // CHECK: after sleep, inside startSynchronously -#if canImport(Darwin) // because Dispatch print("\n\n==== ------------------------------------------------------------------") var behavior: SynchronousTaskBehavior = .suspend print("syncOnNonTaskThread(synchronousTask: \(behavior))") @@ -179,9 +174,7 @@ syncOnNonTaskThread(synchronousTask: behavior) // CHECK-NEXT: inside startSynchronously, before sleep [thread:[[CALLING_THREAD2]]] // CHECK-NEXT: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD2]]] // CHECK-NEXT: inside startSynchronously, after sleep -#endif -#if canImport(Darwin) // because Dispatch print("\n\n==== ------------------------------------------------------------------") behavior = .dontSuspend print("syncOnNonTaskThread(synchronousTask: \(behavior))") @@ -193,9 +186,7 @@ syncOnNonTaskThread(synchronousTask: behavior) // CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD3]]] // CHECK: inside startSynchronously, done [thread:[[CALLING_THREAD3]]] // CHECK: after startSynchronously, outside; cancel (wakeup) the synchronous task! [thread:[[CALLING_THREAD3]]] -#endif -#if canImport(Darwin) // because Dispatch print("\n\n==== ------------------------------------------------------------------") print("callActorFromStartSynchronousTask()") callActorFromStartSynchronousTask(recipient: .recipient(Recipient())) @@ -325,9 +316,7 @@ func callActorFromStartSynchronousTask(recipient rec: TargetActorToCall) { sem1.wait() sem2.wait() } -#endif -#if canImport(Darwin) // because Dispatch print("\n\n==== ------------------------------------------------------------------") print("callActorFromStartSynchronousTask() - actor in custom executor with its own queue") let actorQueue = DispatchSerialQueue(label: "recipient-actor-queue") @@ -410,4 +399,3 @@ actor RecipientOnQueue { }.value } } -#endif \ No newline at end of file diff --git a/test/abi/macOS/arm64/concurrency.swift b/test/abi/macOS/arm64/concurrency.swift index b5354ba8f3da0..9669905ad7a20 100644 --- a/test/abi/macOS/arm64/concurrency.swift +++ b/test/abi/macOS/arm64/concurrency.swift @@ -388,3 +388,5 @@ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolatio Added: _swift_task_startSynchronously // static (extension in Swift):Swift.Task< where B == Swift.Never>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task Added: _$sScTss5NeverORs_rlE19_startSynchronously4name8priority9operationScTyxABGSSSg_ScPSgxyYaYbcntFZ +// static (extension in Swift):Swift.Task< where B == Swift.Error>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task +Added: _$sScTss5Error_pRs_rlE19_startSynchronously4name8priority9operationScTyxsAA_pGSSSg_ScPSgxyYaYbcntFZ diff --git a/test/abi/macOS/x86_64/concurrency.swift b/test/abi/macOS/x86_64/concurrency.swift index b9b1f3182174c..29a72a42dd53a 100644 --- a/test/abi/macOS/x86_64/concurrency.swift +++ b/test/abi/macOS/x86_64/concurrency.swift @@ -388,3 +388,5 @@ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolatio Added: _swift_task_startSynchronously // static (extension in Swift):Swift.Task< where B == Swift.Never>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task Added: _$sScTss5NeverORs_rlE19_startSynchronously4name8priority9operationScTyxABGSSSg_ScPSgxyYaYbcntFZ +// static (extension in Swift):Swift.Task< where B == Swift.Error>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task +Added: _$sScTss5Error_pRs_rlE19_startSynchronously4name8priority9operationScTyxsAA_pGSSSg_ScPSgxyYaYbcntFZ From 56f28eecb7526e9d96eb93ea60e9e232b843e7bf Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 26 Feb 2025 17:11:30 +0900 Subject: [PATCH 08/11] [Concurrency] %import dispatch for Linux startSynchronously test --- stdlib/public/Concurrency/Actor.cpp | 2 +- stdlib/public/Concurrency/Task+startSynchronously.swift.gyb | 2 +- test/Concurrency/Runtime/startSynchronously.swift | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index 0233c71550af4..d99e01ba78ad8 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -2311,7 +2311,7 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex currentTaskExecutor.isDefined() ? "" : " (undefined)", newTaskExecutor.getIdentity(), newTaskExecutor.isDefined() ? "" : " (undefined)", - trackingInfo->isSynchronousStart() ? "[synchronous start]" : "[NOT SYNC START]"); + trackingInfo->isSynchronousStart() ? "[synchronous start]" : ""); // If the current executor is compatible with running the new executor, // we can just immediately continue running with the resume function diff --git a/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb b/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb index d7000f876fd23..2ad30dd4cfcbc 100644 --- a/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb +++ b/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb @@ -52,7 +52,7 @@ extension Task where Failure == ${FAILURE_TYPE} { /// - priority: The priority of the task. /// Pass `nil` to use the ``Task/basePriority`` of the current task (if there is one). /// - operation: the operation to be run immediately upon entering the task. - /// - Returns: + /// - Returns: A reference to the unstructured task which may be awaited on. @available(SwiftStdlib 6.2, *) @discardableResult public static func _startSynchronously( diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index 4015544f8bf27..b5dc3fb8e9638 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -1,5 +1,5 @@ // RUN: %empty-directory(%t) -// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s -swift-version 6 -o %t/a.out +// RUN: %target-build-swift -Xfrontend -disable-availability-checking %s %import-libdispatch -swift-version 6 -o %t/a.out // RUN: %target-codesign %t/a.out // RUN: %target-run %t/a.out | %FileCheck %s --dump-input=always From 022ed7a204c3ab7f937f6ffffcea6b58f9bae18c Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 26 Feb 2025 18:11:14 +0900 Subject: [PATCH 09/11] [Concurrency] Add TaskGroup.startTaskSynchronously funcs --- .../Task+startSynchronously.swift.gyb | 125 +++++++++++++++++- .../Runtime/startSynchronously.swift | 14 +- .../startSynchronouslyIsolation.swift | 40 ++++++ test/abi/macOS/arm64/concurrency.swift | 26 +++- test/abi/macOS/x86_64/concurrency.swift | 26 +++- 5 files changed, 208 insertions(+), 23 deletions(-) create mode 100644 test/Concurrency/startSynchronouslyIsolation.swift diff --git a/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb b/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb index 2ad30dd4cfcbc..102aea7da5304 100644 --- a/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb +++ b/stdlib/public/Concurrency/Task+startSynchronously.swift.gyb @@ -13,11 +13,13 @@ import Swift @_implementationOnly import SwiftConcurrencyInternalShims +// ==== Task.startSynchronously ------------------------------------------------ + % METHOD_VARIANTS = [ % 'THROWING', % 'NON_THROWING', % ] -% OPERATION_PARAM = '@_inheritActorContext @_implicitSelfCapture _ operation: __owned @Sendable @escaping @MainActor () async throws -> Success' +% OPERATION_PARAM = '@_inheritActorContext @_implicitSelfCapture _ operation: __owned @Sendable @escaping () async throws -> Success' % for METHOD_VARIANT in METHOD_VARIANTS: % IS_THROWING = METHOD_VARIANT == 'THROWING' @@ -27,8 +29,6 @@ import Swift % FAILURE_TYPE = 'Never' % end -// ==== Task.startSynchronously ------------------------------------------------ - @available(SwiftStdlib 6.2, *) extension Task where Failure == ${FAILURE_TYPE} { @@ -55,9 +55,9 @@ extension Task where Failure == ${FAILURE_TYPE} { /// - Returns: A reference to the unstructured task which may be awaited on. @available(SwiftStdlib 6.2, *) @discardableResult - public static func _startSynchronously( + public static func startSynchronously( priority: TaskPriority? = nil, - @_inheritActorContext @_implicitSelfCapture operation: __owned @Sendable @escaping () async -> Success + @_inheritActorContext @_implicitSelfCapture _ operation: __owned sending @escaping () async throws -> Success ) -> Task { let flags = taskCreateFlags( priority: priority, @@ -75,8 +75,121 @@ extension Task where Failure == ${FAILURE_TYPE} { return Task(task) } } +%end + +% OPERATION_PARAM = 'operation: sending @escaping () async throws -> ChildTaskResult' +% for (GROUP_TYPE, METHOD_NAMES) in [ +% ( +% 'TaskGroup', +% [ +% 'startTaskSynchronously', +% 'startTaskSynchronouslyUnlessCancelled' +% ] +% ), +% ( +% 'ThrowingTaskGroup', +% [ +% 'startTaskSynchronously', +% 'startTaskSynchronouslyUnlessCancelled' +% ] +% ), +% ( +% 'DiscardingTaskGroup', +% [ +% 'startTaskSynchronously', +% 'startTaskSynchronouslyUnlessCancelled' +% ] +% ), +% ( +% 'ThrowingDiscardingTaskGroup', +% [ +% 'startTaskSynchronously', +% 'startTaskSynchronouslyUnlessCancelled' +% ] +% ), +% ]: +% for METHOD_NAME in METHOD_NAMES: + +% IS_DISCARDING = 'Discarding' in GROUP_TYPE +% IS_THROWING = 'Throwing' in GROUP_TYPE + +% if not IS_THROWING: +% OPERATION_PARAM = OPERATION_PARAM.replace('throws', '') +% end + +% if IS_DISCARDING: +% OPERATION_PARAM = OPERATION_PARAM.replace('ChildTaskResult', 'Void') +% end + +% if IS_THROWING: +% FAILURE_TYPE = 'Error' +% else: +% FAILURE_TYPE = 'Never' +% end + +@available(SwiftStdlib 6.2, *) +extension ${GROUP_TYPE} { + + /// Create and immediately start running a new child task in the context of the calling thread/task. + /// + /// This function _starts_ the created task on the calling context. + /// The task will continue executing on the caller's context until it suspends, + /// and after suspension will resume on the adequate executor. For a nonisolated + /// operation this means running on the global concurrent pool, and on an isolated + /// operation it means the appropriate executor of that isolation context. + /// + /// As indicated by the lack of `async` on this method, this method does _not_ + /// suspend, and instead takes over the calling task's (thread's) execution in + /// a synchronous manner. + /// + /// Other than the execution semantics discussed above, the created task + /// is semantically equivalent to its basic version which can be + /// created using ``${GROUP_TYPE}/addTask``. + @available(SwiftStdlib 6.2, *) + public func ${METHOD_NAME}( // in ${GROUP_TYPE} + priority: TaskPriority? = nil, + ${OPERATION_PARAM} + ) { + let flags = taskCreateFlags( + priority: priority, + isChildTask: true, + copyTaskLocals: false, + inheritContext: false, + enqueueJob: false, // don't enqueue, we'll run it manually + addPendingGroupTaskUnconditionally: true, + isDiscardingTask: false, + isSynchronousStart: false + ) + + // Create the task in this group. + let (task, _) = Builtin.createTask( + flags: flags, + taskGroup: self._group, + operation: operation + ) + _startTaskSynchronously(task) + } +} +%end # METHOD_NAMES +%end # GROUP_TYPES // ==== Legacy SPI ------------------------------------------------------------- + +% METHOD_VARIANTS = [ +% 'THROWING', +% 'NON_THROWING', +% ] +% OPERATION_PARAM = '@_inheritActorContext @_implicitSelfCapture _ operation: __owned @Sendable @escaping @MainActor () async throws -> Success' +% for METHOD_VARIANT in METHOD_VARIANTS: + +% IS_THROWING = METHOD_VARIANT == 'THROWING' +% if IS_THROWING: +% FAILURE_TYPE = 'Error' +% else: +% FAILURE_TYPE = 'Never' +% OPERATION_PARAM = OPERATION_PARAM.replace('throws', '') +% end + #if !SWIFT_STDLIB_TASK_TO_THREAD_MODEL_CONCURRENCY && !SWIFT_CONCURRENCY_EMBEDDED @available(SwiftStdlib 5.9, *) extension Task where Failure == ${FAILURE_TYPE} { @@ -87,7 +200,7 @@ extension Task where Failure == ${FAILURE_TYPE} { @discardableResult public static func startOnMainActor( priority: TaskPriority? = nil, - ${OPERATION_PARAM if IS_THROWING else OPERATION_PARAM.replace("throws", "")} + ${OPERATION_PARAM} ) -> Task { let flags = taskCreateFlags( priority: priority, diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index b5dc3fb8e9638..08c1da4f27978 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -77,7 +77,7 @@ func syncOnMyGlobalActor() -> [Task] { print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") let outerTID = getCurrentThreadID() - let tt = Task._startSynchronously { @MyGlobalActor in + let tt = Task.startSynchronously { @MyGlobalActor in let innerTID = getCurrentThreadID() print("inside startSynchronously, outer thread = \(outerTID)") print("inside startSynchronously, inner thread = \(innerTID)") @@ -103,7 +103,7 @@ func syncOnNonTaskThread(synchronousTask behavior: SynchronousTaskBehavior) { print("before startSynchronously [thread:\(getCurrentThreadID())] @ :\(#line)") let outerTID = getCurrentThreadID() - let tt = Task._startSynchronously { + let tt = Task.startSynchronously { dispatchPrecondition(condition: .onQueue(queue)) let innerTID = getCurrentThreadID() @@ -258,7 +258,7 @@ func callActorFromStartSynchronousTask(recipient rec: TargetActorToCall) { queue.async { let outerTID = getCurrentThreadID() print("before startSynchronously [thread:\(outerTID)] @ :\(#line)") - let tt = Task._startSynchronously { + let tt = Task.startSynchronously { dispatchPrecondition(condition: .onQueue(queue)) let innerTID = getCurrentThreadID() @@ -332,14 +332,14 @@ callActorFromStartSynchronousTask(recipient: .recipientOnQueue(RecipientOnQueue( // allowing the 'after startSynchronously' to run. // // CHECK-NEXT: inside startSynchronously, call rec.sync() [thread:[[CALLING_THREAD4]]] -// CHECK: NaiveQueueExecutor(recipient-actor-queue).enqueue +// CHECK: NaiveQueueExecutor(recipient-actor-queue) enqueue // CHECK: after startSynchronously // CHECK-NOT: ERROR! // CHECK: inside startSynchronously, call rec.sync() done // CHECK-NOT: ERROR! // CHECK: inside startSynchronously, call rec.async() -// CHECK: NaiveQueueExecutor(recipient-actor-queue).enqueue +// CHECK: NaiveQueueExecutor(recipient-actor-queue) enqueue // CHECK-NOT: ERROR! // CHECK: inside startSynchronously, call rec.async() done @@ -355,9 +355,9 @@ final class NaiveQueueExecutor: SerialExecutor { public func enqueue(_ job: consuming ExecutorJob) { let unowned = UnownedJob(job) - print("NaiveQueueExecutor(\(self.queue.label)).enqueue... [thread:\(getCurrentThreadID())]") + print("NaiveQueueExecutor(\(self.queue.label)) enqueue... [thread:\(getCurrentThreadID())]") queue.async { - print("NaiveQueueExecutor(\(self.queue.label)).enqueue: run [thread:\(getCurrentThreadID())]") + print("NaiveQueueExecutor(\(self.queue.label)) enqueue: run [thread:\(getCurrentThreadID())]") unowned.runSynchronously(on: self.asUnownedSerialExecutor()) } } diff --git a/test/Concurrency/startSynchronouslyIsolation.swift b/test/Concurrency/startSynchronouslyIsolation.swift new file mode 100644 index 0000000000000..de5ef9f520c7f --- /dev/null +++ b/test/Concurrency/startSynchronouslyIsolation.swift @@ -0,0 +1,40 @@ +// RUN: %target-build-swift -swift-version 6 %s -strict-concurrency=complete -Xfrontend -verify + +// REQUIRES: concurrency + +@available(SwiftStdlib 6.2, *) +func sync() -> Task { + Task.startSynchronously { + return "" + } +} + +@available(SwiftStdlib 6.2, *) +func async() async throws { + let t1 = Task.startSynchronously { + return "" + } + let _: String = await t1.value + + let t2: Task = Task.startSynchronously { + throw CancellationError() + } + let _: String = try await t2.value + + await withTaskGroup(of: Int.self) { group in + group.startTaskSynchronously { 1 } + group.startTaskSynchronouslyUnlessCancelled { 2 } + } + await withThrowingTaskGroup(of: Int.self) { group in + group.startTaskSynchronously { 1 } + group.startTaskSynchronouslyUnlessCancelled { 2 } + } + await withDiscardingTaskGroup { group in + group.startTaskSynchronously { } + group.startTaskSynchronouslyUnlessCancelled { } + } + try await withThrowingDiscardingTaskGroup { group in + group.startTaskSynchronously { } + group.startTaskSynchronouslyUnlessCancelled { } + } +} \ No newline at end of file diff --git a/test/abi/macOS/arm64/concurrency.swift b/test/abi/macOS/arm64/concurrency.swift index 9669905ad7a20..07d3bd733304c 100644 --- a/test/abi/macOS/arm64/concurrency.swift +++ b/test/abi/macOS/arm64/concurrency.swift @@ -384,9 +384,25 @@ Added: _$sScT16escalatePriority_2toyScTyxq_G_ScPtFZ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lF Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lFTu -// Task._startSynchronously +// Task.DispatchSerialQueue Added: _swift_task_startSynchronously -// static (extension in Swift):Swift.Task< where B == Swift.Never>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task -Added: _$sScTss5NeverORs_rlE19_startSynchronously4name8priority9operationScTyxABGSSSg_ScPSgxyYaYbcntFZ -// static (extension in Swift):Swift.Task< where B == Swift.Error>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task -Added: _$sScTss5Error_pRs_rlE19_startSynchronously4name8priority9operationScTyxsAA_pGSSSg_ScPSgxyYaYbcntFZ +// static (extension in Swift):Swift.Task< where B == Swift.Error>.startSynchronously(priority: Swift.TaskPriority?, _: __owned () async throws -> A) -> Swift.Task +Added: _$sScTss5Error_pRs_rlE18startSynchronously8priority_ScTyxsAA_pGScPSg_xyYaKcntFZ +// static (extension in Swift):Swift.Task< where B == Swift.Never>.startSynchronously(priority: Swift.TaskPriority?, _: __owned () async throws -> A) -> Swift.Task +Added: _$sScTss5NeverORs_rlE18startSynchronously8priority_ScTyxABGScPSg_xyYaKcntFZ +// Swift.TaskGroup.startTaskSynchronously(priority: Swift.TaskPriority?, operation: __owned () async -> A) -> () +Added: _$sScG22startTaskSynchronously8priority9operationyScPSg_xyYacntF +// Swift.TaskGroup.startTaskSynchronouslyUnlessCancelled(priority: Swift.TaskPriority?, operation: __owned () async -> A) -> () +Added: _$sScG37startTaskSynchronouslyUnlessCancelled8priority9operationyScPSg_xyYacntF +// Swift.ThrowingTaskGroup.startTaskSynchronously(priority: Swift.TaskPriority?, operation: __owned () async -> A) -> () +Added: _$sScg22startTaskSynchronously8priority9operationyScPSg_xyYacntF +// Swift.ThrowingTaskGroup.startTaskSynchronouslyUnlessCancelled(priority: Swift.TaskPriority?, operation: __owned () async -> A) -> () +Added: _$sScg37startTaskSynchronouslyUnlessCancelled8priority9operationyScPSg_xyYacntF +// Swift.DiscardingTaskGroup.startTaskSynchronously(priority: Swift.TaskPriority?, operation: __owned () async -> ()) -> () +Added: _$ss19DiscardingTaskGroupV05startB13Synchronously8priority9operationyScPSg_yyYacntF +// Swift.DiscardingTaskGroup.startTaskSynchronouslyUnlessCancelled(priority: Swift.TaskPriority?, operation: __owned () async -> ()) -> () +Added: _$ss19DiscardingTaskGroupV05startB28SynchronouslyUnlessCancelled8priority9operationyScPSg_yyYacntF +// Swift.ThrowingDiscardingTaskGroup.startTaskSynchronously(priority: Swift.TaskPriority?, operation: __owned () async -> ()) -> () +Added: _$ss27ThrowingDiscardingTaskGroupV05startC13Synchronously8priority9operationyScPSg_yyYacntF +// Swift.ThrowingDiscardingTaskGroup.startTaskSynchronouslyUnlessCancelled(priority: Swift.TaskPriority?, operation: __owned () async -> ()) -> () +Added: _$ss27ThrowingDiscardingTaskGroupV05startC28SynchronouslyUnlessCancelled8priority9operationyScPSg_yyYacntF diff --git a/test/abi/macOS/x86_64/concurrency.swift b/test/abi/macOS/x86_64/concurrency.swift index 29a72a42dd53a..3ef907536635b 100644 --- a/test/abi/macOS/x86_64/concurrency.swift +++ b/test/abi/macOS/x86_64/concurrency.swift @@ -384,9 +384,25 @@ Added: _$sScT16escalatePriority_2toyScTyxq_G_ScPtFZ Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lF Added: _$ss33withTaskPriorityEscalationHandler9operation02onC9Escalated9isolationxxyYaq_YKXE_yScPYbXEScA_pSgYitYaq_YKs5ErrorR_r0_lFTu -// Task._startSynchronously +// Task.DispatchSerialQueue Added: _swift_task_startSynchronously -// static (extension in Swift):Swift.Task< where B == Swift.Never>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task -Added: _$sScTss5NeverORs_rlE19_startSynchronously4name8priority9operationScTyxABGSSSg_ScPSgxyYaYbcntFZ -// static (extension in Swift):Swift.Task< where B == Swift.Error>._startSynchronously(name: Swift.String?, priority: Swift.TaskPriority?, operation: __owned @Sendable () async -> A) -> Swift.Task -Added: _$sScTss5Error_pRs_rlE19_startSynchronously4name8priority9operationScTyxsAA_pGSSSg_ScPSgxyYaYbcntFZ +// static (extension in Swift):Swift.Task< where B == Swift.Error>.startSynchronously(priority: Swift.TaskPriority?, _: __owned () async throws -> A) -> Swift.Task +Added: _$sScTss5Error_pRs_rlE18startSynchronously8priority_ScTyxsAA_pGScPSg_xyYaKcntFZ +// static (extension in Swift):Swift.Task< where B == Swift.Never>.startSynchronously(priority: Swift.TaskPriority?, _: __owned () async throws -> A) -> Swift.Task +Added: _$sScTss5NeverORs_rlE18startSynchronously8priority_ScTyxABGScPSg_xyYaKcntFZ +// Swift.TaskGroup.startTaskSynchronously(priority: Swift.TaskPriority?, operation: __owned () async -> A) -> () +Added: _$sScG22startTaskSynchronously8priority9operationyScPSg_xyYacntF +// Swift.TaskGroup.startTaskSynchronouslyUnlessCancelled(priority: Swift.TaskPriority?, operation: __owned () async -> A) -> () +Added: _$sScG37startTaskSynchronouslyUnlessCancelled8priority9operationyScPSg_xyYacntF +// Swift.ThrowingTaskGroup.startTaskSynchronously(priority: Swift.TaskPriority?, operation: __owned () async -> A) -> () +Added: _$sScg22startTaskSynchronously8priority9operationyScPSg_xyYacntF +// Swift.ThrowingTaskGroup.startTaskSynchronouslyUnlessCancelled(priority: Swift.TaskPriority?, operation: __owned () async -> A) -> () +Added: _$sScg37startTaskSynchronouslyUnlessCancelled8priority9operationyScPSg_xyYacntF +// Swift.DiscardingTaskGroup.startTaskSynchronously(priority: Swift.TaskPriority?, operation: __owned () async -> ()) -> () +Added: _$ss19DiscardingTaskGroupV05startB13Synchronously8priority9operationyScPSg_yyYacntF +// Swift.DiscardingTaskGroup.startTaskSynchronouslyUnlessCancelled(priority: Swift.TaskPriority?, operation: __owned () async -> ()) -> () +Added: _$ss19DiscardingTaskGroupV05startB28SynchronouslyUnlessCancelled8priority9operationyScPSg_yyYacntF +// Swift.ThrowingDiscardingTaskGroup.startTaskSynchronously(priority: Swift.TaskPriority?, operation: __owned () async -> ()) -> () +Added: _$ss27ThrowingDiscardingTaskGroupV05startC13Synchronously8priority9operationyScPSg_yyYacntF +// Swift.ThrowingDiscardingTaskGroup.startTaskSynchronouslyUnlessCancelled(priority: Swift.TaskPriority?, operation: __owned () async -> ()) -> () +Added: _$ss27ThrowingDiscardingTaskGroupV05startC28SynchronouslyUnlessCancelled8priority9operationyScPSg_yyYacntF From 65cd32ed0d672b67975d4023dc572e7d9441c12f Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 26 Feb 2025 21:53:03 +0900 Subject: [PATCH 10/11] [Concurrency] DispatchSerialQueue does not exist on linux still --- include/swift/ABI/Executor.h | 4 +++- stdlib/public/Concurrency/Actor.cpp | 18 +----------------- stdlib/public/Concurrency/TaskPrivate.h | 2 +- .../Runtime/startSynchronously.swift | 4 ++-- 4 files changed, 7 insertions(+), 21 deletions(-) diff --git a/include/swift/ABI/Executor.h b/include/swift/ABI/Executor.h index f1638887ba64b..1257673826434 100644 --- a/include/swift/ABI/Executor.h +++ b/include/swift/ABI/Executor.h @@ -77,7 +77,9 @@ class SerialExecutorRef { /// Executor that may need to participate in complex "same context" checks, /// by invoking `isSameExclusiveExecutionContext` when comparing execution contexts. ComplexEquality = 0b01, - /// + /// Mark this executor as the one used by `Task.startSynchronously`, + /// It cannot participate in switching. + // TODO: Perhaps make this a generic "cannot switch" rather than start synchronously specific. StartSynchronously = 0b10, }; diff --git a/stdlib/public/Concurrency/Actor.cpp b/stdlib/public/Concurrency/Actor.cpp index d99e01ba78ad8..bdf9d825a0826 100644 --- a/stdlib/public/Concurrency/Actor.cpp +++ b/stdlib/public/Concurrency/Actor.cpp @@ -134,8 +134,6 @@ class ExecutorTrackingInfo { /// is `generic`. TaskExecutorRef TaskExecutor = TaskExecutorRef::undefined(); - bool StartedSynchronouslySkipSwitchOnce = false; - /// Whether this context allows switching. Some contexts do not; /// for example, we do not allow switching from swift_job_run /// unless the passed-in executor is generic. @@ -179,7 +177,7 @@ class ExecutorTrackingInfo { } bool allowsSwitching() const { - return AllowsSwitching && !StartedSynchronouslySkipSwitchOnce; + return AllowsSwitching; } /// Disallow switching in this tracking context. This should only @@ -188,16 +186,6 @@ class ExecutorTrackingInfo { AllowsSwitching = false; } - void markSynchronousStart() { - StartedSynchronouslySkipSwitchOnce = true; - } - bool isSynchronousStart() const { - return StartedSynchronouslySkipSwitchOnce; - } - void withoutStartSynchronously() { - StartedSynchronouslySkipSwitchOnce = false; - } - static ExecutorTrackingInfo *current() { return ActiveInfoInThread.get(); } @@ -2170,10 +2158,6 @@ static bool canGiveUpThreadForSwitch(ExecutorTrackingInfo *trackingInfo, // We can certainly "give up" a generic executor to try to run // a task for an actor. if (currentExecutor.isGeneric()) { - if (trackingInfo->isSynchronousStart()) { - return false; - } - if (currentExecutor.isForSynchronousStart()) { return false; } diff --git a/stdlib/public/Concurrency/TaskPrivate.h b/stdlib/public/Concurrency/TaskPrivate.h index 05cb92bae6e43..361af35590ca3 100644 --- a/stdlib/public/Concurrency/TaskPrivate.h +++ b/stdlib/public/Concurrency/TaskPrivate.h @@ -44,7 +44,7 @@ namespace swift { #if 0 #define SWIFT_TASK_DEBUG_LOG_ENABLED 1 #define SWIFT_TASK_DEBUG_LOG(fmt, ...) \ - fprintf(stdout, "[%#lx] [%s:%d](%s) " fmt "\n", \ + fprintf(stderr, "[%#lx] [%s:%d](%s) " fmt "\n", \ (unsigned long)Thread::current().platformThreadId(), __FILE__, \ __LINE__, __FUNCTION__, __VA_ARGS__) #else diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index 08c1da4f27978..f9117c3263b87 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -319,7 +319,7 @@ func callActorFromStartSynchronousTask(recipient rec: TargetActorToCall) { print("\n\n==== ------------------------------------------------------------------") print("callActorFromStartSynchronousTask() - actor in custom executor with its own queue") -let actorQueue = DispatchSerialQueue(label: "recipient-actor-queue") +let actorQueue = DispatchQueue(label: "recipient-actor-queue") callActorFromStartSynchronousTask(recipient: .recipientOnQueue(RecipientOnQueue(queue: actorQueue))) // CHECK-LABEL: callActorFromStartSynchronousTask() - actor in custom executor with its own queue @@ -367,7 +367,7 @@ actor RecipientOnQueue { let executor: NaiveQueueExecutor nonisolated let unownedExecutor: UnownedSerialExecutor - init(queue: DispatchSerialQueue) { + init(queue: DispatchQueue) { self.executor = NaiveQueueExecutor(queue: queue) self.unownedExecutor = executor.asUnownedSerialExecutor() } From 2fd991f62ab78ace326398b2c671b5e11822dae1 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Thu, 27 Feb 2025 07:40:06 +0900 Subject: [PATCH 11/11] [Concurrency] Fix test on linux because of thread id formatting --- test/Concurrency/Runtime/startSynchronously.swift | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/Concurrency/Runtime/startSynchronously.swift b/test/Concurrency/Runtime/startSynchronously.swift index f9117c3263b87..0f9282cd85e1c 100644 --- a/test/Concurrency/Runtime/startSynchronously.swift +++ b/test/Concurrency/Runtime/startSynchronously.swift @@ -153,7 +153,7 @@ await Task { @MyGlobalActor in // CHECK-LABEL: syncOnMyGlobalActor() // CHECK: Confirmed to be on @MyGlobalActor -// CHECK: schedule Task { @MyGlobalActor }, before startSynchronously [thread:[[CALLING_THREAD:0x.*]]] +// CHECK: schedule Task { @MyGlobalActor }, before startSynchronously [thread:[[CALLING_THREAD:.*]]] // CHECK: before startSynchronously [thread:[[CALLING_THREAD]]] // CHECK-NOT: ERROR! // CHECK: inside startSynchronously, sleep now @@ -168,7 +168,7 @@ syncOnNonTaskThread(synchronousTask: behavior) // CHECK-LABEL: syncOnNonTaskThread(synchronousTask: suspend) // No interleaving allowed between "before" and "inside": -// CHECK-NEXT: before startSynchronously [thread:[[CALLING_THREAD2:0x.*]]] +// CHECK-NEXT: before startSynchronously [thread:[[CALLING_THREAD2:.*]]] // CHECK-NOT: ERROR! // CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD2]]] // CHECK-NEXT: inside startSynchronously, before sleep [thread:[[CALLING_THREAD2]]] @@ -181,7 +181,7 @@ print("syncOnNonTaskThread(synchronousTask: \(behavior))") syncOnNonTaskThread(synchronousTask: behavior) // CHECK-LABEL: syncOnNonTaskThread(synchronousTask: dontSuspend) -// CHECK-NEXT: before startSynchronously [thread:[[CALLING_THREAD3:0x.*]]] +// CHECK-NEXT: before startSynchronously [thread:[[CALLING_THREAD3:.*]]] // CHECK-NOT: ERROR! // CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD3]]] // CHECK: inside startSynchronously, done [thread:[[CALLING_THREAD3]]] @@ -193,7 +193,7 @@ callActorFromStartSynchronousTask(recipient: .recipient(Recipient())) // CHECK: callActorFromStartSynchronousTask() // No interleaving allowed between "before" and "inside": -// CHECK: before startSynchronously [thread:[[CALLING_THREAD4:0x.*]]] +// CHECK: before startSynchronously [thread:[[CALLING_THREAD4:.*]]] // CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD4]]] // It is important that as we suspend on the actor call, the 'after' startSynchronously gets to run @@ -324,7 +324,7 @@ callActorFromStartSynchronousTask(recipient: .recipientOnQueue(RecipientOnQueue( // CHECK-LABEL: callActorFromStartSynchronousTask() - actor in custom executor with its own queue // No interleaving allowed between "before" and "inside": -// CHECK: before startSynchronously [thread:[[CALLING_THREAD4:0x.*]]] +// CHECK: before startSynchronously [thread:[[CALLING_THREAD4:.*]]] // CHECK-NEXT: inside startSynchronously [thread:[[CALLING_THREAD4]]] // As we call into an actor, we must enqueue to its custom executor;