This repository has been archived by the owner on Apr 23, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state_store.clj
64 lines (56 loc) · 1.86 KB
/
state_store.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
(ns rp.jackdaw.state-store
"Adds a protocol-based wrapper for interacting with KV state stores.
Includes some store-related utilities as well.
Nothing really jackdaw-specific here, but including in this library anyway.
Useful when dealing with explicit state stores using the lower-level Processor API
(for example via the Streams `transform` method)."
(:require [jackdaw.serdes.edn :as edn])
(:import [org.apache.kafka.streams.state KeyValueStore StoreBuilder Stores]))
(defprotocol KVStore
(get-key [this k] "Gets the value for key")
(set-key! [this k v] "Sets the value for key; deletes key when value is nil."))
(extend-type KeyValueStore
KVStore
(get-key [this k]
(.get this (name k)))
(set-key! [this k v]
(.put this (name k) v)))
(defn state-store-builder
"Returns a builder (for use with `.addStateStore`) for a persistent store with the specified name and serdes. Defaults to EDN serdes."
^StoreBuilder
([store-name key-serde value-serde]
(Stores/keyValueStoreBuilder
(Stores/persistentKeyValueStore store-name)
key-serde
value-serde))
([store-name]
(state-store-builder store-name (edn/serde) (edn/serde))))
;;
;; Misc
;;
(defn get-all-kvs
"Get _all_ the keys/values in a KeyValueStore. Handy for dev debugging, but be careful not to call it on a huge store."
[^KeyValueStore state-store]
(iterator-seq (.all state-store)))
;;
;; A mock implementation for tests that uses a map atom as a fake store.
;;
(defrecord MockKVStore [store]
KVStore
(get-key [this k]
(get @store (keyword k)))
(set-key! [this k v]
(if v
(swap! store assoc (keyword k) v)
(swap! store dissoc (keyword k)))
nil))
;; Convenience factory fn
(defn make-mock-store
([init-map]
(->MockKVStore (atom init-map)))
([]
(make-mock-store {})))
;; Mock helper
(defn get-mock-data
[mock-store]
@(:store mock-store))