/
bolt.py
246 lines (208 loc) · 9.14 KB
/
bolt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""Base Bolt classes."""
from collections import defaultdict
import os
import signal
import sys
import threading
import time
from base import Component
from ipc import read_handshake, read_tuple, send_message, json, _stdout, Tuple
class Bolt(Component):
"""The base class for all streamparse bolts. For more information on bolts,
consult Storm's `Concepts documentation <http://storm.incubator.apache.org/documentation/Concepts.html>`_.
"""
def initialize(self, storm_conf, context):
"""Called immediately after the initial handshake with Storm and before
the main run loop. A good place to initialize connections to data
sources.
:param storm_conf: the Storm configuration for this Bolt. This is the
configuration provided to the topology, merged in
with cluster configuration on the worker node.
:type storm_conf: dict
:param context: information about the component's place within the
topology such as: task IDs, inputs, outputs etc.
:type context: dict
"""
pass
def process(self, tup):
"""Process a single tuple :class:`Tuple` of input, should be
overridden by subclasses. :class:`Tuple` objects contain metadata
about which component, stream and task it came from. The actual values
of the tuple can be accessed by calling ``tup.values``.
:param tup: the tuple to be processed.
:type tup: Tuple
"""
raise NotImplementedError()
def emit(self, tup, stream=None, anchors=[], direct_task=None):
"""Emit a new tuple to a stream.
:param tup: the Tuple payload to send to Storm, should contain only
JSON-serializable data.
:type tup: list
:param stream: the ID of the stream to emit this tuple to. Specify
``None`` to emit to default stream.
:type stream: str
:param anchors: IDs of the tuples the emitted tuple should be anchored
to.
:type anchors: list
:param direct_task: the task to send the tuple to.
:type direct_task: int
"""
if not isinstance(tup, list):
raise TypeError('All tuples must be lists, received {!r} instead'
.format(type(tup)))
msg = {'command': 'emit', 'tuple': tup}
if stream is not None:
msg['stream'] = stream
msg['anchors'] = [x.id for x in anchors]
if direct_task is not None:
msg['task'] = direct_task
send_message(msg)
def emit_many(self, tuples, stream=None, anchors=[], direct_task=None):
"""A more efficient way to send many tuples, dumps out all tuples to
STDOUT instead of writing one at a time.
:param tuples: a ``list``s containing ``list``s of tuple payload data
to send to Storm. All tuples should contain only
JSON-serializable data.
:type tuples: list
:param stream: the ID of the steram to emit these tuples to. Specify
``None`` to emit to default stream.
:type stream: str
:param anchors: IDs the tuples which the emitted tuples should be
anchored to.
:type anchors: list
:param direct_task: indicates the task to send the tuple to.
:type direct_task: int
"""
msg = {
'command': 'emit',
'anchors': [a.id for a in anchors],
}
if stream is not None:
msg['stream'] = stream
if direct_task is not None:
msg['task'] = direct_task
lines = []
for tup in tuples:
msg['tuple'] = tup
lines.append(json.dumps(msg))
_stdout.write("{}\nend\n".format("\nend\n".join(lines)))
_stdout.flush()
def ack(self, tup):
"""Indicate that processing of a tuple has succeeded.
:param tup: the tuple to acknowledge.
:type tup: str or Tuple
"""
tup_id = tup.id if isinstance(tup, Tuple) else tup
send_message({'command': 'ack', 'id': tup_id})
def fail(self, tup):
"""Indicate that processing of a tuple has failed.
:param tup: the tuple to fail.
:type tup: str or Tuple
"""
tup_id = tup.id if isinstance(tup, Tuple) else tup
send_message({'command': 'fail', 'id': tup_id})
def run(self):
"""Main run loop for all bolts. Performs initial handshake with Storm
and reads tuples handing them off to subclasses. Any exceptions are
caught and logged back to Storm prior to the Python process exits.
Subclasses should not override this method.
"""
storm_conf, context = read_handshake()
tup = None
try:
self.initialize(storm_conf, context)
while True:
tup = read_tuple()
self.process(tup)
except Exception as e:
self.raise_exception(e, tup)
class BasicBolt(Bolt):
"""A bolt implementation that automatically acknowledges all tuples after
:func:`process` completes."""
def emit(self, tup, stream=None, anchors=[], direct_task=None):
"""Override to anchor to the current tuple if no anchors are specified"""
anchors = anchors or [self.__current_tup]
super(BasicBolt, self).emit(
tup, stream=stream, anchors=anchors, direct_task=direct_task
)
def run(self):
storm_conf, context = read_handshake()
self.__current_tup = None # used for auto-anchoring
try:
self.initialize(storm_conf, context)
while True:
self.__current_tup = read_tuple()
self.process(self.__current_tup)
self.ack(self.__current_tup)
except Exception as e:
self.raise_exception(e, self.__current_tup)
class BatchingBolt(Bolt):
"""A bolt which batches tuples for processing.
Batching tuples is unexpectedly complex to do correctly. The main problem
is that all bolts are single-threaded. The difficult comes when the topology
is shutting down because Storm stops feeding the bolt tuples. If the bolt
is blocked waiting on stdin, then it can't process any waiting tuples,
or even ack ones that were asynchronously written to a data store.
This bolt helps with that grouping tuples based on a time interval and then
processing them on a worker thread. The bolt also handles ack'ing tuples
after processing has finished, much like the BasicBolt.
To use this class, you must implement ``process_batch``. ``group_key`` can
be optionally implemented so that tuples are grouped before
``process_batch`` is even called.
"""
SECS_BETWEEN_BATCHES = 2
def __init__(self):
self.exc_info = None
signal.signal(signal.SIGINT, self._handle_worker_exception)
self._batch = defaultdict(list)
self._should_stop = threading.Event()
self._batcher = threading.Thread(target=self._batcher)
self._batch_lock = threading.Lock()
self._batcher.daemon = True
self._batcher.start()
def process(self, tup):
"""Add a tuple a specific batch by group key. Do not override this
method in subclasses.
"""
with self._batch_lock:
group_key = self.group_key(tup)
self._batch[group_key].append(tup)
def group_key(self, tup):
"""Return the group key used to group tuples within a batch.
By default, returns None, which put all tuples in a single
batch. Override this function to enable batching.
:param tup: the tuple used to extract a group key
:type tup: Tuple
:returns: Any ``hashable`` value (will be used in a ``dict``).
"""
return None
def process_batch(self, key, tups):
"""Process a batch of tuples. Should be overridden by subclasses.
:param key: the group key for the list of batches.
:type key: hashable
:param tups: a `list` of :class:`ipc.Tuple`s for the group.
:type tups: list
"""
raise NotImplementedError()
def _batcher(self):
"""Entry point for the batcher thread."""
try:
while True:
time.sleep(self.SECS_BETWEEN_BATCHES)
with self._batch_lock:
if not self._batch:
# No tuples to save
continue
for key, tups in self._batch.iteritems():
self.process_batch(key, tups)
[self.ack(tup) for tup in tups]
self._batch = defaultdict(list)
except Exception:
self.exc_info = sys.exc_info()
os.kill(os.getpid(), signal.SIGINT) # interrupt stdin waiting
def _handle_worker_exception(self, signum, frame):
"""Handle an exception raised in the worker thread.
Exceptions in the _batcher thread will send a SIGINT to the main
thread which we catch here, and then raise in the main thread.
"""
raise self.exc_info[1], None, self.exc_info[2]