Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Non-public API for work-stealing from QueueExecutionContext #3863

Merged
merged 14 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package object async {
if (isMultithreadingEnabled)
Await.result(future, Duration.Inf)
else {
if (!isMultithreadingEnabled) runtime.loop()
concurrent.NativeExecutionContext.queueInternal.helpComplete()
future.value.get.get
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package scala.scalanative
package concurrent

import scala.concurrent.{ExecutionContextExecutor, ExecutionContext}
import scala.concurrent.duration._

object NativeExecutionContext {

/** Single-threaded computeQueue based execution context. Points to the same
* instance as `queue` but grants additional access to internal API.
*/
private[scalanative] val queueInternal: QueueExecutionContext =
new QueueExecutionContextImpl()

/** Single-threaded computeQueue based execution context. Each runable is
* executed sequentially after termination of the main method
*/
val queue: ExecutionContextExecutor = queueInternal

object Implicits {
implicit final def queue: ExecutionContext = NativeExecutionContext.queue
}

private[scalanative] trait QueueExecutionContext
extends ExecutionContextExecutor
with WorkStealing
with AutoCloseable {

/** Disallow scheduling any new tasks to the ExecutionContext */
def shutdown(): Unit

/** Checks if the ExecutionContext shutdown was started */
def inShutdown: Boolean

/** Await for gracefull termination of this ExecutionContext, by waiting
* until the pending tasks are finished until timeout reaches out.
* @return
* false if failed to finish the pending tasks before the timeout, true
* otherwise
*/
def awaitTermination(timeout: FiniteDuration): Boolean
}

private[scalanative] trait WorkStealing { self: ExecutionContextExecutor =>

/** Check if there are any tasks available for work stealing.
* @return
* true if there are tasks available, false otherwise
*/
def isWorkStealingPossible: Boolean

/** Apply work-stealing mechanism to help with completion of any tasks
* available for execution.Returns after work-stealing maximal number or
* tasks or there is no more tasks available for execution
* @param maxSteals
* maximal ammount of tasks that can be executed, if <= 0 then no tasks
* would be completed
*/
def stealWork(maxSteals: Int): Unit

/** Apply work-stealing mechanism to help with completion of any tasks
* available for execution. Returns when timeout passed out or there is no
* more tasks available for execution
* @param timeout
* maximal ammount of time for which execution of new tasks can be
* started
*/
def stealWork(timeout: FiniteDuration): Unit

/** Apply work-stealing mechanism to help with completion of available tasks
* available for execution. Returns when there is no more tasks available
* for execution
*/
def helpComplete(): Unit
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package scala.scalanative.concurrent

import scala.concurrent.{ExecutionContextExecutor, ExecutionContext}
import scala.concurrent.duration._
import scala.collection.mutable

import scala.scalanative.meta.LinktimeInfo.isMultithreadingEnabled
import scala.scalanative.concurrent.NativeExecutionContext._
import scala.scalanative.runtime.MainThreadShutdownContext

import java.util.{AbstractQueue, ArrayDeque, Comparator, Deque, PriorityQueue}
import java.util.concurrent.{ConcurrentLinkedQueue, RejectedExecutionException}

private[concurrent] class QueueExecutionContextImpl()
extends QueueExecutionContext {

private val computeQueue: Queue =
if (isMultithreadingEnabled) new Queue.Concurrent
else new Queue.SingleThreaded

private def nowMillis(): Long = System.currentTimeMillis()

// EventEventLoopExecutionContext
private var isClosed = false
override def inShutdown: Boolean = isClosed
override def shutdown(): Unit = isClosed = true
override def awaitTermination(timeout: FiniteDuration): Boolean = {
stealWork(timeout)
!isWorkStealingPossible
}

override def close(): Unit = shutdown()

// ExecutionContextExecutor
private def ensureNotClosed() = {
if (inShutdown)
throw new RejectedExecutionException(
"ExecutionContext was closed, queuing new tasks in not allowed"
)
}
override def execute(runnable: Runnable): Unit = {
ensureNotClosed()
computeQueue.enqueue(runnable)
if (isMultithreadingEnabled) {
MainThreadShutdownContext.onTaskEnqueued()
}
}

override def reportFailure(t: Throwable): Unit = t.printStackTrace()

//
// Work stealing
//
private[scalanative] def availableTasks: Int = computeQueue.size

override def isWorkStealingPossible: Boolean = computeQueue.nonEmpty

override def stealWork(maxSteals: Int): Unit = if (maxSteals > 0) {
var steals = 0
while (isWorkStealingPossible && steals < maxSteals) {
doStealWork()
steals += 1
}
}

override def stealWork(timeout: FiniteDuration): Unit =
if (timeout > Duration.Zero) {
var clock = nowMillis()
val deadline = clock + timeout.toMillis
while (isWorkStealingPossible && clock < deadline) {
doStealWork()
clock = nowMillis()
}
}

override def helpComplete(): Unit =
while (isWorkStealingPossible) stealWork(Int.MaxValue)

private def doStealWork(): Unit = computeQueue.dequeue() match {
case null => ()
case runnable =>
try runnable.run()
catch { case t: Throwable => reportFailure(t) }
}

private trait Queue {
def enqueue(task: Runnable): Unit
def dequeue(): Runnable
def size: Int
def isEmpty: Boolean
final def nonEmpty: Boolean = !isEmpty
}
private object Queue {
class Concurrent extends Queue {
private val tasks = new ConcurrentLinkedQueue[Runnable]()
override def enqueue(task: Runnable): Unit = tasks.offer(task)
override def dequeue(): Runnable = tasks.poll()
override def size: Int = tasks.size()
override def isEmpty: Boolean = tasks.isEmpty()
}
class SingleThreaded() extends Queue {
private val tasks = mutable.ListBuffer.empty[Runnable]
override def enqueue(runnable: Runnable) = tasks += runnable
override def dequeue(): Runnable =
if (tasks.nonEmpty) tasks.remove(0)
else null
override def size: Int = tasks.size
override def isEmpty: Boolean = tasks.isEmpty
}
}
}

This file was deleted.

19 changes: 15 additions & 4 deletions nativelib/src/main/scala/scala/scalanative/runtime/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ package object runtime {
thread.isAlive()
}

def queue = NativeExecutionContext.QueueExecutionContext
def queue = concurrent.NativeExecutionContext.queueInternal
def shouldWaitForThreads =
if (isMultithreadingEnabled) gracefully && pollNonDaemonThreads.hasNext
else false
def shouldRunQueuedTasks = gracefully && queue.hasNextTask
def shouldRunQueuedTasks = gracefully && queue.isWorkStealingPossible

// Both runnable from the NativeExecutionContext.queue and the running threads can spawn new runnables
while ({
// drain the queue
queue.executeAvailableTasks()
queue.helpComplete()
// queue is empty, threads might be still running
if (isMultithreadingEnabled) {
if (shouldWaitForThreads) LockSupport.park()
Expand Down Expand Up @@ -147,8 +147,19 @@ package object runtime {
/** Run the runtime's event loop. The method is called from the generated
* C-style after the application's main method terminates.
*/
@deprecated(
"Usage in the users code is discouraged, public method would be removed in the future. Use `scala.scalanative` package private method `scala.scalanative.concurrent.NativeExecutionContext.queueInternal.helpComplete()) instead",
since = "0.5.0"
)
@noinline def loop(): Unit =
NativeExecutionContext.QueueExecutionContext.executeAvailableTasks()
concurrent.NativeExecutionContext.queueInternal.helpComplete()

// It should be val but we don't want any fields in runtime package object
@deprecated(
"Use `scala.scalanative.concurrent.NativeExecutionContext",
since = "0.5.0"
)
def ExecutionContext = concurrent.NativeExecutionContext

/** Called by the generated code in case of division by zero. */
@noinline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.util.concurrent.locks.LockSupport
import scala.scalanative.meta.LinktimeInfo.isMultithreadingEnabled

// Extracted fields from runtime package to ensure it does not require initialization
private[runtime] object MainThreadShutdownContext {
private[scalanative] object MainThreadShutdownContext {
@volatile var shutdownThread: Thread = _
var gracefully: Boolean = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
- implicit lazy val global: ExecutionContext = impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ implicit lazy val global: ExecutionContext = {
+ if(isMultithreadingEnabled) impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ else scala.scalanative.runtime.NativeExecutionContext.queue
+ else scala.scalanative.concurrent.NativeExecutionContext.queue
+ }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
- final lazy val global: ExecutionContextExecutor = impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ final lazy val global: ExecutionContextExecutor = {
+ if(isMultithreadingEnabled) impl.ExecutionContextImpl.fromExecutor(null: Executor)
+ else scala.scalanative.runtime.NativeExecutionContext.queue
+ else scala.scalanative.concurrent.NativeExecutionContext.queue
+ }

/**
Expand Down
34 changes: 32 additions & 2 deletions scripted-tests/run/execution-context/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ scalaVersion := {

nativeConfig ~= { _.withMultithreading(false) }

lazy val runAndCheck = taskKey[Unit]("...")
import java.util.Locale
val osName = System
.getProperty("os.name", "unknown")
.toLowerCase(Locale.ROOT)
val isMac = osName.startsWith("mac")

runAndCheck := {
lazy val testQueueExecutionContext = taskKey[Unit]("...")
testQueueExecutionContext := {
import scala.sys.process._

val bin = (Compile / nativeLink).value
Expand All @@ -30,3 +35,28 @@ runAndCheck := {
)
)
}

lazy val testQueueExecutionContext2 = taskKey[Unit]("...")
testQueueExecutionContext2 := {
import java.util.concurrent.TimeUnit
val bin = (Compile / nativeLink).value
val proc = new ProcessBuilder(bin.getAbsolutePath).start()
val finished = proc.waitFor(1, TimeUnit.SECONDS)
if (!finished) proc.destroyForcibly()
assert(finished)
}

lazy val testEventLoop = taskKey[Unit]("...")
testEventLoop := Def.taskDyn {
// libuv is preintstalled only on MacOS GithubRunners
if (!isMac) Def.task { println("EvenLoop test skipped") }
WojciechMazur marked this conversation as resolved.
Show resolved Hide resolved
else
Def.task {
import java.util.concurrent.TimeUnit
val bin = (Compile / nativeLink).value
val proc = new ProcessBuilder(bin.getAbsolutePath).start()
val finished = proc.waitFor(1, TimeUnit.SECONDS)
if (!finished) proc.destroyForcibly()
assert(finished)
}
}.value
9 changes: 8 additions & 1 deletion scripted-tests/run/execution-context/test
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
> runAndCheck
$ copy-file variants/QueueExecutionContext.scala src/main/scala/Main.scala
> testQueueExecutionContext

$ copy-file variants/QueueExecutionContext2.scala src/main/scala/Main.scala
> testQueueExecutionContext2

$ copy-file variants/EventLoop.scala src/main/scala/Main.scala
> testEventLoop