Skip to content

Commit

Permalink
[SPARK-13747][CORE] Add ThreadUtils.awaitReady and disallow Await.ready
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Add `ThreadUtils.awaitReady` similar to `ThreadUtils.awaitResult` and disallow `Await.ready`.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#17763 from zsxwing/awaitready.
  • Loading branch information
zsxwing authored and liyichao committed May 24, 2017
1 parent 94b7ff5 commit bd86d40
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 18 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: S

private def getImpl(timeout: Duration): T = {
// This will throw TimeoutException on timeout:
Await.ready(futureAction, timeout)
ThreadUtils.awaitReady(futureAction, timeout)
futureAction.value.get match {
case scala.util.Success(value) => converter(value)
case scala.util.Failure(exception) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,7 @@ class DAGScheduler(
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
// Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
// which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
// due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
// safe to pass in null here. For more detail, see SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.channels.Channels

import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.Random
Expand Down Expand Up @@ -334,7 +334,7 @@ private[spark] class BlockManager(
val task = asyncReregisterTask
if (task != null) {
try {
Await.ready(task, Duration.Inf)
ThreadUtils.awaitReady(task, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for async. reregistration", t)
Expand Down Expand Up @@ -916,7 +916,7 @@ private[spark] class BlockManager(
if (level.replication > 1) {
// Wait for asynchronous replication to finish
try {
Await.ready(replicationFuture, Duration.Inf)
ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,25 @@ private[spark] object ThreadUtils {
}
}
// scalastyle:on awaitresult

// scalastyle:off awaitready
/**
* Preferred alternative to `Await.ready()`.
*
* @see [[awaitResult]]
*/
@throws(classOf[SparkException])
def awaitReady[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
// See SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
awaitable.ready(atMost)(awaitPermission)
} catch {
// TimeoutException is thrown in the current thread, so not need to warp the exception.
case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
// scalastyle:on awaitready
}
5 changes: 2 additions & 3 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

import scala.concurrent.duration._
import scala.concurrent.Await

import com.google.common.io.Files
import org.apache.hadoop.conf.Configuration
Expand All @@ -35,7 +34,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.Matchers._

import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}


class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
Expand Down Expand Up @@ -315,7 +314,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
sc.cancelJobGroup("nonExistGroupId")
Await.ready(future, Duration(2, TimeUnit.SECONDS))
ThreadUtils.awaitReady(future, Duration(2, TimeUnit.SECONDS))

// In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
// SparkContext to shutdown, so the following assertion will fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio._
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

import scala.concurrent.{Await, Promise}
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

Expand All @@ -36,6 +36,7 @@ import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.storage.{BlockId, ShuffleBlockId}
import org.apache.spark.util.ThreadUtils

class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar with ShouldMatchers {
test("security default off") {
Expand Down Expand Up @@ -166,7 +167,7 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
}
})

Await.ready(promise.future, FiniteDuration(10, TimeUnit.SECONDS))
ThreadUtils.awaitReady(promise.future, FiniteDuration(10, TimeUnit.SECONDS))
promise.future.value.get
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, Future}
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, SECONDS}
import scala.language.existentials
import scala.reflect.ClassTag
Expand Down Expand Up @@ -260,7 +260,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
*/
def awaitJobTermination(jobFuture: Future[_], duration: Duration): Unit = {
try {
Await.ready(jobFuture, duration)
ThreadUtils.awaitReady(jobFuture, duration)
} catch {
case te: TimeoutException if backendException.get() != null =>
val msg = raw"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
// one should acquire the write lock. The second thread should block until the winner of the
// write race releases its lock.
val winningFuture: Future[Boolean] =
Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
ThreadUtils.awaitReady(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
assert(winningFuture.value.get.get)
val winningTID = blockInfoManager.get("block").get.writerTask
assert(winningTID === 1 || winningTID === 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ class KinesisCheckpointerSuite extends TestSuiteBase
ExecutionContext.global)

intercept[TimeoutException] {
// scalastyle:off awaitready
Await.ready(f, 50 millis)
// scalastyle:on awaitready
}

clock.advance(checkpointInterval.milliseconds / 2)
Expand Down
11 changes: 11 additions & 0 deletions scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,17 @@ This file is divided into 3 sections:
]]></customMessage>
</check>

<check customId="awaitready" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter name="regex">Await\.ready</parameter></parameters>
<customMessage><![CDATA[
Are you sure that you want to use Await.ready? In most cases, you should use ThreadUtils.awaitReady instead.
If you must use Await.ready, wrap the code block with
// scalastyle:off awaitready
Await.ready(...)
// scalastyle:on awaitready
]]></customMessage>
</check>

<!-- As of SPARK-9613 JavaConversions should be replaced with JavaConverters -->
<check customId="javaconversions" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
<parameters><parameter name="regex">JavaConversions</parameter></parameters>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ private[streaming] class FileBasedWriteAheadLog(
val f = Future { deleteFile(logInfo) }(executionContext)
if (waitForCompletion) {
import scala.concurrent.duration._
// scalastyle:off awaitready
Await.ready(f, 1 second)
// scalastyle:on awaitready
}
} catch {
case e: RejectedExecutionException =>
Expand Down

0 comments on commit bd86d40

Please sign in to comment.