This repository has been archived by the owner on Jan 23, 2020. It is now read-only.
/
InMemoryStatsService.java
141 lines (118 loc) · 4.99 KB
/
InMemoryStatsService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
Copyright 2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.ambrose.service.impl;
import com.twitter.ambrose.model.Job;
import com.twitter.ambrose.service.DAGNode;
import com.twitter.ambrose.service.StatsReadService;
import com.twitter.ambrose.service.StatsWriteService;
import com.twitter.ambrose.service.WorkflowEvent;
import com.twitter.ambrose.util.JSONUtil;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* In-memory implementation of both StatsReadService and StatsWriteService. Used when stats
* collection and stats serving are happening within the same VM. This class is intended to run in
* a VM that only handles a single workflow. Hence it ignores workflowId.
* <P>
* Upon job completion this class can optionally write all json data to disk. This is useful for
* debugging. The written files can also be replayed in the Ambrose UI without re-running the Job
* via the <pre>bin/demo</pre> script. To write all json data to disk, set the following values
* as system properties using <pre>-D</pre>:
* <ul>
* <li><pre>ambrose.write.dag.file</pre> file to write the dag data to</li>
* <li><pre>ambrose.write.events.file</pre> file to write the events data to</li>
* </ul>
* </P>
*
* @author billg
*/
public class InMemoryStatsService implements StatsReadService, StatsWriteService<Job> {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryStatsService.class);
private static final String DUMP_DAG_FILE_PARAM = "ambrose.write.dag.file";
private static final String DUMP_EVENTS_FILE_PARAM = "ambrose.write.events.file";
private Map<String, DAGNode<Job>> dagNodeNameMap = new HashMap<String, DAGNode<Job>>();
private SortedMap<Integer, WorkflowEvent> eventMap =
new ConcurrentSkipListMap<Integer, WorkflowEvent>();
private Writer dagWriter = null;
private Writer eventsWriter = null;
private boolean eventWritten = false;
public InMemoryStatsService() {
String dumpDagFileName = System.getProperty(DUMP_DAG_FILE_PARAM);
String dumpEventsFileName = System.getProperty(DUMP_EVENTS_FILE_PARAM);
if (dumpDagFileName != null) {
try {
dagWriter = new PrintWriter(dumpDagFileName);
} catch (FileNotFoundException e) {
LOG.error("Could not create dag PrintWriter at " + dumpDagFileName, e);
}
}
if (dumpEventsFileName != null) {
try {
eventsWriter = new PrintWriter(dumpEventsFileName);
} catch (FileNotFoundException e) {
LOG.error("Could not create events PrintWriter at " + dumpEventsFileName, e);
}
}
}
@Override
public synchronized void sendDagNodeNameMap(String workflowId,
Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
this.dagNodeNameMap = dagNodeNameMap;
writeJsonDagNodenameMapToDisk(dagNodeNameMap);
}
@Override
public synchronized Map<String, DAGNode<Job>> getDagNodeNameMap(String workflowId) {
return dagNodeNameMap;
}
@Override
public synchronized Collection<WorkflowEvent> getEventsSinceId(String workflowId, int sinceId) {
int minId = sinceId >= 0 ? sinceId + 1 : sinceId;
SortedMap<Integer, WorkflowEvent> tailMap = eventMap.tailMap(minId);
return tailMap.values();
}
@Override
public synchronized void pushEvent(String workflowId, WorkflowEvent event) throws IOException {
eventMap.put(event.getEventId(), event);
writeJsonEventToDisk(event);
}
private void writeJsonDagNodenameMapToDisk(Map<String, DAGNode<Job>> dagNodeNameMap) throws IOException {
if (dagWriter != null && dagNodeNameMap != null) {
Collection<DAGNode<Job>> nodes = dagNodeNameMap.values();
JSONUtil.writeJson(dagWriter, nodes.toArray(new DAGNode[dagNodeNameMap.size()]));
}
}
private void writeJsonEventToDisk(WorkflowEvent event) throws IOException {
if (eventsWriter != null && event != null) {
eventsWriter.append(!eventWritten ? "[ " : ", ");
JSONUtil.writeJson(eventsWriter, event);
eventsWriter.flush();
eventWritten = true;
}
}
public void flushJsonToDisk() throws IOException {
if (dagWriter != null) { dagWriter.close(); }
if (eventsWriter != null) {
if (eventWritten) { eventsWriter.append("]\n"); }
eventsWriter.close();
}
}
}