Skip to content

Commit

Permalink
[SPARK-19956][CORE] Optimize a location order of blocks with topology…
Browse files Browse the repository at this point in the history
… information

## What changes were proposed in this pull request?

When call the method getLocations of BlockManager, we only compare the data block host. Random selection for non-local data blocks, this may cause the selected data block to be in a different rack. So in this patch to increase the sort of the rack.

## How was this patch tested?

New test case.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes apache#17300 from ConeyLiu/blockmanager.
  • Loading branch information
ConeyLiu authored and liyichao committed May 24, 2017
1 parent 17fd9b1 commit abcdfb7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -612,12 +612,19 @@ private[spark] class BlockManager(

/**
* Return a list of locations for the given block, prioritizing the local machine since
* multiple block managers can share the same host.
* multiple block managers can share the same host, followed by hosts on the same rack.
*/
private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
val locs = Random.shuffle(master.getLocations(blockId))
val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
preferredLocs ++ otherLocs
blockManagerId.topologyInfo match {
case None => preferredLocs ++ otherLocs
case Some(_) =>
val (sameRackLocs, differentRackLocs) = otherLocs.partition {
loc => blockManagerId.topologyInfo == loc.topologyInfo
}
preferredLocs ++ sameRackLocs ++ differentRackLocs
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
}

test("optimize a location order of blocks") {
val localHost = Utils.localHostName()
test("optimize a location order of blocks without topology information") {
val localHost = "localhost"
val otherHost = "otherHost"
val bmMaster = mock(classOf[BlockManagerMaster])
val bmId1 = BlockManagerId("id1", localHost, 1)
Expand All @@ -508,7 +508,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockManager = makeBlockManager(128, "exec", bmMaster)
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
}

test("optimize a location order of blocks with topology information") {
val localHost = "localhost"
val otherHost = "otherHost"
val localRack = "localRack"
val otherRack = "otherRack"

val bmMaster = mock(classOf[BlockManagerMaster])
val bmId1 = BlockManagerId("id1", localHost, 1, Some(localRack))
val bmId2 = BlockManagerId("id2", localHost, 2, Some(localRack))
val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack))
val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack))
val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack))
when(bmMaster.getLocations(mc.any[BlockId]))
.thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4))

val blockManager = makeBlockManager(128, "exec", bmMaster)
blockManager.blockManagerId =
BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, Some(localRack))
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, otherHost, otherHost))
assert(locations.flatMap(_.topologyInfo)
=== Seq(localRack, localRack, localRack, otherRack, otherRack))
}

test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
Expand Down

0 comments on commit abcdfb7

Please sign in to comment.