Skip to content

Commit

Permalink
Merge branch 'master' of github.com:aphyr/riemann
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle Kingsbury committed Jun 4, 2012
2 parents 25a72b2 + c1229f2 commit 40ac15f
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 3 deletions.
19 changes: 19 additions & 0 deletions src/riemann/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
(:require [riemann.folds :as folds])
(:use clojure.tools.logging)
(:use riemann.client)
(:require [riemann.pubsub :as pubsub])
(:use riemann.streams)
(:use riemann.email)
(:use riemann.graphite)
Expand Down Expand Up @@ -50,6 +51,24 @@
([]
(periodically-expire 10)))

(defn pubsub
"Returns this core's pubsub registry."
[]
(:pubsub core))

(defn publish
"Returns a stream which publishes events to this the given channel. Uses this
core's pubsub registry."
[channel]
(fn [event]
(pubsub/publish (:pubsub core) channel event)))

(defn subscribe
"Subscribes to the given channel with f, which will receive events. Uses this
core's pubsub registry."
[channel f]
(pubsub/subscribe (:pubsub core) channel f))

; Start the core
(defn start []
(riemann.core/start core))
Expand Down
6 changes: 4 additions & 2 deletions src/riemann/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
(:use riemann.common)
(:use clojure.tools.logging)
(:require riemann.streams)
(:require [riemann.index :as index]))
(:require [riemann.index :as index])
(:require [riemann.pubsub :as ps]))

(defn core
"Create a new core."
[]
{:servers (ref [])
:streams (ref [])
:index (ref nil)
:reaper (ref nil)})
:reaper (ref nil)
:pubsub (ps/pubsub-registry)})

(defn periodically-expire
"Every interval (default 10) seconds, expire states from this core's index
Expand Down
55 changes: 55 additions & 0 deletions src/riemann/pubsub.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
(ns riemann.pubsub
"Provides publish-subscribe handling of events. Publishers push events onto a
channel, which has n subscribers. Each subscriber subscribes to a channel
with an optional predicate function. Events which match the predicate are
sent to the subscriber."

(:use riemann.common)
(:use [clojure.core.incubator :only [dissoc-in]]))

; Registry:
; channel1:
; id1: fun1
; id2: fun2
; id3: fun3
; channel2:
; id4: fun1

(defn pubsub-registry
"Returns a new pubsub registry, which tracks which subscribers are
listening to which channels."
[]
{:channels (ref {})
:last-sub-id (ref 0)})

(defn publish
"Publish an event to the given channel in a registry."
[registry channel event]
(let [channels (deref (:channels registry))]
(doseq [[id f] (channels channel)]
(f event))))

(defn subscribe
"Subscribe to the given channel in a registry with f, which is called with
each event that arrives on that channel. Returns an ID for the subscription."
[registry channel f]
(let [channels (:channels registry)]
(dosync
(let [sub-id (alter (:last-sub-id registry) inc)]
(alter channels assoc-in [channel sub-id] f)
sub-id))))

(defn unsubscribe
"Unsubscribe from the given registry by id. If you provide a channel to
unsubscribe from, O(1). If you provide only the id, O(channels)."
([registry channel id]
(let [channels (:channels registry)]
(dosync
(alter channels dissoc-in [channel id]))))

([registry id]
(let [channels (:channels registry)]
(dosync
(ref-set channels
(into {} (for [[channel channel-subs] (deref channels)]
[channel (dissoc channel-subs id)])))))))
1 change: 0 additions & 1 deletion src/riemann/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -808,4 +808,3 @@
[index]
(fn [event]
(index/delete index event)))

47 changes: 47 additions & 0 deletions test/riemann/test/pubsub.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
(ns riemann.test.pubsub
(:use riemann.pubsub)
(:use clojure.test))

(defn pusher [out]
"push events onto x"
(fn [x] (dosync (alter out conj x))))

(deftest one-to-one
(let [r (pubsub-registry)
out (ref [])
id (subscribe r :foo (pusher out))]

(publish r :foo 1)
(publish r :foo 2)
(is (= (deref out) [1 2]))))

(deftest one-to-many
(let [r (pubsub-registry)
out1 (ref [])
out2 (ref [])
id1 (subscribe r :foo (pusher out1))
id2 (subscribe r :foo (pusher out2))]

(publish r :foo 1)
(publish r :foo 2)
(is (= (deref out1) (deref out2) [1 2]))))

(deftest unsub
(let [r (pubsub-registry)
out1 (ref [])
out2 (ref [])
foo1 (subscribe r :foo (pusher out1))
foo2 (subscribe r :foo (pusher out2))]

(publish r :foo 1)

; Unsub with channel
(unsubscribe r :foo foo1)
(publish r :foo 2)

; Unsub without channel
(unsubscribe r foo2)
(publish r :foo 3)

(is (= (deref out1) [1]))
(is (= (deref out2) [1 2]))))

0 comments on commit 40ac15f

Please sign in to comment.