Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

570 lines (437 sloc) 15.91 kb
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.collection.parallel
import java.util.concurrent.ThreadPoolExecutor
import scala.concurrent.forkjoin._
import scala.concurrent.ExecutionContext
import scala.util.control.Breaks._
import scala.annotation.unchecked.uncheckedVariance
trait Task[R, +Tp] {
type Result = R
def repr = this.asInstanceOf[Tp]
/** Body of the task - non-divisible unit of work done by this task.
* Optionally is provided with the result from the previous completed task
* or `None` if there was no previous task (or the previous task is uncompleted or unknown).
*/
def leaf(result: Option[R])
/** A result that can be accessed once the task is completed. */
var result: R
/** Decides whether or not this task should be split further. */
def shouldSplitFurther: Boolean
/** Splits this task into a list of smaller tasks. */
private[parallel] def split: Seq[Task[R, Tp]]
/** Read of results of `that` task and merge them into results of this one. */
private[parallel] def merge(that: Tp @uncheckedVariance) {}
// exception handling mechanism
@volatile var throwable: Throwable = null
def forwardThrowable() = if (throwable != null) throw throwable
// tries to do the leaf computation, storing the possible exception
private[parallel] def tryLeaf(lastres: Option[R]) {
try {
tryBreakable {
leaf(lastres)
result = result // ensure that effects of `leaf` are visible to readers of `result`
} catchBreak {
signalAbort
}
} catch {
case thr: Exception =>
result = result // ensure that effects of `leaf` are visible
throwable = thr
signalAbort
}
}
private[parallel] def tryMerge(t: Tp @uncheckedVariance) {
val that = t.asInstanceOf[Task[R, Tp]]
val local = result // ensure that any effects of modifying `result` are detected
// checkMerge(that)
if (this.throwable == null && that.throwable == null) merge(t)
mergeThrowables(that)
}
private def checkMerge(that: Task[R, Tp] @uncheckedVariance) {
if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) {
println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable)
} else if (this.throwable != null || that.throwable != null) {
println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
}
}
private[parallel] def mergeThrowables(that: Task[_, _]) {
if (this.throwable != null && that.throwable != null) {
// merge exceptions, since there were multiple exceptions
this.throwable = this.throwable alongWith that.throwable
} else if (that.throwable != null) this.throwable = that.throwable
else this.throwable = this.throwable
}
// override in concrete task implementations to signal abort to other tasks
private[parallel] def signalAbort() {}
}
/** A trait that declares task execution capabilities used
* by parallel collections.
*/
trait Tasks {
private[parallel] val debugMessages = scala.collection.mutable.ArrayBuffer[String]()
private[parallel] def debuglog(s: String) = synchronized {
debugMessages += s
}
trait WrappedTask[R, +Tp] {
/** the body of this task - what it executes, how it gets split and how results are merged. */
val body: Task[R, Tp]
def split: Seq[WrappedTask[R, Tp]]
/** Code that gets called after the task gets started - it may spawn other tasks instead of calling `leaf`. */
def compute()
/** Start task. */
def start()
/** Wait for task to finish. */
def sync()
/** Try to cancel the task.
* @return `true` if cancellation is successful.
*/
def tryCancel(): Boolean
/** If the task has been cancelled successfully, those syncing on it may
* automatically be notified, depending on the implementation. If they
* aren't, this release method should be called after processing the
* cancelled task.
*
* This method may be overridden.
*/
def release() {}
}
/* task control */
/** The type of the environment is more specific in the implementations. */
val environment: AnyRef
/** Executes a task and returns a future. Forwards an exception if some task threw it. */
def execute[R, Tp](fjtask: Task[R, Tp]): () => R
/** Executes a result task, waits for it to finish, then returns its result. Forwards an exception if some task threw it. */
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R
/** Retrieves the parallelism level of the task execution environment. */
def parallelismLevel: Int
}
/** This trait implements scheduling by employing
* an adaptive work stealing technique.
*/
trait AdaptiveWorkStealingTasks extends Tasks {
trait WrappedTask[R, Tp] extends super.WrappedTask[R, Tp] {
@volatile var next: WrappedTask[R, Tp] = null
@volatile var shouldWaitFor = true
def split: Seq[WrappedTask[R, Tp]]
def compute() = if (body.shouldSplitFurther) {
internal()
release()
} else {
body.tryLeaf(None)
release()
}
def internal() = {
var last = spawnSubtasks()
last.body.tryLeaf(None)
last.release()
body.result = last.body.result
body.throwable = last.body.throwable
while (last.next != null) {
// val lastresult = Option(last.body.result)
val beforelast = last
last = last.next
if (last.tryCancel()) {
// println("Done with " + beforelast.body + ", next direct is " + last.body)
last.body.tryLeaf(Some(body.result))
last.release()
} else {
// println("Done with " + beforelast.body + ", next sync is " + last.body)
last.sync()
}
// println("Merging " + body + " with " + last.body)
body.tryMerge(last.body.repr)
}
}
def spawnSubtasks() = {
var last: WrappedTask[R, Tp] = null
var head: WrappedTask[R, Tp] = this
do {
val subtasks = head.split
head = subtasks.head
for (t <- subtasks.tail.reverse) {
t.next = last
last = t
t.start()
}
} while (head.body.shouldSplitFurther);
head.next = last
head
}
def printChain() = {
var curr = this
var chain = "chain: "
while (curr != null) {
chain += curr + " ---> "
curr = curr.next
}
println(chain)
}
}
// specialize ctor
protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
}
/** An implementation of tasks objects based on the Java thread pooling API. */
trait ThreadPoolTasks extends Tasks {
import java.util.concurrent._
trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] {
// initially, this is null
// once the task is started, this future is set and used for `sync`
// utb: var future: Future[_] = null
@volatile var owned = false
@volatile var completed = false
def start() = synchronized {
// debuglog("Starting " + body)
// utb: future = executor.submit(this)
executor.synchronized {
incrTasks()
executor.submit(this)
}
}
def sync() = synchronized {
// debuglog("Syncing on " + body)
// utb: future.get()
executor.synchronized {
val coresize = executor.getCorePoolSize
if (coresize < totaltasks) {
executor.setCorePoolSize(coresize + 1)
//assert(executor.getCorePoolSize == (coresize + 1))
}
}
while (!completed) this.wait
}
def tryCancel() = synchronized {
// utb: future.cancel(false)
if (!owned) {
// debuglog("Cancelling " + body)
owned = true
true
} else false
}
def run() = {
// utb: compute
var isOkToRun = false
synchronized {
if (!owned) {
owned = true
isOkToRun = true
}
}
if (isOkToRun) {
// debuglog("Running body of " + body)
compute()
} else {
// just skip
// debuglog("skipping body of " + body)
}
}
override def release() = synchronized {
//println("releasing: " + this + ", body: " + this.body)
completed = true
executor.synchronized {
decrTasks()
}
this.notifyAll
}
}
protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
val environment: ThreadPoolExecutor
def executor = environment.asInstanceOf[ThreadPoolExecutor]
def queue = executor.getQueue.asInstanceOf[LinkedBlockingQueue[Runnable]]
@volatile var totaltasks = 0
private def incrTasks() = synchronized {
totaltasks += 1
}
private def decrTasks() = synchronized {
totaltasks -= 1
}
def execute[R, Tp](task: Task[R, Tp]): () => R = {
val t = newWrappedTask(task)
// debuglog("-----------> Executing without wait: " + task)
t.start()
() => {
t.sync()
t.body.forwardThrowable
t.body.result
}
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
val t = newWrappedTask(task)
// debuglog("-----------> Executing with wait: " + task)
t.start()
t.sync()
t.body.forwardThrowable
t.body.result
}
def parallelismLevel = ThreadPoolTasks.numCores
}
object ThreadPoolTasks {
import java.util.concurrent._
val numCores = Runtime.getRuntime.availableProcessors
val tcount = new atomic.AtomicLong(0L)
val defaultThreadPool = new ThreadPoolExecutor(
numCores,
Int.MaxValue,
60L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable],
new ThreadFactory {
def newThread(r: Runnable) = {
val t = new Thread(r)
t.setName("pc-thread-" + tcount.incrementAndGet)
t.setDaemon(true)
t
}
},
new ThreadPoolExecutor.CallerRunsPolicy
)
}
/** An implementation of tasks objects based on the Java thread pooling API and synchronization using futures. */
@deprecated("This implementation is not used.", "2.10.0")
trait FutureThreadPoolTasks extends Tasks {
import java.util.concurrent._
trait WrappedTask[R, +Tp] extends Runnable with super.WrappedTask[R, Tp] {
@volatile var future: Future[_] = null
def start() = {
executor.synchronized {
future = executor.submit(this)
}
}
def sync() = future.get
def tryCancel = false
def run = {
compute()
}
}
protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
val environment: AnyRef = FutureThreadPoolTasks.defaultThreadPool
def executor = environment.asInstanceOf[ThreadPoolExecutor]
def execute[R, Tp](task: Task[R, Tp]): () => R = {
val t = newWrappedTask(task)
// debuglog("-----------> Executing without wait: " + task)
t.start
() => {
t.sync
t.body.forwardThrowable
t.body.result
}
}
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
val t = newWrappedTask(task)
// debuglog("-----------> Executing with wait: " + task)
t.start
t.sync
t.body.forwardThrowable
t.body.result
}
def parallelismLevel = FutureThreadPoolTasks.numCores
}
object FutureThreadPoolTasks {
import java.util.concurrent._
val numCores = Runtime.getRuntime.availableProcessors
val tcount = new atomic.AtomicLong(0L)
val defaultThreadPool = Executors.newCachedThreadPool()
}
/**
* A trait describing objects that provide a fork/join pool.
*/
trait HavingForkJoinPool {
def forkJoinPool: ForkJoinPool
}
/** An implementation trait for parallel tasks based on the fork/join framework.
*
* @define fjdispatch
* If the current thread is a fork/join worker thread, the task's `fork` method will
* be invoked. Otherwise, the task will be executed on the fork/join pool.
*/
trait ForkJoinTasks extends Tasks with HavingForkJoinPool {
trait WrappedTask[R, +Tp] extends RecursiveAction with super.WrappedTask[R, Tp] {
def start() = fork
def sync() = join
def tryCancel = tryUnfork
}
// specialize ctor
protected def newWrappedTask[R, Tp](b: Task[R, Tp]): WrappedTask[R, Tp]
/** The fork/join pool of this collection.
*/
def forkJoinPool: ForkJoinPool = environment.asInstanceOf[ForkJoinPool]
val environment: ForkJoinPool
/** Executes a task and does not wait for it to finish - instead returns a future.
*
* $fjdispatch
*/
def execute[R, Tp](task: Task[R, Tp]): () => R = {
val fjtask = newWrappedTask(task)
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
} else {
forkJoinPool.execute(fjtask)
}
() => {
fjtask.sync
fjtask.body.forwardThrowable
fjtask.body.result
}
}
/** Executes a task on a fork/join pool and waits for it to finish.
* Returns its result when it does.
*
* $fjdispatch
*
* @return the result of the task
*/
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
val fjtask = newWrappedTask(task)
if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
fjtask.fork
} else {
forkJoinPool.execute(fjtask)
}
fjtask.sync
// if (fjtask.body.throwable != null) println("throwing: " + fjtask.body.throwable + " at " + fjtask.body)
fjtask.body.forwardThrowable
fjtask.body.result
}
def parallelismLevel = forkJoinPool.getParallelism
}
object ForkJoinTasks {
val defaultForkJoinPool: ForkJoinPool = new ForkJoinPool() // scala.parallel.forkjoinpool
// defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors)
// defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors)
}
/* Some boilerplate due to no deep mixin composition. Not sure if it can be done differently without them.
*/
trait AdaptiveWorkStealingForkJoinTasks extends ForkJoinTasks with AdaptiveWorkStealingTasks {
class WrappedTask[R, Tp](val body: Task[R, Tp])
extends super[ForkJoinTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] {
def split = body.split.map(b => newWrappedTask(b))
}
def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b)
}
trait AdaptiveWorkStealingThreadPoolTasks extends ThreadPoolTasks with AdaptiveWorkStealingTasks {
class WrappedTask[R, Tp](val body: Task[R, Tp])
extends super[ThreadPoolTasks].WrappedTask[R, Tp] with super[AdaptiveWorkStealingTasks].WrappedTask[R, Tp] {
def split = body.split.map(b => newWrappedTask(b))
}
def newWrappedTask[R, Tp](b: Task[R, Tp]) = new WrappedTask[R, Tp](b)
}
trait ExecutionContextTasks extends Tasks {
def executionContext = environment
val environment: ExecutionContext
// this part is a hack which allows switching
val driver: Tasks = executionContext match {
case eci: scala.concurrent.impl.ExecutionContextImpl => eci.executor match {
case fjp: ForkJoinPool => new ForkJoinTaskSupport(fjp)
case tpe: ThreadPoolExecutor => new ThreadPoolTaskSupport(tpe)
case _ => ???
}
case _ => ???
}
def execute[R, Tp](task: Task[R, Tp]): () => R = driver execute task
def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = driver executeAndWaitResult task
def parallelismLevel = driver.parallelismLevel
}
Jump to Line
Something went wrong with that request. Please try again.