This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
checkpoint.clj
77 lines (68 loc) · 2.53 KB
/
checkpoint.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
(ns onyx.state.serializers.checkpoint
"Streaming serializer / deserializer for state checkpoints."
(:import [org.agrona.concurrent UnsafeBuffer]
[org.agrona MutableDirectBuffer]
[org.agrona ExpandableArrayBuffer]))
(def current-schema-version 0)
(defprotocol SEncoder
(set-schema-version [this version])
(set-metadata [this bs])
(set-next-bytes [this bs])
(encoded-bytes [this])
(length [this]))
(defprotocol SDecoder
(get-schema-version [this])
(get-metadata [this])
(get-next-bytes [this]))
(deftype StateCheckpointEncoder [^MutableDirectBuffer buffer
initial-offset
^:unsynchronized-mutable offset]
SEncoder
(set-schema-version [this version]
(.putInt buffer offset (int version))
(set! offset (unchecked-add-int offset 4)))
(set-metadata [this bs]
(let [len (alength ^bytes bs)
bytes-offset (unchecked-add-int offset 8)]
(.putLong buffer offset len)
(.putBytes buffer bytes-offset ^bytes bs)
(set! offset (unchecked-add-int bytes-offset len))))
(set-next-bytes [this bs]
(let [len (int (alength ^bytes bs))
bytes-offset (unchecked-add-int offset 4)]
(.putInt buffer offset len)
(.putBytes buffer bytes-offset ^bytes bs)
(set! offset (unchecked-add-int bytes-offset len))))
(encoded-bytes [this]
(let [bs (byte-array (- offset initial-offset))]
(.getBytes buffer initial-offset bs)
bs))
(length [this]
offset))
(defn empty-checkpoint-encoder []
(->StateCheckpointEncoder (ExpandableArrayBuffer.) 0 0))
(deftype StateCheckpointDecoder [^MutableDirectBuffer buffer
^long length
^:unsynchronized-mutable offset]
SDecoder
(get-schema-version [this]
(let [version (.getInt buffer offset)]
(set! offset (unchecked-add-int offset 4))
version))
(get-metadata [this]
(let [len (.getLong buffer offset)
ba (byte-array len)
bytes-offset (unchecked-add-int offset 8)]
(.getBytes buffer bytes-offset ba)
(set! offset (unchecked-add-int bytes-offset len))
ba))
(get-next-bytes [this]
(when (< offset length)
(let [len (.getInt buffer offset)
ba (byte-array len)
bytes-offset (unchecked-add-int offset 4)]
(.getBytes buffer bytes-offset ba)
(set! offset (unchecked-add-int bytes-offset len))
ba))))
(defn new-decoder [^bytes bs]
(->StateCheckpointDecoder (UnsafeBuffer. bs) (alength bs) 0))