Skip to content

Commit

Permalink
Merge pull request #197 from aarondav/patrick-fix
Browse files Browse the repository at this point in the history
Fix 'timeWriting' stat for shuffle files

Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
  • Loading branch information
rxin committed Nov 24, 2013
2 parents 718cc80 + ccea38b commit 972171b
Showing 1 changed file with 6 additions and 3 deletions.
Expand Up @@ -167,6 +167,7 @@ private[spark] class ShuffleMapTask(
var totalTime = 0L
val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
Expand All @@ -184,14 +185,16 @@ private[spark] class ShuffleMapTask(
} catch { case e: Exception =>
// If there is an exception from running the task, revert the partial writes
// and throw the exception upstream to Spark.
if (shuffle != null) {
shuffle.writers.foreach(_.revertPartialWrites())
if (shuffle != null && shuffle.writers != null) {
for (writer <- shuffle.writers) {
writer.revertPartialWrites()
writer.close()
}
}
throw e
} finally {
// Release the writers back to the shuffle block manager.
if (shuffle != null && shuffle.writers != null) {
shuffle.writers.foreach(_.close())
shuffle.releaseWriters(success)
}
// Execute the callbacks on task completion.
Expand Down

0 comments on commit 972171b

Please sign in to comment.