Permalink
Browse files

Changing so writing json to disk happens as the job is running, not o…

…nce it's complete.
  • Loading branch information...
Bill Graham Bill Graham
Bill Graham authored and Bill Graham committed Apr 3, 2013
1 parent 99e2ee3 commit 939cb6b77602ac43fbbb20f4617b4d602a2396b0
@@ -64,6 +64,7 @@
private Writer dagWriter = null;
private Writer eventsWriter = null;
+ private boolean eventWritten = false;
public InMemoryStatsService() {
String dumpDagFileName = System.getProperty(DUMP_DAG_FILE_PARAM);
@@ -87,8 +88,10 @@ public InMemoryStatsService() {
}
@Override
- public synchronized void sendDagNodeNameMap(String workflowId, Map<String, DAGNode<Job>> dagNodeNameMap) {
+ public synchronized void sendDagNodeNameMap(String workflowId,
+ Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
this.dagNodeNameMap = dagNodeNameMap;
+ writeJsonDagNodenameMapToDisk(dagNodeNameMap);
}
@Override
@@ -105,21 +108,33 @@ public synchronized void sendDagNodeNameMap(String workflowId, Map<String, DAGNo
}
@Override
- public synchronized void pushEvent(String workflowId, WorkflowEvent event) {
+ public synchronized void pushEvent(String workflowId, WorkflowEvent event) throws IOException {
eventMap.put(event.getEventId(), event);
+ writeJsonEventToDisk(event);
}
- public void writeJsonToDisk() throws IOException {
-
+ private void writeJsonDagNodenameMapToDisk(Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
if (dagWriter != null && dagNodeNameMap != null) {
- Collection<DAGNode<Job>> nodes = getDagNodeNameMap(null).values();
+ Collection<DAGNode<Job>> nodes = dagNodeNameMap.values();
JSONUtil.writeJson(dagWriter, nodes.toArray(new DAGNode[dagNodeNameMap.size()]));
- dagWriter.close();
}
+ }
+
+ private void writeJsonEventToDisk(WorkflowEvent event) throws IOException {
+ if (eventsWriter != null && event != null) {
+ eventsWriter.append(!eventWritten ? "[ " : ", ");
+ JSONUtil.writeJson(eventsWriter, event);
+ eventsWriter.flush();
+ eventWritten = true;
+ }
+ }
+
+ public void flushJsonToDisk() throws IOException {
+
+ if (dagWriter != null) { dagWriter.close(); }
- if (eventsWriter != null && eventMap != null) {
- Collection<WorkflowEvent> events = getEventsSinceId(null, -1);
- JSONUtil.writeJson(eventsWriter, events.toArray(new WorkflowEvent[events.size()]));
+ if (eventsWriter != null) {
+ if (eventWritten) { eventsWriter.append("\n]"); }
eventsWriter.close();
}
}
@@ -77,7 +77,7 @@ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) {
log.info("Job complete but sleeping for " + sleepTimeSeconds
+ " seconds to keep the PigStats REST server running. Hit ctrl-c to exit.");
- service.writeJsonToDisk();
+ service.flushJsonToDisk();
Thread.sleep(sleepTimeSeconds * 1000);
server.stop();

0 comments on commit 939cb6b

Please sign in to comment.