Skip to content

Commit

Permalink
remove some logic code for tachyonstore's replication
Browse files Browse the repository at this point in the history
  • Loading branch information
RongGu committed Apr 3, 2014
1 parent 51149e7 commit 7cd4600
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Expand Up @@ -653,12 +653,11 @@ private[spark] class BlockManager(
res.droppedBlocks.foreach { block => updatedBlocks += block }
} else if (level.useOffHeap) {
// Save to Tachyon.
val askForBytes = level.replication > 1
val res = data match {
case IteratorValues(iterator) =>
tachyonStore.putValues(blockId, iterator, level, askForBytes)
tachyonStore.putValues(blockId, iterator, level, false)
case ArrayBufferValues(array) =>
tachyonStore.putValues(blockId, array, level, askForBytes)
tachyonStore.putValues(blockId, array, level, false)
case ByteBufferValues(bytes) => {
bytes.rewind();
tachyonStore.putBytes(blockId, bytes, level)
Expand All @@ -668,7 +667,7 @@ private[spark] class BlockManager(
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
}
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
Expand Down Expand Up @@ -718,7 +717,7 @@ private[spark] class BlockManager(

// Either we're storing bytes and we asynchronously started replication, or we're storing
// values and need to serialize and replicate them now:
if (level.replication > 1) {
if (level.replication > 1 && !level.useOffHeap) {
data match {
case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
case _ => {
Expand All @@ -740,7 +739,7 @@ private[spark] class BlockManager(

BlockManager.dispose(bytesAfterPut)

if (level.replication > 1) {
if (level.replication > 1 && !level.useOffHeap) {
logDebug("Put for block " + blockId + " with replication took " +
Utils.getUsedTimeMs(startTimeMs))
} else {
Expand Down

0 comments on commit 7cd4600

Please sign in to comment.