From 7fa873d10e147e2363c064f0c3ca64893022e51a Mon Sep 17 00:00:00 2001 From: Nathan Marz Date: Wed, 28 Sep 2011 00:35:52 -0700 Subject: [PATCH] added example of clojure DSL --- project.clj | 3 +- src/clj/storm/starter/clj/word_count.clj | 60 ++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) create mode 100644 src/clj/storm/starter/clj/word_count.clj diff --git a/project.clj b/project.clj index bbc13e3..1ba1632 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,5 @@ (defproject storm-starter "0.0.1-SNAPSHOT" + :source-path "src/clj" :java-source-path "src/jvm" :javac-options {:debug "true" :fork "true"} :resources-path "multilang" @@ -15,6 +16,6 @@ [com.googlecode.json-simple/json-simple "1.1"] ] - :dev-dependencies [[storm "0.5.2"] + :dev-dependencies [[storm "0.5.3-SNAPSHOT"] ]) diff --git a/src/clj/storm/starter/clj/word_count.clj b/src/clj/storm/starter/clj/word_count.clj new file mode 100644 index 0000000..41f87e1 --- /dev/null +++ b/src/clj/storm/starter/clj/word_count.clj @@ -0,0 +1,60 @@ +(ns storm.starter.clj.word-count + (:import backtype.storm.LocalCluster) + (:use [backtype.storm clojure config])) + +(defspout sentence-spout ["sentence"] + [conf context collector] + (let [sentences ["a little brown dog" + "the man petted the dog" + "four score and seven years ago" + "an apple a day keeps the doctor away"]] + (spout + (nextTuple [] + (Thread/sleep 100) + (emit-spout! collector [(rand-nth sentences)]) + ) + (ack [id] + ;; You only need to define this method for reliable spouts + ;; (such as one that reads off of a queue like Kestrel) + ;; This is an unreliable spout, so it does nothing here + )))) + +(defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false} + [collector] + (Thread/sleep 500) + (emit-spout! collector [(rand-nth sentences)])) + +(defbolt split-sentence ["word"] [tuple collector] + (let [words (.split (.getString tuple 0) " ")] + (doseq [w words] + (emit-bolt! collector [w] :anchor tuple)) + (ack! collector tuple) + )) + +(defbolt word-count ["word" "count"] {:prepare true} + [conf context collector] + (let [counts (atom {})] + (bolt + (execute [tuple] + (let [word (.getString tuple 0)] + (swap! counts (partial merge-with +) {word 1}) + (emit-bolt! collector [word (@counts word)] :anchor tuple) + (ack! collector tuple) + ))))) + +(defn mk-topology [] + (topology + {1 (spout-spec sentence-spout) + 2 (spout-spec (sentence-spout-parameterized + ["the cat jumped over the door" + "greetings from a faraway land"]) + :p 2)} + {3 (bolt-spec {1 :shuffle 2 :shuffle} split-sentence :p 5) + 4 (bolt-spec {3 ["word"]} word-count :p 6)})) + +(defn run-local! [] + (let [cluster (LocalCluster.)] + (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology)) + (Thread/sleep 10000) + (.shutdown cluster) + ))