Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added drpc helpers and version is now 0.2.0

  • Loading branch information...
commit db55be66c059e5310a2a90ae1423f3a4667b238a 1 parent 9207998
@schleyfox authored
View
10 README.md
@@ -52,6 +52,15 @@ Usage is like so:
You can also play with the clusters time and other such nasty pranks inside of
here.
+### storm.test.drpc
+
+Simulated time is awesome. DRPC is awesome. Together though, there is only
+pain and your test will hang forever. The <tt>execute-drpc!</tt> helper
+allows your tests to use drpc with or without simulated time. It may be
+exceptionally useful in a capturing topology. Usage is like so:
+
+ (execute-drpc! cluster drpc-client "exclamation" "hello")
+
### storm.test.visualization
Sometimes it is nice to see what your topology actually looks like. This
@@ -70,7 +79,6 @@ will create and open a png of your topology. The result looks like this:
1. tracked-capturing-topology for use with tracked clusters. I go back and
forth on whether this would help many situations.
-2. DRPC testing helpers.
3. Failure testing: attach a bolt that fails tuples under certain conditions
as well as beef up testing spouts to replay realistically.
4. Test-generative helpers. test-generative seems cool, would be nice to test
View
2  project.clj
@@ -1,4 +1,4 @@
-(defproject storm-test "0.1.0"
+(defproject storm-test "0.2.0"
:description "Testing utilities for storm"
:source-path "src/clj"
:java-source-path "src/jvm"
View
3  src/clj/storm/test/capturing_topology.clj
@@ -28,7 +28,8 @@
(for [[id spec]
(merge (clojurify-structure spouts)
(clojurify-structure bolts))]
- (for [[stream _] (.. spec get_common get_streams)]
+ (for [[stream info] (.. spec get_common get_streams)
+ :when (not (.is_direct info))]
(GlobalStreamId. id stream))))
capturer (persistent-tuple-capture-bolt storm-name) ]
(doseq [[id spout] feeders]
View
19 src/clj/storm/test/drpc.clj
@@ -0,0 +1,19 @@
+(ns storm.test.drpc
+ (:import [backtype.storm LocalDRPC])
+ (:import [backtype.storm.utils Time])
+ (:use [backtype.storm testing util]))
+
+(defn- do-execute
+ [_ client func func-args]
+ (.execute client func func-args))
+
+(defn execute-drpc!
+ "Executes a DRPC request in a possibly simulated time environment"
+ [cluster-map client func func-args]
+ (if (Time/isSimulating)
+ (let [ req (agent nil) ]
+ (send-off req do-execute client func func-args)
+ (while (not (await-for 100 req))
+ (advance-cluster-time cluster-map 10))
+ @req)
+ (do-execute client func func-args)))
View
46 src/jvm/storm/test/DRPCExclamationTopology.java
@@ -0,0 +1,46 @@
+package storm.test;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.LocalDRPC;
+import backtype.storm.StormSubmitter;
+import backtype.storm.drpc.LinearDRPCTopologyBuilder;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import java.util.Map;
+
+public class DRPCExclamationTopology {
+ public static class ExclaimBolt implements IBasicBolt {
+ public void prepare(Map conf, TopologyContext context) {
+ }
+
+ @Override
+ public void execute(Tuple tuple, BasicOutputCollector collector) {
+ String input = tuple.getString(1);
+ collector.emit(new Values(tuple.getValue(0), input + "!"));
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("id", "result"));
+ }
+
+ }
+
+ public static StormTopology makeTopology(LocalDRPC drpc) {
+ LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
+ builder.addBolt(new ExclaimBolt(), 3);
+ return builder.createLocalTopology(drpc);
+ }
+}
+
View
24 test/storm/test/test/drpc.clj
@@ -0,0 +1,24 @@
+(ns storm.test.test.drpc
+ (:use [clojure.test])
+ (:use [backtype.storm testing config])
+ (:use [storm.test drpc capturing-topology util])
+ (:import [backtype.storm LocalDRPC])
+ (:import [storm.test DRPCExclamationTopology]))
+
+(defn drpc-exclamation-topology
+ [drpc]
+ (DRPCExclamationTopology/makeTopology drpc))
+
+(deftest test-drpc
+ (with-quiet-logs
+ (with-simulated-time-local-cluster [cluster]
+ (let [drpc (LocalDRPC.)]
+ (with-capturing-topology [capture
+ cluster
+ (drpc-exclamation-topology drpc)
+ :storm-conf {TOPOLOGY-DEBUG true}]
+ (dotimes [i 5]
+ (is (= (str "hello" i "!")
+ (execute-drpc! cluster drpc "exclamation" (str "hello" i))))))))
+ )
+)
Please sign in to comment.
Something went wrong with that request. Please try again.