-
Notifications
You must be signed in to change notification settings - Fork 78
/
utils.clj
73 lines (66 loc) · 3.04 KB
/
utils.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
(ns clj-kafka.test.utils
(:import
[kafka.admin AdminUtils]
[kafka.server KafkaConfig KafkaServer]
[java.net InetSocketAddress]
[org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory]
[org.apache.commons.io FileUtils]
[org.I0Itec.zkclient ZkClient]
[org.I0Itec.zkclient.serialize ZkSerializer]
[kafka.utils Time]
[java.util Properties])
(:require [clj-kafka.admin :as admin])
(:use [clojure.java.io :only (file)]
[clj-kafka.core :only (as-properties)]))
(defn tmp-dir
[& parts]
(.getPath (apply file (System/getProperty "java.io.tmpdir") "clj-kafka" parts)))
(def system-time (proxy [Time] []
(milliseconds [] (System/currentTimeMillis))
(nanoseconds [] (System/nanoTime))
(sleep [ms] (Thread/sleep ms))))
;; enable.zookeeper doesn't seem to be used- check it actually has an effect
(defn create-broker
[{:keys [kafka-port zookeeper-port]}]
(let [base-config {"broker.id" "0"
"port" "9999"
"host.name" "localhost"
"zookeeper.connect" (str "127.0.0.1:" zookeeper-port)
"enable.zookeeper" "true"
"log.flush.interval.messages" "1"
"auto.create.topics.enable" "true"
"log.dir" (.getAbsolutePath (file (tmp-dir "kafka-log")))}]
(KafkaServer. (KafkaConfig. (as-properties (assoc base-config "port" (str kafka-port))))
system-time)))
(defn create-zookeeper
[{:keys [zookeeper-port]}]
(let [tick-time 500
zk (ZooKeeperServer. (file (tmp-dir "zookeeper-snapshot")) (file (tmp-dir "zookeeper-log")) tick-time)]
(doto (NIOServerCnxnFactory.)
(.configure (InetSocketAddress. "127.0.0.1" zookeeper-port) 10)
(.startup zk))))
(defn wait-until-initialised
[^KafkaServer kafka-server topic]
(let [cache (.. kafka-server apis metadataCache)
topics (scala.collection.JavaConversions/asScalaSet #{topic})]
(while (< (.. cache (getTopicMetadata topics) size) 1)
(Thread/sleep 500))))
(defmacro with-test-broker
"Creates an in-process broker that can be used to test against"
[config & body]
`(do (FileUtils/deleteDirectory (file (tmp-dir)))
(let [zk# (create-zookeeper ~config)
kafka# (create-broker ~config)
topic# (:topic ~config)]
(try
(.startup kafka#)
(let [zk-client# (admin/zk-client (str "127.0.0.1:" (:zookeeper-port ~config))
{:session-timeout-ms 500
:connection-timeout-ms 500})]
(admin/create-topic zk-client# topic#)
(wait-until-initialised kafka# topic#)
~@body)
(finally (do (.shutdown kafka#)
(.awaitShutdown kafka#)
(.shutdown zk#)
(FileUtils/deleteDirectory (file (tmp-dir)))))))))