diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8e5378ecc08de..66e07d10132b8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -344,6 +344,8 @@ class SparkContext(config: SparkConf) extends Logging { // The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. metricsSystem.start() + // Attach the driver metrics servlet handler to the web ui after the metrics system is started. + metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7b32c505def9b..1f9f35d32059d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -129,6 +129,10 @@ private[spark] class Master( masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() + // Attach the master and app metrics servlet handler to the web ui after the metrics systems are + // started. + masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) + applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index d86ec1e03e45c..73400c5affb5d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -41,8 +41,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) attachPage(new HistoryNotFoundPage(this)) attachPage(new MasterPage(this)) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) - master.masterMetricsSystem.getServletHandlers.foreach(attachHandler) - master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler) } /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index eb11163538b20..6863b625514c6 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -163,6 +163,8 @@ private[spark] class Worker( metricsSystem.registerSource(workerSource) metricsSystem.start() + // Attach the worker metrics servlet handler to the web ui after the metrics system is started. + metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } def changeMaster(url: String, uiUrl: String) { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index b07942a9ca729..7ac81a2d87efd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -50,7 +50,6 @@ class WorkerWebUI( attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static")) attachHandler(createServletHandler("/log", (request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr)) - worker.metricsSystem.getServletHandlers.foreach(attachHandler) } } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 5dd67b0cbf683..45633e3de01dd 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -76,22 +76,36 @@ private[spark] class MetricsSystem private ( private val sources = new mutable.ArrayBuffer[Source] private val registry = new MetricRegistry() + private var running: Boolean = false + // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui private var metricsServlet: Option[MetricsServlet] = None - /** Get any UI handlers used by this metrics system. */ - def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array()) + /** + * Get any UI handlers used by this metrics system; can only be called after start(). + */ + def getServletHandlers = { + require(running, "Can only call getServletHandlers on a running MetricsSystem") + metricsServlet.map(_.getHandlers).getOrElse(Array()) + } metricsConfig.initialize() def start() { + require(!running, "Attempting to start a MetricsSystem that is already running") + running = true registerSources() registerSinks() sinks.foreach(_.start) } def stop() { - sinks.foreach(_.stop) + if (running) { + sinks.foreach(_.stop) + } else { + logWarning("Stopping a MetricsSystem that is not running") + } + running = false } def report() { @@ -107,7 +121,7 @@ private[spark] class MetricsSystem private ( * @return An unique metric name for each combination of * application, executor/driver and metric source. */ - def buildRegistryName(source: Source): String = { + private[spark] def buildRegistryName(source: Source): String = { val appId = conf.getOption("spark.app.id") val executorId = conf.getOption("spark.executor.id") val defaultName = MetricRegistry.name(source.sourceName) @@ -144,7 +158,7 @@ private[spark] class MetricsSystem private ( }) } - def registerSources() { + private def registerSources() { val instConfig = metricsConfig.getInstance(instance) val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) @@ -160,7 +174,7 @@ private[spark] class MetricsSystem private ( } } - def registerSinks() { + private def registerSinks() { val instConfig = metricsConfig.getInstance(instance) val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 176907dffa46a..0c24ad2760e08 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -57,8 +57,6 @@ private[spark] class SparkUI private ( attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) attachHandler( createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest)) - // If the UI is live, then serve - sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) } } initialize()