This repository has been archived by the owner on Apr 29, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
common.clj
641 lines (560 loc) · 23.8 KB
/
common.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
(ns oc.lib.db.common
"CRUD functions on resources stored in RethinkDB."
(:require [clojure.string :as s]
[clojure.core.async :as async]
[defun.core :refer (defun defun-)]
[rethinkdb.query :as r]
[oc.lib.schema :as lib-schema]
[oc.lib.time :as oc-time]))
;; ----- ISO 8601 timestamp -----
(def timestamp-format oc-time/timestamp-format)
(def current-timestamp oc-time/current-timestamp)
;; ----- Utility functions -----
(defn conn?
"Check if a var is a valid RethinkDB connection map/atom."
[conn]
(lib-schema/conn? conn))
(defn updated-at-order
"Return items in a sequence sorted by their :updated-at key. Newest first."
[coll]
(vec (sort #(compare (:updated-at %2) (:updated-at %1)) coll)))
(defn unique-id
"Return a 12 character fragment from a UUID e.g. 51ab-4c86-a474"
[]
(s/join "-" (take 3 (rest (s/split (str (java.util.UUID/randomUUID)) #"-")))))
(defn s-or-k?
"Truthy if the provided value is a string or a keyword."
[value]
(or (string? value)
(keyword? value)))
(defn- drain-cursor
"If the result is a cursor, drain it into a Clojure sequence."
[result]
(if (= (type result) rethinkdb.net.Cursor)
(seq result)
result))
(defn- iterate-filters
"Given a list of filters, map the list into RethinkDB functions."
[filter-map row]
(for [filter filter-map]
(cond
(= :contains (:fn filter))
(r/contains (:value filter) (r/get-field row (:field filter)))
(= :ne (:fn filter))
(r/ne (:value filter) (r/get-field row (:field filter)))
(= :eq (:fn filter))
(r/eq (:value filter) (r/get-field row (:field filter)))
;; NB: For the following four, we reverse the expectation of what is the left-hand and right-hand
;; side of the comparison, so we provide the opposite filter from what has been asked for
(= :le (:fn filter))
(r/ge (:value filter) (r/get-field row (:field filter)))
(= :lt (:fn filter))
(r/gt (:value filter) (r/get-field row (:field filter)))
(= :ge (:fn filter))
(r/le (:value filter) (r/get-field row (:field filter)))
(= :gt (:fn filter))
(r/lt (:value filter) (r/get-field row (:field filter))))))
(defn build-filter-fn
[filter-map]
(r/fn [row]
(if (> (count filter-map) 1)
(apply r/and (iterate-filters filter-map row))
(first (iterate-filters filter-map row)))))
;; ----- DB Access Timeouts ----
(def default-timeout 50000) ; 50 sec
(defmacro with-timeout
"A basic macro to wrap things in a timeout.
Will throw an exception if the operation times out.
Note: This is a simplistic approach and piggybacks on core.asyncs executor-pool.
Read this discussion for more info: https://gist.github.com/martinklepsch/0caf92b5e42eefa3a894"
[ms & body]
`(let [c# (async/thread-call #(do ~@body))]
(let [[v# ch#] (async/alts!! [c# (async/timeout ~ms)])]
(if-not (= ch# c#)
(throw (ex-info "Operation timed out" {}))
v#))))
;; ----- Resource CRUD -----
(defn create-resource
"Create a resource in the DB, returning the property map for the resource."
[conn table-name resource timestamp]
{:pre [(conn? conn)
(s-or-k? table-name)
(map? resource)
(string? timestamp)]}
(let [timed-resource (merge resource {
:created-at timestamp
:updated-at timestamp})
insert (with-timeout default-timeout
(-> (r/table table-name)
(r/insert timed-resource)
(r/run conn)))]
(if (= 1 (:inserted insert))
timed-resource
(throw (RuntimeException. (str "RethinkDB insert failure: " insert))))))
(defn read-resource
"
Given a table name and a primary key value, retrieve the resource from the database,
or return nil if it doesn't exist.
"
[conn table-name primary-key-value]
{:pre [(conn? conn)
(s-or-k? table-name)]}
(-> (r/table table-name)
(r/get primary-key-value)
(r/run conn)))
(defn read-resources
"
Given a table name, and an optional index name and value, an optional set of fields, and an optional limit, retrieve
the resources from the database.
"
([conn table-name]
{:pre [(conn? conn)
(s-or-k? table-name)]}
(with-timeout default-timeout
(-> (r/table table-name)
(r/run conn)
(drain-cursor))))
([conn table-name fields]
{:pre [(conn? conn)
(s-or-k? table-name)
(sequential? fields)
(every? s-or-k? fields)]}
(with-timeout default-timeout
(-> (r/table table-name)
(r/with-fields fields)
(r/run conn)
(drain-cursor))))
([conn table-name index-name index-value]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? index-name)
(or (s-or-k? index-value) (sequential? index-value))]}
(let [index-values (if (sequential? index-value) index-value [index-value])]
(with-timeout default-timeout
(-> (r/table table-name)
(r/get-all index-values {:index index-name})
(r/run conn)
(drain-cursor)))))
([conn table-name index-name index-value fields]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? index-name)
(or (s-or-k? index-value) (sequential? index-value))
(sequential? fields)
(every? s-or-k? fields)]}
(let [index-values (if (sequential? index-value) index-value [index-value])]
(with-timeout default-timeout
(-> (r/table table-name)
(r/get-all index-values {:index index-name})
(r/pluck fields)
(r/run conn)
(drain-cursor)))))
([conn table-name index-name index-value fields limit]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? index-name)
(or (s-or-k? index-value) (sequential? index-value))
(sequential? fields)
(every? s-or-k? fields)
(number? limit)]}
(let [index-values (if (sequential? index-value) index-value [index-value])]
(with-timeout default-timeout
(-> (r/table table-name)
(r/get-all index-values {:index index-name})
(r/limit limit)
(r/pluck fields)
(r/run conn)
(drain-cursor))))))
(defn filter-resources
"
Given a table name, and a filter map in the form of:
- a `filter-map` A sequence of filters used to filter results.
example: [{:fn :ne :value 'blah' :field 'foo'}
{:fn :eq :value true :field 'bar'}]
Return the resources which pass the provided filter(s).
Valid filters are: :eq, :ne, :gt, :ge, :lt, :le
All filters are AND'ed together if more than one is provided.
"
([conn table-name filter-map]
{:pre [(conn? conn)
(s-or-k? table-name)
(sequential? filter-map)]}
(with-timeout default-timeout
(-> (r/table table-name)
(r/filter (build-filter-fn filter-map))
(r/run conn)
(drain-cursor))))
([conn table-name filter-map fields]
{:pre [(conn? conn)
(s-or-k? table-name)
(sequential? filter-map)
(sequential? fields)]}
(with-timeout default-timeout
(-> (r/table table-name)
(r/filter (build-filter-fn filter-map))
(r/pluck fields)
(r/run conn)
(drain-cursor)))))
(defn read-resources-and-relations
"
In the first arity (9): Given a table name, an index name and value, and what amounts to a document key, foreign table,
foreign key, foreign key index and the fields of the foreign table that are interesting, return all the resources
that match the index, and any related resources in the other table in an array in each resource.
E.g.
Table A: foo, bar
Table B: blat, a-bar, blu
Here `bar` and `a-bar` have the same value, with `a-bar` acting as a foreign key pointing each B at an A.
(read-resources-and-relations conn 'A' :foo-index '1234' :my-bees 'B' :bar :a-bar-index ['blat', 'blu'])
will return something like:
[
{
:foo '1234'
:bar 'abcd'
:my-bees [{:blat 'ferret' :blu 42} {:blat 'monkey' :blu 7}]
}
{
:foo '1234'
:bar 'efgh'
:my-bees [{:blat 'mouse' :blu 77} {:blat 'mudskipper' :blu 17}]
}
]
The second arity (14) is largely the same functionality as the first, but with more control over the selection and
order of the returned resources in the form of:
- an `order-by` field for the order of the returned resources
- an `order`, one of either `:desc` or `:asc`
- an initial value of the order-by field to `start` the limited set from
- a `direction`, one of either `:before` or `:after` the `start` value
- `limit`, a numeric limit to the number returned
The third arity (15) is largely the same functionality as the second, but with an additonal filter map
in the form of:
- a `filter-map` A sequence of filters used to filter results.
example: [{:fn :ne :value 'blah' :field 'foo'}
{:fn :eq :value true :field 'bar'}]
Valid filters are: :eq, :ne, :gt, :ge, :lt, :le
All filters are AND'ed together if more than one is provided.
"
;; TODO: Switch to a query map for index, relationship, filtering, and ordering.
([conn table-name index-name index-value
relation-name relation-table-name relation-field-name relation-index-name relation-fields {:keys [count] :or {count false}}]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? index-name)
(or (string? index-value) (sequential? index-value))
(s-or-k? relation-name)
(s-or-k? relation-table-name)
(s-or-k? relation-field-name)
(s-or-k? relation-index-name)
(sequential? relation-fields)
(every? s-or-k? relation-fields)]}
(let [index-values (if (sequential? index-value) index-value [index-value])]
(with-timeout default-timeout
(as-> (r/table table-name) query
(r/get-all query index-values {:index index-name})
(if-not count (r/merge query (r/fn [resource]
{relation-name (-> (r/table relation-table-name)
(r/get-all [(r/get-field resource relation-field-name)] {:index relation-index-name})
(r/pluck relation-fields)
(r/coerce-to :array))}))
query)
(if count (r/count query) query)
(r/run query conn)
(drain-cursor query)))))
([conn table-name index-name index-value
order-by order start direction limit
relation-name relation-table-name relation-field-name relation-index-name relation-fields {:keys [count] :or {count false}}]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? index-name)
(or (string? index-value) (sequential? index-value))
(s-or-k? order-by)
(#{:desc :asc} order)
(not (nil? start))
(#{:before :after} direction)
(number? limit)
(s-or-k? relation-name)
(s-or-k? relation-table-name)
(s-or-k? relation-field-name)
(s-or-k? relation-index-name)
(sequential? relation-fields)
(every? s-or-k? relation-fields)]}
(let [index-values (if (sequential? index-value) index-value [index-value])
order-fn (if (= order :desc) r/desc r/asc)
filter-fn (if (= direction :before) r/gt r/lt)]
(with-timeout default-timeout
(as-> (r/table table-name) query
(r/get-all query index-values {:index index-name})
(r/filter query (r/fn [row]
(filter-fn start (r/get-field row order-by))))
(if-not count (r/order-by query (order-fn order-by)) query)
(if-not count (r/limit query limit) query)
(if-not count (r/merge query (r/fn [resource]
{relation-name (-> (r/table relation-table-name)
(r/get-all [(r/get-field resource relation-field-name)] {:index relation-index-name})
(r/pluck relation-fields)
(r/coerce-to :array))}))
query)
(if count (r/count query) query)
(r/run query conn)
(drain-cursor query)))))
([conn table-name index-name index-value
order-by order start direction limit
filter-map
relation-name relation-table-name
relation-field-name relation-index-name relation-fields {:keys [count] :or {count false}}]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? index-name)
(or (string? index-value) (sequential? index-value))
(s-or-k? order-by)
(#{:desc :asc} order)
(not (nil? start))
(#{:before :after} direction)
(number? limit)
(sequential? filter-map)
(s-or-k? relation-name)
(s-or-k? relation-table-name)
(s-or-k? relation-field-name)
(s-or-k? relation-index-name)
(sequential? relation-fields)
(every? s-or-k? relation-fields)]}
(let [index-values (if (sequential? index-value) index-value [index-value])
order-fn (if (= order :desc) r/desc r/asc)
filter-fn (if (= direction :before) r/gt r/lt)
filter-by-fn (build-filter-fn filter-map)]
(with-timeout default-timeout
(as-> (r/table table-name) query
(r/get-all query index-values {:index index-name})
(r/filter query filter-by-fn)
(r/filter query (r/fn [row]
(filter-fn start (r/get-field row order-by))))
(if count (r/count query) query)
(if-not count (r/order-by query (order-fn order-by)) query)
(if-not count (r/limit query limit) query)
(if-not count (r/merge query (r/fn [resource]
{relation-name (-> (r/table relation-table-name)
(r/get-all [(r/get-field resource relation-field-name)] {:index relation-index-name})
(r/pluck relation-fields)
(r/coerce-to :array))}))
query)
(r/run query conn)
(drain-cursor query))))))
(defn read-resources-by-primary-keys
"Given a table name, a sequence of primary keys, and an optional set of fields, retrieve the
resources from the database."
([conn table-name primary-keys]
{:pre [(conn? conn)
(s-or-k? table-name)
(sequential? primary-keys)
(every? string? primary-keys)]}
(with-timeout default-timeout
(-> (r/table table-name)
(r/get-all primary-keys)
(r/run conn)
(drain-cursor))))
([conn table-name primary-keys fields]
{:pre [(conn? conn)
(s-or-k? table-name)
(sequential? primary-keys)
(every? string? primary-keys)
(sequential? fields)
(every? s-or-k? fields)]}
(with-timeout default-timeout
(-> (r/table table-name)
(r/get-all primary-keys)
(r/pluck fields)
(r/run conn)
(drain-cursor)))))
(defn read-resources-in-order
"
Given a table name, an index name and value, and an optional set of fields, retrieve
the resources from the database in updated-at property order.
"
([conn table-name index-name index-value]
{:pre [(conn? conn)]}
(updated-at-order
(read-resources conn table-name index-name index-value)))
([conn table-name index-name index-value fields]
{:pre [(conn? conn)]}
(updated-at-order
(read-resources conn table-name index-name index-value fields))))
(defn grouped-resources-by-most-common
"
Given a table name, an index name and value, a grouping field, return an sequence of the grouping field,
and a count of how many were in the group. Sequence is ordered, most common to least. Optionally specify
a limit on how many to return.
Response:
[['😜' 3] ['👌' 2] ['💥' 1]]
"
([conn table-name index-name index-value group-field]
(grouped-resources-by-most-common conn table-name index-name index-value group-field nil))
([conn table-name index-name index-value group-field limit]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? index-name)
(or (string? index-value) (sequential? index-value))
(or (nil? limit) (integer? limit))]}
(let [index-values (if (sequential? index-value) index-value [index-value])
resource-counts (with-timeout default-timeout
(-> (r/table table-name)
(r/get-all index-values {:index index-name})
(r/with-fields [group-field])
(r/group group-field)
(r/map (r/fn [value] 1))
(r/reduce (r/fn [l r] (r/add l r)))
(r/run conn)
(drain-cursor)))
sorted-resources (reverse (sort #(compare (resource-counts %1)
(resource-counts %2))
(keys resource-counts)))
limited-resources (if limit (take limit sorted-resources) sorted-resources)]
(vec (map #(vec [% (resource-counts %)]) limited-resources)))))
(defn months-with-resource
"
Given a table name, an index name and value, and an ISO8601 date field, return an ordered sequence of all the months
that have at least one resource.
Response:
[['2017' '06'] ['2017' '01'] [2016 '05']]
Sequence is ordered, newest to oldest.
"
[conn table-name index-name index-value date-field]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? index-name)
(or (string? index-value) (sequential? index-value))]}
(let [index-values (if (sequential? index-value) index-value [index-value])]
(reverse (sort-by #(str (first %) "-" (last %))
(with-timeout default-timeout
(-> (r/table table-name)
(r/get-all index-values {:index index-name})
(r/get-field date-field)
(r/map (r/fn [value] (r/limit (r/split value "-" 2) 2))) ; only the first 2 parts of the ISO8601 date
(r/distinct)
(r/run conn)
(drain-cursor)))))))
(defun update-resource
"
Given a table name, the name of the primary key, an optional original resource (for efficiency if it's already
been retrieved) and the updated resource, update the resource in the DB, returning the property map for the
updated resource.
"
([conn table-name primary-key-name original-resource :guard map? new-resource]
(update-resource conn table-name primary-key-name original-resource new-resource (current-timestamp)))
([conn table-name primary-key-name primary-key-value new-resource]
(if-let [original-resource (read-resource conn table-name primary-key-value)]
(update-resource conn table-name primary-key-name original-resource (merge original-resource new-resource))))
([conn :guard conn?
table-name :guard s-or-k?
primary-key-name :guard s-or-k?
original-resource :guard map?
new-resource :guard map?
timestamp :guard string?]
(let [timed-resource (merge new-resource {
primary-key-name (original-resource primary-key-name)
:created-at (:created-at original-resource)
:updated-at timestamp})
update (with-timeout default-timeout
(-> (r/table table-name)
(r/get (original-resource primary-key-name))
(r/replace timed-resource)
(r/run conn)))]
(if (or (= 1 (:replaced update)) (= 1 (:unchanged update)))
timed-resource
(throw (RuntimeException. (str "RethinkDB update failure: " update)))))))
(defn remove-property
"
Given a table name, the name of the primary key, and a property to remove,
update a resource in the DB, removing the specified property of the resource.
"
([conn table-name primary-key-value property-name]
(remove-property conn table-name primary-key-value property-name (current-timestamp)))
([conn table-name primary-key-value property-name timestamp]
{:pre [(conn? conn)]}
(let [update (with-timeout default-timeout
(-> (r/table table-name)
(r/get primary-key-value)
(r/replace (r/fn [resource]
(r/merge
(r/without resource [property-name])
{:updated-at timestamp})))
(r/run conn)))]
(if (or (= 1 (:replaced update)) (= 1 (:unchanged update)))
(read-resource conn table-name primary-key-value)
(throw (RuntimeException. (str "RethinkDB update failure: " update)))))))
(defn delete-resource
"Delete the specified resource and return `true`."
([conn table-name primary-key-value]
{:pre [(conn? conn)]}
(let [delete (with-timeout default-timeout
(-> (r/table table-name)
(r/get primary-key-value)
(r/delete)
(r/run conn)))]
(if (= 1 (:deleted delete))
true
(throw (RuntimeException. (str "RethinkDB delete failure: " delete))))))
([conn table-name key-name key-value]
{:pre [(conn? conn)]}
(let [delete (with-timeout default-timeout
(-> (r/table table-name)
(r/get-all [key-value] {:index key-name})
(r/delete)
(r/run conn)))]
(if (zero? (:errors delete))
true
(throw (RuntimeException. (str "RethinkDB delete failure: " delete)))))))
(defn delete-all-resources!
"Use with caution! Failure can result in partial deletes of just some resources. Returns `true` if successful."
[conn table-name]
{:pre [(conn? conn)]}
(let [delete (with-timeout default-timeout
(-> (r/table table-name)
(r/delete)
(r/run conn)))]
(if (pos? (:errors delete))
(throw (RuntimeException. (str "RethinkDB delete failure: " delete)))
true)))
;; ----- Set operations -----
(defn- update-set
"
For the resource specified by the primary key, add the element to the set of elements with the specified field
name. Return the updated resource if a change is made, and an exception on DB error.
"
[conn table-name primary-key-value field element set-operation]
{:pre [(conn? conn)
(s-or-k? table-name)
(s-or-k? field)]}
(let [field-key (keyword field)
ts (current-timestamp)
update (with-timeout default-timeout
(-> (r/table table-name)
(r/get primary-key-value)
(r/update (r/fn [document]
{:updated-at ts field-key (-> (r/get-field document field-key)(set-operation element))}))
(r/run conn)))]
(if (or (= 1 (:replaced update)) (= 1 (:unchanged update)))
(read-resource conn table-name primary-key-value)
(throw (RuntimeException. (str "RethinkDB update failure: " update))))))
(defn add-to-set
"
For the resource specified by the primary key, add the element to the set of elements with the specified field
name. Return the updated resource if a change is made, and an exception on DB error.
"
[conn table-name primary-key-value field element]
(update-set conn table-name primary-key-value field element r/set-insert))
(defn remove-from-set
"
For the resource specified by the primary key, remove the element to the set of elements with the specified
field name. Return the updated resource if a change is made, nil if not, and an exception on DB error.
"
[conn table-name primary-key-value field element]
(update-set conn table-name primary-key-value field [element] r/set-difference))
;; ----- REPL usage -----
(comment
(require '[rethinkdb.query :as r])
(require '[oc.lib.db.common :as db-common] :reload)
(def conn (apply r/connect [:host "127.0.0.1" :port 28015 :db "open_company_storage_dev"]))
(def conn2 (apply r/connect [:host "127.0.0.1" :port 28015 :db "open_company_auth_dev"]))
(db-common/read-resource conn2 "teams" "c55c-47f1-898e")
(db-common/add-to-set conn2 "teams" "c55c-47f1-898e" "admins" "1234-1234-1234")
(db-common/remove-from-set conn2 "teams" "c55c-47f1-898e" "admins" "1234-1234-1234")
)