diff --git a/common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java b/common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java index cf30b89..5a32991 100644 --- a/common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java +++ b/common/src/main/java/com/twitter/ambrose/service/impl/InMemoryStatsService.java @@ -64,6 +64,7 @@ public class InMemoryStatsService implements StatsReadService, StatsWriteService private Writer dagWriter = null; private Writer eventsWriter = null; + private boolean eventWritten = false; public InMemoryStatsService() { String dumpDagFileName = System.getProperty(DUMP_DAG_FILE_PARAM); @@ -87,8 +88,10 @@ public InMemoryStatsService() { } @Override - public synchronized void sendDagNodeNameMap(String workflowId, Map> dagNodeNameMap) { + public synchronized void sendDagNodeNameMap(String workflowId, + Map> dagNodeNameMap) throws IOException { this.dagNodeNameMap = dagNodeNameMap; + writeJsonDagNodenameMapToDisk(dagNodeNameMap); } @Override @@ -105,21 +108,33 @@ public synchronized Collection getEventsSinceId(String workflowId } @Override - public synchronized void pushEvent(String workflowId, WorkflowEvent event) { + public synchronized void pushEvent(String workflowId, WorkflowEvent event) throws IOException { eventMap.put(event.getEventId(), event); + writeJsonEventToDisk(event); } - public void writeJsonToDisk() throws IOException { - + private void writeJsonDagNodenameMapToDisk(Map> dagNodeNameMap) throws IOException { if (dagWriter != null && dagNodeNameMap != null) { - Collection> nodes = getDagNodeNameMap(null).values(); + Collection> nodes = dagNodeNameMap.values(); JSONUtil.writeJson(dagWriter, nodes.toArray(new DAGNode[dagNodeNameMap.size()])); - dagWriter.close(); } + } + + private void writeJsonEventToDisk(WorkflowEvent event) throws IOException { + if (eventsWriter != null && event != null) { + eventsWriter.append(!eventWritten ? "[ " : ", "); + JSONUtil.writeJson(eventsWriter, event); + eventsWriter.flush(); + eventWritten = true; + } + } + + public void flushJsonToDisk() throws IOException { + + if (dagWriter != null) { dagWriter.close(); } - if (eventsWriter != null && eventMap != null) { - Collection events = getEventsSinceId(null, -1); - JSONUtil.writeJson(eventsWriter, events.toArray(new WorkflowEvent[events.size()])); + if (eventsWriter != null) { + if (eventWritten) { eventsWriter.append("]\n"); } eventsWriter.close(); } } diff --git a/pig/src/main/java/com/twitter/ambrose/pig/EmbeddedAmbrosePigProgressNotificationListener.java b/pig/src/main/java/com/twitter/ambrose/pig/EmbeddedAmbrosePigProgressNotificationListener.java index 8a6faf2..4c8e79e 100644 --- a/pig/src/main/java/com/twitter/ambrose/pig/EmbeddedAmbrosePigProgressNotificationListener.java +++ b/pig/src/main/java/com/twitter/ambrose/pig/EmbeddedAmbrosePigProgressNotificationListener.java @@ -77,7 +77,7 @@ public void launchCompletedNotification(String scriptId, int numJobsSucceeded) { log.info("Job complete but sleeping for " + sleepTimeSeconds + " seconds to keep the PigStats REST server running. Hit ctrl-c to exit."); - service.writeJsonToDisk(); + service.flushJsonToDisk(); Thread.sleep(sleepTimeSeconds * 1000); server.stop();