Permalink
Browse files

add some probe-channel hooks, mark beta1

  • Loading branch information...
1 parent 8e55f2d commit 7de95b5607305c5df7d55085006aecda46a80510 @ztellman committed Aug 22, 2012
Showing with 62 additions and 23 deletions.
  1. +3 −1 project.clj
  2. +11 −5 src/lamina/connections.clj
  3. +17 −11 src/lamina/executor.clj
  4. +2 −0 src/lamina/trace.clj
  5. +29 −6 src/lamina/trace/probe.clj
View
@@ -1,4 +1,4 @@
-(defproject lamina "0.5.0-SNAPSHOT"
+(defproject lamina "0.5.0-beta1"
:description "event-driven data structures for clojure"
:dependencies [[org.clojure/clojure "1.4.0"]
[org.clojure/tools.logging "0.2.3"]
@@ -11,6 +11,7 @@
[org.clojure/tools.logging "0.2.3"]
[useful "0.8.2"]
[potemkin "0.1.5"]]
+ "master" [[org.clojure/clojure "1.5.0-master-SNAPSHOT"]]
"1.2" [[org.clojure/clojure "1.2.1"]]
"1.4" [[org.clojure/clojure "1.4.0"]]
}
@@ -19,6 +20,7 @@
:codox {:include [lamina.core
lamina.trace
lamina.viz
+ lamina.walk
lamina.executor
lamina.stats
lamina.api
@@ -60,11 +60,17 @@
;; handle the new connection, and wait for it to close
(fn [conn]
- (when on-connected
- (on-connected conn))
- (success @connection conn)
- (trace {:state :connected})
- (closed-result conn))
+ (if-not (channel? conn)
+ (reset! done? true)
+ (do
+ (when on-connected
+ (try
+ (on-connected conn)
+ (catch Throwable e
+ (log/error e "Error in on-connected callback"))))
+ (success @connection conn)
+ (trace {:state :connected})
+ (closed-result conn))))
;; handle the lost connection, and restart
(fn [_]
@@ -25,22 +25,28 @@
"Executes the body on a separate thread, returning an unrealized result representing the eventual
value or error."
[& body]
- `((trace/instrumented-fn
- {:name "task"
- :executor default-executor}
- []
- ~@body)))
+ (let [explicit-name? (string? (first body))
+ name (if explicit-name? (first body) "task")
+ body (if explicit-name? (rest body) body)]
+ `((trace/instrumented-fn
+ {:name "task"
+ :executor default-executor}
+ []
+ ~@body))))
(defmacro bound-task
"Executes the body on a separate thread, returning an unrealized result representing the eventual
value or error. Unlike 'task', thread-local bindings are preserved when evaluating the body."
[& body]
- `((trace/instrumented-fn
- {:name "bound-task"
- :executor default-executor
- :with-bindings? true}
- []
- ~@body)))
+ (let [explicit-name? (string? (first body))
+ name (if explicit-name? (first body) "bound-task")
+ body (if explicit-name? (rest body) body)]
+ `((trace/instrumented-fn
+ {:name ~name
+ :with-bindings? true
+ :executor default-executor}
+ []
+ ~@body))))
(defn executor-channel
"Creates a channel that ensures all downstream channels will receive messages on the thread-pool
@@ -27,6 +27,8 @@
(import-macro u/time*)
(import-macro u/with-instrumentation)
+(import-fn pr/on-enabled-changed)
+(import-fn pr/on-new-probe)
(import-fn pr/canonical-probe-name)
(import-fn pr/probe-channel)
(import-fn pr/error-probe-channel)
@@ -22,15 +22,18 @@
[java.io
Writer]
[java.util.concurrent
- ConcurrentHashMap]
+ ConcurrentHashMap
+ CopyOnWriteArrayList]
[java.util.concurrent.atomic
AtomicBoolean]))
(defprotocol-once IProbe
- (probe-enabled? [_] "Returns true if the probe has downstream channels."))
+ (probe-enabled? [_] "Returns true if the probe has downstream channels.")
+ (trigger-callbacks [_ enabled?]))
(deftype-once ProbeChannel
[^AtomicBoolean enabled?
+ ^CopyOnWriteArrayList callbacks
channel
log-on-disabled?
description]
@@ -43,25 +46,42 @@
IProbe
(probe-enabled? [_]
(.get enabled?))
+ (trigger-callbacks [_ enabled?]
+ (doseq [c callbacks]
+ (try
+ (c enabled?)
+ (catch Exception e
+ (log/error e "Error in on-enabled-changed callback.")))))
c/IChannel
(receiver-node [_]
(c/receiver-node channel))
- (emitter-node [_]
+ (emitter-node [_]
(c/emitter-node channel)))
(defn probe-channel- [description log-on-disabled?]
(let [flag (AtomicBoolean. false)
ch (c/channel*
:grounded? true
:permanent? true
- :description (str "probe: " (name description)))]
+ :description (str "probe: " (name description)))
+ p (ProbeChannel.
+ flag
+ (CopyOnWriteArrayList.)
+ ch
+ log-on-disabled?
+ description)]
;; set the flag whenever the downstream count changes
(g/on-state-changed (c/emitter-node ch) nil
(fn [_ downstream _]
- (.set flag (pos? downstream))))
+ (let [enabled? (pos? downstream)]
+ (when-not (= enabled? (.getAndSet flag enabled?))
+ (trigger-callbacks p enabled?)))))
+
+ p))
- (ProbeChannel. flag ch log-on-disabled? description)))
+(defn on-enabled-changed [^ProbeChannel p callback]
+ (.add (.callbacks p) callback))
(defn canonical-probe-name
"something goes here"
@@ -75,6 +95,9 @@
(def ^ConcurrentHashMap probes (ConcurrentHashMap.))
(def new-probe-broadcaster (c/channel* :grounded? true, :permanent? true))
+(defn on-new-probe [callback]
+ (c/receive-all new-probe-broadcaster callback))
+
(defn reset-probes
"something goes here"
[]

0 comments on commit 7de95b5

Please sign in to comment.