-
Notifications
You must be signed in to change notification settings - Fork 30
Description
I first submitted this to stackoverflow and Volodymyr Glushak was not able to recreate the bug on Mac OS or using scastie. I'm hoping that someone else can recreate this issue or figure out what about my current environment might be causing the problem.
I'm using Scala 2.12.5 and Java OpenJDK 1.8.0_161-b14 on a Linux 3.10.0 x86_64 kernel.
I want to make a parallel collection that uses a fixed number of threads. The standard advice for this is to set tasksupport for the parallel collection to use a ForkJoinTaskSupport with a ForkJoinPool with a fixed number of threads. That works fine UNTIL the processing you are doing in your parallel collection itself uses a parallel collection. When this is the case it appears that the limit set for the ForkJoinPool goes away.
A simple test looks something like the following:
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ForkJoinPool
import scala.collection.parallel.ForkJoinTaskSupport
object InnerPar {
val numTasks = 100
val numThreads = 10
/** return maximum number of simultaneous threads running when requesting numThreads */
def forkJoinPoolMaxThreads(useInnerPar:Boolean): Int = {
// every thread in the outer collection will increment
// and decrement this counter as it starts and exits
val threadCounter = new AtomicInteger(0)
// this function increments and decrements threadCounter
// on start and exit, optionally creates an inner parallel collection
// and finally returns the thread count it found at startup
def incrementAndCountThreads(idx:Int):Int = {
val otherThreadsRunning:Int = threadCounter.getAndAdd(1)
if (useInnerPar) {
(0 until 20).toSeq.par.map { elem => elem + 1 }
}
Thread.sleep(10)
threadCounter.getAndAdd(-1)
otherThreadsRunning + 1
}
// create parallel collection using a ForkJoinPool with numThreads
val parCollection = (0 until numTasks).toVector.par
parCollection.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(numThreads))
val threadCountLogList = parCollection.map { idx =>
incrementAndCountThreads(idx)
}
// return the maximum number of simultaneous threads
// running simultaneously (as counted on each thread start)
threadCountLogList.max
}
def main(args:Array[String]):Unit = {
val testConfigs = Seq(true, false, true, false)
testConfigs.foreach { useInnerPar =>
val maxThreads = forkJoinPoolMaxThreads(useInnerPar)
// the total number of threads running should not have exceeded
// numThreads at any point
val isSuccess = (maxThreads <= numThreads)
println(f"useInnerPar $useInnerPar%6s, success is $isSuccess%6s (maxThreads $maxThreads%4d)")
}
}
}
And from this in my environment I get the following output, showing that more than numThreads (in the example 10) threads are running simultaneously if we create a parallel collection inside of incrementAndCountThreads().
useInnerPar true, success is false (maxThreads 22)
useInnerPar false, success is true (maxThreads 10)
useInnerPar true, success is false (maxThreads 24)
useInnerPar false, success is true (maxThreads 10)
Also note that using a ForkJoinTaskSupport in the inner collection does not fix the problem. In other words you get the same results if you use the following code for the inner collection:
if (useInnerPar) {
val innerParCollection = (0 until 20).toVector.par
innerParCollection.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(3))
innerParCollection.map { elem => elem + 1 }
}
Am I missing something? If not is there a way to work around this?
Thanks!