Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: twitter/ambrose
base: 5baf6ff629
...
head fork: twitter/ambrose
compare: 7376c42ddf
Checking mergeability… Don't worry, you can still create the pull request.
  • 4 commits
  • 19 files changed
  • 0 commit comments
  • 1 contributor
Showing with 733 additions and 137 deletions.
  1. +1 −0  common/src/main/java/com/twitter/ambrose/model/Event.java
  2. +27 −0 common/src/main/java/com/twitter/ambrose/model/PaginatedList.java
  3. +48 −0 common/src/main/java/com/twitter/ambrose/model/WorkflowId.java
  4. +75 −0 common/src/main/java/com/twitter/ambrose/model/WorkflowSummary.java
  5. +59 −31 common/src/main/java/com/twitter/ambrose/server/APIHandler.java
  6. +38 −31 common/src/main/java/com/twitter/ambrose/server/ScriptStatusServer.java
  7. +24 −0 common/src/main/java/com/twitter/ambrose/service/WorkflowIndexReadService.java
  8. +81 −43 common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java
  9. +8 −0 common/src/main/resources/web/css/ambrose.css
  10. +104 −0 common/src/main/resources/web/dashboard.html
  11. +22 −0 common/src/main/resources/web/data/workflows.json
  12. +2 −0  common/src/main/resources/web/js/ambrose.js
  13. +29 −0 common/src/main/resources/web/js/ambrose/client.js
  14. +1 −1  common/src/main/resources/web/js/ambrose/core.js
  15. +166 −0 common/src/main/resources/web/js/ambrose/dashboard.js
  16. +14 −0 common/src/main/resources/web/js/dashboard-main.js
  17. +1 −1  common/src/main/resources/web/js/{main.js → workflow-main.js}
  18. +6 −5 common/src/main/resources/web/workflow.html
  19. +27 −25 pig/src/main/java/com/twitter/ambrose/pig/EmbeddedAmbrosePigProgressNotificationListener.java
