Skip to content

Commit

Permalink
Test broadcast cleanup in ContextCleanerSuite + remove BoundedHashMap
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Mar 27, 2014
1 parent e95479c commit f201a8d
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 125 deletions.
67 changes: 0 additions & 67 deletions core/src/main/scala/org/apache/spark/util/BoundedHashMap.scala

This file was deleted.

147 changes: 94 additions & 53 deletions core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

import java.lang.ref.WeakReference

import scala.collection.mutable.{ArrayBuffer, HashSet, SynchronizedSet}
import scala.collection.mutable.{HashSet, SynchronizedSet}
import scala.util.Random

import org.scalatest.{BeforeAndAfter, FunSuite}
Expand All @@ -29,7 +29,7 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId}
import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId}

class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {

Expand All @@ -46,9 +46,9 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo

// Explicit cleanup
cleaner.cleanupRDD(rdd)
tester.assertCleanup
tester.assertCleanup()

// verify that RDDs can be re-executed after cleaning up
// Verify that RDDs can be re-executed after cleaning up
assert(rdd.collect().toList === collected)
}

Expand All @@ -59,87 +59,101 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo

// Explicit cleanup
shuffleDeps.foreach(s => cleaner.cleanupShuffle(s))
tester.assertCleanup
tester.assertCleanup()

// Verify that shuffles can be re-executed after cleaning up
assert(rdd.collect().toList === collected)
}

test("cleanup broadcast") {
val broadcast = newBroadcast
val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))

// Explicit cleanup
cleaner.cleanupBroadcast(broadcast)
tester.assertCleanup()
}

test("automatically cleanup RDD") {
var rdd = newRDD.persist()
rdd.count()

// test that GC does not cause RDD cleanup due to a strong reference
// Test that GC does not cause RDD cleanup due to a strong reference
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1000 millis))
}

// test that GC causes RDD cleanup after dereferencing the RDD
// Test that GC causes RDD cleanup after dereferencing the RDD
val postGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
rdd = null // make RDD out of scope
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup
postGCTester.assertCleanup()
}

test("automatically cleanup shuffle") {
var rdd = newShuffleRDD
rdd.count()

// test that GC does not cause shuffle cleanup due to a strong reference
val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
// Test that GC does not cause shuffle cleanup due to a strong reference
val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
runGC()
intercept[Exception] {
preGCTester.assertCleanup(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1000 millis))
}

// test that GC causes shuffle cleanup after dereferencing the RDD
// Test that GC causes shuffle cleanup after dereferencing the RDD
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
rdd = null // make RDD out of scope, so that corresponding shuffle goes out of scope
rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
runGC()
postGCTester.assertCleanup
postGCTester.assertCleanup()
}

test("automatically cleanup RDD + shuffle") {
test("automatically cleanup broadcast") {
var broadcast = newBroadcast

def randomRDD: RDD[_] = {
val rdd: RDD[_] = Random.nextInt(3) match {
case 0 => newRDD
case 1 => newShuffleRDD
case 2 => newPairRDD.join(newPairRDD)
}
if (Random.nextBoolean()) rdd.persist()
rdd.count()
rdd
// Test that GC does not cause broadcast cleanup due to a strong reference
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
}

val buffer = new ArrayBuffer[RDD[_]]
for (i <- 1 to 500) {
buffer += randomRDD
}
// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
val postGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
broadcast = null // Make broadcast variable out of scope
runGC()
postGCTester.assertCleanup()
}

test("automatically cleanup RDD + shuffle + broadcast") {
val numRdds = 100
val numBroadcasts = 4 // Broadcasts are more costly
val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts

val preGCTester = new CleanerTester(sc, rddIds, shuffleIds)
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
preGCTester.assertCleanup(timeout(1000 millis))
preGCTester.assertCleanup()(timeout(1000 millis))
}
// test that GC causes shuffle cleanup after dereferencing the RDD
val postGCTester = new CleanerTester(sc, rddIds, shuffleIds)
buffer.clear()

// Test that GC triggers the cleanup of all variables after the dereferencing them
val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
broadcastBuffer.clear()
rddBuffer.clear()
runGC()
postGCTester.assertCleanup
postGCTester.assertCleanup()
}

def newRDD = sc.makeRDD(1 to 10)

def newPairRDD = newRDD.map(_ -> 1)

def newShuffleRDD = newPairRDD.reduceByKey(_ + _)

def newBroadcast = sc.broadcast(1 to 100)
def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _]]) = {
def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
Expand All @@ -149,11 +163,27 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val rdd = newShuffleRDD

// Get all the shuffle dependencies
val shuffleDeps = getAllDependencies(rdd).filter(_.isInstanceOf[ShuffleDependency[_, _]])
val shuffleDeps = getAllDependencies(rdd)
.filter(_.isInstanceOf[ShuffleDependency[_, _]])
.map(_.asInstanceOf[ShuffleDependency[_, _]])
(rdd, shuffleDeps)
}

