Skip to content

Commit

Permalink
Track time spend closing / flushing files; split TimeTrackingOutputSt…
Browse files Browse the repository at this point in the history
…ream into separate file.
  • Loading branch information
JoshRosen committed May 12, 2015
1 parent d5779c6 commit 5e189c6
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.TaskMemoryManager;

Expand Down Expand Up @@ -301,7 +302,7 @@ private long[] mergeSpillsWithFileStream(
for (int partition = 0; partition < numPartitions; partition++) {
final long initialFileLength = outputFile.length();
mergedFileOutputStream =
new TimeTrackingFileOutputStream(writeMetrics, new FileOutputStream(outputFile, true));
new TimeTrackingOutputStream(writeMetrics, new FileOutputStream(outputFile, true));
if (compressionCodec != null) {
mergedFileOutputStream = compressionCodec.compressedOutputStream(mergedFileOutputStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,23 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;
package org.apache.spark.storage;

import org.apache.spark.executor.ShuffleWriteMetrics;

import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.spark.executor.ShuffleWriteMetrics;

/**
* Intercepts write calls and tracks total time spent writing.
* Intercepts write calls and tracks total time spent writing in order to update shuffle write
* metrics. Not thread safe.
*/
final class TimeTrackingFileOutputStream extends OutputStream {
public final class TimeTrackingOutputStream extends OutputStream {

private final ShuffleWriteMetrics writeMetrics;
private final FileOutputStream outputStream;
private final OutputStream outputStream;

public TimeTrackingFileOutputStream(
ShuffleWriteMetrics writeMetrics,
FileOutputStream outputStream) {
public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream outputStream) {
this.writeMetrics = writeMetrics;
this.outputStream = outputStream;
}
Expand All @@ -49,7 +47,8 @@ public void write(int b) throws IOException {
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime); }
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
Expand All @@ -60,11 +59,15 @@ public void write(byte[] b, int off, int len) throws IOException {

@Override
public void flush() throws IOException {
final long startTime = System.nanoTime();
outputStream.flush();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
}

@Override
public void close() throws IOException {
final long startTime = System.nanoTime();
outputStream.close();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ private[spark] class DiskBlockObjectWriter(
extends BlockObjectWriter(blockId)
with Logging
{
/** Intercepts write calls and tracks total time spent writing. Not thread safe. */
private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
override def write(i: Int): Unit = callWithTiming(out.write(i))
override def write(b: Array[Byte]): Unit = callWithTiming(out.write(b))
override def write(b: Array[Byte], off: Int, len: Int): Unit = {
callWithTiming(out.write(b, off, len))
}
override def close(): Unit = out.close()
override def flush(): Unit = out.flush()
}

/** The file channel, used for repositioning / truncating the file. */
private var channel: FileChannel = null
Expand Down Expand Up @@ -136,7 +126,7 @@ private[spark] class DiskBlockObjectWriter(
throw new IllegalStateException("Writer already closed. Cannot be reopened.")
}
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
ts = new TimeTrackingOutputStream(writeMetrics, fos)
channel = fos.getChannel()
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializerInstance.serializeStream(bs)
Expand All @@ -150,9 +140,9 @@ private[spark] class DiskBlockObjectWriter(
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
callWithTiming {
fos.getFD.sync()
}
val start = System.nanoTime()
fos.getFD.sync()
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
}
} {
objOut.close()
Expand Down Expand Up @@ -251,12 +241,6 @@ private[spark] class DiskBlockObjectWriter(
reportedPosition = pos
}

private def callWithTiming(f: => Unit) = {
val start = System.nanoTime()
f
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
}

// For testing
private[spark] override def flush() {
objOut.flush()
Expand Down

0 comments on commit 5e189c6

Please sign in to comment.