Permalink
Browse files

allow parameterised config

  • Loading branch information...
1 parent 11641c8 commit 6ea10790963b090def6d6dc52c779fcf561e3e3f @scarytom scarytom committed Jun 21, 2012
Showing with 28 additions and 18 deletions.
  1. +5 −0 config.clj
  2. +23 −18 src/clj_statsd_svr.clj
View
@@ -0,0 +1,5 @@
+{:port 8125
+ :mgmt-port 8126
+ :flush-interval 10000
+ :backends '[backends.console
+ backends.graphite]}
@@ -5,19 +5,12 @@
(:import [java.net DatagramPacket DatagramSocket])
(:import [java.util.concurrent Executors LinkedBlockingQueue TimeUnit]))
-;configuration
-(def config {:port 8125
- :mgmt-port 8126
- :flush-interval 10000
- :backends '[backends.console backends.graphite]})
-
;initialisation
(def statistics (agent { :counters {} :timers {} :gauges {} }))
;backends
-(doseq [backend (config :backends)] (require backend))
-(defn backend-send [function & args]
- (doall (for [backend (config :backends)] (future (apply (ns-resolve backend function) args)))))
+(defn backend-send [backends f & args]
+ (doall (for [backend backends] (future (apply (ns-resolve backend f) args)))))
;statistics
(defn update-stat [stats stat bucket f]
@@ -73,25 +66,25 @@
(assoc @snapshot :timestamp (System/currentTimeMillis))))
(defn distribute [report config]
- (backend-send 'publish report config))
+ (backend-send (config :backends) 'publish report config))
;manangement
(defn seconds-since [time-millis]
(int (/ (- (System/currentTimeMillis) (or time-millis 0)) 1000)))
-(defn vitals [startup-time-millis]
+(defn vitals [backends startup-time-millis]
(str "uptime: " (seconds-since startup-time-millis) "\r\n"
"messages.bad_lines_seen: " (or (:bad_lines_seen (:counters @statistics)) 0) "\r\n"
"messages.last_msg_seen: " (seconds-since (:last_msg_seen (:gauges @statistics))) "\r\n"
- (reduce str (map #(str @% "\r\n") (backend-send 'status)))))
+ (reduce str (map #(str @% "\r\n") (backend-send backends 'status)))))
-(defn manage-via [socket startup-time-millis]
+(defn manage-via [socket backends startup-time-millis]
(let [in (.useDelimiter (java.util.Scanner. (.getInputStream socket)) #"[^\w\.\t]")
out (java.io.PrintWriter. (.getOutputStream socket) true)
done (atom false)]
(def commands {"quit" #(do (swap! done (fn [x] (not x))) "bye")
"help" #(str "Commands: " (reduce (fn [x y] (str x ", " y)) (keys commands)))
- "stats" #(vitals startup-time-millis)
+ "stats" #(vitals backends startup-time-millis)
"counters" #(@statistics :counters)
"timers" #(@statistics :timers)
"gauges" #(@statistics :gauges)})
@@ -100,17 +93,29 @@
(.println out (str (response) "\n"))))
(.close socket)))
-(defn start-manager [port-no startup-time-millis]
+(defn start-manager [port-no backends startup-time-millis]
(let [server (java.net.ServerSocket. port-no)]
- (.start (Thread. #(while true (let [socket (.accept server)] (future (manage-via socket startup-time-millis))))))))
+ (.start (Thread. #(while true (let [socket (.accept server)] (future (manage-via socket backends startup-time-millis))))))))
;lifecycle
-(defn -main []
+(defn start [config]
(let [worker-count 2
work-queue (LinkedBlockingQueue.)
work-executor (Executors/newFixedThreadPool worker-count)
report-executor (Executors/newSingleThreadScheduledExecutor)]
(start-receiver (config :port) work-queue)
(dotimes [_ worker-count] (.submit work-executor (new-worker work-queue)))
- (start-manager (config :mgmt-port) (System/currentTimeMillis))
+ (start-manager (config :mgmt-port) (config :backends) (System/currentTimeMillis))
(.scheduleAtFixedRate report-executor #(distribute (make-report) config) (config :flush-interval) (config :flush-interval) TimeUnit/MILLISECONDS)))
+
+;configuration
+(def default-config {:port 8125
+ :mgmt-port 8126
+ :flush-interval 10000
+ :backends '[backends.console]})
+
+;main
+(defn -main [& [config-file]]
+ (let [config (if config-file (load-file config-file) default-config)]
+ (doseq [backend (config :backends)] (require backend))
+ (start config)))

0 comments on commit 6ea1079

Please sign in to comment.