Skip to content

Commit

Permalink
Small code cleanup for readability
Browse files Browse the repository at this point in the history
  • Loading branch information
massie committed Jun 12, 2015
1 parent 7429a98 commit 4ea1712
Showing 1 changed file with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
(address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
}

def unpackBlock(blockPair: (BlockId, Try[InputStream])) : (BlockId, InputStream) = {
val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)

// Make sure that fetch failures are wrapped inside a FetchFailedException for the scheduler
blockFetcherItr.map { blockPair =>
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
Expand All @@ -70,15 +79,5 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
}
}
}

val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)

blockFetcherItr.map(unpackBlock)
}
}

0 comments on commit 4ea1712

Please sign in to comment.