/
client.clj
346 lines (293 loc) · 10.6 KB
/
client.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
(ns jepsen.mongodb.client
"Wraps the MongoDB Java client."
(:require [clojure.walk :as walk]
[clojure.tools.logging :refer [info warn]]
[jepsen [util :as util :refer [timeout]]]
[slingshot.slingshot :refer [try+ throw+]])
(:import (java.util ArrayList
List)
(java.util.concurrent TimeUnit)
(com.mongodb Block
ConnectionString
MongoClientSettings
MongoClientSettings$Builder
ServerAddress
WriteConcern
ReadConcern
ReadPreference)
(com.mongodb.client MongoClient
MongoClients
MongoCollection
MongoDatabase
TransactionBody)
(com.mongodb.client.model Filters
FindOneAndUpdateOptions
ReplaceOptions
ReturnDocument
Sorts
Updates
UpdateOptions)
(com.mongodb.client.result UpdateResult)
(com.mongodb.session ClientSession)
(org.bson Document)))
(def mongos-port 27017)
(def shard-port 27018)
(def config-port 27019)
;; Basic node manipulation
(defn addr->node
"Takes a node address like n1:27017 and returns just n1"
[addr]
((re-find #"(.+):\d+" addr) 1))
(defmacro with-block
"Wrapper for the functional mongo Block interface"
[x & body]
`(reify Block
(apply [_ ~x]
~@body)))
;; Connection management
(defn open
"Opens a connection to a node."
[node port]
(MongoClients/create
(.. (MongoClientSettings/builder)
(applyToClusterSettings (with-block builder
(.. builder
(hosts [(ServerAddress. node port)])
(serverSelectionTimeout 1 TimeUnit/SECONDS))))
(applyToSocketSettings (with-block builder
(.. builder
(connectTimeout 5 TimeUnit/SECONDS)
(readTimeout 5 TimeUnit/SECONDS))))
(applyToConnectionPoolSettings (with-block builder
(.. builder
(minSize 1)
(maxSize 1)
(maxWaitTime 1 TimeUnit/SECONDS))))
build)))
(defn await-open
"Blocks until (open node) succeeds. Helpful for initial cluster setup."
[node port]
(timeout 30000
(throw+ {:type ::timed-out-awaiting-connection
:node node
:port port})
(loop []
(or (try
(let [conn (open node port)]
(try
(.first (.listDatabaseNames conn))
conn
; Don't leak clients when they fail
(catch Throwable t
(.close conn)
(throw t))))
(catch com.mongodb.MongoTimeoutException e
(info "Mongo timeout while waiting for conn; retrying. "
(.getMessage e))
nil)
(catch com.mongodb.MongoSocketReadTimeoutException e
(info "Mongo socket read timeout waiting for conn; retrying")
nil))
; If we aren't ready, sleep and retry
(do (Thread/sleep 1000)
(recur))))))
; Basic plumbing
(defprotocol ToDoc
"Supports coercion to MongoDB BSON Documents."
(->doc [x]))
(extend-protocol ToDoc
nil
(->doc [_] (Document.))
clojure.lang.Keyword
(->doc [x] (name x))
clojure.lang.IPersistentMap
(->doc [x]
(->> x
(map (fn [[k v]] [(name k) (->doc v)]))
(into {})
(Document.)))
clojure.lang.Sequential
(->doc [x]
(ArrayList. (map ->doc x)))
Object
(->doc [x] x))
(defprotocol FromDoc
"Supports coercion from MongoDB BSON Documents"
(parse [x]))
(extend-protocol FromDoc
nil
(parse [x] nil)
Document
(parse [x]
(persistent!
(reduce (fn [m [k v]]
(assoc! m (keyword k) (parse v)))
(transient {})
(.entrySet x))))
UpdateResult
(parse [r]
{:matched-count (.getMatchedCount r)
:modified-count (.getModifiedCount r)
:upserted-id (.getUpsertedId r)
:acknowledged? (.wasAcknowledged r)})
List
(parse [x]
(map parse x))
Object
(parse [x]
x))
;; Write Concerns
(defn write-concern
"Turns a named (e.g. :majority, \"majority\") into a WriteConcern."
[wc]
(when wc
(case (name wc)
"acknowledged" WriteConcern/ACKNOWLEDGED
"journaled" WriteConcern/JOURNALED
"majority" WriteConcern/MAJORITY
"unacknowledged" WriteConcern/UNACKNOWLEDGED)))
(defn read-concern
"Turns a named (e.g. :majority, \"majority\" into a ReadConcern."
[rc]
(when rc
(case (name rc)
"available" ReadConcern/AVAILABLE
"default" ReadConcern/DEFAULT
"linearizable" ReadConcern/LINEARIZABLE
"local" ReadConcern/LOCAL
"majority" ReadConcern/MAJORITY
"snapshot" ReadConcern/SNAPSHOT)))
(defn transactionless-read-concern
"Read concern SNAPSHOT isn't supported outside transactions; we weaken it to
MAJORITY."
[rc]
(case rc
"snapshot" "majority"
rc))
;; Error handling
(defmacro with-errors
"Remaps common errors; takes an operation and returns a :fail or :info op
when a throw occurs in body."
[op & body]
`(try ~@body
(catch com.mongodb.MongoNotPrimaryException e#
(assoc ~op :type :fail, :error :not-primary))
(catch com.mongodb.MongoNodeIsRecoveringException e#
(assoc ~op :type :fail, :error :node-recovering))
(catch com.mongodb.MongoSocketReadTimeoutException e#
(assoc ~op :type :info, :error :socket-read-timeout))
(catch com.mongodb.MongoTimeoutException e#
(condp re-find (.getMessage e#)
#"Timed out after \d+ ms while waiting to connect"
(assoc ~op :type :fail, :error :connect-timeout)
(assoc ~op :type :info, :error :mongo-timeout)
(throw e#)))
(catch com.mongodb.MongoExecutionTimeoutException e#
(assoc ~op :type :info, :error :mongo-execution-timeout))
(catch com.mongodb.MongoWriteException e#
(condp re-find (.getMessage e#)
#"Not primary so we cannot begin or continue a transaction"
(assoc ~op :type :fail, :error :not-primary-cannot-txn)
#"Could not find host matching read preference"
(assoc ~op :type :fail, :error :no-host-matching-read-preference)
(throw e#)))
(catch com.mongodb.MongoCommandException e#
(condp re-find (.getMessage e#)
; Huh, this is NOT, as it turns out, a determinate failure.
#"TransactionCoordinatorSteppingDown"
(assoc ~op :type :info, :error :transaction-coordinator-stepping-down)
; This can be the underlying cause of issues like "unable to
; initialize targeter for write op for collection..."
; These are ALSO apparently not... determinate failures?
#"Connection refused"
(assoc ~op :type :info, :error :connection-refused)
; Likewise
#"Connection reset by peer"
(assoc ~op :type :info, :error :connection-reset-by-peer)
(throw e#)))
(catch com.mongodb.MongoClientException e#
(condp re-find (.getMessage e#)
; This... seems like a bug too
#"Sessions are not supported by the MongoDB cluster to which this client is connected"
(assoc ~op :type :fail, :error :sessions-not-supported-by-cluster)
(throw e#)))
(catch com.mongodb.MongoQueryException e#
(condp re-find (.getMessage e#)
#"Could not find host matching read preference"
(assoc ~op :type :fail, :error :no-host-matching-read-preference)
#"code 251" (assoc ~op :type :fail, :error :transaction-aborted)
; Why are there two ways to report this?
#"code 10107 " (assoc ~op :type :fail, :error :not-primary-2)
#"code 13436 " (assoc ~op :type :fail, :error :not-primary-or-recovering)
(throw e#)
))))
(defn ^MongoDatabase db
"Get a DB from a connection. Options may include
:write-concern e.g. :majority
:read-concern e.g. :local"
([conn db-name]
(.getDatabase conn db-name))
([conn db-name opts]
(let [rc (read-concern (:read-concern opts))
wc (write-concern (:write-concern opts))]
(cond-> (db conn db-name)
rc (.withReadConcern rc)
wc (.withWriteConcern wc)))))
(defn ^MongoCollection collection
"Gets a Mongo collection from a DB."
[^MongoDatabase db collection-name]
(.getCollection db collection-name))
(defn create-collection!
[^MongoDatabase db collection-name]
(.createCollection db collection-name))
;; Sessions
(defn start-session
"Starts a new session"
[conn]
(.startSession conn))
;; Transactions
(defmacro txn
"Converts body to a TransactionBody function."
[& body]
`(reify TransactionBody
(execute [this]
~@body)))
;; Actual commands
(defn command!
"Runs a command on the given db."
[^MongoDatabase db cmd]
(parse (.runCommand db (->doc cmd))))
(defn admin-command!
"Runs a command on the admin database."
[conn cmd]
(command! (db conn "admin") cmd))
(defn find-one
"Find a document by ID. If a session is provided, will use that session
for a causally consistent read"
([coll id]
(find-one coll nil id))
([^MongoCollection coll ^ClientSession session id]
(let [filt (Filters/eq "_id" id)]
(-> (if session
(.find coll session filt)
(.find coll filt))
.first
parse))))
(defn upsert!
"Ensures the existence of the given document, a map with at minimum an :_id
key."
([^MongoCollection coll doc]
(upsert! nil coll doc))
([^ClientSession session ^MongoCollection coll doc]
(assert (:_id doc))
(parse
(if session
(.replaceOne coll
session
(Filters/eq "_id" (:_id doc))
(->doc doc)
(.upsert (ReplaceOptions.) true))
(.replaceOne coll
(Filters/eq "_id" (:_id doc))
(->doc doc)
(.upsert (ReplaceOptions.) true))))))