Skip to content

Commit

Permalink
show progress bar if no task finished in 500ms
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Nov 4, 2014
1 parent e4e7344 commit 6fd30ff
Showing 1 changed file with 72 additions and 64 deletions.
136 changes: 72 additions & 64 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.util.Arrays
import java.util.{Arrays, Timer, TimerTask}

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
Expand Down Expand Up @@ -96,13 +96,6 @@ private[spark] class TaskSetManager(
var totalResultSize = 0L
var calculatedTasks = 0

// show progress bar in console
private val console = System.console()
private val showProgress = console != null &&
conf.getBoolean("spark.driver.showConsoleProgress", true)
private val startTime = clock.getTime()
private var lastUpdate = clock.getTime()

val runningTasksSet = new HashSet[Long]
override def runningTasks = runningTasksSet.size

Expand Down Expand Up @@ -181,6 +174,77 @@ private[spark] class TaskSetManager(

var emittedTaskSizeWarning = false

// show progress bar in console
private val console = System.console()
private val showProgress = console != null &&
conf.getBoolean("spark.driver.showConsoleProgress", true)
private val startTime = clock.getTime()
private var lastUpdate = clock.getTime()

// show the progress bar if no task succeeds in 500ms
private val timer = new Timer("show progress", true)
timer.schedule(new TimerTask{
override def run() {
if (tasksSuccessful < numTasks) {
showProgressBar(tasksSuccessful, numTasks)
}
}
}, 500)

/**
* Show progress in console (also in title). The progress bar is displayed in the next line
* after your last output, keeps overwriting itself to hold in one line. The logging will follow
* the progress bar, then progress bar will be showed in next line without overwrite logs.
*
* @param finished the number of finished tasks
* @param total total number of tasks
*/
private def showProgressBar(finished: Int, total: Int): Unit = {
val now = clock.getTime()
// Only update title once in 100 milliseconds
if (now - lastUpdate < 100 && finished < total) {
return
}
lastUpdate = now

// show progress in title
if (Terminal.getTerminal.isANSISupported) {
val ESC = "\033"
val title = if (finished < total) {
s"Spark Job: $finished/$total Finished, $runningTasks are running"
} else {
s"Spark Job: Finished in ${Utils.msDurationToString(now - startTime)}"
}
console.printf(s"$ESC]0; $title \007")
}

// show one line progress bar
if (!log.isInfoEnabled) {
if (finished < total) {
val header = s"Stage $stageId: ["
val tailer = s"] $finished+$runningTasks/$total - " +
s"${Utils.msDurationToString(now - startTime)}"
val width = Terminal.getTerminal.getTerminalWidth - header.size - tailer.size
val percent = finished * width / total;
val bar = (0 until width).map { i =>
if (i < percent) "=" else if (i == percent) ">" else " "
}.mkString("")
console.printf("\r" + header + bar + tailer)
} else {
val finishTimes = taskInfos.map(_._2.finishTime - startTime)
val avg = finishTimes.sum / finishTimes.size / 1000
val min = finishTimes.min / 1000
val max = finishTimes.max / 1000
val med = finishTimes.toSeq.sorted.slice(0, finishTimes.size / 2).last / 1000
val nFailures = numFailures.sum
var summary = s"Stage $stageId: Finished in ${Utils.msDurationToString(now - startTime)} " +
s"with $total tasks (min=$min/median=$med/avg=$avg/max=${max} s)."
console.printf("\r" + summary +
" " * (Terminal.getTerminal.getTerminalWidth - summary.size) + "\r\n")
}
}
}

/**
* Add a task to all the pending-task lists that it should be on. If readding is set, we are
* re-adding the task so only include it in each list if it's not already there.
Expand Down Expand Up @@ -537,62 +601,6 @@ private[spark] class TaskSetManager(
sched.dagScheduler.taskGettingResult(info)
}

/**
* Show progress in console (also in title). The progress bar is displayed in the next line
* after your last output, keeps overwriting itself to hold in one line. The logging will follow
* the progress bar, then progress bar will be showed in next line without overwrite logs.
*
* TODO(davies): the progress will not show until success of any tasks
*
* @param finished the number of finished tasks
* @param total total number of tasks
*/
private def showProgressBar(finished: Int, total: Int): Unit = {
val now = clock.getTime()
// Only update title once in 100 milliseconds
if (now - lastUpdate < 100 && finished < total) {
return
}
lastUpdate = now

// show progress in title
if (Terminal.getTerminal.isANSISupported) {
val ESC = "\033"
val title = if (finished < total) {
s"Spark Job: $finished/$total Finished, $runningTasks are running"
} else {
s"Spark Job: Finished in ${Utils.msDurationToString(now - startTime)}"
}
console.printf(s"$ESC]0; $title \007")
}

// show one line progress bar
if (!log.isInfoEnabled) {
if (finished < total) {
val header = s"Stage $stageId: ["
val tailer = s"] $finished+$runningTasks/$total - " +
s"${Utils.msDurationToString(now - startTime)}"
val width = Terminal.getTerminal.getTerminalWidth - header.size - tailer.size
val percent = finished * width / total;
val bar = (0 until width).map { i =>
if (i < percent) "=" else if (i == percent) ">" else " "
}.mkString("")
console.printf("\r" + header + bar + tailer)
} else {
val finishTimes = taskInfos.map(_._2.finishTime - startTime)
val avg = finishTimes.sum / finishTimes.size / 1000
val min = finishTimes.min / 1000
val max = finishTimes.max / 1000
val med = finishTimes.toSeq.sorted.slice(0, finishTimes.size / 2).last / 1000
val nFailures = numFailures.sum
var summary = s"Stage $stageId: Finished in ${Utils.msDurationToString(now - startTime)} " +
s"with $total tasks (min=$min/median=$med/avg=$avg/max=${max} s)."
console.printf("\r" + summary +
" " * (Terminal.getTerminal.getTerminalWidth - summary.size) + "\r\n")
}
}
}

/*
* Check whether has enough quota to fetch the result with `size` bytes
*/
Expand Down

0 comments on commit 6fd30ff

Please sign in to comment.