Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: nathanmarz/storm
...
head fork: nicoo/storm
Checking mergeability… Don’t worry, you can still create the pull request.
  • 7 commits
  • 3 files changed
  • 0 commit comments
  • 2 contributors
View
2  project.clj
@@ -1,4 +1,4 @@
-(defproject storm "0.6.2-SNAPSHOT"
+(defproject storm "0.6.2"
:source-path "src/clj"
:test-path "test/clj"
:java-source-path "src/jvm"
View
6 src/jvm/backtype/storm/task/ShellBolt.java
@@ -54,7 +54,7 @@
OutputCollector _collector;
Map<Long, Tuple> _inputs = new HashMap<Long, Tuple>();
String[] command;
-
+
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
@@ -183,7 +183,9 @@ public void cleanup() {
}
private void sendToSubprocess(String str) throws IOException {
- _processin.writeBytes(str + "\n");
+ byte[] strBytes = str.getBytes("UTF-8");
+ _processin.write(strBytes, 0, strBytes.length);
+ _processin.writeBytes("\n");
_processin.writeBytes("end\n");
_processin.flush();
}
View
29 src/multilang/py/storm.py
@@ -3,13 +3,12 @@
import traceback
try:
- import cjson
- json_encode = cjson.encode
- json_decode = lambda x: cjson.decode(x, all_unicode=True)
+ import simplejson as json
except ImportError:
import json
- json_encode = lambda x: json.dumps(x, ensure_ascii=False)
- json_decode = lambda x: json.loads(unicode(x))
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
def readStringMsg():
msg = ""
@@ -30,7 +29,7 @@ def sendToParent(s):
print s
print "end"
sys.stdout.flush()
-
+
def sync():
print "sync"
sys.stdout.flush()
@@ -39,7 +38,7 @@ def sendpid(heartbeatdir):
pid = os.getpid()
print pid
sys.stdout.flush()
- open(heartbeatdir + "/" + str(pid), "w").close()
+ open(heartbeatdir + "/" + str(pid), "w").close()
def sendMsgToParent(amap):
sendToParent(json_encode(amap))
@@ -56,12 +55,12 @@ def emittuple(tup, stream=None, anchors = [], directTask=None):
m["task"] = directTask
m["tuple"] = tup
sendMsgToParent(m)
-
+
def emit(tup, stream=None, anchors = []):
emittuple(tup, stream=stream, anchors=anchors)
#read back task ids
return readMsg()
-
+
def emitDirect(task, tup, stream=None, anchors = []):
emittuple(tup, stream=stream, anchors=anchors, directTask=task)
@@ -89,7 +88,7 @@ def initbolt():
sendpid(heartbeatdir)
return readenv()
-class Tuple:
+class Tuple:
def __init__(self, id, component, stream, task, values):
self.id = id
self.component = component
@@ -105,10 +104,10 @@ def __repr__(self):
class Bolt:
def initialize(self, stormconf, context):
pass
-
+
def process(self, tuple):
pass
-
+
def run(self):
conf, context = initbolt()
self.initialize(conf, context)
@@ -118,15 +117,15 @@ def run(self):
self.process(tup)
sync()
except Exception, e:
- log(traceback.format_exc(e))
+ log(traceback.format_exc(e))
class BasicBolt:
def initialize(self, stormconf, context):
pass
-
+
def process(self, tuple):
pass
-
+
def run(self):
global ANCHOR_TUPLE
conf, context = initbolt()

No commit comments for this range

Something went wrong with that request. Please try again.