Skip to content

Commit

Permalink
Don't use globals like this.
Browse files Browse the repository at this point in the history
  • Loading branch information
kbourgoin committed Jun 17, 2014
1 parent a7d1c2d commit 2927a14
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions streamparse/bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
from ipc import read_handshake, read_tuple, send_message, json, _stdout, Tuple


_ANCHOR_TUPLE = None


class Bolt(Component):
"""The base class for all streamparse bolts. For more information on bolts,
consult Storm's `Concepts documentation <http://storm.incubator.apache.org/documentation/Concepts.html>`_.
Expand Down Expand Up @@ -62,8 +59,6 @@ def emit(self, tup, stream=None, anchors=[], direct_task=None):
if not isinstance(tup, list):
raise TypeError('All tuples must be lists, received {!r} instead'
.format(type(tup)))
if _ANCHOR_TUPLE is not None:
anchors = [_ANCHOR_TUPLE]
msg = {'command': 'emit', 'tuple': tup}
if stream is not None:
msg['stream'] = stream
Expand All @@ -90,8 +85,6 @@ def emit_many(self, tuples, stream=None, anchors=[], direct_task=None):
:param direct_task: indicates the task to send the tuple to.
:type direct_task: int
"""
if _ANCHOR_TUPLE is not None:
anchors = [_ANCHOR_TUPLE]
msg = {
'command': 'emit',
'anchors': [a.id for a in anchors],
Expand Down Expand Up @@ -147,19 +140,24 @@ class BasicBolt(Bolt):
"""A bolt implementation that automatically acknowledges all tuples after
:func:`process` completes."""

def emit(self, tup, stream=None, anchors=[], direct_task=None):
"""Override to anchor to the current tuple if no anchors are specified"""
anchors = anchors of self.__current_tup
super(BasicBolt, self).emit(
tup, stream=stream, anchors=anchors, direct_task=direct_task
)

def run(self):
global _ANCHOR_TUPLE
storm_conf, context = read_handshake()
tup = None
self.__current_tup = None # used for auto-anchoring
try:
self.initialize(storm_conf, context)
while True:
tup = read_tuple()
_ANCHOR_TUPLE = tup
self.process(tup)
self.ack(tup)
self.__current_tup = read_tuple()
self.process(self.__current_tup)
self.ack(self.__current_tup)
except Exception as e:
self.raise_exception(e, tup)
self.raise_exception(e, self.__current_tup)


class BatchingBolt(Bolt):
Expand Down

0 comments on commit 2927a14

Please sign in to comment.