Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Nov 1, 2014
1 parent e6bb189 commit bc53d99
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min

import jline.ANSIBuffer.ANSICodes
import jline.Terminal

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.{Clock, SystemClock}
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
Expand All @@ -57,8 +56,10 @@ private[spark] class TaskSetManager(
extends Schedulable with Logging {

val conf = sched.sc.conf
val isInTerminal = Terminal.getTerminal.isANSISupported
val showProgress = isInTerminal && conf.getBoolean("spark.driver.showConsoleProgress", true)

// show progress bar in console
val console = System.console()
val showProgress = console != null && conf.getBoolean("spark.driver.showConsoleProgress", true)
val startTime = clock.getTime()
var lastUpdate = clock.getTime()

Expand Down Expand Up @@ -534,39 +535,43 @@ private[spark] class TaskSetManager(
if (now - lastUpdate < 100 && curr < total) {
return
}
val SetTitle = "\033]0;"
val EndTitle = "\007"
if (curr < total) {
System.err.print(s"${SetTitle} Spark Job: $curr/$total Finished, " +
s"$runningTasks are running ${EndTitle}")
lastUpdate = now

// show progress in title
if (Terminal.getTerminal.isANSISupported) {
val ESC = "\033"
val title = if (curr < total) {
s"Spark Job: $curr/$total Finished, $runningTasks are running"
} else {
s"Spark Job: Finished in ${Utils.msDurationToString(now - startTime)}"
}
console.printf(s"$ESC]0; $title \007")
}

if (!log.isInfoEnabled) {
val used = (now - startTime) / 1000
val header = s"Stage ${stageId}: ["
val tailer = s"] ${curr}+${runningTasks}/${total} - ${used}s"
// show one line progress bar
if (!log.isInfoEnabled) {
if (curr < total) {
val header = s"Stage $stageId: ["
val tailer = s"] $curr+$runningTasks/$total - ${Utils.msDurationToString(now - startTime)}"
val width = Terminal.getTerminal.getTerminalWidth - header.size - tailer.size
val percent = curr * width / total;
val bar = (0 until width).map { i =>
if (i < percent) "=" else if (i==percent) ">" else " "
}.mkString("")
System.err.print(header + bar + tailer + s"\n${ANSICodes.up(0)}")
}
} else {
System.err.print(s"${SetTitle} Spark Job: All Finished ${EndTitle}")
if (!log.isInfoEnabled) {
val used = (now - startTime) / 1000
console.printf("\r" + header + bar + tailer)
} else {
val finishTimes = taskInfos.map(_._2.finishTime - startTime)
val avg = finishTimes.sum / finishTimes.size / 1000
val min = finishTimes.min / 1000
val max = finishTimes.max / 1000
val med = finishTimes.toSeq.sorted.slice(0, finishTimes.size / 2).last / 1000
// erase current line
System.err.print(" " * Terminal.getTerminal.getTerminalWidth + "\n" + ANSICodes.up(0))
System.err.println(s"Stage ${stageId}: Finished in ${used}s with ${total} tasks " +
s"(min=${min}/median=${med}/avg=${avg}/max=${max}s).")
val nFailures = numFailures.sum
var summary = s"Stage $stageId: Finished in ${Utils.msDurationToString(now - startTime)} " +
s"with $total tasks (min=$min/median=$med/avg=$avg/max=${max} s)."
console.printf("\r" + summary +
" " * (Terminal.getTerminal.getTerminalWidth - summary.size) + "\r\n")
}
}
lastUpdate = now
}

/**
Expand All @@ -579,9 +584,6 @@ private[spark] class TaskSetManager(
removeRunningTask(tid)
if (!successful(index)) {
tasksSuccessful += 1
if (showProgress) {
progressBar(tasksSuccessful, numTasks)
}
logDebug("Finished task %s in stage %s (TID %d) in %.3fs on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration/1000.0, info.host,
tasksSuccessful, numTasks))
Expand All @@ -594,6 +596,9 @@ private[spark] class TaskSetManager(
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
if (showProgress) {
progressBar(tasksSuccessful, numTasks)
}
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
failedExecutors.remove(index)
Expand Down

0 comments on commit bc53d99

Please sign in to comment.