Skip to content

Commit

Permalink
Added Druid plugin (http://druid.io)
Browse files Browse the repository at this point in the history
  • Loading branch information
pradeepchhetri committed May 21, 2016
1 parent 27608c0 commit 7bb6ec4
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 0 deletions.
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
:bench :bench
:focus :focus
:slack :slack
:druid :druid
:cloudwatch :cloudwatch
:datadog :datadog
:stackdriver :stackdriver
Expand Down
1 change: 1 addition & 0 deletions src/riemann/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
[common :as common :refer [event]]
[core :as core]
[datadog :refer [datadog]]
[druid :refer [druid]]
[email :refer [mailer]]
[folds :as folds]
[graphite :as graphite-client :refer [graphite]]
Expand Down
64 changes: 64 additions & 0 deletions src/riemann/druid.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
(ns riemann.druid
"Forwards events to Druid"
(:require
[clj-http.client :as http]
[cheshire.core :refer [generate-string]]
[riemann.common :refer [unix-to-iso8601]]))

(defn post-datapoint
"Post the riemann metrics as datapoints."
[host port dataset json-data]
(let [p (str port)
scheme "http://"
endpoint "/v1/post/"
url (str scheme host ":" p endpoint dataset)
http-options {:body json-data
:content-type :json
:conn-timeout 5000
:socket-timeout 5000
:throw-entire-message? true}]
(http/post url http-options)))

(defn generate-event [event]
{:host (:host event)
:service (:service event)
:state (:state event)
:timestamp (unix-to-iso8601 (:time event))
:tags (:tags event)
:description (:description event)
:value (:metric event)})

(defn druid
"Returns a function which accepts single events or batches of
events in a vectorand send them to Druid Tranquility Server.
Usage:
(druid {:host \"druid.example.com\"})
Options:
`:host` Hostname of Druid Tranquility server. (default: `\"localhost\"`)
`:port` Port at which Druid Tranquility is listening (default: `8200`)
`:dataset` Dataset name to be given (default: `\"riemann\"`)
Example:
(def druid-async
(batch 100 1/10
(async-queue!
:druid-async ; A name for the forwarder
{:queue-size 1e4 ; 10,000 events max
:core-pool-size 5 ; Minimum 5 threads
:max-pools-size 100} ; Maxium 100 threads
(druid {:host \"localhost\"}))))
"
[opts]
(let [opts (merge {:host "localhost"
:port 8200
:dataset "riemann"}
opts)]
(fn [event]
(let [events (if (sequential? event)
event
[event])
post-data (mapv generate-event events)
json-data (generate-string post-data)]
(post-datapoint (:host opts) (:port opts) (:dataset opts) json-data)))))
27 changes: 27 additions & 0 deletions test/riemann/druid_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
(ns riemann.druid-test
(:require [clj-http.client :as client]
[clojure.test :refer :all]
[riemann.druid :as druid]
[riemann.logging :as logging]
[riemann.test-utils :refer [with-mock]]))

(logging/init)

(deftest ^:druid druid-test
(with-mock [calls client/post]
(let [d (druid/druid {:host "localhost"})]

(testing "an event with metric")
(d {:host "testhost"
:service "testservice"
:metric 42
:time 123456789
:state "ok"})
(is (= 1 (count @calls)))
(is (= (vec (last @calls))
["http://localhost:8200/v1/post/riemann"
{:body "[{\"host\":\"testhost\",\"service\":\"testservice\",\"state\":\"ok\",\"timestamp\":\"1973-11-29T21:33:09.000Z\",\"tags\":null,\"description\":null,\"value\":42}]"
:socket-timeout 5000
:conn-timeout 5000
:content-type :json
:throw-entire-message? true}])))))

0 comments on commit 7bb6ec4

Please sign in to comment.