-
Notifications
You must be signed in to change notification settings - Fork 2
/
wrap_stream_properties.clj
41 lines (37 loc) · 1.6 KB
/
wrap_stream_properties.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
(ns rill.wheel.wrap-stream-properties
(:require [rill.event-store :refer [EventStore retrieve-events-since append-events]]
[rill.event-stream :refer [all-events-stream-id]]))
(defn valid-props?
[p]
(and (map? p)
(sorted? p)))
(defrecord StreamPropertiesWrapper [delegated-event-store]
EventStore
(retrieve-events-since [this props cursor wait-for-seconds]
(assert (or (= all-events-stream-id props)
(valid-props? props))
"Can only use sorted maps as props")
(let [stream-id (if (= all-events-stream-id props)
all-events-stream-id
(pr-str props))
events (retrieve-events-since delegated-event-store stream-id cursor wait-for-seconds)]
(if (= props all-events-stream-id)
;; must fetch props for each event separately
(map (fn [e]
(let [id (read-string (:rill.message/stream-id e))]
(-> e
(merge id)
(assoc :rill.message/stream-id id))))
events)
;; set these props on every event
(map (fn [e] (-> e
(merge props)
(assoc :rill.message/stream-id props)))
events))))
(append-events [this props from-version events]
(assert (valid-props? props)
"Can only use sorted maps as props")
(append-events delegated-event-store (pr-str props) from-version (map #(apply dissoc % (keys props)) events))))
(defn wrap-stream-properties
[event-store]
(map->StreamPropertiesWrapper {:delegated-event-store event-store}))