Permalink
Browse files

Fixes remaining issues with WorkflowIndexReadService impl

  • Loading branch information...
1 parent 1e55295 commit 20c2a9ca8747be21d83a4371f8f1c819b2e94271 @sagemintblue sagemintblue committed Apr 5, 2013
@@ -2,18 +2,25 @@
import java.io.IOException;
import java.util.Collection;
+import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import com.twitter.ambrose.model.DAGNode;
-import com.twitter.ambrose.model.Event;
+import com.google.common.base.Charsets;
+
import org.mortbay.jetty.HttpConnection;
import org.mortbay.jetty.Request;
import org.mortbay.jetty.handler.AbstractHandler;
+import com.twitter.ambrose.model.DAGNode;
+import com.twitter.ambrose.model.Event;
+import com.twitter.ambrose.model.Job;
+import com.twitter.ambrose.model.PaginatedList;
+import com.twitter.ambrose.model.WorkflowSummary;
import com.twitter.ambrose.service.StatsReadService;
+import com.twitter.ambrose.service.WorkflowIndexReadService;
import com.twitter.ambrose.util.JSONUtil;
/**
@@ -22,60 +29,81 @@
* @author billg
*/
public class APIHandler extends AbstractHandler {
+ private static void sendJson(HttpServletRequest request,
+ HttpServletResponse response, Object object) throws IOException {
+ JSONUtil.writeJson(response.getWriter(), object);
+ response.getWriter().close();
+ setHandled(request);
+ }
+
+ private static void setHandled(HttpServletRequest request) {
+ Request base_request = (request instanceof Request) ?
+ (Request) request : HttpConnection.getCurrentConnection().getRequest();
+ base_request.setHandled(true);
+ }
+
+ private static final String QUERY_PARAM_CLUSTER = "cluster";
+ private static final String QUERY_PARAM_USER_ID = "userId";
+ private static final String QUERY_PARAM_STATUS = "status";
+ private static final String QUERY_PARAM_START_KEY = "startKey";
private static final String QUERY_PARAM_WORKFLOW_ID = "workflowId";
private static final String QUERY_PARAM_LAST_EVENT_ID = "lastEventId";
-
private static final String MIME_TYPE_HTML = "text/html";
private static final String MIME_TYPE_JSON = "application/json";
+ private WorkflowIndexReadService workflowIndexReadService;
+ private StatsReadService<Job> statsReadService;
- private StatsReadService statsReadService;
-
- public APIHandler(StatsReadService statsReadService) {
+ public APIHandler(WorkflowIndexReadService workflowIndexReadService,
+ StatsReadService<Job> statsReadService) {
+ this.workflowIndexReadService = workflowIndexReadService;
this.statsReadService = statsReadService;
}
@Override
public void handle(String target,
- HttpServletRequest request,
- HttpServletResponse response,
- int dispatch) throws IOException, ServletException {
+ HttpServletRequest request,
+ HttpServletResponse response,
+ int dispatch) throws IOException, ServletException {
- if (target.endsWith("/dag")) {
+ if (target.endsWith("/workflows")) {
+ response.setContentType(MIME_TYPE_JSON);
+ response.setStatus(HttpServletResponse.SC_OK);
+ String cluster = request.getParameter(QUERY_PARAM_CLUSTER);
+ String userId = request.getParameter(QUERY_PARAM_USER_ID);
+ String status = request.getParameter(QUERY_PARAM_STATUS);
+ String startKey = request.getParameter(QUERY_PARAM_START_KEY);
+
+ PaginatedList<WorkflowSummary> workflows =
+ workflowIndexReadService.getWorkflows(
+ cluster,
+ status != null ? WorkflowSummary.Status.valueOf(status) : null,
+ userId, 10,
+ startKey != null ? startKey.getBytes(Charsets.UTF_8) : null);
+ sendJson(request, response, workflows);
+
+ } else if (target.endsWith("/dag")) {
response.setContentType(MIME_TYPE_JSON);
response.setStatus(HttpServletResponse.SC_OK);
- Collection<DAGNode> nodes =
- statsReadService.getDagNodeNameMap(request.getParameter(QUERY_PARAM_WORKFLOW_ID)).values();
-
+ Map<String, DAGNode<Job>> dagNodeNameMap =
+ statsReadService.getDagNodeNameMap(request.getParameter(QUERY_PARAM_WORKFLOW_ID));
+ Collection<DAGNode<Job>> nodes = dagNodeNameMap.values();
sendJson(request, response, nodes.toArray(new DAGNode[nodes.size()]));
+
} else if (target.endsWith("/events")) {
response.setContentType(MIME_TYPE_JSON);
response.setStatus(HttpServletResponse.SC_OK);
Integer sinceId = request.getParameter(QUERY_PARAM_LAST_EVENT_ID) != null ?
- Integer.parseInt(request.getParameter(QUERY_PARAM_LAST_EVENT_ID)) : -1;
+ Integer.parseInt(request.getParameter(QUERY_PARAM_LAST_EVENT_ID)) : -1;
Collection<Event> events =
- statsReadService.getEventsSinceId(request.getParameter(QUERY_PARAM_WORKFLOW_ID), sinceId);
-
+ statsReadService.getEventsSinceId(request.getParameter(QUERY_PARAM_WORKFLOW_ID), sinceId);
sendJson(request, response, events.toArray(new Event[events.size()]));
- }
- else if (target.endsWith(".html")) {
+
+ } else if (target.endsWith(".html")) {
response.setContentType(MIME_TYPE_HTML);
// this is because the next handler will be picked up here and it doesn't seem to
// handle html well. This is jank.
}
}
-
- private static void sendJson(HttpServletRequest request,
- HttpServletResponse response, Object object) throws IOException {
- JSONUtil.writeJson(response.getWriter(), object);
- response.getWriter().close();
- setHandled(request);
- }
-
- private static void setHandled(HttpServletRequest request) {
- Request base_request = (request instanceof Request) ?
- (Request)request : HttpConnection.getCurrentConnection().getRequest();
- base_request.setHandled(true);
- }
}
@@ -28,24 +28,39 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.twitter.ambrose.model.Job;
import com.twitter.ambrose.service.StatsReadService;
+import com.twitter.ambrose.service.WorkflowIndexReadService;
/**
* Light weight application server that serves both the JSON API and the Ambrose web pages powered
- * from the JSON. The port defaults to {@value #PORT_DEFAULT} but can be overridden with the
- * {@value #PORT_PARAM} system property. For a random port to be used, set {@value #PORT_PARAM} to
- * zero or {@value #PORT_RANDOM}.
+ * from the JSON. The port defaults to {@value #PORT_DEFAULT} but can be overridden with the {@value
+ * #PORT_PARAM} system property. For a random port to be used, set {@value #PORT_PARAM} to zero or
+ * {@value #PORT_RANDOM}.
* <p/>
* The JSON API supports the following URIs:
- * <ul>
- * <li><code>/jobs</code> returns the workflow jobs.</li>
- * <li><code>/events</code> returns all events since the start of the workflow. Optionally the
- * <code>sinceEventId</code> query parameter can be used to return only events after a given event.</li>
- * </ul>
- *
- * @author billg
+ * <pre>
+ * <ul>
+ * <li><code>/workflows</code> - Returns workflow summaries.</li>
+ * <li><code>/jobs</code> - Returns a workflow's jobs.</li>
+ * <li><code>/events</code> - Returns all workflow events.</li>
+ * </ul>
+ * </pre>
*/
public class ScriptStatusServer implements Runnable {
+ private static int getConfiguredPort() {
+ String port = System.getProperty(PORT_PARAM, PORT_DEFAULT);
+ if (PORT_RANDOM.equalsIgnoreCase(port)) {
+ port = "0";
+ }
+ try {
+ return Integer.parseInt(port);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(String.format(
+ "Parameter '%s' value '%s' is not a valid port number", PORT_PARAM, port), e);
+ }
+ }
+
/**
* Name of system property used to configure port on which to bind HTTP server.
*/
@@ -61,33 +76,22 @@
private static final Logger LOG = LoggerFactory.getLogger(ScriptStatusServer.class);
private static final String SLASH = "/";
private static final String ROOT_PATH = "web";
-
- private static int getConfiguredPort() {
- String port = System.getProperty(PORT_PARAM, PORT_DEFAULT);
- if (PORT_RANDOM.equalsIgnoreCase(port)) {
- port = "0";
- }
- try {
- return Integer.parseInt(port);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException(String.format(
- "Parameter '%s' value '%s' is not a valid port number", PORT_PARAM, port), e);
- }
- }
-
- private final StatsReadService statsReadService;
+ private final WorkflowIndexReadService workflowIndexReadService;
+ private final StatsReadService<Job> statsReadService;
private final int port;
private Server server;
private Thread serverThread;
- public ScriptStatusServer(StatsReadService statsReadService) {
+ public ScriptStatusServer(WorkflowIndexReadService workflowIndexReadService,
+ StatsReadService<Job> statsReadService) {
+ this.workflowIndexReadService = workflowIndexReadService;
this.statsReadService = statsReadService;
this.port = getConfiguredPort();
}
public int getPort() {
return port;
- };
+ }
/**
* Starts the server in it's own daemon thread.
@@ -111,7 +115,8 @@ public void run() {
// override newServerSocket to log local port once bound
Connector connector = new SocketConnector() {
@Override
- protected ServerSocket newServerSocket(String host, int port, int backlog) throws IOException {
+ protected ServerSocket newServerSocket(String host, int port, int backlog)
+ throws IOException {
ServerSocket ss = super.newServerSocket(host, port, backlog);
int localPort = ss.getLocalPort();
LOG.info("Ambrose web server listening on port {}", localPort);
@@ -121,13 +126,15 @@ protected ServerSocket newServerSocket(String host, int port, int backlog) throw
};
connector.setPort(port);
server = new Server();
- server.setConnectors(new Connector[] { connector });
+ server.setConnectors(new Connector[]{connector});
// this needs to be loaded via the jar'ed resources, not the relative dir
URL resourcesUrl = this.getClass().getClassLoader().getResource(ROOT_PATH);
HandlerList handler = new HandlerList();
- handler.setHandlers(new Handler[] { new APIHandler(statsReadService),
- new WebAppContext(resourcesUrl.toExternalForm(), SLASH) });
+ handler.setHandlers(new Handler[]{
+ new APIHandler(workflowIndexReadService, statsReadService),
+ new WebAppContext(resourcesUrl.toExternalForm(), SLASH)
+ });
server.setHandler(handler);
server.setStopAtShutdown(false);
@@ -67,6 +67,7 @@
System.getProperty("user.name", "unknown"), "unknown", null, 0);
private final PaginatedList<WorkflowSummary> summaries =
new PaginatedList<WorkflowSummary>(ImmutableList.of(summary));
+ private boolean jobFailed = false;
private Map<String, DAGNode<Job>> dagNodeNameMap = Maps.newHashMap();
private SortedMap<Integer, Event> eventMap = new ConcurrentSkipListMap<Integer, Event>();
private Writer workflowWriter;
@@ -107,11 +108,23 @@ public synchronized void sendDagNodeNameMap(String workflowId,
@Override
public synchronized void pushEvent(String workflowId, Event event) throws IOException {
eventMap.put(event.getId(), event);
- if (event.getType() == Event.Type.WORKFLOW_PROGRESS) {
- Event.WorkflowProgressEvent workflowProgressEvent = (Event.WorkflowProgressEvent) event;
- String progressString =
- workflowProgressEvent.getPayload().get(Event.WorkflowProgressField.workflowProgress);
- summary.setProgress(Integer.parseInt(progressString));
+ switch (event.getType()) {
+ case WORKFLOW_PROGRESS:
+ Event.WorkflowProgressEvent workflowProgressEvent = (Event.WorkflowProgressEvent) event;
+ String progressString =
+ workflowProgressEvent.getPayload().get(Event.WorkflowProgressField.workflowProgress);
+ int progress = Integer.parseInt(progressString);
+ summary.setProgress(progress);
+ if (progress == 100) {
+ summary.setStatus(jobFailed
+ ? WorkflowSummary.Status.FAILED
+ : WorkflowSummary.Status.SUCCEEDED);
+ }
+ break;
+ case JOB_FAILED:
+ jobFailed = true;
+ default:
+ // nothing
}
writeJsonEventToDisk(event);
}
@@ -1,10 +1,18 @@
/* Page dimensions */
body { position: relative; padding-top: 40px; }
+@media (max-width: 979px) { body { padding-top: 0px; } }
#diagrams > div { height: 350px; margin: 0; }
#ambrose-view-progress-bar {}
#ambrose-view-chord {}
#ambrose-view-graph {}
+/* Typography */
+h2 { margin: 1em 0px; }
+
+/* Workflows table */
+.ambrose-dashboard .progress,
+.ambrose-dashboard .bar { margin: 0px; }
+
/* Progress Bar view */
.ambrose-view-progress-bar { position: relative; margin: 20px 0px; }
.ambrose-view-progress-bar .number { position: absolute; top: 0px; right: 10px; font-size: 50px; opacity: 0.1; }
Oops, something went wrong.

0 comments on commit 20c2a9c

Please sign in to comment.