Skip to content

Commit

Permalink
[SPARK-13747][SQL] Fix concurrent query with fork-join pool
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to apache#9264:

```
(1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() }
```

This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA.

## How was this patch tested?

New test in `SQLExecutionSuite`.

Author: Andrew Or <andrew@databricks.com>

Closes apache#11586 from andrewor14/fix-concurrent-sql.
  • Loading branch information
Andrew Or authored and roygao94 committed Mar 22, 2016
1 parent 363d879 commit d220ebd
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,12 @@ class DAGScheduler(
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
Await.ready(waiter.completionFuture, atMost = Duration.Inf)
// Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
// which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
// due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
// safe to pass in null here. For more detail, see SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,20 @@ class SQLExecutionSuite extends SparkFunSuite {
}
}

test("concurrent query execution with fork-join pool (SPARK-13747)") {
val sc = new SparkContext("local[*]", "test")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
try {
// Should not throw IllegalArgumentException
(1 to 100).par.foreach { _ =>
sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
}
} finally {
sc.stop()
}
}

/**
* Trigger SPARK-10548 by mocking a parent and its child thread executing queries concurrently.
*/
Expand Down

0 comments on commit d220ebd

Please sign in to comment.