diff --git a/streamparse/bolt.py b/streamparse/bolt.py index 637cbfca..83341205 100644 --- a/streamparse/bolt.py +++ b/streamparse/bolt.py @@ -10,9 +10,6 @@ from ipc import read_handshake, read_tuple, send_message, json, _stdout, Tuple -_ANCHOR_TUPLE = None - - class Bolt(Component): """The base class for all streamparse bolts. For more information on bolts, consult Storm's `Concepts documentation `_. @@ -62,8 +59,6 @@ def emit(self, tup, stream=None, anchors=[], direct_task=None): if not isinstance(tup, list): raise TypeError('All tuples must be lists, received {!r} instead' .format(type(tup))) - if _ANCHOR_TUPLE is not None: - anchors = [_ANCHOR_TUPLE] msg = {'command': 'emit', 'tuple': tup} if stream is not None: msg['stream'] = stream @@ -90,8 +85,6 @@ def emit_many(self, tuples, stream=None, anchors=[], direct_task=None): :param direct_task: indicates the task to send the tuple to. :type direct_task: int """ - if _ANCHOR_TUPLE is not None: - anchors = [_ANCHOR_TUPLE] msg = { 'command': 'emit', 'anchors': [a.id for a in anchors], @@ -147,19 +140,24 @@ class BasicBolt(Bolt): """A bolt implementation that automatically acknowledges all tuples after :func:`process` completes.""" + def emit(self, tup, stream=None, anchors=[], direct_task=None): + """Override to anchor to the current tuple if no anchors are specified""" + anchors = anchors of self.__current_tup + super(BasicBolt, self).emit( + tup, stream=stream, anchors=anchors, direct_task=direct_task + ) + def run(self): - global _ANCHOR_TUPLE storm_conf, context = read_handshake() - tup = None + self.__current_tup = None # used for auto-anchoring try: self.initialize(storm_conf, context) while True: - tup = read_tuple() - _ANCHOR_TUPLE = tup - self.process(tup) - self.ack(tup) + self.__current_tup = read_tuple() + self.process(self.__current_tup) + self.ack(self.__current_tup) except Exception as e: - self.raise_exception(e, tup) + self.raise_exception(e, self.__current_tup) class BatchingBolt(Bolt):