Skip to content

Commit

Permalink
Add ClickHouse Support (#1009)
Browse files Browse the repository at this point in the history
  • Loading branch information
chhetripradeep committed Feb 27, 2022
1 parent 497704c commit cec0dd1
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 0 deletions.
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
:kafka :kafka
:pushover :pushover
:msteams :msteams
:clickhouse :clickhouse
:all (fn [_] true)}
;; :javac-options ["-target" "1.6" "-source" "1.6"]
:java-source-paths ["src/riemann/"]
Expand Down
88 changes: 88 additions & 0 deletions src/riemann/clickhouse.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
(ns riemann.clickhouse
"Forwards riemann events to ClickHouse."
(:require [clojure.string :as str]
[clj-http.client :as http]))

(defn generate-tags
[tags]
(str/replace tags #"\"" "'"))

(defn generate-datapoint
"Accepts riemann event and converts it into clickhouse datapoint."
[event]
(let [timestamp (long (:time event))
host (:host event)
metric (:metric event)
service (:service event)
tags (generate-tags (:tags event))]
(when (and host metric service)
(str timestamp "," host "," service "," metric "," tags \newline))))

(defn generate-datapoint-batch
"Accepts riemann events and converts it into clickhouse datapoint batch."
[events]
(let [processed-events (map generate-datapoint events)]
(str/join "" processed-events)))

(defn generate-url
"Generates the URL to which datapoint should be posted."
[opts]
(let [scheme (:scheme opts)
host (:host opts)
port (:port opts)
database (:database opts)
table (:table opts)]
(str scheme host ":" port "/?query=INSERT%20INTO%20" database "." table "%20FORMAT%20CSV")))

(defn post-datapoint
"Post the riemann event as clickhouse datapoint."
[url datapoint]
(let [http-options {:body datapoint
:content-type :csv
:conn-timeout 5000
:socket-timeout 5000
:throw-entire-message? true}]
(http/post url http-options)))

(defn clickhouse
"Returns a function which accepts an event and sends it to clickhouse.
Usage:
(batch 10000 5 (clickhouse {:host \"play.clickhouse.com\"}))
Options:
- `:scheme` ClickHouse URL Scheme (default: \"http://\")
- `:host` ClickHouse Server IP (default: \"localhost\")
- `:port` ClickHouse Server Port (default: 8123)
- `:database` ClickHouse Database Name (default: \"default\")
- `:table` ClickHouse Table Name (default: \"riemann\")
You need to first create a clickhouse table:
CREATE TABLE default.riemann
(
`timestamp` DateTime,
`host` String,
`service` String,
`metric` Float32,
`tags` Array(String)
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, host, service)
SETTINGS index_granularity = 8192;
"
[opts]
(let [opts (merge {:scheme "http://"
:host "localhost"
:port 8123
:database "default"
:table "riemann"}
opts)]
(fn [events]
(let [url (generate-url opts)
datapoint (generate-datapoint-batch events)]
(if-not (nil? datapoint)
(post-datapoint url datapoint))))))
1 change: 1 addition & 0 deletions src/riemann/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
(:import (java.io File))
(:require [riemann [boundary :refer [boundary]]
[client :refer [udp-client tcp-client multi-client]]
[clickhouse :refer [clickhouse]]
[cloudwatch :refer [cloudwatch]]
[common :as common :refer [event]]
[core :as core]
Expand Down
30 changes: 30 additions & 0 deletions test/riemann/clickhouse_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
(ns riemann.clickhouse-test
(:use riemann.clickhouse
[riemann.time :only [unix-time]]
clojure.test)
(:require [riemann.logging :as logging]))

(logging/init)

(deftest ^:clickhouse ^:integration clickhouse-test
(let [k (clickhouse {:block-start true})]
(k {:host "riemann.local"
:service "clickhouse test"
:state "ok"
:description "all clear, uh, situation normal"
:metric -2
:time (unix-time)}))

(let [k (clickhouse {:block-start true})]
(k {:service "clickhouse test"
:state "ok"
:description "all clear, uh, situation normal"
:metric 3.14159
:time (unix-time)}))

(let [k (clickhouse {:block-start true})]
(k {:host "no-service.riemann.local"
:state "ok"
:description "missing service, not transmitted"
:metric 4
:time (unix-time)})))

0 comments on commit cec0dd1

Please sign in to comment.