Skip to content

Commit

Permalink
add @hellp changes in storm.py - integration tests are passing.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoo committed Mar 13, 2012
1 parent 9fad0f2 commit 72f9248
Showing 1 changed file with 14 additions and 15 deletions.
29 changes: 14 additions & 15 deletions src/multilang/py/storm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
import traceback

try:
import cjson
json_encode = cjson.encode
json_decode = lambda x: cjson.decode(x, all_unicode=True)
import simplejson as json
except ImportError:
import json
json_encode = lambda x: json.dumps(x, ensure_ascii=False)
json_decode = lambda x: json.loads(unicode(x))

json_encode = lambda x: json.dumps(x)
json_decode = lambda x: json.loads(x)

def readStringMsg():
msg = ""
Expand All @@ -30,7 +29,7 @@ def sendToParent(s):
print s
print "end"
sys.stdout.flush()

def sync():
print "sync"
sys.stdout.flush()
Expand All @@ -39,7 +38,7 @@ def sendpid(heartbeatdir):
pid = os.getpid()
print pid
sys.stdout.flush()
open(heartbeatdir + "/" + str(pid), "w").close()
open(heartbeatdir + "/" + str(pid), "w").close()

def sendMsgToParent(amap):
sendToParent(json_encode(amap))
Expand All @@ -56,12 +55,12 @@ def emittuple(tup, stream=None, anchors = [], directTask=None):
m["task"] = directTask
m["tuple"] = tup
sendMsgToParent(m)

def emit(tup, stream=None, anchors = []):
emittuple(tup, stream=stream, anchors=anchors)
#read back task ids
return readMsg()

def emitDirect(task, tup, stream=None, anchors = []):
emittuple(tup, stream=stream, anchors=anchors, directTask=task)

Expand Down Expand Up @@ -89,7 +88,7 @@ def initbolt():
sendpid(heartbeatdir)
return readenv()

class Tuple:
class Tuple:
def __init__(self, id, component, stream, task, values):
self.id = id
self.component = component
Expand All @@ -105,10 +104,10 @@ def __repr__(self):
class Bolt:
def initialize(self, stormconf, context):
pass

def process(self, tuple):
pass

def run(self):
conf, context = initbolt()
self.initialize(conf, context)
Expand All @@ -118,15 +117,15 @@ def run(self):
self.process(tup)
sync()
except Exception, e:
log(traceback.format_exc(e))
log(traceback.format_exc(e))

class BasicBolt:
def initialize(self, stormconf, context):
pass

def process(self, tuple):
pass

def run(self):
global ANCHOR_TUPLE
conf, context = initbolt()
Expand Down

0 comments on commit 72f9248

Please sign in to comment.