Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'model_refactor' of github.com:twitter/ambrose into hack…

…week
  • Loading branch information...
commit d6d1ee5751a31bb0899ad4e2731716b53310a94b 2 parents 756b6e5 + 5522b5e
@sagemintblue sagemintblue authored
View
33 common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java
@@ -64,6 +64,7 @@
private Writer dagWriter = null;
private Writer eventsWriter = null;
+ private boolean eventWritten = false;
public InMemoryStatsService() {
String dumpDagFileName = System.getProperty(DUMP_DAG_FILE_PARAM);
@@ -87,8 +88,10 @@ public InMemoryStatsService() {
}
@Override
- public synchronized void sendDagNodeNameMap(String workflowId, Map<String, DAGNode<Job>> dagNodeNameMap) {
+ public synchronized void sendDagNodeNameMap(String workflowId,
+ Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
this.dagNodeNameMap = dagNodeNameMap;
+ writeJsonDagNodenameMapToDisk(dagNodeNameMap);
}
@Override
@@ -105,21 +108,33 @@ public synchronized void sendDagNodeNameMap(String workflowId, Map<String, DAGNo
}
@Override
- public synchronized void pushEvent(String workflowId, WorkflowEvent event) {
+ public synchronized void pushEvent(String workflowId, WorkflowEvent event) throws IOException {
eventMap.put(event.getEventId(), event);
+ writeJsonEventToDisk(event);
}
- public void writeJsonToDisk() throws IOException {
-
+ private void writeJsonDagNodenameMapToDisk(Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
if (dagWriter != null && dagNodeNameMap != null) {
- Collection<DAGNode<Job>> nodes = getDagNodeNameMap(null).values();
+ Collection<DAGNode<Job>> nodes = dagNodeNameMap.values();
JSONUtil.writeJson(dagWriter, nodes.toArray(new DAGNode[dagNodeNameMap.size()]));
- dagWriter.close();
}
+ }
+
+ private void writeJsonEventToDisk(WorkflowEvent event) throws IOException {
+ if (eventsWriter != null && event != null) {
+ eventsWriter.append(!eventWritten ? "[ " : ", ");
+ JSONUtil.writeJson(eventsWriter, event);
+ eventsWriter.flush();
+ eventWritten = true;
+ }
+ }
+
+ public void flushJsonToDisk() throws IOException {
+
+ if (dagWriter != null) { dagWriter.close(); }
- if (eventsWriter != null && eventMap != null) {
- Collection<WorkflowEvent> events = getEventsSinceId(null, -1);
- JSONUtil.writeJson(eventsWriter, events.toArray(new WorkflowEvent[events.size()]));
+ if (eventsWriter != null) {
+ if (eventWritten) { eventsWriter.append("]\n"); }
eventsWriter.close();
}
}
View
2  pig/src/main/java/com/twitter/ambrose/pig/EmbeddedAmbrosePigProgressNotificationListener.java
@@ -77,7 +77,7 @@ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
log.info("Job complete but sleeping for " + sleepTimeSeconds
+ " seconds to keep the PigStats REST server running. Hit ctrl-c to exit.");
- service.writeJsonToDisk();
+ service.flushJsonToDisk();
Thread.sleep(sleepTimeSeconds * 1000);
server.stop();
Please sign in to comment.
Something went wrong with that request. Please try again.