Skip to content

Commit

Permalink
[SPARK-3454] separate json endpoints for data in the UI
Browse files Browse the repository at this point in the history
Exposes data available in the UI as json over http.  Key points:

* new endpoints, handled independently of existing XyzPage classes.  Root entrypoint is `JsonRootResource`
* Uses jersey + jackson for routing & converting POJOs into json
* tests against known results in `HistoryServerSuite`
* also fixes some minor issues w/ the UI -- synchronizing on access to `StorageListener` & `StorageStatusListener`, and fixing some inconsistencies w/ the way we handle retained jobs & stages.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#5940 from squito/SPARK-3454_better_test_files and squashes the following commits:

1a72ed6 [Imran Rashid] rats
85fdb3e [Imran Rashid] Merge branch 'no_php' into SPARK-3454
1fc65b0 [Imran Rashid] Revert "Revert "[SPARK-3454] separate json endpoints for data in the UI""
1276900 [Imran Rashid] get rid of giant event file, replace w/ smaller one; check both shuffle read & shuffle write
4e12013 [Imran Rashid] just use test case name for expectation file name
863ef64 [Imran Rashid] rename json files to avoid strange file names and not look like php
  • Loading branch information
squito authored and pwendell committed May 8, 2015
1 parent ebff732 commit c796be7
Show file tree
Hide file tree
Showing 100 changed files with 8,608 additions and 172 deletions.
7 changes: 7 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,12 @@ logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
known_translations
json_expectation
local-1422981759269/*
local-1422981780767/*
local-1425081759269/*
local-1426533911241/*
local-1426633911242/*
local-1430917381534/*
DESCRIPTION
NAMESPACE
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
<version>3.2.10</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/org/apache/spark/JobExecutionStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@

package org.apache.spark;

import org.apache.spark.util.EnumUtil;

public enum JobExecutionStatus {
RUNNING,
SUCCEEDED,
FAILED,
UNKNOWN
UNKNOWN;

public static JobExecutionStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(JobExecutionStatus.class, str);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.api.v1;

import org.apache.spark.util.EnumUtil;

public enum ApplicationStatus {
COMPLETED,
RUNNING;

public static ApplicationStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(ApplicationStatus.class, str);
}

}
31 changes: 31 additions & 0 deletions core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.api.v1;

import org.apache.spark.util.EnumUtil;

public enum StageStatus {
ACTIVE,
COMPLETE,
FAILED,
PENDING;

public static StageStatus fromString(String str) {
return EnumUtil.parseIgnoreCase(StageStatus.class, str);
}
}
48 changes: 48 additions & 0 deletions core/src/main/java/org/apache/spark/status/api/v1/TaskSorting.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.status.api.v1;

import org.apache.spark.util.EnumUtil;

import java.util.HashSet;
import java.util.Set;

public enum TaskSorting {
ID,
INCREASING_RUNTIME("runtime"),
DECREASING_RUNTIME("-runtime");

private final Set<String> alternateNames;
private TaskSorting(String... names) {
alternateNames = new HashSet<String>();
for (String n: names) {
alternateNames.add(n);
}
}

public static TaskSorting fromString(String str) {
String lower = str.toLowerCase();
for (TaskSorting t: values()) {
if (t.alternateNames.contains(lower)) {
return t;
}
}
return EnumUtil.parseIgnoreCase(TaskSorting.class, str);
}

}
38 changes: 38 additions & 0 deletions core/src/main/java/org/apache/spark/util/EnumUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util;

import com.google.common.base.Joiner;
import org.apache.spark.annotation.Private;

@Private
public class EnumUtil {
public static <E extends Enum<E>> E parseIgnoreCase(Class<E> clz, String str) {
E[] constants = clz.getEnumConstants();
if (str == null) {
return null;
}
for (E e : constants) {
if (e.name().equalsIgnoreCase(str)) {
return e;
}
}
throw new IllegalArgumentException(
String.format("Illegal type='%s'. Supported type values: %s",
str, Joiner.on(", ").join(constants)));
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager,appName))
_env.securityManager,appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package org.apache.spark.deploy.history

import org.apache.spark.ui.SparkUI

private[history] case class ApplicationAttemptInfo(
private[spark] case class ApplicationAttemptInfo(
attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)

private[history] case class ApplicationHistoryInfo(
private[spark] case class ApplicationHistoryInfo(
id: String,
name: String,
attempts: List[ApplicationAttemptInfo])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,21 @@

package org.apache.spark.deploy.history

import java.io.{IOException, BufferedInputStream, FileNotFoundException, InputStream}
import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}

import scala.collection.mutable
import scala.concurrent.duration.Duration

import com.google.common.util.concurrent.ThreadFactoryBuilder

import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.permission.AccessControlException
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.permission.AccessControlException

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}

/**
* A class that provides application history from event logs stored in the file system.
Expand Down Expand Up @@ -151,7 +149,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
HistoryServer.getAttemptURI(appId, attempt.attemptId))
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
Expand All @@ -45,7 +46,7 @@ class HistoryServer(
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
extends WebUI(securityManager, port, conf) with Logging {
extends WebUI(securityManager, port, conf) with Logging with UIRoot {

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
Expand All @@ -56,7 +57,7 @@ class HistoryServer(
require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
val ui = provider
.getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
.getOrElse(throw new NoSuchElementException())
.getOrElse(throw new NoSuchElementException(s"no app with key $key"))
attachSparkUI(ui)
ui
}
Expand Down Expand Up @@ -113,6 +114,10 @@ class HistoryServer(
}
}

def getSparkUI(appKey: String): Option[SparkUI] = {
Option(appCache.get(appKey))
}

initialize()

/**
Expand All @@ -123,6 +128,9 @@ class HistoryServer(
*/
def initialize() {
attachPage(new HistoryPage(this))

attachHandler(JsonRootResource.getJsonServlet(this))

attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

val contextHandler = new ServletContextHandler
Expand Down Expand Up @@ -160,7 +168,13 @@ class HistoryServer(
*
* @return List of all known applications.
*/
def getApplicationList(): Iterable[ApplicationHistoryInfo] = provider.getListing()
def getApplicationList(): Iterable[ApplicationHistoryInfo] = {
provider.getListing()
}

def getApplicationInfoList: Iterator[ApplicationInfo] = {
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}

/**
* Returns the provider configuration to show in the listing page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.util.Utils

private[deploy] class ApplicationInfo(
private[spark] class ApplicationInfo(
val startTime: Long,
val id: String,
val desc: ApplicationDescription,
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -754,17 +754,17 @@ private[master] class Master(

/**
* Rebuild a new SparkUI from the given application's event logs.
* Return whether this is successful.
* Return the UI if successful, else None
*/
private def rebuildSparkUI(app: ApplicationInfo): Boolean = {
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
try {
val eventLogDir = app.desc.eventLogDir
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
return false
return None
}

val eventLogFilePrefix = EventLoggingListener.getLogPath(
Expand All @@ -787,7 +787,7 @@ private[master] class Master(
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
val maybeTruncated = eventLogFile.endsWith(EventLoggingListener.IN_PROGRESS)
try {
replayBus.replay(logInput, eventLogFile, maybeTruncated)
Expand All @@ -798,7 +798,7 @@ private[master] class Master(
webUi.attachSparkUI(ui)
// Application UI is successfully rebuilt, so link the Master UI to it
app.desc.appUiUrl = ui.basePath
true
Some(ui)
} catch {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
Expand All @@ -808,7 +808,7 @@ private[master] class Master(
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
false
None
case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
Expand All @@ -817,7 +817,7 @@ private[master] class Master(
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title"
false
None
}
}

Expand Down
Loading

0 comments on commit c796be7

Please sign in to comment.