Converted the DRPC mechanism to use binary (ByteBuffer) results sets but preserve String output for existing users. #211

Closed
wants to merge 1 commit into
from

Projects

None yet

3 participants

@jasonkolb

executeBinary returns java.nio.ByteBuffer, legacy "execute" method now
converts from ByteBuffer to String using default encoding

@jasonkolb jasonkolb Added executeBinary method to RPC
executeBinary returns java.nio.ByteBuffer, legacy "execute" method now
converts from ByteBuffer to String using default encoding
370478c
@tomjack
Contributor
tomjack commented May 8, 2012

sweet!

@nathanmarz nathanmarz commented on the diff May 9, 2012
src/clj/backtype/storm/daemon/drpc.clj
@@ -70,8 +71,36 @@
(throw result)
result
))))
+ (^String execute [this ^String function ^String args]
@nathanmarz
nathanmarz May 9, 2012 Owner

Need to get rid of the code duplication here. Execute should be a simple wrapper around executeBinary

@nathanmarz nathanmarz commented on the diff May 9, 2012
src/jvm/backtype/storm/ILocalDRPC.java
public interface ILocalDRPC extends DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable {
public String getServiceId();
+
@nathanmarz
nathanmarz May 9, 2012 Owner

What is this?

@nathanmarz nathanmarz commented on the diff May 9, 2012
src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -48,6 +52,29 @@ public void result(String id, String result) throws TException {
}
}
+ /**
+ * Overloaded method which accepts a string as a result and converts it to a ByteBuffer in the background
+ **/
+ public void result(String id, String result) throws TException, java.nio.charset.CharacterCodingException {
+ try {
@nathanmarz
nathanmarz May 9, 2012 Owner

Need to get rid of code duplication here. This should be a wrapper around result(String, ByteBuffer)

@nathanmarz nathanmarz commented on the diff May 9, 2012
src/jvm/backtype/storm/drpc/ReturnResults.java
@@ -33,7 +36,28 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
@Override
public void execute(Tuple input) {
- String result = (String) input.getValue(0);
+
+ ByteBuffer result = null;
+
+ if( input.getValue(0) instanceof ByteBuffer )
@nathanmarz
nathanmarz May 9, 2012 Owner

I believe this is dependent on #199 being implemented.

Also, I think you should also handle the case of getting a regular byte array too.

@nathanmarz nathanmarz commented on the diff May 9, 2012
src/clj/backtype/storm/daemon/drpc.clj
+ (swap! id->start assoc id (current-time-secs))
+ (swap! id->sem assoc id sem)
+ (.add queue req)
+ (log-debug "Waiting for DRPC result for " function " " args " at " (System/currentTimeMillis))
+ (.acquire sem)
+ (log-debug "Acquired DRPC result for " function " " args " at " (System/currentTimeMillis))
+ (let [result (@id->result id)]
+ (cleanup id)
+ (log-debug "Returning DRPC result for " function " " args " at " (System/currentTimeMillis))
+ (if (instance? DRPCExecutionException result)
+ (throw result)
+
+ ;(-> (.newDecoder (Charset/defaultCharset)) (.decode result) str)
+ ;str
+
+ (-> (Charset/defaultCharset)
@nathanmarz
nathanmarz May 9, 2012 Owner

The String encoding/decoding stuff should be wrapped in a utility function (probably in Utils.java)

@nathanmarz
Owner

@jasonkolb I left some comments. I also think adding a short test to test the binary code paths in drpc_test.clj would be good.

@nathanmarz
Owner

Closing this since there's been no activity on this pull request and there's still work that needs to be done. I'd be happy to accept another pull request that completes this feature.

@nathanmarz nathanmarz closed this Aug 28, 2012
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment