-
Notifications
You must be signed in to change notification settings - Fork 1
/
test_helpers.clj
184 lines (157 loc) · 6.84 KB
/
test_helpers.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
(ns common-clj.test-helpers
(:require [cheshire.core :refer [generate-string]]
[com.stuartsierra.component :as component]
[common-clj.coercion :refer [coerce]]
[common-clj.components.consumer.protocol :as consumer.protocol]
[common-clj.components.logger.protocol :as logger.protocol]
[common-clj.components.producer.protocol :as producer.protocol]
[common-clj.json :refer [json->string string->json]]
[common-clj.lib.kafka :refer [kafka-topic->topic]]
[common-clj.time :as time]
[io.pedestal.http :as http]
[io.pedestal.http.route :as http.routes]
[io.pedestal.test :as test]
[matcher-combinators.midje :refer [match]]
[midje.sweet :refer [throws]])
(:import (clojure.lang ExceptionInfo)
(org.apache.kafka.clients.consumer ConsumerRecord KafkaConsumer
MockConsumer OffsetResetStrategy)
(org.apache.kafka.clients.producer MockProducer)
(org.apache.kafka.common TopicPartition)))
(defn init!
"setup components and store them in the world"
[system-map world]
(let [system (component/start system-map)]
(assoc world :system system)))
(defn message-arrived!
[topic message world]
(let [consumer (-> world :system :consumer)]
(consumer.protocol/consume! consumer topic message)
world))
(defn try-consume!
"Try to consume message and, if any error is thrown while consuming,
add the error to world in path [:consumption-errors <topic>]"
[topic message world]
(try
(message-arrived! topic message world)
(catch Exception e
(update-in world [:consumption-errors topic] (partial cons e)))))
(defn schema-error? [exception-info]
(-> exception-info
ex-data
:type
(= :schema.core/error)))
(defn coercion-error? [exception-info]
(-> exception-info
ex-data
:type
(= :schema-tools.coerce/error)))
(defn kafka-message-arrived!
[topic message world]
(let [kafka-client (-> world :system :consumer :kafka-client)]
(.rebalance kafka-client [(TopicPartition. topic 0)])
(.updateBeginningOffsets kafka-client {(TopicPartition. topic 0) 0})
(.updateEndOffsets kafka-client {(TopicPartition. topic 0) 1})
(.addRecord kafka-client (ConsumerRecord. topic 0 0 "key" (generate-string message))))
world)
(defn kafka-try-consume!
[topic message world]
(let [logger (-> world :system :logger)
error-handler (reify Thread$UncaughtExceptionHandler
(uncaughtException [_ _ e]
(logger.protocol/log! logger topic e)))
consumer-thread (-> world :system :consumer :consumer-thread)]
(.setUncaughtExceptionHandler consumer-thread error-handler)
(kafka-message-arrived! topic message world)))
(defn mock-kafka-client [& args] (MockConsumer. OffsetResetStrategy/EARLIEST))
(defn produce! [topic message world]
(let [producer (-> world :system :producer)]
(producer.protocol/produce! producer topic message)
world))
(defn try-produce!
[topic message world]
(try
(produce! topic message world)
(catch Exception e
(update-in world [:producer-error topic] #(conj % e)))))
(defn produced-messages
[topic world]
(or (-> world :system :producer :messages deref topic)
[]))
(defn clear-produced-messages! [world]
(-> world :system :producer :messages (reset! {}))
world)
(defn produced-errors
[topic world]
(or (-> world :producer-error topic)
[]))
(def exception? (partial instance? Exception))
(defn kafka-produce!
[topic message world]
(let [producer (-> world :system :producer)]
(producer.protocol/produce! producer topic message)
world))
(defn kafka-try-produce!
[topic message world]
(try
(produce! topic message world)
(catch Exception e
(update-in world [:producer-error topic] #(conj % e)))))
(defn check-kafka-produced-messages [topic world]
(let [kafka-client (-> world :system :producer :kafka-client)]
(->> kafka-client
.history
vec
(map (fn [record] {:kafka-topic (.topic record)
:value (.value record)}))
(filter (fn [{:keys [kafka-topic]}] (= topic kafka-topic)))
(map :value))))
(defn check-kafka-produced-errors [topic world]
(or (-> world :producer-error topic)
[]))
(defn mock-kafka-producer [& args] (MockProducer.))
(defn request-arrived!
([route world]
(request-arrived! route {} world))
([route {:keys [path-params body supress-errors]} world]
(let [service (-> world :system :http-server :service :io.pedestal.http/service-fn)
routes (-> world :system :http-server :routes)
pedestal-routes (-> world :system :http-server :pedestal-routes)
url-for (http.routes/url-for-routes
(http.routes/expand-routes pedestal-routes))
{:keys [method path]} (route routes)
{:keys [status body] :as response} (test/response-for
service method
(url-for route :path-params path-params)
:headers {"Content-Type" "application/json"}
:body (json->string body))]
(when (and (or (= 500 status) (= 400 status))
(not supress-errors))
(throw (Exception. body)))
(if (and (not= 500 status) (not= 400 status))
(let [{:keys [response-schema]} (route routes)
coerced-body (coerce response-schema (string->json body))]
(update-in world [:http-responses route] (comp vec conj) (assoc response
:body
coerced-body)))
(update-in world [:http-responses route] (comp vec conj) response)))))
(defn throws-ex
[m]
(throws ExceptionInfo (fn [ex]
(let [matcher (match m)]
(matcher (ex-data ex))))))
(defn random-uuid [] (java.util.UUID/randomUUID))
(defmacro as-of [value stuff-to-do]
`(with-redefs [time/now (if (= java.time.LocalDateTime (class ~value))
(constantly ~value)
time/now)
time/today (if (= java.time.LocalDate (class ~value))
(constantly ~value)
time/today)]
~stuff-to-do))
(defn http-response [route world]
"Returns last http response found for :route"
(-> world :http-responses route last))
(defn http-responses [route world]
"Returns all http responses found for :route"
(-> world :http-responses route))