View
1  common/src/main/java/com/twitter/ambrose/model/Event.java
@@ -54,6 +54,7 @@
}
private int id;
+ @JsonIgnore
private Type type;
private long timestamp;
private T payload;
View
27 common/src/main/java/com/twitter/ambrose/model/PaginatedList.java
@@ -0,0 +1,27 @@
+package com.twitter.ambrose.model;
+
+import java.util.List;
+
+/**
+ * Holds a list of results, as well as pagination info.
+ */
+public class PaginatedList<T> {
+ private List<T> results;
+ private String nextPageStart;
+
+ public PaginatedList(List<T> results) {
+ this.results = results;
+ }
+
+ public List<T> getResults() {
+ return results;
+ }
+
+ public String getNextPageStart() {
+ return nextPageStart;
+ }
+
+ public void setNextPageStart(String startKey) {
+ this.nextPageStart = startKey;
+ }
+}
View
48 common/src/main/java/com/twitter/ambrose/model/WorkflowId.java
@@ -0,0 +1,48 @@
+package com.twitter.ambrose.model;
+
+/**
+ * WorkflowId sent to the client is a composite of a bunch of information required to later fetch
+ * the DAG and events.
+ */
+public final class WorkflowId {
+ public static WorkflowId parseString(String workflowId) {
+ String[] parts = workflowId.split(DELIM, 6);
+ return new WorkflowId(parts[0], parts[1], parts[2],
+ Long.parseLong(parts[3]), Long.parseLong(parts[4]), parts[5]);
+ }
+
+ private static final String DELIM = "!";
+ private static final String PATTERN = "%s!%s!%s!%d!%d!%s";
+ private final String cluster;
+ private final String userId;
+ private final String appId;
+ private long runId;
+ private long timestamp;
+ private String flowId;
+
+ private WorkflowId(String cluster, String userId, String appId,
+ long runId, long timestamp, String flowId) {
+ this.cluster = cluster;
+ this.userId = userId;
+ this.appId = appId;
+ this.runId = runId;
+ this.timestamp = timestamp;
+ this.flowId = flowId;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public String toId() {
+ return String.format(PATTERN, cluster, userId, appId, runId, timestamp, flowId);
+ }
+}
View
75 common/src/main/java/com/twitter/ambrose/model/WorkflowSummary.java
@@ -0,0 +1,75 @@
+package com.twitter.ambrose.model;
+
+/**
+ * Holds WorkflowSummary info.
+ */
+public class WorkflowSummary {
+ /**
+ * Represents whether a workflow is running or completed.
+ */
+ public static enum Status {
+ RUNNING,
+ SUCCEEDED,
+ FAILED
+ }
+
+ private String id;
+ private String userId;
+ private String name;
+ private int progress;
+ private Status status;
+
+ /**
+ * Constructs a new WorkflowSummary.
+ */
+ public WorkflowSummary(String id, String userId, String name, Status status, int progress) {
+ this.id = id;
+ this.userId = userId;
+ this.name = name;
+ this.status = status;
+ this.progress = progress;
+ }
+
+ public WorkflowSummary() {
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getProgress() {
+ return progress;
+ }
+
+ public void setProgress(int progress) {
+ this.progress = progress;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+}
View
90 common/src/main/java/com/twitter/ambrose/server/APIHandler.java
@@ -2,18 +2,25 @@
import java.io.IOException;
import java.util.Collection;
+import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import com.twitter.ambrose.model.DAGNode;
-import com.twitter.ambrose.model.Event;
+import com.google.common.base.Charsets;
+
import org.mortbay.jetty.HttpConnection;
import org.mortbay.jetty.Request;
import org.mortbay.jetty.handler.AbstractHandler;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
+import com.twitter.ambrose.model.Job;
+import com.twitter.ambrose.model.PaginatedList;
+import com.twitter.ambrose.model.WorkflowSummary;
import com.twitter.ambrose.service.StatsReadService;
+import com.twitter.ambrose.service.WorkflowIndexReadService;
import com.twitter.ambrose.util.JSONUtil;
/**
@@ -22,60 +29,81 @@
* @author billg
*/
public class APIHandler extends AbstractHandler {
+ private static void sendJson(HttpServletRequest request,
+ HttpServletResponse response, Object object) throws IOException {
+ JSONUtil.writeJson(response.getWriter(), object);
+ response.getWriter().close();
+ setHandled(request);
+ }
+
+ private static void setHandled(HttpServletRequest request) {
+ Request base_request = (request instanceof Request) ?
+ (Request) request : HttpConnection.getCurrentConnection().getRequest();
+ base_request.setHandled(true);
+ }
+
+ private static final String QUERY_PARAM_CLUSTER = "cluster";
+ private static final String QUERY_PARAM_USER_ID = "userId";
+ private static final String QUERY_PARAM_STATUS = "status";
+ private static final String QUERY_PARAM_START_KEY = "startKey";
private static final String QUERY_PARAM_WORKFLOW_ID = "workflowId";
private static final String QUERY_PARAM_LAST_EVENT_ID = "lastEventId";
-
private static final String MIME_TYPE_HTML = "text/html";
private static final String MIME_TYPE_JSON = "application/json";
+ private WorkflowIndexReadService workflowIndexReadService;
+ private StatsReadService<Job> statsReadService;
- private StatsReadService statsReadService;
-
- public APIHandler(StatsReadService statsReadService) {
+ public APIHandler(WorkflowIndexReadService workflowIndexReadService,
+ StatsReadService<Job> statsReadService) {
+ this.workflowIndexReadService = workflowIndexReadService;
this.statsReadService = statsReadService;
}
@Override
public void handle(String target,
- HttpServletRequest request,
- HttpServletResponse response,
- int dispatch) throws IOException, ServletException {
+ HttpServletRequest request,
+ HttpServletResponse response,
+ int dispatch) throws IOException, ServletException {
- if (target.endsWith("/dag")) {
+ if (target.endsWith("/workflows")) {
+ response.setContentType(MIME_TYPE_JSON);
+ response.setStatus(HttpServletResponse.SC_OK);
+ String cluster = request.getParameter(QUERY_PARAM_CLUSTER);
+ String userId = request.getParameter(QUERY_PARAM_USER_ID);
+ String status = request.getParameter(QUERY_PARAM_STATUS);
+ String startKey = request.getParameter(QUERY_PARAM_START_KEY);
+
+ PaginatedList<WorkflowSummary> workflows =
+ workflowIndexReadService.getWorkflows(
+ cluster,
+ status != null ? WorkflowSummary.Status.valueOf(status) : null,
+ userId, 10,
+ startKey != null ? startKey.getBytes(Charsets.UTF_8) : null);
+ sendJson(request, response, workflows);
+
+ } else if (target.endsWith("/dag")) {
response.setContentType(MIME_TYPE_JSON);
response.setStatus(HttpServletResponse.SC_OK);
- Collection<DAGNode> nodes =
- statsReadService.getDagNodeNameMap(request.getParameter(QUERY_PARAM_WORKFLOW_ID)).values();
-
+ Map<String, DAGNode<Job>> dagNodeNameMap =
+ statsReadService.getDagNodeNameMap(request.getParameter(QUERY_PARAM_WORKFLOW_ID));
+ Collection<DAGNode<Job>> nodes = dagNodeNameMap.values();
sendJson(request, response, nodes.toArray(new DAGNode[nodes.size()]));
+
} else if (target.endsWith("/events")) {
response.setContentType(MIME_TYPE_JSON);
response.setStatus(HttpServletResponse.SC_OK);
Integer sinceId = request.getParameter(QUERY_PARAM_LAST_EVENT_ID) != null ?
- Integer.parseInt(request.getParameter(QUERY_PARAM_LAST_EVENT_ID)) : -1;
+ Integer.parseInt(request.getParameter(QUERY_PARAM_LAST_EVENT_ID)) : -1;
Collection<Event> events =
- statsReadService.getEventsSinceId(request.getParameter(QUERY_PARAM_WORKFLOW_ID), sinceId);
-
+ statsReadService.getEventsSinceId(request.getParameter(QUERY_PARAM_WORKFLOW_ID), sinceId);
sendJson(request, response, events.toArray(new Event[events.size()]));
- }
- else if (target.endsWith(".html")) {
+
+ } else if (target.endsWith(".html")) {
response.setContentType(MIME_TYPE_HTML);
// this is because the next handler will be picked up here and it doesn't seem to
// handle html well. This is jank.
}
}
-
- private static void sendJson(HttpServletRequest request,
- HttpServletResponse response, Object object) throws IOException {
- JSONUtil.writeJson(response.getWriter(), object);
- response.getWriter().close();
- setHandled(request);
- }
-
- private static void setHandled(HttpServletRequest request) {
- Request base_request = (request instanceof Request) ?
- (Request)request : HttpConnection.getCurrentConnection().getRequest();
- base_request.setHandled(true);
- }
}
View
69 common/src/main/java/com/twitter/ambrose/server/ScriptStatusServer.java
@@ -28,24 +28,39 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.twitter.ambrose.model.Job;
import com.twitter.ambrose.service.StatsReadService;
+import com.twitter.ambrose.service.WorkflowIndexReadService;
/**
* Light weight application server that serves both the JSON API and the Ambrose web pages powered
- * from the JSON. The port defaults to {@value #PORT_DEFAULT} but can be overridden with the
- * {@value #PORT_PARAM} system property. For a random port to be used, set {@value #PORT_PARAM} to
- * zero or {@value #PORT_RANDOM}.
+ * from the JSON. The port defaults to {@value #PORT_DEFAULT} but can be overridden with the {@value
+ * #PORT_PARAM} system property. For a random port to be used, set {@value #PORT_PARAM} to zero or
+ * {@value #PORT_RANDOM}.
* <p/>
* The JSON API supports the following URIs:
- * <ul>
- * <li><code>/jobs</code> returns the workflow jobs.</li>
- * <li><code>/events</code> returns all events since the start of the workflow. Optionally the
- * <code>sinceEventId</code> query parameter can be used to return only events after a given event.</li>
- * </ul>
- *
- * @author billg
+ * <pre>
+ * <ul>
+ * <li><code>/workflows</code> - Returns workflow summaries.</li>
+ * <li><code>/jobs</code> - Returns a workflow's jobs.</li>
+ * <li><code>/events</code> - Returns all workflow events.</li>
+ * </ul>
+ * </pre>
*/
public class ScriptStatusServer implements Runnable {
+ private static int getConfiguredPort() {
+ String port = System.getProperty(PORT_PARAM, PORT_DEFAULT);
+ if (PORT_RANDOM.equalsIgnoreCase(port)) {
+ port = "0";
+ }
+ try {
+ return Integer.parseInt(port);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format(
+ "Parameter '%s' value '%s' is not a valid port number", PORT_PARAM, port), e);
+ }
+ }
+
/**
* Name of system property used to configure port on which to bind HTTP server.
*/
@@ -61,33 +76,22 @@
private static final Logger LOG = LoggerFactory.getLogger(ScriptStatusServer.class);
private static final String SLASH = "/";
private static final String ROOT_PATH = "web";
-
- private static int getConfiguredPort() {
- String port = System.getProperty(PORT_PARAM, PORT_DEFAULT);
- if (PORT_RANDOM.equalsIgnoreCase(port)) {
- port = "0";
- }
- try {
- return Integer.parseInt(port);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(String.format(
- "Parameter '%s' value '%s' is not a valid port number", PORT_PARAM, port), e);
- }
- }
-
- private final StatsReadService statsReadService;
+ private final WorkflowIndexReadService workflowIndexReadService;
+ private final StatsReadService<Job> statsReadService;
private final int port;
private Server server;
private Thread serverThread;
- public ScriptStatusServer(StatsReadService statsReadService) {
+ public ScriptStatusServer(WorkflowIndexReadService workflowIndexReadService,
+ StatsReadService<Job> statsReadService) {
+ this.workflowIndexReadService = workflowIndexReadService;
this.statsReadService = statsReadService;
this.port = getConfiguredPort();
}
public int getPort() {
return port;
- };
+ }
/**
* Starts the server in it's own daemon thread.
@@ -111,7 +115,8 @@ public void run() {
// override newServerSocket to log local port once bound
Connector connector = new SocketConnector() {
@Override
- protected ServerSocket newServerSocket(String host, int port, int backlog) throws IOException {
+ protected ServerSocket newServerSocket(String host, int port, int backlog)
+ throws IOException {
ServerSocket ss = super.newServerSocket(host, port, backlog);
int localPort = ss.getLocalPort();
LOG.info("Ambrose web server listening on port {}", localPort);
@@ -121,13 +126,15 @@ protected ServerSocket newServerSocket(String host, int port, int backlog) throw
};
connector.setPort(port);
server = new Server();
- server.setConnectors(new Connector[] { connector });
+ server.setConnectors(new Connector[]{connector});
// this needs to be loaded via the jar'ed resources, not the relative dir
URL resourcesUrl = this.getClass().getClassLoader().getResource(ROOT_PATH);
HandlerList handler = new HandlerList();
- handler.setHandlers(new Handler[] { new APIHandler(statsReadService),
- new WebAppContext(resourcesUrl.toExternalForm(), SLASH) });
+ handler.setHandlers(new Handler[]{
+ new APIHandler(workflowIndexReadService, statsReadService),
+ new WebAppContext(resourcesUrl.toExternalForm(), SLASH)
+ });
server.setHandler(handler);
server.setStopAtShutdown(false);
View
24 common/src/main/java/com/twitter/ambrose/service/WorkflowIndexReadService.java
@@ -0,0 +1,24 @@
+package com.twitter.ambrose.service;
+
+import java.io.IOException;
+
+import com.twitter.ambrose.model.PaginatedList;
+import com.twitter.ambrose.model.WorkflowSummary;
+
+/**
+ * Interface to fetch paginated lists of WorkflowSummaries.
+ */
+public interface WorkflowIndexReadService {
+ /**
+ * Returns workflow summaries for a given status and optional userId filter.
+ *
+ * @param cluster cluser to return results for.
+ * @param status workflow status.
+ * @param userId user to filter on, or null if all users requested.
+ * @param numResults how many results to return.
+ * @param startKey start key for the page of results to return.
+ * @return paginated list of workflow summaries.
+ */
+ PaginatedList<WorkflowSummary> getWorkflows(String cluster, WorkflowSummary.Status status,
+ String userId, int numResults, byte[] startKey) throws IOException;
+}
View
124 common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java
@@ -15,67 +15,74 @@
*/
package com.twitter.ambrose.service.impl;
-import com.twitter.ambrose.model.DAGNode;
-import com.twitter.ambrose.model.Event;
-import com.twitter.ambrose.model.Job;
-import com.twitter.ambrose.service.StatsReadService;
-import com.twitter.ambrose.service.StatsWriteService;
-import com.twitter.ambrose.util.JSONUtil;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
+import com.twitter.ambrose.model.Job;
+import com.twitter.ambrose.model.PaginatedList;
+import com.twitter.ambrose.model.WorkflowSummary;
+import com.twitter.ambrose.service.StatsReadService;
+import com.twitter.ambrose.service.StatsWriteService;
+import com.twitter.ambrose.service.WorkflowIndexReadService;
+import com.twitter.ambrose.util.JSONUtil;
+
/**
* In-memory implementation of both StatsReadService and StatsWriteService. Used when stats
- * collection and stats serving are happening within the same VM. This class is intended to run in
- * a VM that only handles a single workflow. Hence it ignores workflowId.
- * <P>
+ * collection and stats serving are happening within the same VM. This class is intended to run in a
+ * VM that only handles a single workflow. Hence it ignores workflowId.
+ * <p/>
* Upon job completion this class can optionally write all json data to disk. This is useful for
* debugging. The written files can also be replayed in the Ambrose UI without re-running the Job
- * via the <pre>bin/demo</pre> script. To write all json data to disk, set the following values
- * as system properties using <pre>-D</pre>:
- * <ul>
- * <li><pre>ambrose.write.dag.file</pre> file to write the dag data to</li>
- * <li><pre>ambrose.write.events.file</pre> file to write the events data to</li>
- * </ul>
- * </P>
- *
- * @author billg
+ * via the <code>bin/demo</code> script. To write all json data to disk, set the following values as
+ * system properties using <code>-D</code>:
+ * <pre>
+ * <ul>
+ * <li><code>{@value #DUMP_WORKFLOW_FILE_PARAM}</code> - file in which to write the workflow
+ * json.</li>
+ * <li><code>{@value #DUMP_EVENTS_FILE_PARAM}</code> - file in which to write the events
+ * json.</li>
+ * </ul>
+ * </pre>
*/
-public class InMemoryStatsService implements StatsReadService, StatsWriteService<Job> {
+public class InMemoryStatsService implements StatsReadService, StatsWriteService<Job>,
+ WorkflowIndexReadService {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryStatsService.class);
-
- private static final String DUMP_DAG_FILE_PARAM = "ambrose.write.dag.file";
+ private static final String DUMP_WORKFLOW_FILE_PARAM = "ambrose.write.dag.file";
private static final String DUMP_EVENTS_FILE_PARAM = "ambrose.write.events.file";
-
+ private final WorkflowSummary summary = new WorkflowSummary(null,
+ System.getProperty("user.name", "unknown"), "unknown", null, 0);
+ private final PaginatedList<WorkflowSummary> summaries =
+ new PaginatedList<WorkflowSummary>(ImmutableList.of(summary));
+ private boolean jobFailed = false;
private Map<String, DAGNode<Job>> dagNodeNameMap = Maps.newHashMap();
private SortedMap<Integer, Event> eventMap = new ConcurrentSkipListMap<Integer, Event>();
-
- private Writer dagWriter = null;
- private Writer eventsWriter = null;
+ private Writer workflowWriter;
+ private Writer eventsWriter;
private boolean eventWritten = false;
public InMemoryStatsService() {
- String dumpDagFileName = System.getProperty(DUMP_DAG_FILE_PARAM);
+ String dumpWorkflowFileName = System.getProperty(DUMP_WORKFLOW_FILE_PARAM);
String dumpEventsFileName = System.getProperty(DUMP_EVENTS_FILE_PARAM);
- if (dumpDagFileName != null) {
+ if (dumpWorkflowFileName != null) {
try {
- dagWriter = new PrintWriter(dumpDagFileName);
+ workflowWriter = new PrintWriter(dumpWorkflowFileName);
} catch (FileNotFoundException e) {
- LOG.error("Could not create dag PrintWriter at " + dumpDagFileName, e);
+ LOG.error("Could not create dag PrintWriter at " + dumpWorkflowFileName, e);
}
}
@@ -90,12 +97,39 @@ public InMemoryStatsService() {
@Override
public synchronized void sendDagNodeNameMap(String workflowId,
- Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
+ Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
+ this.summary.setId(workflowId);
+ this.summary.setStatus(WorkflowSummary.Status.RUNNING);
+ this.summary.setProgress(0);
this.dagNodeNameMap = dagNodeNameMap;
writeJsonDagNodenameMapToDisk(dagNodeNameMap);
}
@Override
+ public synchronized void pushEvent(String workflowId, Event event) throws IOException {
+ eventMap.put(event.getId(), event);
+ switch (event.getType()) {
+ case WORKFLOW_PROGRESS:
+ Event.WorkflowProgressEvent workflowProgressEvent = (Event.WorkflowProgressEvent) event;
+ String progressString =
+ workflowProgressEvent.getPayload().get(Event.WorkflowProgressField.workflowProgress);
+ int progress = Integer.parseInt(progressString);
+ summary.setProgress(progress);
+ if (progress == 100) {
+ summary.setStatus(jobFailed
+ ? WorkflowSummary.Status.FAILED
+ : WorkflowSummary.Status.SUCCEEDED);
+ }
+ break;
+ case JOB_FAILED:
+ jobFailed = true;
+ default:
+ // nothing
+ }
+ writeJsonEventToDisk(event);
+ }
+
+ @Override
public synchronized Map<String, DAGNode<Job>> getDagNodeNameMap(String workflowId) {
return dagNodeNameMap;
}
@@ -103,20 +137,20 @@ public synchronized void sendDagNodeNameMap(String workflowId,
@Override
public synchronized Collection<Event> getEventsSinceId(String workflowId, int sinceId) {
int minId = sinceId >= 0 ? sinceId + 1 : sinceId;
-
- SortedMap<Integer, Event> tailMap = eventMap.tailMap(minId);
- return tailMap.values();
+ return eventMap.tailMap(minId).values();
}
@Override
- public synchronized void pushEvent(String workflowId, Event event) throws IOException {
- eventMap.put(event.getId(), event);
- writeJsonEventToDisk(event);
+ public synchronized PaginatedList<WorkflowSummary> getWorkflows(String cluster,
+ WorkflowSummary.Status status, String userId, int numResults, byte[] startKey)
+ throws IOException {
+ return summaries;
}
- private void writeJsonDagNodenameMapToDisk(Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
- if (dagWriter != null && dagNodeNameMap != null) {
- JSONUtil.writeJson(dagWriter, dagNodeNameMap.values());
+ private void writeJsonDagNodenameMapToDisk(Map<String, DAGNode<Job>> dagNodeNameMap)
+ throws IOException {
+ if (workflowWriter != null && dagNodeNameMap != null) {
+ JSONUtil.writeJson(workflowWriter, dagNodeNameMap.values());
}
}
@@ -130,9 +164,13 @@ private void writeJsonEventToDisk(Event event) throws IOException {
}
public void flushJsonToDisk() throws IOException {
- if (dagWriter != null) { dagWriter.close(); }
+ if (workflowWriter != null) {
+ workflowWriter.close();
+ }
if (eventsWriter != null) {
- if (eventWritten) { eventsWriter.write(" ]\n"); }
+ if (eventWritten) {
+ eventsWriter.write(" ]\n");
+ }
eventsWriter.close();
}
}
View
8 common/src/main/resources/web/css/ambrose.css
@@ -1,10 +1,18 @@
/* Page dimensions */
body { position: relative; padding-top: 40px; }
+@media (max-width: 979px) { body { padding-top: 0px; } }
#diagrams > div { height: 350px; margin: 0; }
#ambrose-view-progress-bar {}
#ambrose-view-chord {}
#ambrose-view-graph {}
+/* Typography */
+h2 { margin: 1em 0px; }
+
+/* Workflows table */
+.ambrose-dashboard .progress,
+.ambrose-dashboard .bar { margin: 0px; }
+
/* Progress Bar view */
.ambrose-view-progress-bar { position: relative; margin: 20px 0px; }
.ambrose-view-progress-bar .number { position: absolute; top: 0px; right: 10px; font-size: 50px; opacity: 0.1; }
View
104 common/src/main/resources/web/dashboard.html
@@ -0,0 +1,104 @@
+<!DOCTYPE html>
+<html lang="en">
+ <head>
+ <meta charset="utf-8">
+ <title>Ambrose Dashboard</title>
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <meta name="description" content="Data workflow visualization and monitoring">
+ <meta name="author" content="Twitter">
+ <link href="css/bootstrap.css" rel="stylesheet">
+ <link href="css/ambrose.css" rel="stylesheet">
+
+ <!-- HTML5 shim, for IE6-8 support of HTML5 elements -->
+ <!--[if lt IE 9]>
+ <script src="js/html5shiv.js"></script>
+ <![endif]-->
+
+ <!-- fav and touch icons -->
+ <link rel="shortcut icon" href="ico/favicon.ico">
+ <link rel="apple-touch-icon-precomposed" sizes="114x114" href="ico/apple-touch-icon-114-precomposed.png">
+ <link rel="apple-touch-icon-precomposed" sizes="72x72" href="ico/apple-touch-icon-72-precomposed.png">
+ <link rel="apple-touch-icon-precomposed" href="ico/apple-touch-icon-57-precomposed.png">
+ </head>
+ <body>
+
+ <!-- navbar -->
+ <div class="navbar navbar-inverse navbar-fixed-top">
+ <div class="navbar-inner">
+ <div class="container-fluid">
+ <button type="button" class="btn btn-navbar" data-toggle="collapse" data-target=".nav-collapse">
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ <span class="icon-bar"></span>
+ </button>
+ <a class="brand" href="#">Ambrose</a>
+ <div class="nav-collapse collapse">
+ <ul class="nav">
+ <li class="active"><a href="dashboard.html">Dashboard</a></li>
+ <li class="dropdown">
+ <a id="cluster-dropdown" href="#" class="dropdown-toggle" data-toggle="dropdown">Cluster <b class="caret"></b></a>
+ <ul id="cluster-menu" class="dropdown-menu" role="menu" aria-labelledby="cluster-dropdown"></ul>
+ </li>
+ <li class="dropdown">
+ <a id="status-dropdown" href="#" class="dropdown-toggle" data-toggle="dropdown">Status <b class="caret"></b></a>
+ <ul id="status-menu" class="dropdown-menu" role="menu" aria-labelledby="status-dropdown"></ul>
+ </li>
+ </ul>
+ <form id="user-form" class="navbar-form pull-left">
+ <input id="user-field" type="text" class="span2" placeholder="User" />
+ <button id="user-form-btn" type="submit" class="btn">Filter</button>
+ </form>
+ <ul class="nav pull-right">
+ <li><a target="_blank" href="https://github.com/twitter/ambrose">About</a></li>
+ </ul>
+ </div>
+ </div>
+ </div>
+ </div>
+
+ <div class="container-fluid">
+ <div class="row-fluid">
+ <div class="span12">
+ <h2>Workflows</h2>
+ <table class="table table-hover ambrose-dashboard">
+ <thead>
+ <tr>
+ <th>#</th>
+ <th>User</th>
+ <th>Name</th>
+ <th>Status</th>
+ <th>Progress</th>
+ </tr>
+ </thead>
+ <tbody id="workflows"></tbody>
+ </table>
+ </div>
+ </div>
+
+ <div class="row-fluid">
+ <div class="span12">
+ <div class="pagination pagination-large pagination-centered">
+ <ul>
+ <li><a href="#" id="page-prev-link">&laquo;</a></li>
+ <li><a href="#" id="page-next-link">&raquo;</a></li>
+ </ul>
+ </div>
+ </div>
+ </div>
+
+ <hr>
+ <footer>
+ <p>&copy; Twitter 2013</p>
+ </footer>
+ </div>
+
+ <script type="text/javascript">
+ var require = {
+ waitSeconds: 15,
+ //urlArgs : "bust=" + new Date().getTime(),
+ };
+ </script>
+ <script data-main="js/dashboard-main" src="js/require-jquery.js"></script>
+
+ </body>
+</html>
View
22 common/src/main/resources/web/data/workflows.json
@@ -0,0 +1,22 @@
+{
+ "nextPageStart": 1,
+ "results": [ {
+ "id" : "foobar",
+ "userId" : "joe",
+ "name" : "unknown",
+ "progress" : 23,
+ "status" : "RUNNING"
+ }, {
+ "id" : "barfoo",
+ "userId" : "mary",
+ "name" : "unknown",
+ "progress" : 100,
+ "status" : "SUCCEEDED"
+ }, {
+ "id" : "foobar",
+ "userId" : "joe",
+ "name" : "unknown",
+ "progress" : 100,
+ "status" : "FAILED"
+ } ]
+}
View
2  common/src/main/resources/web/js/ambrose.js
@@ -19,7 +19,9 @@ limitations under the License.
*/
define([
'ambrose/core',
+ 'ambrose/graph',
'ambrose/client',
+ 'ambrose/dashboard',
'ambrose/workflow',
'ambrose/view',
], function(Ambrose) {
View
29 common/src/main/resources/web/js/ambrose/client.js
@@ -37,6 +37,7 @@ define(['jquery', 'uri', './core'], function($, URI, Ambrose) {
*/
init: function(baseUri) {
// default endpoint paths
+ var workflowsUri = 'workflows';
var jobsUri = 'dag';
var eventsUri = 'events';
@@ -45,6 +46,7 @@ define(['jquery', 'uri', './core'], function($, URI, Ambrose) {
var uri = new URI(window.location.href);
var params = uri.search(true);
if (params.localdata) {
+ workflowsUri = 'data/workflows.json';
if (params.localdata == 'small') {
jobsUri = 'data/small-dag.json';
eventsUri = 'data/small-events.json';
@@ -56,15 +58,42 @@ define(['jquery', 'uri', './core'], function($, URI, Ambrose) {
} else {
// resolve relative paths given base uri
var uri = new URI(baseUri);
+ workflowsUri = new URI(workflowsUri).absoluteTo(uri);
jobsUri = new URI(jobsUri).absoluteTo(uri);
eventsUri = new URI(eventsUri).absoluteTo(uri);
}
+ this.workflowsUri = new URI(workflowsUri);
this.jobsUri = new URI(jobsUri);
this.eventsUri = new URI(eventsUri);
},
/**
+ * Submits asynchronous request for workflow summaries from server.
+ *
+ * @param cluster
+ * @param user
+ * @param status
+ * @param startKey
+ * @return a jQuery Promise on which success and error callbacks may be registered.
+ */
+ getWorkflows: function(cluster, user, status, startKey) {
+ var self = this;
+ return $.getJSON(new URI(this.workflowsUri).addSearch({
+ cluster: cluster,
+ userId: user,
+ status: status,
+ startKey: startKey
+ }).unicode())
+ .error(function(jqXHR, textStatus, errorThrown) {
+ console.error('Failed to get workflows:', self, textStatus, errorThrown);
+ })
+ .success(function(data, textStatus, jqXHR) {
+ console.debug('Succeeded to get workflows:', textStatus, data);
+ });
+ },
+
+ /**
* Submits asynchronous request for workflow jobs from server.
*
* @param workflowId id of workflow for which to retrieve jobs.
View
2  common/src/main/resources/web/js/ambrose/core.js
@@ -20,7 +20,7 @@ limitations under the License.
define(['jquery'], function($) {
// capitalize string
String.prototype.capitalize = function() {
- return this.charAt(0).toUpperCase() + this.slice(1);
+ return this.charAt(0).toUpperCase() + this.slice(1).toLowerCase();
};
// pattern for separator char + alpha
View
166 common/src/main/resources/web/js/ambrose/dashboard.js
@@ -0,0 +1,166 @@
+/*
+Copyright 2012 Twitter, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * Ambrose dashboard module.
+ */
+define(['jquery', './core', './client'], function($, Ambrose, Client) {
+ var clusterMap = {
+ 'dw_smf1': 'dw@smf1',
+ 'test_smf1': 'test@smf1'
+ };
+
+ var statusSet = [
+ 'RUNNING',
+ 'SUCCEEDED',
+ 'FAILED'
+ ];
+
+ // Dashboard ctor
+ var Dashboard = Ambrose.Dashboard = function(client) {
+ return new Ambrose.Dashboard.fn.init(client);
+ };
+
+ /**
+ * Dashboard prototype.
+ */
+ Dashboard.fn = Dashboard.prototype = {
+ /**
+ * Constructs a new Dashboard.
+ *
+ * @param client client to use when requesting workflow summaries from server. If null, defaults
+ * to new client instance constructed with default arguments.
+ */
+ init: function(client) {
+ console.log('Initializing Ambrose Dashboard');
+ var self = this;
+ self.client = client || Client();
+ self.currentStartKey = '';
+ self.nextStartKey = '';
+ self.prevStartKeys = [];
+
+ // build cluster menu
+ $.each(clusterMap, function(id, name) {
+ $('<a>').appendTo($('<li>').appendTo($('#cluster-menu'))
+ .attr('id', 'cluster_' + id).addClass('cluster')).text(name)
+ .click(function() { self.setCluster(id); self.loadFlows(); });
+ });
+
+ // build status menu
+ $.each(statusSet, function(index, id) {
+ $('<a>').appendTo($('<li>').appendTo($('#status-menu'))
+ .attr('id', 'status_' + id).addClass('status')).text(id.capitalize())
+ .click(function() { self.setStatus(id); self.loadFlows(); });
+ });
+
+ // configure event handlers
+ $('#user-form').submit(function() {
+ self.setUser($('#user-field').val()); self.loadFlows(); return false;
+ });
+ $('#page-prev-link').click(function() { self.prevPage(); });
+ $('#page-next-link').click(function() { self.nextPage(); });
+
+ // set default values
+ self.setCluster('dw_smf1');
+ self.setStatus('RUNNING');
+ self.setUser('');
+ },
+
+ getClusterMap: function() {
+ return clusterMap;
+ },
+
+ getStatusSet: function() {
+ return statusSet;
+ },
+
+ setCluster: function(cluster) {
+ this.cluster = cluster;
+ $('.cluster').removeClass('active');
+ $('#cluster_' + cluster).addClass('active');
+ return this;
+ },
+
+ setStatus: function(status) {
+ this.status = status;
+ $('.status').removeClass('active');
+ $('#status_' + status).addClass('active');
+ return this;
+ },
+
+ setUser: function(user) {
+ this.user = user;
+ return this;
+ },
+
+ prevPage: function() {
+ this.currentStartKey = this.prevStartKeys.pop();
+ this.loadFlows();
+ return this;
+ },
+
+ nextPage: function() {
+ this.prevStartKeys.push(this.currentStartKey);
+ this.currentStartKey = this.nextStartKey;
+ this.loadFlows();
+ return this;
+ },
+
+ loadFlows: function() {
+ var self = this;
+ self.client.getWorkflows(clusterMap[self.cluster], self.user, self.status, self.currentStartKey)
+ .success(function(data) {
+ self.nextStartKey = data.nextPageStart;
+ self.renderFlows(data);
+ });
+ return this;
+ },
+
+ renderFlows: function(data) {
+ var self = this;
+ var workflows = data.results;
+ var $workflows = $('#workflows').empty();
+ var pageOffset = self.prevStartKeys.length * 10;
+ $.each(workflows, function(i, workflow) {
+ var $tr = $('<tr>').appendTo($workflows).attr('id', 'workflow_' + workflow.id)
+ .click(function() {
+ window.location.href = 'workflow.html?workflowId=' + encodeURIComponent(workflow.id);
+ });
+ $('<td>').text(pageOffset + i + 1).appendTo($tr);
+ $('<td>').text(workflow.userId).appendTo($tr);
+ $('<td>').text(workflow.name).appendTo($tr);
+ $('<td>').text(workflow.status).appendTo($tr);
+ $('<div class="bar">').width(workflow.progress + '%').appendTo(
+ $('<div class="progress">').appendTo($('<td>').appendTo($tr)));
+ });
+ if (self.nextStartKey != null && self.nextStartKey != '') {
+ $('#page-next-link').removeClass('disabled');
+ } else {
+ $('#page-next-link').addClass('disabled');
+ }
+ if (self.prevStartKeys.length > 0) {
+ $('#page-next-link').removeClass('disabled');
+ } else {
+ $('#page-next-link').addClass('disabled');
+ }
+ return this;
+ },
+ };
+
+ // bind prototype to ctor
+ Dashboard.fn.init.prototype = Dashboard.fn;
+ return Dashboard;
+});
View
14 common/src/main/resources/web/js/dashboard-main.js
@@ -0,0 +1,14 @@
+requirejs.config({
+ shim: {
+ 'bootstrap': { deps: ['jquery'], exports: 'bootstrap' },
+ 'uri': { deps: ['jquery'], exports: 'URI' },
+ 'd3': { deps: ['jquery'], exports: 'd3' },
+ 'colorbrewer': { deps: [], exports: 'colorbrewer' },
+ }
+});
+
+require(['jquery', 'ambrose', 'bootstrap'], function ($, Ambrose) {
+ $(document).ready(function() {
+ Ambrose.Dashboard().loadFlows();
+ });
+});
View
2  common/src/main/resources/web/js/main.js → common/src/main/resources/web/js/workflow-main.js
@@ -1,7 +1,7 @@
requirejs.config({
shim: {
- 'uri': { deps: ['jquery'], exports: 'URI' },
'bootstrap': { deps: ['jquery'], exports: 'bootstrap' },
+ 'uri': { deps: ['jquery'], exports: 'URI' },
'd3': { deps: ['jquery'], exports: 'd3' },
'colorbrewer': { deps: [], exports: 'colorbrewer' },
}
View
11 common/src/main/resources/web/workflow.html
@@ -2,13 +2,12 @@
<html lang="en">
<head>
<meta charset="utf-8">
- <title>Ambrose</title>
+ <title>Ambrose Workflow</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Data workflow visualization and monitoring">
<meta name="author" content="Twitter">
-
- <link href="css/ambrose.css" rel="stylesheet">
<link href="css/bootstrap.css" rel="stylesheet">
+ <link href="css/ambrose.css" rel="stylesheet">
<!-- HTML5 shim, for IE6-8 support of HTML5 elements -->
<!--[if lt IE 9]>
@@ -35,7 +34,9 @@
<a class="brand" href="#">Ambrose</a>
<div class="nav-collapse collapse">
<ul class="nav">
- <li><a href="/">Dashboard</a></li>
+ <li><a href="dashboard.html">Dashboard</a></li>
+ </ul>
+ <ul class="nav pull-right">
<li><a target="_blank" href="https://github.com/twitter/ambrose">About</a></li>
</ul>
</div>
@@ -68,7 +69,7 @@
urlArgs : "bust=" + new Date().getTime(),
};
</script>
- <script data-main="js/main" src="js/require-jquery.js"></script>
+ <script data-main="js/workflow-main" src="js/require-jquery.js"></script>
</body>
</html>
View
52 pig/src/main/java/com/twitter/ambrose/pig/EmbeddedAmbrosePigProgressNotificationListener.java
@@ -15,45 +15,46 @@
*/
package com.twitter.ambrose.pig;
-import com.twitter.ambrose.server.ScriptStatusServer;
-import com.twitter.ambrose.service.impl.InMemoryStatsService;
+import java.io.IOException;
+
import org.apache.pig.tools.pigstats.PigStatsUtil;
-import java.io.IOException;
+import com.twitter.ambrose.server.ScriptStatusServer;
+import com.twitter.ambrose.service.impl.InMemoryStatsService;
/**
- * Sublclass of AmbrosePigProgressNotificationListener that starts a ScriptStatusServer embedded in
+ * Subclass of AmbrosePigProgressNotificationListener that starts a ScriptStatusServer embedded in
* the running Pig client VM. Stats are collected using by this class via InMemoryStatsService,
* which is what serves stats to ScriptStatusServer.
- * <P>
+ * <p/>
* To use this class with pig, start pig as follows:
* <pre>
- * $ bin/pig \
- * -Dpig.notification.listener=com.twitter.ambrose.pig.EmbeddedAmbrosePigProgressNotificationListener \
- * -f path/to/script.pig
+ * $ pig \
+ * -Dpig.notification.listener=\
+ * com.twitter.ambrose.pig.EmbeddedAmbrosePigProgressNotificationListener \
+ * -f path/to/script.pig
+ * </pre>
+ * Additional {@code -D} options can be set as system as system properties. Note that these must be
+ * set via {@code PIG_OPTS}. For example, {@code export PIG_OPTS=-Dambrose.port=8188}.
+ * <pre>
+ * <ul>
+ * <li><code>{@value ScriptStatusServer#PORT_PARAM}</code> - Port for the ambrose server to
+ * listen on. Defaults to {@value ScriptStatusServer#PORT_DEFAULT}.</li>
+ * <li><code>{@value #POST_SCRIPT_SLEEP_SECS_PARAM}</code> - Number of seconds to keep the VM
+ * running after the script is complete.</li>
+ * </ul>
* </pre>
- * Additional <pre>-D</pre> options can be set as system as system properties. Note that these must
- * be set via <pre>PIG_OPTS</pre>. For example, <pre>export PIG_OPTS=-Dambrose.port.number=8188</pre>.
- * <ul>
- * <li><pre>ambrose.port.number</pre> (default=8080) port for the ambrose tool to listen on.</li>
- * <li><pre>ambrose.post.script.sleep.seconds</pre> number of seconds to keep the VM running after
- * the script is complete. This is useful to keep Ambrose up once the job is done.</li>
- * </ul>
- * </P>
- * @author billg
*/
public class EmbeddedAmbrosePigProgressNotificationListener
- extends AmbrosePigProgressNotificationListener {
-
+ extends AmbrosePigProgressNotificationListener {
+ private static final String POST_SCRIPT_SLEEP_SECS_PARAM = "ambrose.post.script.sleep.seconds";
private InMemoryStatsService service;
private ScriptStatusServer server;
- private static final String POST_SCRIPT_SLEEP_SECS_PARAM = "ambrose.post.script.sleep.seconds";
public EmbeddedAmbrosePigProgressNotificationListener() {
super(new InMemoryStatsService());
- this.service = (InMemoryStatsService)getStatsWriteService();
-
- this.server = new ScriptStatusServer(service);
+ this.service = (InMemoryStatsService) getStatsWriteService();
+ this.server = new ScriptStatusServer(service, service);
this.server.start();
}
@@ -76,13 +77,14 @@ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
}
log.info("Job complete but sleeping for " + sleepTimeSeconds
- + " seconds to keep the PigStats REST server running. Hit ctrl-c to exit.");
+ + " seconds to keep the PigStats REST server running. Hit ctrl-c to exit.");
service.flushJsonToDisk();
Thread.sleep(sleepTimeSeconds * 1000);
server.stop();
} catch (NumberFormatException e) {
- log.warn(POST_SCRIPT_SLEEP_SECS_PARAM + " param is not a valid number, not sleeping: " + sleepTime);
+ log.warn(POST_SCRIPT_SLEEP_SECS_PARAM + " param is not a valid number, not sleeping: " +
+ sleepTime);
} catch (IOException e) {
log.warn("Couldn't write json to disk", e);
} catch (InterruptedException e) {

No commit comments for this range

Something went wrong with that request. Please try again.