Skip to content

Commit

Permalink
Bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Sep 26, 2014
1 parent 89f91a0 commit 012afa3
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ private[spark] class BlockManager(
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
private val cachedPeers = new mutable.HashSet[BlockManagerId]
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L

initialize()
Expand Down Expand Up @@ -792,18 +793,17 @@ private[spark] class BlockManager(
/**
* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): mutable.HashSet[BlockManagerId] = {
cachedPeers.synchronized {
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
peerFetchLock.synchronized {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
if (cachedPeers.isEmpty || forceFetch || timeout) {
cachedPeers.clear()
cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
if (cachedPeers == null || forceFetch || timeout) {
cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
lastPeerFetchTime = System.currentTimeMillis
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
}
cachedPeers
}
cachedPeers
}

/**
Expand Down

0 comments on commit 012afa3

Please sign in to comment.