Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Controlling parallelism in nested parallel collection #10577

Closed
algobardo opened this issue Oct 30, 2017 · 3 comments

Comments

@algobardo
Copy link

commented Oct 30, 2017

I have really hard time understanding why the following behaviour of parallel collections in scala should be the expected one.

Consider the following piece of code

    import java.util.concurrent.TimeUnit
    import scala.collection.parallel.ForkJoinTaskSupport
    
    object test extends App {
    
      val CHANGE_ME = false
    
      val externalParallelism = if (CHANGE_ME) 10 else 2
    
      val l = (1 until 100).par
      val l2 = List(1).par // par but with only one element
    
      l2.tasksupport = new ForkJoinTaskSupport(
        new scala.concurrent.forkjoin.ForkJoinPool(externalParallelism))
      l.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
    
      l2.map { j =>
        l.map { i =>
          println(s"STARTED $i")
          TimeUnit.SECONDS.sleep(10000) // blocking
          println(s"DONE $i")
        }
      }
    }

If the code above is ran with CHANGE_ME = false you will immediately observe

    STARTED 1
    STARTED 50

showing that the current parallelism is 2, which is correct since the internal parallel collection l is bound to a task support with a fork-join thread pool with parallelism 2.
If now you set CHANGE_ME = true, you will surprisingly observe

STARTED 87
STARTED 93
STARTED 13
STARTED 25
STARTED 75
STARTED 56
STARTED 62
STARTED 50
STARTED 1

witnessing that the internal parallel collection is using threads that should be bound to the external parallel collection.

I guess the reason has to be found in the executeAndWaitResult

def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = {
    val fjtask = newWrappedTask(task)

    if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
      fjtask.fork
    } else {
      forkJoinPool.execute(fjtask)
    }

    fjtask.sync()
    // if (fjtask.body.throwable != null) println("throwing: " + fjtask.body.throwable + " at " + fjtask.body)
    fjtask.body.forwardThrowable()
    fjtask.body.result
  }

where it is the current thread identity that drive the pool where the task is pushed.

  • Is this the expected behaviour?
  • If so, how can the parallelism of l be bound then in situations where the parallel collection is created and operated in a ForkJoinWorkerThread ?
  • The example above is artificial, but in my case I have parallel scala tests that are causing "internal" parallel collections ignoring the parallelism bound.

JVM: java version "1.8.0_121"
Scala: 2.11.11

@som-snytt

This comment has been minimized.

Copy link

commented Oct 31, 2017

It looks like similar behavior was subsequently fixed in the default executor.

class MyForkJoinTaskSupport(environment0: ForkJoinPool = ForkJoinTasks.defaultForkJoinPool) extends ForkJoinTaskSupport(environment0) { 
  override def execute[R, Tp](task: Task[R, Tp]): () => R = { 
    val fjtask = newWrappedTask(task)

    Thread.currentThread match { 
      case fjw: ForkJoinWorkerThread if fjw.getPool eq forkJoinPool => fjtask.fork()
      case _ => forkJoinPool.execute(fjtask)
    }
    () => { 
      fjtask.sync()
      fjtask.body.forwardThrowable()
      fjtask.body.result
    }
  } 

  override def executeAndWaitResult[R, Tp](task: Task[R, Tp]): R = { 
    val fjtask = newWrappedTask(task)

    Thread.currentThread match { 
      case fjw: ForkJoinWorkerThread if fjw.getPool eq forkJoinPool => fjtask.fork()
      case _ => forkJoinPool.execute(fjtask)
    }
    fjtask.sync()
    fjtask.body.forwardThrowable()
    fjtask.body.result
  }
}

@SethTisue

This comment has been minimized.

Copy link
Member

commented Nov 1, 2017

although 2.12.5 won't be released until 2018, note that we do publish nightly builds of Scala

oh, but I see you're on 2.11, actually. ah well

note that as of Scala 2.13, the parallel collections will be a separately versioned module with its own release schedule, so it will become possible to get bugfixes like this out in a more timely manner

@som-snytt

This comment has been minimized.

Copy link

commented Nov 1, 2017

The custom tasksupport as shown above is pretty easy workaround. I didn't verify it on 2.11 but it's probably OK.

I was aware par would get spun out but didn't know it had already happened. I wonder if it will have partests.

som-snytt added a commit to som-snytt/scala-parallel-collections that referenced this issue Nov 2, 2017

@som-snytt som-snytt added the has PR label Dec 8, 2017

@retronym retronym assigned retronym and som-snytt and unassigned retronym Jan 12, 2018

@retronym retronym added this to the 2.12.5 milestone Jan 12, 2018

@retronym retronym closed this Jan 12, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.