Permalink
Browse files

Added a shuffle block manager so it is easier in the future to

consolidate shuffle output files.
  • Loading branch information...
1 parent ed4ddf4 commit 7007201201981c6fb002e3008d97a6d6248f4dba @rxin rxin committed Apr 30, 2013
@@ -86,8 +86,14 @@ private[spark] class ShuffleMapTask(
protected def this() = this(0, null, null, 0, null)
- // data locality is on a per host basis, not hyper specific to container (host:port). Unique on set of hosts.
- private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq
+ // Data locality is on a per host basis, not hyper specific to container (host:port).
+ // Unique on set of hosts.
+ // TODO(rxin): The above statement seems problematic. Even if partitions are on the same host,
+ // the worker would still need to serialize / deserialize those data when they are in
+ // different jvm processes. Often that is very costly ...
+ @transient
+ private val preferredLocs: Seq[String] =
+ if (locs == null) Nil else locs.map(loc => Utils.parseHostPort(loc)._1).toSet.toSeq
{
// DEBUG code
@@ -131,31 +137,32 @@ private[spark] class ShuffleMapTask(
val taskContext = new TaskContext(stageId, partition, attemptId)
metrics = Some(taskContext.taskMetrics)
+
+ val blockManager = SparkEnv.get.blockManager
+ var shuffle: ShuffleBlockManager#Shuffle = null
+ var buckets: ShuffleWriterGroup = null
+
try {
// Obtain all the block writers for shuffle blocks.
- val blockManager = SparkEnv.get.blockManager
- val buckets = Array.tabulate[BlockObjectWriter](numOutputSplits) { bucketId =>
- val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + bucketId
- blockManager.getDiskBlockWriter(blockId, Serializer.get(dep.serializerClass))
- }
+ val ser = Serializer.get(dep.serializerClass)
+ shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
+ buckets = shuffle.acquireWriters(partition)
// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, taskContext)) {
val pair = elem.asInstanceOf[(Any, Any)]
val bucketId = dep.partitioner.getPartition(pair._1)
- buckets(bucketId).write(pair)
+ buckets.writers(bucketId).write(pair)
}
- // Close the bucket writers and get the sizes of each block.
- val compressedSizes = new Array[Byte](numOutputSplits)
- var i = 0
+ // Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
- while (i < numOutputSplits) {
- buckets(i).close()
- val size = buckets(i).size()
+ val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
+ writer.commit()
+ writer.close()
+ val size = writer.size()
totalBytes += size
- compressedSizes(i) = MapOutputTracker.compressSize(size)
- i += 1
+ MapOutputTracker.compressSize(size)
}
// Update shuffle metrics.
@@ -164,7 +171,18 @@ private[spark] class ShuffleMapTask(
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
return new MapStatus(blockManager.blockManagerId, compressedSizes)
+ } catch { case e: Exception =>
+ // If there is an exception from running the task, revert the partial writes
+ // and throw the exception upstream to Spark.
+ if (buckets != null) {
+ buckets.writers.foreach(_.revertPartialWrites())
+ }
+ throw e
} finally {
+ // Release the writers back to the shuffle block manager.
+ if (shuffle != null && buckets != null) {
+ shuffle.releaseWriters(buckets)
+ }
// Execute the callbacks on task completion.
taskContext.executeOnCompleteCallbacks()
}
@@ -88,6 +88,8 @@ class BlockManager(
}
}
+ val shuffleBlockManager = new ShuffleBlockManager(this)
+
private val blockInfo = new TimeStampedHashMap[String, BlockInfo]
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
@@ -391,7 +393,7 @@ class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
- if (blockId.startsWith("shuffle_")) {
+ if (ShuffleBlockManager.isShuffle(blockId)) {
return diskStore.getBytes(blockId) match {
case Some(bytes) =>
Some(bytes)
@@ -508,12 +510,12 @@ class BlockManager(
/**
* A short circuited method to get a block writer that can write data directly to disk.
- * This is currently used for writing shuffle files out.
+ * This is currently used for writing shuffle files out. Callers should handle error
+ * cases.
*/
def getDiskBlockWriter(blockId: String, serializer: Serializer): BlockObjectWriter = {
val writer = diskStore.getBlockWriter(blockId, serializer)
writer.registerCloseEventHandler(() => {
- // TODO(rxin): This doesn't handle error cases.
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.size())
@@ -872,7 +874,7 @@ class BlockManager(
}
def shouldCompress(blockId: String): Boolean = {
- if (blockId.startsWith("shuffle_")) {
+ if (ShuffleBlockManager.isShuffle(blockId)) {
compressShuffle
} else if (blockId.startsWith("broadcast_")) {
compressBroadcast
@@ -887,7 +889,11 @@ class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
- if (shouldCompress(blockId)) new LZFOutputStream(s) else s
+ if (shouldCompress(blockId)) {
+ (new LZFOutputStream(s)).setFinishBlockOnFlush(true)
+ } else {
+ s
+ }
}
/**
@@ -3,25 +3,48 @@ package spark.storage
import java.nio.ByteBuffer
+/**
+ * An interface for writing JVM objects to some underlying storage. This interface allows
+ * appending data to an existing block, and can guarantee atomicity in the case of faults
+ * as it allows the caller to revert partial writes.
+ *
+ * This interface does not support concurrent writes.
+ */
abstract class BlockObjectWriter(val blockId: String) {
- // TODO(rxin): What if there is an exception when the block is being written out?
-
var closeEventHandler: () => Unit = _
- def registerCloseEventHandler(handler: () => Unit) {
- closeEventHandler = handler
+ def open(): BlockObjectWriter
+
+ def close() {
+ closeEventHandler()
}
- def write(value: Any)
+ def isOpen: Boolean
- def writeAll(value: Iterator[Any]) {
- value.foreach(write)
+ def registerCloseEventHandler(handler: () => Unit) {
+ closeEventHandler = handler
}
- def close() {
- closeEventHandler()
- }
+ /**
+ * Flush the partial writes and commit them as a single atomic block. Return the
+ * number of bytes written for this commit.
+ */
+ def commit(): Long
+
+ /**
+ * Reverts writes that haven't been flushed yet. Callers should invoke this function
+ * when there are runtime exceptions.
+ */
+ def revertPartialWrites()
+
+ /**
+ * Writes an object.
+ */
+ def write(value: Any)
+ /**
+ * Size of the valid writes, in bytes.
+ */
def size(): Long
}
@@ -12,7 +12,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark.Utils
import spark.executor.ExecutorExitCode
-import spark.serializer.Serializer
+import spark.serializer.{Serializer, SerializationStream}
/**
@@ -21,35 +21,58 @@ import spark.serializer.Serializer
private class DiskStore(blockManager: BlockManager, rootDirs: String)
extends BlockStore(blockManager) {
- private val mapMode = MapMode.READ_ONLY
- private var mapOpenMode = "r"
-
class DiskBlockObjectWriter(blockId: String, serializer: Serializer)
extends BlockObjectWriter(blockId) {
private val f: File = createFile(blockId /*, allowAppendExisting */)
- private val bs: OutputStream = blockManager.wrapForCompression(blockId,
- new FastBufferedOutputStream(new FileOutputStream(f)))
- private val objOut = serializer.newInstance().serializeStream(bs)
-
- private var _size: Long = -1L
- override def write(value: Any) {
- objOut.writeObject(value)
+ private var repositionableStream: FastBufferedOutputStream = null
+ private var bs: OutputStream = null
+ private var objOut: SerializationStream = null
+ private var validLength = 0L
+
+ override def open(): DiskBlockObjectWriter = {
+ println("------------------------------------------------- opening " + f)
+ repositionableStream = new FastBufferedOutputStream(new FileOutputStream(f))
+ bs = blockManager.wrapForCompression(blockId, repositionableStream)
+ objOut = serializer.newInstance().serializeStream(bs)
+ this
}
override def close() {
objOut.close()
bs.close()
+ objOut = null
+ bs = null
+ repositionableStream = null
+ // Invoke the close callback handler.
super.close()
}
- override def size(): Long = {
- if (_size < 0) {
- _size = f.length()
- }
- _size
+ override def isOpen: Boolean = objOut != null
+
+ // Flush the partial writes, and set valid length to be the length of the entire file.
+ // Return the number of bytes written for this commit.
+ override def commit(): Long = {
+ bs.flush()
+ repositionableStream.position()
+ }
+
+ override def revertPartialWrites() {
+ // Flush the outstanding writes and delete the file.
+ objOut.close()
+ bs.close()
+ objOut = null
+ bs = null
+ repositionableStream = null
+ f.delete()
+ }
+
+ override def write(value: Any) {
+ objOut.writeObject(value)
}
+
+ override def size(): Long = validLength
}
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@@ -90,9 +113,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
private def getFileBytes(file: File): ByteBuffer = {
val length = file.length()
- val channel = new RandomAccessFile(file, mapOpenMode).getChannel()
+ val channel = new RandomAccessFile(file, "r").getChannel()
val buffer = try {
- channel.map(mapMode, 0, length)
+ channel.map(MapMode.READ_ONLY, 0, length)
} finally {
channel.close()
}
@@ -230,12 +253,14 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}
private def addShutdownHook() {
- localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir) )
+ localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
logDebug("Shutdown hook called")
try {
- localDirs.foreach(localDir => if (! Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir))
+ localDirs.foreach { localDir =>
+ if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
+ }
} catch {
case t: Throwable => logError("Exception while deleting local spark dirs", t)
}
@@ -0,0 +1,52 @@
+package spark.storage
+
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.ArrayBuffer
+
+import spark.serializer.Serializer
+
+
+private[spark]
+class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter])
+
+
+private[spark]
+class ShuffleBlockManager(blockManager: BlockManager) {
+
+ val shuffles = new ConcurrentHashMap[Int, Shuffle]
+
+ def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): Shuffle = {
+ new Shuffle(shuffleId, numBuckets, serializer)
+ }
+
+ class Shuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer) {
+
+ // Get a group of writers for a map task.
+ def acquireWriters(mapId: Int): ShuffleWriterGroup = {
+ val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
+ val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId)
+ blockManager.getDiskBlockWriter(blockId, serializer).open()
+ }
+ new ShuffleWriterGroup(mapId, writers)
+ }
+
+ def releaseWriters(group: ShuffleWriterGroup) = {
+ // Nothing really to release here.
+ }
+ }
+}
+
+
+private[spark]
+object ShuffleBlockManager {
+
+ // Returns the block id for a given shuffle block.
+ def blockId(shuffleId: Int, bucketId: Int, groupId: Int): String = {
+ "shuffle_" + shuffleId + "_" + groupId + "_" + bucketId
+ }
+
+ // Returns true if the block is a shuffle block.
+ def isShuffle(blockId: String): Boolean = blockId.startsWith("shuffle_")
+}

0 comments on commit 7007201

Please sign in to comment.