/
in_memory_consumer.clj
34 lines (28 loc) · 1.36 KB
/
in_memory_consumer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
(ns common-clj.components.consumer.in-memory-consumer
(:require [clojure.repl :refer [demunge]]
[com.stuartsierra.component :as component]
[common-clj.components.consumer.protocol]
[common-clj.components.consumer.protocol :refer [Consumer]]
[common-clj.schemata.consumer :as schemata.consumer]
[schema.core :as s]))
(defn subscription-key [topic handler]
(str topic "-" (demunge (str handler))))
(defn maybe-call [handler schema handler-topic component]
(fn [_ _ _ {:keys [message topic]}]
(when (= topic handler-topic) (s/validate schema message) (handler message component))))
(s/defrecord InMemoryConsumer [consumer-topics :- schemata.consumer/ConsumerTopics]
component/Lifecycle
(start [component]
(let [channel (atom nil)]
(doseq [[topic {:keys [handler schema]}] consumer-topics]
(add-watch channel
(subscription-key topic handler)
(maybe-call handler schema topic component)))
(assoc component :channel channel)))
(stop [{:keys [consumer] :as component}])
Consumer
(consume! [component topic message]
(let [channel (:channel component)]
(reset! channel {:topic topic :message message}))))
(s/defn new-consumer [consumer-topics :- schemata.consumer/ConsumerTopics]
(map->InMemoryConsumer {:consumer-topics consumer-topics}))