/
TopologyContext.java
222 lines (200 loc) · 7.43 KB
/
TopologyContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package backtype.storm.task;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.hooks.ITaskHook;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.NotImplementedException;
/**
* A TopologyContext is given to bolts and spouts in their "prepare" and "open"
* methods, respectively. This object provides information about the component's
* place within the topology, such as task ids, inputs and outputs, etc.
*
* <p>The TopologyContext is also used to declare ISubscribedState objects to
* synchronize state with StateSpouts this object is subscribed to.</p>
*/
public class TopologyContext extends GeneralTopologyContext {
private Integer _taskId;
private String _codeDir;
private String _pidDir;
private Object _taskData = null;
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
private Integer _workerPort;
private List<Integer> _workerTasks;
public TopologyContext(StormTopology topology, Map stormConf,
Map<Integer, String> taskToComponent, String stormId,
String codeDir, String pidDir, Integer taskId,
Integer workerPort, List<Integer> workerTasks) {
super(topology, stormConf, taskToComponent, stormId);
_workerPort = workerPort;
_taskId = taskId;
try {
_pidDir = new File(pidDir).getCanonicalPath();
} catch (IOException e) {
throw new RuntimeException("Could not get canonical path for " + _pidDir, e);
}
_codeDir = codeDir;
_workerTasks = new ArrayList<Integer>(workerTasks);
Collections.sort(_workerTasks);
}
/**
* All state from all subscribed state spouts streams will be synced with
* the provided object.
*
* <p>It is recommended that your ISubscribedState object is kept as an instance
* variable of this object. The recommended usage of this method is as follows:</p>
*
* <p>
* _myState = context.setAllSubscribedState(new MyState());
* </p>
* @param obj Provided ISubscribedState implementation
* @return Returns the ISubscribedState object provided
*/
public <T extends ISubscribedState> T setAllSubscribedState(T obj) {
//check that only subscribed to one component/stream for statespout
//setsubscribedstate appropriately
throw new NotImplementedException();
}
/**
* Synchronizes the default stream from the specified state spout component
* id with the provided ISubscribedState object.
*
* <p>The recommended usage of this method is as follows:</p>
* <p>
* _myState = context.setSubscribedState(componentId, new MyState());
* </p>
*
* @param componentId the id of the StateSpout component to subscribe to
* @param obj Provided ISubscribedState implementation
* @return Returns the ISubscribedState object provided
*/
public <T extends ISubscribedState> T setSubscribedState(String componentId, T obj) {
return setSubscribedState(componentId, Utils.DEFAULT_STREAM_ID, obj);
}
/**
* Synchronizes the specified stream from the specified state spout component
* id with the provided ISubscribedState object.
*
* <p>The recommended usage of this method is as follows:</p>
* <p>
* _myState = context.setSubscribedState(componentId, streamId, new MyState());
* </p>
*
* @param componentId the id of the StateSpout component to subscribe to
* @param streamId the stream to subscribe to
* @param obj Provided ISubscribedState implementation
* @return Returns the ISubscribedState object provided
*/
public <T extends ISubscribedState> T setSubscribedState(String componentId, String streamId, T obj) {
throw new NotImplementedException();
}
/**
* Gets the task id of this task.
*
* @return the task id
*/
public int getThisTaskId() {
return _taskId;
}
/**
* Gets the component id for this task. The component id maps
* to a component id specified for a Spout or Bolt in the topology definition.
* @return
*/
public String getThisComponentId() {
return getComponentId(_taskId);
}
/**
* Gets all the task ids that are running in this worker process
* (including the task for this task).
*/
public List<Integer> getThisWorkerTasks() {
return _workerTasks;
}
/**
* Gets the declared output fields for the specified stream id for the component
* this task is a part of.
*/
public Fields getThisOutputFields(String streamId) {
return getComponentOutputFields(getThisComponentId(), streamId);
}
/**
* Gets the set of streams declared for the component of this task.
*/
public Set<String> getThisStreams() {
return getComponentStreams(getThisComponentId());
}
/**
* Gets the index of this task id in getComponentTasks(getThisComponentId()).
* An example use case for this method is determining which task
* accesses which resource in a distributed resource to ensure an even distribution.
*/
public int getThisTaskIndex() {
List<Integer> tasks = new ArrayList<Integer>(getComponentTasks(getThisComponentId()));
Collections.sort(tasks);
for(int i=0; i<tasks.size(); i++) {
if(tasks.get(i) == getThisTaskId()) {
return i;
}
}
throw new RuntimeException("Fatal: could not find this task id in this component");
}
/**
* Gets the declared inputs to this component.
*
* @return A map from subscribed component/stream to the grouping subscribed with.
*/
public Map<GlobalStreamId, Grouping> getThisSources() {
return getSources(getThisComponentId());
}
/**
* Gets information about who is consuming the outputs of this component, and how.
*
* @return Map from stream id to component id to the Grouping used.
*/
public Map<String, Map<String, Grouping>> getThisTargets() {
return getTargets(getThisComponentId());
}
/**
* Gets the location of the external resources for this worker on the
* local filesystem. These external resources typically include bolts implemented
* in other languages, such as Ruby or Python.
*/
public String getCodeDir() {
return _codeDir;
}
/**
* If this task spawns any subprocesses, those subprocesses must immediately
* write their PID to this directory on the local filesystem to ensure that
* Storm properly destroys that process when the worker is shutdown.
*/
public String getPIDDir() {
return _pidDir;
}
public void setTaskData(Object data) {
_taskData = data;
}
public Object getTaskData() {
return _taskData;
}
public Integer getThisWorkerPort() {
return _workerPort;
}
public void addTaskHook(ITaskHook hook) {
hook.prepare(_stormConf, this);
_hooks.add(hook);
}
public Collection<ITaskHook> getHooks() {
return _hooks;
}
}