Skip to content

Loading…

Fix string encoding in ShellBolt protocol (send UTF-8). #123

Closed
wants to merge 2 commits into from

4 participants

@nicoo

This patch should fix issue #119

@nathanmarz
Owner

Thanks. Can you fill in a contributor agreement so that I can merge this in? Details here:

https://github.com/nathanmarz/storm/wiki/Contributing-to-Storm

@nathanmarz
Owner

I played around with this; it's not this simple. In the tests, the Python bolt chokes on reading this format.

@nicoo

I did not test this patch with the Python bolt. My Perl bolt is working fine with this patch. I'll investigate this as soon as possible, maybe the Python json parser does not handle UTF-8.

@hellp

To make the Python bolts work with the patch submitted here, you have to simplify the encoding related code in storm.py:

     import json
-    json_encode = lambda x: json.dumps(x, ensure_ascii=False)
-    json_decode = lambda x: json.loads(unicode(x))
+    json_encode = lambda x: json.dumps(x)
+    json_decode = lambda x: json.loads(x)

As x is now a properly encoded UTF-8 string, json.dumps and .loads will swallow it without hickups. Haven't checkout out how cjson behaves, yet.

(I'd be very happy if that would be fixed in 0.7.0 :o)

@hellp

I just tried cjson: unfortunately it behaves a little different. It doesn't crash but it's writing some other encoding to my output file. I didn't investigate further. Rather, I'd like to question the usage of cjson in storm.py looking at how old (and out-dated?) it actually is.

I'd vote for the idiom mentioned in this Stack Overflow answer http://stackoverflow.com/a/712799/110987:

try:
    import simplejson as json
except ImportError:
    import json

json_encode = lambda x: json.dumps(x)
json_decode = lambda x: json.loads(x)

As mentioned in the SO thread, simplejson with its C boosts is almost as fast as cjson. And with the fallback to Python stdlib json (which is an older incarnation of simplejson) most people should be covered.

@nicoo

Thanks Fabian, I didn't have time to investigate this last week.

@hellp

How's the status on this? @nicoo, can you include my findings into this pull request?

@nathanmarz
Owner

Still not working in my tests. Check out this branch: https://github.com/nathanmarz/storm/tree/fix-shell-encoding

In the integration test, the shell spout emits a unicode character which gets passed back into a shell bolt. From the logging, you can see that the string changed when it should have stayed the same.

@hellp

Can you paste the part of the test log with the error. I can't find anything when just skimming through the logs.

@nicoo

I ran test-multilang-py from emacs clojure repl and all UTF-8 messages seems correctly encoded.

@tomjack

fix-shell-encoding seems to work for me. Some difference in our Python environments, maybe?

I see stuff like:

Shell msg: SSS: bertels人 186
Emitting: 1 default ["bertels人"]
Shell msg: BBB: bertels人 20154
Emitting: 2 default ["bertels人lalala"]
@nathanmarz
Owner

Right, so you can see the problem in the log messages you printed out. The "SSS" and "BBB" lines print out the ordinal value of the last character in the word string (which should be identical in both the spout and bolt). For the spout, it's 186, but for the bolt, it's 20154. Which means the strings are somehow not the same...

(I threw in this check because running the tests at console prints ? for unicode characters)

@nicoo

Your test prints different values because of python unicode handling:

In the first case, the string is considered as an ascii string, ord returns the value of the last byte
In the second case, the string is considered as an unicode string, ord returns the value of the last char.

You can try to run the following snippet:

word = "bertles人"
print str(ord(word[-1])) # 186

word = u"bertles人"
print str(ord(word[-1])) # 20154
@nathanmarz
Owner

This is now merged in. Thanks!

@nathanmarz nathanmarz closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 1, 2012
  1. @nicoo
Commits on Mar 13, 2012
  1. @nicoo
Showing with 18 additions and 17 deletions.
  1. +4 −2 src/jvm/backtype/storm/task/ShellBolt.java
  2. +14 −15 src/multilang/py/storm.py
View
6 src/jvm/backtype/storm/task/ShellBolt.java
@@ -54,7 +54,7 @@
OutputCollector _collector;
Map<Long, Tuple> _inputs = new HashMap<Long, Tuple>();
String[] command;
-
+
public ShellBolt(ShellComponent component) {
this(component.get_execution_command(), component.get_script());
}
@@ -183,7 +183,9 @@ public void cleanup() {
}
private void sendToSubprocess(String str) throws IOException {
- _processin.writeBytes(str + "\n");
+ byte[] strBytes = str.getBytes("UTF-8");
+ _processin.write(strBytes, 0, strBytes.length);
+ _processin.writeBytes("\n");
_processin.writeBytes("end\n");
_processin.flush();
}
View
29 src/multilang/py/storm.py
@@ -3,13 +3,12 @@
import traceback
try:
- import cjson
- json_encode = cjson.encode
- json_decode = lambda x: cjson.decode(x, all_unicode=True)
+ import simplejson as json
except ImportError:
import json
- json_encode = lambda x: json.dumps(x, ensure_ascii=False)
- json_decode = lambda x: json.loads(unicode(x))
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
def readStringMsg():
msg = ""
@@ -30,7 +29,7 @@ def sendToParent(s):
print s
print "end"
sys.stdout.flush()
-
+
def sync():
print "sync"
sys.stdout.flush()
@@ -39,7 +38,7 @@ def sendpid(heartbeatdir):
pid = os.getpid()
print pid
sys.stdout.flush()
- open(heartbeatdir + "/" + str(pid), "w").close()
+ open(heartbeatdir + "/" + str(pid), "w").close()
def sendMsgToParent(amap):
sendToParent(json_encode(amap))
@@ -56,12 +55,12 @@ def emittuple(tup, stream=None, anchors = [], directTask=None):
m["task"] = directTask
m["tuple"] = tup
sendMsgToParent(m)
-
+
def emit(tup, stream=None, anchors = []):
emittuple(tup, stream=stream, anchors=anchors)
#read back task ids
return readMsg()
-
+
def emitDirect(task, tup, stream=None, anchors = []):
emittuple(tup, stream=stream, anchors=anchors, directTask=task)
@@ -89,7 +88,7 @@ def initbolt():
sendpid(heartbeatdir)
return readenv()
-class Tuple:
+class Tuple:
def __init__(self, id, component, stream, task, values):
self.id = id
self.component = component
@@ -105,10 +104,10 @@ def __repr__(self):
class Bolt:
def initialize(self, stormconf, context):
pass
-
+
def process(self, tuple):
pass
-
+
def run(self):
conf, context = initbolt()
self.initialize(conf, context)
@@ -118,15 +117,15 @@ def run(self):
self.process(tup)
sync()
except Exception, e:
- log(traceback.format_exc(e))
+ log(traceback.format_exc(e))
class BasicBolt:
def initialize(self, stormconf, context):
pass
-
+
def process(self, tuple):
pass
-
+
def run(self):
global ANCHOR_TUPLE
conf, context = initbolt()
Something went wrong with that request. Please try again.