diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 6e8eb6c40eeda..4a10a6fb3e8cf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler import java.io.NotSerializableException -import java.util.Arrays +import java.util.{Arrays, Timer, TimerTask} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap @@ -96,13 +96,6 @@ private[spark] class TaskSetManager( var totalResultSize = 0L var calculatedTasks = 0 - // show progress bar in console - private val console = System.console() - private val showProgress = console != null && - conf.getBoolean("spark.driver.showConsoleProgress", true) - private val startTime = clock.getTime() - private var lastUpdate = clock.getTime() - val runningTasksSet = new HashSet[Long] override def runningTasks = runningTasksSet.size @@ -181,6 +174,77 @@ private[spark] class TaskSetManager( var emittedTaskSizeWarning = false + // show progress bar in console + private val console = System.console() + private val showProgress = console != null && + conf.getBoolean("spark.driver.showConsoleProgress", true) + private val startTime = clock.getTime() + private var lastUpdate = clock.getTime() + + // show the progress bar if no task succeeds in 500ms + private val timer = new Timer("show progress", true) + timer.schedule(new TimerTask{ + override def run() { + if (tasksSuccessful < numTasks) { + showProgressBar(tasksSuccessful, numTasks) + } + } + }, 500) + + /** + * Show progress in console (also in title). The progress bar is displayed in the next line + * after your last output, keeps overwriting itself to hold in one line. The logging will follow + * the progress bar, then progress bar will be showed in next line without overwrite logs. + * + * @param finished the number of finished tasks + * @param total total number of tasks + */ + private def showProgressBar(finished: Int, total: Int): Unit = { + val now = clock.getTime() + // Only update title once in 100 milliseconds + if (now - lastUpdate < 100 && finished < total) { + return + } + lastUpdate = now + + // show progress in title + if (Terminal.getTerminal.isANSISupported) { + val ESC = "\033" + val title = if (finished < total) { + s"Spark Job: $finished/$total Finished, $runningTasks are running" + } else { + s"Spark Job: Finished in ${Utils.msDurationToString(now - startTime)}" + } + console.printf(s"$ESC]0; $title \007") + } + + // show one line progress bar + if (!log.isInfoEnabled) { + if (finished < total) { + val header = s"Stage $stageId: [" + val tailer = s"] $finished+$runningTasks/$total - " + + s"${Utils.msDurationToString(now - startTime)}" + val width = Terminal.getTerminal.getTerminalWidth - header.size - tailer.size + val percent = finished * width / total; + val bar = (0 until width).map { i => + if (i < percent) "=" else if (i == percent) ">" else " " + }.mkString("") + console.printf("\r" + header + bar + tailer) + } else { + val finishTimes = taskInfos.map(_._2.finishTime - startTime) + val avg = finishTimes.sum / finishTimes.size / 1000 + val min = finishTimes.min / 1000 + val max = finishTimes.max / 1000 + val med = finishTimes.toSeq.sorted.slice(0, finishTimes.size / 2).last / 1000 + val nFailures = numFailures.sum + var summary = s"Stage $stageId: Finished in ${Utils.msDurationToString(now - startTime)} " + + s"with $total tasks (min=$min/median=$med/avg=$avg/max=${max} s)." + console.printf("\r" + summary + + " " * (Terminal.getTerminal.getTerminalWidth - summary.size) + "\r\n") + } + } + } + /** * Add a task to all the pending-task lists that it should be on. If readding is set, we are * re-adding the task so only include it in each list if it's not already there. @@ -537,62 +601,6 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskGettingResult(info) } - /** - * Show progress in console (also in title). The progress bar is displayed in the next line - * after your last output, keeps overwriting itself to hold in one line. The logging will follow - * the progress bar, then progress bar will be showed in next line without overwrite logs. - * - * TODO(davies): the progress will not show until success of any tasks - * - * @param finished the number of finished tasks - * @param total total number of tasks - */ - private def showProgressBar(finished: Int, total: Int): Unit = { - val now = clock.getTime() - // Only update title once in 100 milliseconds - if (now - lastUpdate < 100 && finished < total) { - return - } - lastUpdate = now - - // show progress in title - if (Terminal.getTerminal.isANSISupported) { - val ESC = "\033" - val title = if (finished < total) { - s"Spark Job: $finished/$total Finished, $runningTasks are running" - } else { - s"Spark Job: Finished in ${Utils.msDurationToString(now - startTime)}" - } - console.printf(s"$ESC]0; $title \007") - } - - // show one line progress bar - if (!log.isInfoEnabled) { - if (finished < total) { - val header = s"Stage $stageId: [" - val tailer = s"] $finished+$runningTasks/$total - " + - s"${Utils.msDurationToString(now - startTime)}" - val width = Terminal.getTerminal.getTerminalWidth - header.size - tailer.size - val percent = finished * width / total; - val bar = (0 until width).map { i => - if (i < percent) "=" else if (i == percent) ">" else " " - }.mkString("") - console.printf("\r" + header + bar + tailer) - } else { - val finishTimes = taskInfos.map(_._2.finishTime - startTime) - val avg = finishTimes.sum / finishTimes.size / 1000 - val min = finishTimes.min / 1000 - val max = finishTimes.max / 1000 - val med = finishTimes.toSeq.sorted.slice(0, finishTimes.size / 2).last / 1000 - val nFailures = numFailures.sum - var summary = s"Stage $stageId: Finished in ${Utils.msDurationToString(now - startTime)} " + - s"with $total tasks (min=$min/median=$med/avg=$avg/max=${max} s)." - console.printf("\r" + summary + - " " * (Terminal.getTerminal.getTerminalWidth - summary.size) + "\r\n") - } - } - } - /* * Check whether has enough quota to fetch the result with `size` bytes */