-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils_test.clj
104 lines (76 loc) · 3.06 KB
/
utils_test.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
(ns kafka-clj-test-utils.utils-test
(:require [clojure.test :refer :all]
[kafka-clj-test-utils.core :as co]
[kafka-clj-test-utils.consumer :as ktc]
[kafka-clj-test-utils.producer :as ktp]
[zookareg.core :as zkr]
[integrant.core :as ig]))
;; Taken from https://gitlab.com/vise890/multistub with permission
(defn with-around-fns
"Executes `f` within the context of `around-fns`. `around-fns` is a (sorted!)
sequence of functions that take a no arg function.
E.g.:
(with-around-fns [with-zookeeper-fn
with-kafka-fn]
produce-some-stuff-on-kafka-fn)"
[around-fns f]
(cond
(empty? around-fns) (f)
(= 1 (count around-fns)) ((first around-fns) f)
:else (with-around-fns (butlast around-fns)
(fn [] ((last around-fns) f)))))
(def topic-a "test.topic.a")
(def topic-b "test.topic.b")
(def config {:kafka/config {:bootstrap.servers "127.0.0.1:9092"}
:kafka.serde/config {:schema-registry/base-url "http://localhost:8081"}})
(use-fixtures :each (partial with-around-fns [zkr/with-zookareg-fn
(partial co/with-topic topic-a)
(partial co/with-topic topic-b)]))
(def test-schema
{:namespace "kafkaCljTestUtils"
:type "record"
:name "TestRecord"
:fields [{:name "foo" :type "string"}
{:name "bar" :type "string"}]})
(def event1 {:foo "FOO" :bar "BAR"})
(def event2 {:foo "BAZ" :bar "QUX"})
(deftest produce-consume-test
(ktp/produce config topic-a test-schema event1)
(ktc/with-consumer config
(fn [consumer]
(ktc/assign-partition-0 consumer topic-a)
(ktc/seek-to-end consumer)
(let [msg (ktc/poll* consumer)]
(is (empty? msg)))
(ktp/produce config topic-a test-schema event2)
(let [[msg :as msgs] (ktc/poll* consumer)]
(is (= 1 (count msgs)))
(is (= event2 msg)))
(ktc/seek-to-beginning consumer)
(let [[msg1 msg2 :as msgs] (ktc/poll* consumer)]
(is (= 2 (count msgs)))
(is (= event1 msg1))
(is (= event2 msg2)))))
(let [[msg1 msg2 :as msgs] (ktc/consume config topic-a)]
(is (= 2 (count msgs)))
(is (= event1 msg1))
(is (= event2 msg2))))
(deftest subscribe-unsubscribe-test
(ktc/with-consumer config
(fn [consumer]
(ktc/subscribe consumer topic-a topic-b)
(ktc/seek-to-end consumer)
(ktp/produce config topic-a test-schema event1)
(ktp/produce config topic-b test-schema event1)
(let [[msg1 msg2 :as msgs] (ktc/poll* consumer :expected-msgs 2)]
(is (= 2 (count msgs)))
(is (= event1 msg1))
(is (= event1 msg2)))
(ktc/unsubscribe consumer)
(ktp/produce config topic-a test-schema event2)
(ktc/subscribe consumer topic-a)
(ktc/seek-to-beginning consumer)
(let [[msg1 msg2 :as msgs] (ktc/poll* consumer :expected-msgs 2)]
(is (= 2 (count msgs)))
(is (= event1 msg1))
(is (= event2 msg2))))))