Skip to content

Commit

Permalink
[SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded shoul…
Browse files Browse the repository at this point in the history
…d be thread-safe

### What changes were proposed in this pull request?

Add `synchronized` on method `isCachedColumnBuffersLoaded`

### Why are the changes needed?

`isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise may cause NPE when modify `_cachedColumnBuffers` concurrently.

```
def isCachedColumnBuffersLoaded: Boolean = {
  _cachedColumnBuffers != null && isCachedRDDLoaded
}

def isCachedRDDLoaded: Boolean = {
    _cachedColumnBuffersAreLoaded || {
      val bmMaster = SparkEnv.get.blockManager.master
      val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
        bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, partition.index), false)
          .exists { case(_, blockStatus) => blockStatus.isCached }
      }
      if (rddLoaded) {
        _cachedColumnBuffersAreLoaded = rddLoaded
      }
      rddLoaded
  }
}
```

```
java.lang.NullPointerException
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
    at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
    at org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
    at scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
    at org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
    at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
    at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UT.

Closes apache#36496 from pan3793/SPARK-39104.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Sean Owen <srowen@gmail.com>
(cherry picked from commit 3c8d8d7)
Signed-off-by: Sean Owen <srowen@gmail.com>
  • Loading branch information
pan3793 authored and sunchao committed May 23, 2022
1 parent 63b0d05 commit da41290
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
Expand Up @@ -238,10 +238,15 @@ case class CachedRDDBuilder(
}

def isCachedColumnBuffersLoaded: Boolean = {
_cachedColumnBuffers != null && isCachedRDDLoaded
if (_cachedColumnBuffers != null) {
synchronized {
return _cachedColumnBuffers != null && isCachedRDDLoaded
}
}
false
}

def isCachedRDDLoaded: Boolean = {
private def isCachedRDDLoaded: Boolean = {
_cachedColumnBuffersAreLoaded || {
val bmMaster = SparkEnv.get.blockManager.master
val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
Expand Down
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.columnar

import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
Expand Down Expand Up @@ -563,4 +564,56 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe") {
val plan = spark.range(1).queryExecution.executedPlan
val serializer = new TestCachedBatchSerializer(true, 1)
val cachedRDDBuilder = CachedRDDBuilder(serializer, MEMORY_ONLY, plan, None)

@volatile var isCachedColumnBuffersLoaded = false
@volatile var stopped = false

val th1 = new Thread {
override def run(): Unit = {
while (!isCachedColumnBuffersLoaded && !stopped) {
cachedRDDBuilder.cachedColumnBuffers
cachedRDDBuilder.clearCache()
}
}
}

val th2 = new Thread {
override def run(): Unit = {
while (!isCachedColumnBuffersLoaded && !stopped) {
isCachedColumnBuffersLoaded = cachedRDDBuilder.isCachedColumnBuffersLoaded
}
}
}

val th3 = new Thread {
override def run(): Unit = {
Thread.sleep(3000L)
stopped = true
}
}

val exceptionCnt = new AtomicInteger
val exceptionHandler: Thread.UncaughtExceptionHandler = (_: Thread, cause: Throwable) => {
exceptionCnt.incrementAndGet
fail(cause)
}

th1.setUncaughtExceptionHandler(exceptionHandler)
th2.setUncaughtExceptionHandler(exceptionHandler)
th1.start()
th2.start()
th3.start()
th1.join()
th2.join()
th3.join()

cachedRDDBuilder.clearCache()

assert(exceptionCnt.get == 0)
}
}

0 comments on commit da41290

Please sign in to comment.