def randomRdd = {
val rdd: RDD[_] = Random.nextInt(3) match {
case 0 => newRDD
case 1 => newShuffleRDD
case 2 => newPairRDD.join(newPairRDD)
}
if (Random.nextBoolean()) rdd.persist()
rdd.count()
rdd
}

def randomBroadcast = {
sc.broadcast(Random.nextInt(Int.MaxValue))
}

/** Run GC and make sure it actually has run */
def runGC() {
val weakRef = new WeakReference(new Object())
Expand Down Expand Up @@ -208,7 +238,7 @@ class CleanerTester(
sc.cleaner.attachListener(cleanerListener)

/** Assert that all the stuff has been cleaned up */
def assertCleanup(implicit waitTimeout: Eventually.Timeout) {
def assertCleanup()(implicit waitTimeout: Eventually.Timeout) {
try {
eventually(waitTimeout, interval(10 millis)) {
assert(isAllCleanedUp)
Expand All @@ -222,7 +252,7 @@ class CleanerTester(

/** Verify that RDDs, shuffles, etc. occupy resources */
private def preCleanupValidate() {
assert(rddIds.nonEmpty || shuffleIds.nonEmpty, "Nothing to cleanup")
assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup")

// Verify the RDDs have been persisted and blocks are present
assert(rddIds.forall(sc.persistentRdds.contains),
Expand All @@ -233,8 +263,12 @@ class CleanerTester(
// Verify the shuffle ids are registered and blocks are present
assert(shuffleIds.forall(mapOutputTrackerMaster.containsShuffle),
"One or more shuffles have not been registered cannot start cleaner test")
assert(shuffleIds.forall(shuffleId => diskBlockManager.containsBlock(shuffleBlockId(shuffleId))),
assert(shuffleIds.forall(sid => diskBlockManager.containsBlock(shuffleBlockId(sid))),
"One or more shuffles' blocks cannot be found in disk manager, cannot start cleaner test")

// Verify that the broadcast is in the driver's block manager
assert(broadcastIds.forall(bid => blockManager.getLevel(broadcastBlockId(bid)).isDefined),
"One ore more broadcasts have not been persisted in the driver's block manager")
}

/**
Expand All @@ -247,14 +281,19 @@ class CleanerTester(
attempts += 1
logInfo("Attempt: " + attempts)
try {
// Verify all the RDDs have been unpersisted
// Verify all RDDs have been unpersisted
assert(rddIds.forall(!sc.persistentRdds.contains(_)))
assert(rddIds.forall(rddId => !blockManager.master.contains(rddBlockId(rddId))))

// Verify all the shuffle have been deregistered and cleaned up
// Verify all shuffles have been deregistered and cleaned up
assert(shuffleIds.forall(!mapOutputTrackerMaster.containsShuffle(_)))
assert(shuffleIds.forall(shuffleId =>
!diskBlockManager.containsBlock(shuffleBlockId(shuffleId))))
assert(shuffleIds.forall(sid => !diskBlockManager.containsBlock(shuffleBlockId(sid))))

// Verify all broadcasts have been unpersisted
assert(broadcastIds.forall { bid =>
blockManager.master.askForStorageLevels(broadcastBlockId(bid)).isEmpty
})

return
} catch {
case t: Throwable =>
Expand All @@ -271,18 +310,20 @@ class CleanerTester(
s"""
|\tRDDs = ${toBeCleanedRDDIds.mkString("[", ", ", "]")}
|\tShuffles = ${toBeCleanedShuffleIds.mkString("[", ", ", "]")}
|\tBroadcasts = ${toBeCleanedBroadcstIds.mkString("[", ", ", "]")}
""".stripMargin
}

private def isAllCleanedUp = toBeCleanedRDDIds.isEmpty && toBeCleanedShuffleIds.isEmpty

private def shuffleBlockId(shuffleId: Int) = ShuffleBlockId(shuffleId, 0, 0)
private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
toBeCleanedBroadcstIds.isEmpty

private def rddBlockId(rddId: Int) = RDDBlockId(rddId, 0)
private def shuffleBlockId(shuffleId: Int) = ShuffleBlockId(shuffleId, 0, 0)
private def broadcastBlockId(broadcastId: Long) = BroadcastBlockId(broadcastId)

private def blockManager = sc.env.blockManager

private def diskBlockManager = blockManager.diskBlockManager

private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ class WrappedJavaHashMapSuite extends FunSuite {
// Test a simple WrappedJavaHashMap
testMap(new TestMap[String, String]())

// Test BoundedHashMap
testMap(new BoundedHashMap[String, String](100, true))

testMapThreadSafety(new BoundedHashMap[String, String](100, true))

// Test TimeStampedHashMap
testMap(new TimeStampedHashMap[String, String])

Expand Down

0 comments on commit f201a8d

Please sign in to comment.