Skip to content

Commit

Permalink
added example of clojure DSL
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Sep 28, 2011
1 parent 7027ea9 commit 7fa873d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
3 changes: 2 additions & 1 deletion project.clj
@@ -1,4 +1,5 @@
(defproject storm-starter "0.0.1-SNAPSHOT"
:source-path "src/clj"
:java-source-path "src/jvm"
:javac-options {:debug "true" :fork "true"}
:resources-path "multilang"
Expand All @@ -15,6 +16,6 @@
[com.googlecode.json-simple/json-simple "1.1"]
]

:dev-dependencies [[storm "0.5.2"]
:dev-dependencies [[storm "0.5.3-SNAPSHOT"]
])

60 changes: 60 additions & 0 deletions src/clj/storm/starter/clj/word_count.clj
@@ -0,0 +1,60 @@
(ns storm.starter.clj.word-count
(:import backtype.storm.LocalCluster)
(:use [backtype.storm clojure config]))

(defspout sentence-spout ["sentence"]
[conf context collector]
(let [sentences ["a little brown dog"
"the man petted the dog"
"four score and seven years ago"
"an apple a day keeps the doctor away"]]
(spout
(nextTuple []
(Thread/sleep 100)
(emit-spout! collector [(rand-nth sentences)])
)
(ack [id]
;; You only need to define this method for reliable spouts
;; (such as one that reads off of a queue like Kestrel)
;; This is an unreliable spout, so it does nothing here
))))

(defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false}
[collector]
(Thread/sleep 500)
(emit-spout! collector [(rand-nth sentences)]))

(defbolt split-sentence ["word"] [tuple collector]
(let [words (.split (.getString tuple 0) " ")]
(doseq [w words]
(emit-bolt! collector [w] :anchor tuple))
(ack! collector tuple)
))

(defbolt word-count ["word" "count"] {:prepare true}
[conf context collector]
(let [counts (atom {})]
(bolt
(execute [tuple]
(let [word (.getString tuple 0)]
(swap! counts (partial merge-with +) {word 1})
(emit-bolt! collector [word (@counts word)] :anchor tuple)
(ack! collector tuple)
)))))

(defn mk-topology []
(topology
{1 (spout-spec sentence-spout)
2 (spout-spec (sentence-spout-parameterized
["the cat jumped over the door"
"greetings from a faraway land"])
:p 2)}
{3 (bolt-spec {1 :shuffle 2 :shuffle} split-sentence :p 5)
4 (bolt-spec {3 ["word"]} word-count :p 6)}))

(defn run-local! []
(let [cluster (LocalCluster.)]
(.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
(Thread/sleep 10000)
(.shutdown cluster)
))

0 comments on commit 7fa873d

Please sign in to comment.