-
Notifications
You must be signed in to change notification settings - Fork 25
/
storage_layout.cljc
177 lines (166 loc) · 8.48 KB
/
storage_layout.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
(ns konserve.impl.storage-layout
"One of these protocols must be implemented by each store to provide low level
access depending on the low-level storage layout chosen. Stores can support
multiple layouts."
(:require [konserve.serializers :refer [serializer-class->byte byte->key]]
[konserve.compressor :refer [compressor->byte byte->compressor]]
[konserve.encryptor :refer [encryptor->byte byte->encryptor]])
#?(:clj (:import [java.nio ByteBuffer])))
(def ^:const header-size 20)
(defn create-header
"Return Byte Array with following content
1th Byte = Storage layout used
2th Byte = Serializer Type
3th Byte = Compressor Type
4th Byte = Encryptor Type
5-8th Byte = Meta-Size
9th-20th Byte are spare"
[version serializer compressor encryptor meta]
#?(:clj
(let [serializer-id (get serializer-class->byte (type serializer))
compressor-id (get compressor->byte compressor)
encryptor-id (get encryptor->byte encryptor)
env-array (byte-array [version serializer-id compressor-id encryptor-id])
return-buffer (ByteBuffer/allocate header-size)
_ (.put return-buffer env-array)
_ (.putInt return-buffer 4 meta)
return-array (.array return-buffer)]
(.clear return-buffer)
return-array)
:cljs
(let [serializer-id (get serializer-class->byte (type serializer)) ;;TODO
compressor-id (get compressor->byte compressor)
encryptor-id (get encryptor->byte encryptor)
env-array #js [version serializer-id compressor-id encryptor-id]
return-buffer (js/Uint8Array. header-size)] ;;possibly sparse?
(dotimes [i (alength env-array)]
(aset return-buffer i (aget env-array i)))
(aset return-buffer 4 meta)
return-buffer)))
(defn- header-not-zero-padded? [^bytes bs]
;; does not have zero padding from byte 9 to 20
(or (not= 0 (aget bs 8))
(not= 0 (aget bs 9))
(not= 0 (aget bs 10))
(not= 0 (aget bs 11))
(not= 0 (aget bs 12))
(not= 0 (aget bs 13))
(not= 0 (aget bs 14))
(not= 0 (aget bs 15))
(not= 0 (aget bs 16))
(not= 0 (aget bs 17))
(not= 0 (aget bs 18))
(not= 0 (aget bs 19))))
(defn parse-header
"Inverse function to create-header. serializers are a map of serializer-id to
instance that are potentially initialized with custom handlers by the store
user. We assume compressors and encryptors to use system-wide standard configurations."
[header-bytes serializers]
#?(:clj
(let [bb (ByteBuffer/allocate header-size)
_ (.put bb ^bytes header-bytes)
version (.get bb 0)
_ (when-not (= version 1)
(throw (ex-info "Konserve version not supported."
{:type :konserve-version-in-header-no-supported
:header-version version
:supported-versions #{1}})))
serializer-id (.get bb 1)
compressor-id (.get bb 2)
encryptor-id (.get bb 3)
meta-size (.getInt bb 4)
;; was used temporarily at some point during 0.6.0-alpha (JVM only)
small-header-size 8
actual-header-size (if (and (= version 1)
(= 20 (count header-bytes))
;; use rest of header to detect actual 8
;; byte size requires version bump to set
;; these bytes to non-zero now
(header-not-zero-padded? header-bytes))
small-header-size header-size)
serializer (serializers (byte->key serializer-id))
compressor (byte->compressor compressor-id)
encryptor (byte->encryptor encryptor-id)]
(when-not serializer
(throw (ex-info "Serializer not found."
{:type :serializer-not-found
:serializers serializers
:serializer-id serializer-id})))
(when-not compressor
(throw (ex-info "Compressor not found."
{:type :compressor-not-found
:compressor-id compressor-id})))
(when-not encryptor
(throw (ex-info "Encryptor not found."
{:type :encryptor-not-found
:encryptor-id encryptor-id})))
[version
serializer
compressor
encryptor
meta-size
actual-header-size])
:cljs
(let [version (aget header-bytes 0)
_ (when-not (= version 1)
(throw (ex-info "Konserve version not supported."
{:type :konserve-version-in-header-no-supported
:header-version version
:supported-versions #{1}})))
serializer-id (aget header-bytes 1)
compressor-id (aget header-bytes 2)
encryptor-id (aget header-bytes 3)
meta-size (aget header-bytes 4)
serializer (serializers (byte->key serializer-id))
compressor (byte->compressor compressor-id)
encryptor (byte->encryptor encryptor-id)]
(when-not serializer
(throw (ex-info "Serializer not found."
{:type :serializer-not-found
:serializers serializers
:serializer-id serializer-id})))
(when-not compressor
(throw (ex-info "Compressor not found."
{:type :compressor-not-found
:compressor-id compressor-id})))
(when-not encryptor
(throw (ex-info "Encryptor not found."
{:type :encryptor-not-found
:encryptor-id encryptor-id})))
[version
serializer
compressor
encryptor
meta-size
header-size])))
(def ^:const default-version 1)
(defprotocol PBackingStore
"Backing store protocol for default implementation of the high-level konserve protocol."
(-create-blob [this store-key env] "Create a blob object to write a metadata and value into.")
(-delete-blob [this store-key env] "Delete a blob object under path.")
(-blob-exists? [this store-key env] "Check whether blob exists under path")
(-migratable [this key store-key env] "Check if blob exists elsewhere and return a migration key or nil.")
(-migrate [this migration-key key-vec serializer read-handlers write-handlers env] "Use the key returned from -migratable to import blob for key-vec.")
(-copy [this from to env] "Copy a blob from one key to another.")
(-atomic-move [this from to env] "Atomically move (rename) a blob.")
(-create-store [this env] "Create the underlying store.")
(-delete-store [this env] "Delete the underlying store.")
(-store-exists? [this env] "Check if underlying store already exists.")
(-sync-store [this env] "Synchronize the store. This is only needed if your store does not guarantee durability without this synchronisation command, e.g. fsync in the file system.")
(-keys [this env] "List all the keys representing blobs in the store.")
(-handle-foreign-key [this migration-key serializer read-handlers write-handlers env] "Handle keys not recognized by the current konserve version."))
(defprotocol PBackingBlob
"Blob object that is backing a stored value and its metadata."
(-sync [this env] "Synchronize this object and ensure it is stored. This is not necessary in many stores.")
(-close [this env] "Close the blob and ensure that pending data is send to the store.")
(-get-lock [this env] "Acquire a store specific lock on the blob. This is needed for access to the store from concurrent processes.")
(-read-header [this env] "Read header array.")
(-read-meta [this meta-size env] "Read metadata array.")
(-read-value [this meta-size env] "Read serialized edn value array.")
(-read-binary [this meta-size locked-cb env] "Read binary object and pass to locked-cb.")
(-write-header [this header-arr env] "Write header array.")
(-write-meta [this meta-arr env] "Write metadata array.")
(-write-value [this value-arr meta-size env] "Write value array.")
(-write-binary [this meta-size blob env] "Write binary blob."))
(defprotocol PBackingLock
(-release [this env] "Release this lock."))