Skip to content

Commit

Permalink
Update to reflect upstream ShuffleBlockManager -> ShuffleBlockResolve…
Browse files Browse the repository at this point in the history
…r rename.
  • Loading branch information
JoshRosen committed May 9, 2015
1 parent e8718dd commit 1929a74
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockManager;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
Expand All @@ -59,7 +59,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();

private final BlockManager blockManager;
private final IndexShuffleBlockManager shuffleBlockManager;
private final IndexShuffleBlockResolver shuffleBlockResolver;
private final TaskMemoryManager memoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final SerializerInstance serializer;
Expand Down Expand Up @@ -87,15 +87,15 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {

public UnsafeShuffleWriter(
BlockManager blockManager,
IndexShuffleBlockManager shuffleBlockManager,
IndexShuffleBlockResolver shuffleBlockResolver,
TaskMemoryManager memoryManager,
ShuffleMemoryManager shuffleMemoryManager,
UnsafeShuffleHandle<K, V> handle,
int mapId,
TaskContext taskContext,
SparkConf sparkConf) {
this.blockManager = blockManager;
this.shuffleBlockManager = shuffleBlockManager;
this.shuffleBlockResolver = shuffleBlockResolver;
this.memoryManager = memoryManager;
this.shuffleMemoryManager = shuffleMemoryManager;
this.mapId = mapId;
Expand Down Expand Up @@ -162,7 +162,7 @@ void closeAndWriteOutput() throws IOException {
}
}
}
shuffleBlockManager.writeIndexFile(shuffleId, mapId, partitionLengths);
shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

Expand Down Expand Up @@ -192,7 +192,7 @@ void forceSorterToSpill() throws IOException {
}

private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final File outputFile = shuffleBlockManager.getDataFile(shuffleId, mapId);
final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
try {
if (spills.length == 0) {
new FileOutputStream(outputFile).close(); // Create an empty file
Expand Down Expand Up @@ -323,7 +323,7 @@ public Option<MapStatus> stop(boolean success) {
return Option.apply(mapStatus);
} else {
// The map task failed, so delete our output data.
shuffleBlockManager.removeDataByMap(shuffleId, mapId);
shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
return Option.apply(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class UnsafeShuffleManager(conf: SparkConf) extends ShuffleManage
// TODO: do we need to do anything to register the shuffle here?
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockManager],
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
env.shuffleMemoryManager,
unsafeShuffleHandle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

import org.apache.spark.*;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.shuffle.IndexShuffleBlockManager;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
import org.apache.spark.serializer.SerializerInstance;
Expand All @@ -67,7 +67,7 @@ public class UnsafeShuffleWriterSuite {

@Mock(answer = RETURNS_SMART_NULLS) ShuffleMemoryManager shuffleMemoryManager;
@Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager;
@Mock(answer = RETURNS_SMART_NULLS) IndexShuffleBlockManager shuffleBlockManager;
@Mock(answer = RETURNS_SMART_NULLS) IndexShuffleBlockResolver shuffleBlockResolver;
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;
@Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext;
@Mock(answer = RETURNS_SMART_NULLS) ShuffleDependency<Object, Object, Object> shuffleDep;
Expand Down Expand Up @@ -124,14 +124,14 @@ public DiskBlockObjectWriter answer(InvocationOnMock invocationOnMock) throws Th
when(blockManager.wrapForCompression(any(BlockId.class), any(InputStream.class)))
.then(returnsSecondArg());

when(shuffleBlockManager.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile);
when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
return null;
}
}).when(shuffleBlockManager).writeIndexFile(anyInt(), anyInt(), any(long[].class));
}).when(shuffleBlockResolver).writeIndexFile(anyInt(), anyInt(), any(long[].class));

when(diskBlockManager.createTempShuffleBlock()).thenAnswer(
new Answer<Tuple2<TempLocalBlockId, File>>() {
Expand All @@ -157,7 +157,7 @@ private UnsafeShuffleWriter<Object, Object> createWriter(boolean transferToEnabl
conf.set("spark.file.transferTo", String.valueOf(transferToEnabled));
return new UnsafeShuffleWriter<Object, Object>(
blockManager,
shuffleBlockManager,
shuffleBlockResolver,
taskMemoryManager,
shuffleMemoryManager,
new UnsafeShuffleHandle<Object, Object>(0, 1, shuffleDep),
Expand Down

0 comments on commit 1929a74

Please sign in to comment.