forked from FundingCircle/jackdaw
-
Notifications
You must be signed in to change notification settings - Fork 0
/
journal.clj
155 lines (132 loc) · 5.46 KB
/
journal.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
(ns jackdaw.test.journal
""
(:require
[clojure.set :refer [subset?]]
[clojure.tools.logging :as log]
[manifold.stream :as s]
[manifold.deferred :as d]))
(set! *warn-on-reflection* true)
;; Journal
;;
;; The journal represents the test output. It captures the output of all
;; messages arriving on the test-machine' `:consumer` channel.
(defn watch-for
"Returns the first true application of the journal to the specified condition `condition?`"
[machine condition? timeout info]
(let [journal (:journal machine)
p (promise)
id (java.util.UUID/randomUUID)
check-condition (fn [new]
(when-let [result (condition? new)]
(remove-watch journal id)
(deliver p {:result :found
:info result})))]
(add-watch journal id (fn [k r old new]
(check-condition new)))
;; don't rely on watcher to 'check-condition'
;; in case journal is already in a final, good state
(check-condition @journal)
(deref p timeout {:error :timeout :info info})))
(defn journal-read
"Append `record` into the `journal` under `journal-key`"
[journal journal-key v]
(let [result (update-in journal journal-key concat v)]
result))
(defn reverse-lookup
"Given a map of `topic-metadata`, find the name of the entry whose
`:topic-name` matches the supplied topic.
The name supplied in the key positions in the supplied map do not necessarily
match the topic name. This allows users to provide a 'logical' name that may stay
constant if the user wishes over version changes.
This function is used when populating the journal to ensure that the topic
identifiers given by the user are used when populating the journal (instead of
the real topic names)."
[topic-metadata topic]
(let [m (reduce-kv (fn [m k v]
(assoc m (:topic-name v) k))
{}
topic-metadata)]
(get m topic)))
(defn journal-result
[machine record]
"Journals the `record` in the appropriate place in the supplied test
machine's `:journal`"
(let [journal (:journal machine)]
(if-let [err (agent-error journal)]
(throw err)
(send journal journal-read [:topics (:topic record)] [record]))))
(defn journaller
"Returns an asynchronous process that reads all messages produced by
the supplied `machine`'s `:consumer` and records them in the journal"
[machine stop?]
(when-not (:journal machine)
(log/error machine "no journal available")
(throw (ex-info "no journal available: " {})))
(when-not (get-in machine [:consumer :messages])
(log/error machine "no message stream to journal")
(throw (ex-info "no message stream to journal:" machine)))
(let [{:keys [messages]} (:consumer machine)]
(d/loop [record (s/take! messages)]
(d/chain record
(fn [record]
(when-not @stop?
(when record
(journal-result machine record)
(d/recur (s/take! messages)))))))))
(defn with-journal
"Enriches the supplied `machine` with a journaller that will write to
the supplied `journal`."
[machine journal]
(let [machine' (assoc machine :journal journal)
stop? (atom false)
jloop (journaller machine' stop?)]
(assoc machine'
:journal journal
:jloop jloop
:exit-hooks (concat
(:exit-hooks machine)
[#(do
(log/debug "stop accepting journal messages")
(reset! stop? true)
(log/debug "wait for journaller to finish")
@jloop)]))))
(defn raw-messages
[journal topic-name]
(sort-by :offset (get-in journal [:topics topic-name])))
(defn messages
[journal topic-name]
(->> (raw-messages journal topic-name)
(map :value)))
(defn messages-by-kv-fn
[journal topic-name ks pred]
(->> (messages journal topic-name)
(filter (fn [m]
(pred (get-in m ks))))))
(defn messages-by-kv
[journal topic-name ks value]
(messages-by-kv-fn journal topic-name ks #(= value %)))
(defn by-key [topic-name ks value]
"Returns the first message in the topic where attribute 'ks' is equal to 'value'. Can be
combined with the :watch command to assert that a message has been published:
[:watch (j/by-key :result-topic [:object :color] \"red\")]"
(fn [journal]
(first (messages-by-kv journal topic-name ks value))))
(defn by-keys [topic-name ks values]
"Returns all of the messages in the topic where attribute 'ks' is equal to one of the values.
Can be combined with the :watch command to assert that messages have been published:
[:watch (j/by-key :result-topic [:object :color] #{\"red\" \"green\" \"blue\"})]"
(fn [journal]
(messages-by-kv-fn journal topic-name ks (set values))))
(defn by-id [topic-name value]
"Returns all of the messages in the topic with an id of `value`. Can be combined with the
:watch command to assert that a message with the supplied id has been published:
[:watch (j/by-id :result-topic 123)]"
(by-key topic-name [:id] value))
(defn all-keys-present
"Returns true if all the passed ids can be found in the topic by key ks"
[topic-name ks ids]
(fn [journal]
(subset? (set ids)
(->> (messages journal topic-name)
(map #(get-in % ks))
set))))