Skip to content

Commit

Permalink
Fix merging; now passes UnsafeShuffleSuite tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed May 5, 2015
1 parent 133c8c9 commit 4f70141
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ private SpillInfo writeSpillFile() throws IOException {
final File file = spilledFileInfo._2();
final BlockId blockId = spilledFileInfo._1();
final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
spills.add(spillInfo);

final SerializerInstance ser = new DummySerializerInstance();
writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, writeMetrics);
Expand Down Expand Up @@ -154,7 +153,11 @@ private SpillInfo writeSpillFile() throws IOException {

if (writer != null) {
writer.commitAndClose();
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
// TODO: comment and explain why our handling of empty spills, etc.
if (currentPartition != -1) {
spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
spills.add(spillInfo);
}
}
return spillInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,14 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
final File outputFile = shuffleBlockManager.getDataFile(shuffleId, mapId);
final int numPartitions = partitioner.numPartitions();
final long[] partitionLengths = new long[numPartitions];

if (spills.length == 0) {
new FileOutputStream(outputFile).close();
return partitionLengths;
}

final FileChannel[] spillInputChannels = new FileChannel[spills.length];
final long[] spillInputChannelPositions = new long[spills.length];

// TODO: We need to add an option to bypass transferTo here since older Linux kernels are
// affected by a bug here that can lead to data truncation; see the comments Utils.scala,
Expand All @@ -173,24 +180,29 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {

final FileChannel mergedFileOutputChannel = new FileOutputStream(outputFile).getChannel();

for (int partition = 0; partition < numPartitions; partition++ ) {
for (int partition = 0; partition < numPartitions; partition++) {
for (int i = 0; i < spills.length; i++) {
final long bytesToTransfer = spills[i].partitionLengths[partition];
long bytesRemainingToBeTransferred = bytesToTransfer;
System.out.println("In partition " + partition + " and spill " + i );
final long partitionLengthInSpill = spills[i].partitionLengths[partition];
System.out.println("Partition length in spill is " + partitionLengthInSpill);
System.out.println("input channel position is " + spillInputChannels[i].position());
long bytesRemainingToBeTransferred = partitionLengthInSpill;
final FileChannel spillInputChannel = spillInputChannels[i];
long fromPosition = spillInputChannel.position();
while (bytesRemainingToBeTransferred > 0) {
bytesRemainingToBeTransferred -= spillInputChannel.transferTo(
fromPosition,
final long actualBytesTransferred = spillInputChannel.transferTo(
spillInputChannelPositions[i],
bytesRemainingToBeTransferred,
mergedFileOutputChannel);
spillInputChannelPositions[i] += actualBytesTransferred;
bytesRemainingToBeTransferred -= actualBytesTransferred;
}
partitionLengths[partition] += bytesToTransfer;
partitionLengths[partition] += partitionLengthInSpill;
}
}

// TODO: should this be in a finally block?
for (int i = 0; i < spills.length; i++) {
assert(spillInputChannelPositions[i] == spills[i].file.length());
spillInputChannels[i].close();
}
mergedFileOutputChannel.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,19 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {


writer.write(numbersToSort.iterator());
final MapStatus mapStatus = writer.stop(true).get();
final Option<MapStatus> mapStatus = writer.stop(true);
Assert.assertTrue(mapStatus.isDefined());

long sumOfPartitionSizes = 0;
for (long size: partitionSizes) {
sumOfPartitionSizes += size;
}
Assert.assertEquals(mergedOutputFile.length(), sumOfPartitionSizes);

// TODO: actually try to read the shuffle output?

// TODO: add a test that manually triggers spills in order to exercise the merging.

// TODO: test that the temporary spill files were cleaned up after the merge.
}

Expand Down

0 comments on commit 4f70141

Please sign in to comment.