Permalink
Browse files

Allow specifying the shuffle write file buffer size. The default buffer

size is 8KB in FastBufferedOutputStream, which is too small and would
cause a lot of disk seeks.
  • Loading branch information...
1 parent 7007201 commit 1055785a836ab2361239f0937a1a22fee953e029 @rxin rxin committed Apr 30, 2013
@@ -513,8 +513,9 @@ class BlockManager(
* This is currently used for writing shuffle files out. Callers should handle error
* cases.
*/
- def getDiskBlockWriter(blockId: String, serializer: Serializer): BlockObjectWriter = {
- val writer = diskStore.getBlockWriter(blockId, serializer)
+ def getDiskBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int)
+ : BlockObjectWriter = {
+ val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize)
writer.registerCloseEventHandler(() => {
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
blockInfo.put(blockId, myInfo)
@@ -21,7 +21,7 @@ import spark.serializer.{Serializer, SerializationStream}
private class DiskStore(blockManager: BlockManager, rootDirs: String)
extends BlockStore(blockManager) {
- class DiskBlockObjectWriter(blockId: String, serializer: Serializer)
+ class DiskBlockObjectWriter(blockId: String, serializer: Serializer, bufferSize: Int)
extends BlockObjectWriter(blockId) {
private val f: File = createFile(blockId /*, allowAppendExisting */)
@@ -32,7 +32,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
private var validLength = 0L
override def open(): DiskBlockObjectWriter = {
- println("------------------------------------------------- opening " + f)
repositionableStream = new FastBufferedOutputStream(new FileOutputStream(f))
bs = blockManager.wrapForCompression(blockId, repositionableStream)
objOut = serializer.newInstance().serializeStream(bs)
@@ -55,7 +54,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
// Return the number of bytes written for this commit.
override def commit(): Long = {
bs.flush()
- repositionableStream.position()
+ validLength = repositionableStream.position()
+ validLength
}
override def revertPartialWrites() {
@@ -86,8 +86,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
addShutdownHook()
- def getBlockWriter(blockId: String, serializer: Serializer): BlockObjectWriter = {
- new DiskBlockObjectWriter(blockId, serializer)
+ def getBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int): BlockObjectWriter = {
+ new DiskBlockObjectWriter(blockId, serializer, bufferSize)
}
override def getSize(blockId: String): Long = {
@@ -25,9 +25,10 @@ class ShuffleBlockManager(blockManager: BlockManager) {
// Get a group of writers for a map task.
def acquireWriters(mapId: Int): ShuffleWriterGroup = {
+ val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId)
- blockManager.getDiskBlockWriter(blockId, serializer).open()
+ blockManager.getDiskBlockWriter(blockId, serializer, bufferSize).open()
}
new ShuffleWriterGroup(mapId, writers)
}

0 comments on commit 1055785

Please sign in to comment.