-
Notifications
You must be signed in to change notification settings - Fork 727
/
Copy pathformat.clj
1594 lines (1388 loc) · 61.2 KB
/
format.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(ns jepsen.store.format
"Jepsen tests are logically a map. To save this map to disk, we originally
wrote it as a single Fressian file. This approach works reasonably well, but
has a few problems:
- We write test files multiple times: once at the end of a test, and once
once the analysis is complete--in case the analysis fails. Rewriting the
entire file is inefficient. It would be nice to incrementally append new
state.
- Histories are *enormous* relative to tests, but we force readers to
deserialize them before being able to get to any other high-level keys in the
test--for instance, the result map.
- It might be nice, someday, to have histories bigger than fit into memory.
- We have no way to incrementally write the history, which means if a test
crashes during the run we lose everything.
- Deserializing histories is a linear process, but it would be nice for
analyses to be able to parallelize.
- The web view needs a *little* metadata quickly: the name, the date, the
valid field of the result map. Forcing it to deserialize the entire world to
get this information is bad.
- Likewise, loading tests at the REPL is cumbersome--if all one wants is the
results, you should be able to skip the history. Working with the history
should ideally be lazy.
I held off on designing a custom serialization format for Jepsen for many
years, but at this point the design constraints feel pretty well set, and I
think the time is right to design a custom format.
## File Format Structure
Jepsen files begin with the magic UTF8 string JEPSEN, followed by a 32-byte
big-endian unsigned integer version field, which we use to read old formats
when necessary. Then there is a 64-bit offset into the file where the *block
index*--the metadata structure--lives. There follows a series of *blocks*:
6 32 64
| \"JEPSEN\" | version | block-index-offset | block 1 | block 2 | ...
In general, files are written by appending blocks sequentially to the end of
the file---this allows Jepsen to write files in (mostly) a single pass,
without moving large chunks of bytes around. When one is ready to save the
file, one writes a new index block to the end of the file which provides the
offsets of all the (active) blocks in the file, and finally updates the
block-index-offset at the start of the file to point to that most-recent
index block.
All integers are signed and big-endian, unless otherwise noted. This is the
JVM, after all.
Blocks may be sparse--their lengths may be shorter than the distance to the
start of the next block. This is helpful if one needs to rewrite blocks
later: you can leave padding for their sizes to change.
The top-level value of the file (e.g. the test map) is given in the block
index.
## Block Structure
All blocks begin with an 8-byte length prefix which indicates the length of
the block in bytes, including the length prefix itself. Then follows a CRC32
checksum. Third, we have a 16-bit block type field, which
identifies how to interpret the block. Finally, we have the block's data,
which is type-dependent.
64 32 16
| length | checksum | type | ... data ...
Checksums are computed by taking the CRC32 of the data region, THEN the block
header: the length, the checksum (all zeroes, for purposes of computing the
checksum itself), and the type. We compute checksums this way so that writers
can write large blocks of data with an unknown size in a single pass.
## Index Blocks (Type 1)
An index block lays out the overall arrangement of the file: it stores a map
of logical block numbers to file offsets, and also stores a root id, which
identifies the block containing the top-level test map. The root id comes
first, and is followed by the block map: a series of pairs, each a 32-bit
logical block ID and an offset into the file.
32 32 64 32 64
root id | id 1 | offset 1 | id 2 | offset 2 | ...
There is no block with ID 0: 0 is used as a nil sentinel when one wishes to
indicate the absence of a block.
## Fressian Blocks (Type 2)
A *Fressian block* encodes data (often a key-value map) using the Fressian
serialization format. This is already the workhorse for Jepsen serialization,
but we introduce a twist: large values, like the history and results, can be
stored in other blocks. That way you don't have to deserialize the entire
thing in order to read the top-level structure.
We create a special datatype, BlockRef, which we encode as a 'block-ref' tag
in Fressian. This ref simply contains the ID of the block which encodes that
tag's value.
| fressian data ... |
## PartialMap (Type 3)
Results are a bit weird. We want to efficiently fetch the :valid? field from
them, but the rest of the result map could be *enormous*. To speed this up,
we want to be able to write *part* of a map (for instance, just the results
:valid? field), and store the rest in a different block.
A PartialMap is essentially a cons cell: it comprises a Fressian-encoded map
and a pointer to the ID of a *rest* block (also a PartialMap) which encodes
the remainder of the map. This makes access to those parts of the map encoded
in the head cell fast.
32
| rest-ptr | fressian data ...
When rest-ptr is 0, that indicates there is no more data remaining.
## FressianStream (Type 4)
A FressianStream block allows us to write multiple Fressian-encoded values
into a single block. We represent it as:
| fressian data 1 ... | fressian data 2 ... | ...
Writers can write any number of Fressian-encoded values to the stream one
after the next. Readers start at the beginning and read values until the
block is exhausted. There is no count associated with this block type; it
must be inferred by reading all elements. We generally deserialize streams as
vectors to enable O(1) access and faster reductions over elements.
## BigVector (Type 5)
Histories are chonky boys. 100K operations (each a map) are common, and it's
conceivable we might want to work with histories of tens of millions of
operations. We also want to write them incrementally, so that we can recover
from crashes. It's also nice to be able to deserialize small bits of the
history, or to reduce over it in parallel. To do this, we need a streaming
format for large vectors.
We write each chunk of the vector as a separate block. Then we refer to those
chunks with a BigVector, which stores some basic metadata about the vector as
a whole, and then pointers to each block. Its format is:
64 64 32 64 32
| count | index 1 | pointer 1 | index 2 | pointer 2 | ...
Count is the number of elements in the vector overall. Index 1 is always
0--the offset of the first element in the first chunk. Pointer 1 is the block
ID of the Fressian block which contains the first chunk's data. Index 2 is
the index of the first element in the second chunk, and pointer 2 is the
block ID of the second chunk's data, and so on.
Chunk data can be stored in a Fressian block, a FressianStream block, or
another BigVector.
Access to BigVectors looks very much like a regular Clojure vector. We
deserialize chunks on-demand, caching results as they're accessed. We can
offer O(1) `count` through the count field. We implement `nth` by finding the
chunk a given index belongs to and then looking up the index in that chunk.
Assoc works by assoc'ing into that particular chunk, leaving other chunks
unchanged.
## That's It
There's a lot of obvious stuff I've left out here--metadata, top-level
integrity checks, garbage collection, etc etc... but I think we can
actually skip almost all of it and get a ton of benefit for the limited
use case Jepsen needs.
1. Write the header.
2. Write an empty vector as block 1, for the history.
3. Write the initial test map as a PartialMap block to block 2, pointing to
block 1 as the history. Write an index block pointing to 2 as the root.
4. Write the history incrementally as the test proceeds. Write operations as
they occur to a new FressianStream block. Periodically, and at the end of the
history:
a. Seal that FressianStream block, writing the headers. Call that block id
B.
b. Write a new version of the history block with a new chunk appended: B.
c. Write a new index block with the new history block version.
This ensures that if we crash during the run, we can recover at least some
of the history up to the most recent checkpoint.
5. Write the results as a PartialMap to blocks 4 and 5: 4 containing the
:valid? field, and 5 containing the rest of the results.
6. The test may contain state which changed by the end of the test, and we
might want to save that state. Write the entire test map again as block 6,
again using block 1 as the history, and now block 5 as the results map. Write
a new index block with block 6 as the root.
To read this file, we:
1. Check the magic and version.
2. Read the index block offset.
3. Read the index block into memory.
4. Look up the root block ID, use the index to work out its offset, read that
block, and decode it into a lazy map structure.
When it comes time to reference the results or history in that lazy map, we
look up the right block in the block index, seek to that offset, and decode
whatever's there.
Decoding a block is straightforward. We grab the length header, run a CRC
over that region of the file, check the block type, then decode the remaining
data based on the block structure."
(:require [byte-streams :as bs]
[clojure [set :as set]
[walk :as walk]]
[clojure.data.fressian :as fress]
[clojure.tools.logging :refer [info warn]]
[clojure.core.reducers :as r]
[clojure.java.io :as io]
[dom-top.core :refer [assert+]]
[jepsen [history :as history]
[util :as util :refer [map-vals
with-thread-name]]
[fs-cache :refer [write-atomic!]]]
[jepsen.history.core :refer [soft-chunked-vector]]
[jepsen.store.fressian :as jsf]
[potemkin :refer [def-map-type
definterface+]]
[slingshot.slingshot :refer [try+ throw+]])
(:import (java.io BufferedOutputStream
Closeable
EOFException
File
InputStream
OutputStream
PipedInputStream
PipedOutputStream)
(java.nio ByteBuffer)
(java.nio.channels FileChannel
FileChannel$MapMode)
(java.nio.file StandardOpenOption)
(java.util Arrays)
(java.util.concurrent ArrayBlockingQueue
BlockingQueue
ForkJoinTask)
(java.util.function Consumer)
(java.util.zip CRC32)
(jepsen.history.core SoftChunkedVector)
(jepsen.store.format FileOffsetOutputStream)
(org.fressian.handlers WriteHandler
ReadHandler)))
(def magic
"The magic string at the start of Jepsen files."
"JEPSEN")
(def magic-size
"Bytes it takes to store the magic string."
(count magic))
(def magic-offset
"Where the magic is written"
0)
(def current-version
"The current file version.
Version 0 was the first version of the file format.
Version 1 added support for FressianStream and BigVector blocks."
1)
(def version-size
"Bytes it takes to store a version."
4)
(def version-offset
"Where in the file the version begins"
(+ magic-offset magic-size))
(def block-id-size
"How many bytes per block ID?"
4)
(def block-offset-size
"How many bytes per block offset address?"
8)
(def block-index-offset-offset
"Where in the file do we write the offset of the index block?"
(+ version-offset version-size))
(def first-block-offset
"Where in the file the first block begins."
(+ block-index-offset-offset block-offset-size))
;; Block Headers
(def block-len-size
"How long is the length prefix for a block?"
8)
(def block-len-offset
"Where do we write a block length in a block header?"
0)
(def block-checksum-size
"How long is the checksum for a block?"
4)
(def block-checksum-offset
"Where do we write a checksum in the block header?"
(+ block-len-offset block-len-size))
(def block-type-size
"How long is the type for a block?"
16)
(def block-type-offset
"Where do we store the block type in a block header?"
(+ block-checksum-offset block-checksum-size))
(def short->block-type
"A map of integers to block types."
{(short 1) :block-index
(short 2) :fressian
(short 3) :partial-map
(short 4) :fressian-stream
(short 5) :big-vector})
(def block-type->short
"A map of block types to integer codes."
(->> short->block-type (map (juxt val key)) (into {})))
(def block-header-size
"How long is a block header?"
(+ block-len-size block-checksum-size block-type-size))
(def big-vector-count-size
"How many bytes do we use to store a bigvector's count?"
8)
(def big-vector-index-size
"How many bytes do we use to store a bigvector element's index?"
8)
;; Perf tuning
(def large-region-size
"How big does a file region have to be before we just mmap it instead of
doing file reads?"
(* 1024 1024)) ; 1M
(def fressian-buffer-size
"How many bytes should we buffer before writing Fressian data to disk?"
16384) ; 16K
(def big-vector-chunk-size
"How many elements should we write to a chunk of a BigVector before starting
a new one?"
16384)
(defrecord BlockRef [^int id])
(defn block-ref
"Constructs a new BlockRef object pointing to the given block ID."
[id]
(BlockRef. id))
(defrecord Handle
[^FileChannel file ; The filechannel we use for reads and writes
version ; An atom: what version is this file? Initially nil.
block-index ; An atom to a block index: a map of block IDs to offsets
written? ; An atom: have we written to this file yet?
read? ; An atom: have we read this file yet?
]
Closeable
(close [this]
(reset! block-index :closed)
(.close file)))
(defn version
"Returns the version of a Handle."
[^Handle handle]
@(.version handle))
(defn ^Handle open
"Constructs a new handle for a Jepsen file of the given path (anything which
works with io/file)."
[path]
(let [path (-> path io/file .toPath)
f (FileChannel/open path
(into-array StandardOpenOption
[StandardOpenOption/CREATE
StandardOpenOption/READ
StandardOpenOption/WRITE]))
block-index (atom {:root nil
:blocks {}})]
(Handle. f (atom nil) block-index (atom false) (atom false))))
(defn close!
"Closes a Handle"
[^Closeable handle]
(.close handle)
nil)
(defn flush!
"Flushes writes to a Handle to disk."
[handle]
(.force ^FileChannel (:file handle) false)
handle)
; General IO routines
(defn write-file!
"Takes a FileChannel, an offset, and a ByteBuffer. Writes the ByteBuffer to
the FileChannel at the given offset completely. Returns number of bytes
written."
[^FileChannel file offset ^ByteBuffer buffer]
(let [size (.remaining ^ByteBuffer buffer)
written (.write file buffer offset)]
; Gonna punt on this for now because the position semantics are
; tricky and I'm kinda hoping we never hit it
(assert+ (= size written)
{:type ::incomplete-write
:offset offset
:expected size
:actual written})
written))
(defn ^ByteBuffer read-file
"Returns a ByteBuffer corresponding to a given file region. Uses mmap for
large regions, or regular read calls for small ones."
[^FileChannel file, ^long offset, ^long size]
(if (<= size large-region-size)
; Small region: read directly
(let [buf (ByteBuffer/allocate size)
bytes-read (.read file buf offset)]
(assert+ (= size bytes-read)
{:type ::incomplete-read
:offset offset
:expected size
:actual bytes-read})
(.rewind buf))
; Big region: mmap
(.map file FileChannel$MapMode/READ_ONLY offset size)))
; General file headers
(defn write-header!
"Takes a Handle and writes the initial magic bytes and version number.
Initializes the handle's version to current-version if it hasn't already been
set. Returns handle."
[^Handle handle]
(swap! (.version handle) (fn [v]
(if (or (nil? v)
(= v current-version))
current-version
(throw+
{:type ::can't-write-old-version
:current-version current-version
:handle-version v}))))
(let [buf (ByteBuffer/allocate (+ magic-size version-size))
file ^FileChannel (:file handle)]
(.position file 0)
(bs/transfer magic file {:close? false})
(.putInt buf (version handle))
(.flip buf)
(write-file! file version-offset buf))
handle)
(defn check-magic
"Takes a Handle and reads the magic bytes, ensuring they match."
[handle]
(let [file ^FileChannel (:file handle)
buf (ByteBuffer/allocate magic-size)]
(let [read-bytes (.read file buf magic-offset)
_ (.flip buf)
fmagic (bs/convert buf String)]
(when (or (not= magic-size read-bytes)
(not= magic fmagic))
(throw+ {:type ::magic-mismatch
:expected magic
:actual (if (= -1 read-bytes)
:eof
fmagic)}))))
handle)
(defn check-version!
"Takes a Handle and reads the version. Ensures it's a version we can decode,
and updates the Handle's version if it hasn't already been set."
[^Handle handle]
(let [file ^FileChannel (:file handle)
buf (ByteBuffer/allocate version-size)
read-bytes (.read file buf version-offset)
fversion (.getInt buf 0)]
(when-not (= version-size read-bytes)
(throw+ {:type ::version-incomplete
:expected version-size
:actual read-bytes}))
(when-not (contains? #{0 1} fversion)
(throw+ {:type ::version-mismatch
:expected version
:actual (if (= -1 read-bytes)
:eof
fversion)}))
(swap! (.version handle) (fn [v]
(if (or (nil? v)
(= fversion v))
fversion
(throw+ {:type ::can't-load-mixed-version
:handle-version v
:file-version fversion}))))
handle))
(defn prep-write!
"Called when we write anything to a handle. Ensures that we've written the
header before doing anything else. Returns handle."
[handle]
(when (compare-and-set! (:written? handle) false true)
(write-header! handle))
handle)
(declare load-block-index!)
(defn prep-read!
"Called when we read anything from a handle. Ensures that we've checked the
magic, version, and loaded the block index."
[handle]
(when (compare-and-set! (:read? handle) false true)
(-> handle check-magic check-version! load-block-index!))
handle)
; Fetching and updating the block index offset root pointer
(defn write-block-index-offset!
"Takes a handle and the offset of a block index block to use as the new root.
Updates the file's block pointer. Returns handle."
[handle root]
(let [buf (ByteBuffer/allocate block-offset-size)]
(.putLong buf 0 root)
(write-file! (:file handle) block-index-offset-offset buf))
handle)
(defn read-block-index-offset
"Takes a handle and returns the current root block index offset from its
file. Throws :type ::no-block-index if the block index is 0 or the file is
too short."
[handle]
(try+
(let [buf ^ByteBuffer (read-file (:file handle)
block-index-offset-offset
block-offset-size)
offset (.getLong buf 0)]
(when (zero? offset)
(throw+ {:type ::no-block-index}))
offset)
(catch [:type ::incomplete-read] e
(throw+ {:type ::no-block-index}))))
; Working with block headers
(defn ^ByteBuffer block-header
"Returns a blank ByteBuffer for a block header. All fields zero."
[]
(ByteBuffer/allocate block-header-size))
(defn block-header-type
"Returns the type of a block header, as a keyword."
[^ByteBuffer header]
(short->block-type (.getShort header block-type-offset)))
(defn set-block-header-type!
"Sets the type (a keyword) in a block header. Returns the header."
[^ByteBuffer buf block-type]
(let [type-short (assert+ (block-type->short block-type)
{:type ::no-such-block-type
:block-type block-type})]
(.putShort buf block-type-offset type-short)))
(defn block-header-length
"Fetches the length of a block header."
[^ByteBuffer header]
(.getLong header block-len-offset))
(defn set-block-header-length!
"Sets the length in a block header. Returns the block header."
[^ByteBuffer buf length]
(.putLong buf block-len-offset length))
(defn block-header-checksum
"Fetches the checksum of a block header."
[^ByteBuffer header]
(.getInt header block-checksum-offset))
(defn set-block-header-checksum!
"Sets the checksum in a block header. Returns the block header."
[^ByteBuffer buf checksum]
(.putInt buf block-checksum-offset checksum))
(defn block-checksum-given-data-checksum
"Computes the checksum of a block, given a ByteBuffer header, and an
already-computed CRC32 checksum of the data. Useful for streaming writers
which compute their own checksums while writing. Mutates data-crc in place; I
can't figure out how to safely copy it."
[^ByteBuffer header, ^CRC32 data-crc]
(let [header' (-> (block-header)
(set-block-header-type! (block-header-type header))
(set-block-header-length! (block-header-length header)))]
(.update data-crc ^ByteBuffer header')
(unchecked-int (.getValue data-crc))))
(defn ^Integer block-checksum
"Compute the checksum of a block, given two bytebuffers: one for the header,
and one for the data."
[header, ^ByteBuffer data]
(let [c (CRC32.)]
(.rewind data)
;(bs/print-bytes data)
(.update c data)
(block-checksum-given-data-checksum header c)))
(defn check-block-checksum
"Verifies the checksum of a block, given two ByteBuffers: one for the header,
and one for the data."
[^ByteBuffer header ^ByteBuffer data]
(let [expected (block-header-checksum header)
actual (block-checksum header data)]
(assert+ (= expected actual)
{:type ::checksum-mismatch
:expected expected
:actual actual})))
(defn ^ByteBuffer read-block-header
"Fetches the ByteBuffer for a block header at the given offset."
[handle ^long offset]
(let [file ^FileChannel (:file handle)
buf (block-header)
read-bytes (.read file buf offset)]
(assert+ (= read-bytes block-header-size)
{:type ::block-header-truncated
:offset offset
:length (max 0 read-bytes)})
;(info :read-block-header :offset offset
; :type (block-header-type buf)
; :length (block-header-length buf)
; :checksum (block-header-checksum buf))
(.rewind buf)
buf))
(defn ^ByteBuffer read-block-data
"Fetches the ByteBuffer for a block's data, given a block header stored at
the given offset."
[handle offset header]
(let [file ^FileChannel (:file handle)
data-length (- (block-header-length header) block-header-size)
buf (ByteBuffer/allocateDirect data-length)
read-bytes (.read file buf (+ offset block-header-size))]
;(info :read-block-data :offset offset
; :block-header-size block-header-size
; :data-length data-length
; :data "\n"
; (with-out-str (bs/print-bytes (.rewind buf))))
(assert+ (= read-bytes data-length)
{:type ::block-data-truncated
:offset offset
:expected data-length
:actual read-bytes})
(.rewind buf)
buf))
(defn write-block-header!
"Writes a block header to the given offset in the file backed by the given
handle. Returns handle."
[handle ^long offset ^ByteBuffer block-header]
(.rewind block-header)
(let [written (.write ^FileChannel (:file handle) block-header offset)]
(assert+ (= written block-header-size)
{:type ::block-header-write-failed
:written written
:expected block-header-size}))
;(info :wrote-block-header :offset offset
; :type (block-header-type block-header)
; :length (block-header-length block-header)
; :checksum (block-header-checksum block-header))
handle)
(defn write-block-data!
"Writes block data to the given block offset (e.g. the address of the header,
not the data itself) in the file, backed by the given handle. Returns
handle."
[handle ^long offset ^ByteBuffer data]
(.rewind data)
(write-file! (:file handle) (+ offset block-header-size) data)
handle)
(defn ^ByteBuffer block-header-for-length-and-checksum!
"An optimized way to construct a block header, a block type, the length of a
data region (not including headers) and the CRC checksum of that data.
Mutates the checksum in place."
[block-type data-length data-checksum]
(let [header (block-header)]
(-> header
(set-block-header-type! block-type)
(set-block-header-length! (+ block-header-size data-length))
(set-block-header-checksum! (block-checksum-given-data-checksum
header data-checksum)))))
(defn ^ByteBuffer block-header-for-data
"Takes a block type and a ByteBuffer of data, and constructs a block header
whose type is the given type, and which has the appropriate length and
checksum for the given data."
[block-type ^ByteBuffer data]
(let [header (block-header)]
(-> header
(set-block-header-type! block-type)
(set-block-header-length! (+ block-header-size (.limit data)))
(set-block-header-checksum! (block-checksum header data)))))
(defn write-block!
"Writes a block to a handle at the given offset, given a block type as a
keyword and a ByteBuffer for the block's data. Returns handle."
[handle ^long offset block-type data]
(-> handle
(write-block-header! offset (block-header-for-data block-type data))
(write-block-data! offset data)))
(defn read-block-by-offset*
"Takes a Handle and the offset of a block. Reads the block header and data,
validates the checksum, and returns a map of:
{:header header, as bytebuffer
:data data, as bytebuffer}"
[handle offset]
(let [file ^FileChannel (:file handle)
header (read-block-header handle offset)
data (read-block-data handle offset header)]
(check-block-checksum header data)
(.rewind data)
{:header header
:data data}))
(declare read-block-index-block
read-fressian-block
read-partial-map-block
read-fressian-stream-block
read-big-vector-block)
(defn read-block-by-offset
"Takes a Handle and the offset of a block. Reads the block header, validates
the checksum, and interprets the block data depending on the block type.
Returns a map of:
{:type The block type, as a keyword
:offset The offset of this block
:length How many bytes are in this block, total
:data The interpreted data stored in this block---depends on block type}"
[handle offset]
(prep-read! handle)
(let [{:keys [header data]} (read-block-by-offset* handle offset)
type (block-header-type header)]
{:type type
:offset offset
:length (block-header-length header)
:data (case type
:block-index (read-block-index-block handle data)
:fressian (read-fressian-block handle data)
:partial-map (read-partial-map-block handle data)
:fressian-stream (read-fressian-stream-block handle data)
:big-vector (read-big-vector-block handle data))}))
;; Block indices
(defn new-block-id!
"Takes a handle and returns a fresh block ID for that handle, mutating the
handle so that this ID will not be allocated again."
[handle]
(let [index (:block-index handle)
bs (:blocks @index)
id (int (if (empty? bs)
1 ; Blocks start at 1
(inc (reduce max (keys bs)))))]
(swap! index assoc-in [:blocks id] :reserved)
id))
(defn next-block-offset
"Takes a handle and returns the offset of the next block. Right now this is
just the end of the file."
[handle]
(max (.size ^FileChannel (:file handle))
first-block-offset))
(defn assoc-block!
"Takes a handle, a block ID, and its corresponding offset. Updates the
handle's block index (in-memory) to add this mapping. Returns handle."
[handle id offset]
(swap! (:block-index handle) assoc-in [:blocks id] offset)
handle)
(defn set-root!
"Takes a handle and a block ID. Updates the handle's block index (in-memory)
to point to this block ID as the root. Returns handle."
[handle root-id]
(swap! (:block-index handle) assoc :root root-id)
handle)
(defn block-index-data-size
"Takes a block index and returns the number of bytes required for that block
to be written, NOT including headers."
[index]
(+ block-id-size
(* (count (:blocks index))
(+ block-id-size block-offset-size))))
(defn write-block-index!
"Writes a block index for a Handle, based on whatever that Handle's current
block index is. Automatically generates a new block ID for this index and
adds it to the handle as well. Then writes a new block index offset pointing
to this block index. Returns handle."
([handle]
(let [id (new-block-id! handle)
offset (next-block-offset handle)
_ (assoc-block! handle id offset)]
(write-block-index! handle offset)))
([handle offset]
(prep-write! handle)
(let [file ^FileChannel (:file handle)
index @(:block-index handle)
data (ByteBuffer/allocate (block-index-data-size index))]
; Write the root ID
(.putInt data (or (:root index) (int 0)))
; And each block mapping
(doseq [[id offset] (:blocks index)]
(when-not (= :reserved offset)
(.putInt data id)
(.putLong data offset)))
(.flip data)
; Write the header and data to the file.
(write-block! handle offset :block-index data)
; And the block index offset
(write-block-index-offset! handle offset))
handle))
(defn read-block-index-block
"Takes a ByteBuffer and reads a block index from it: a map of
{:root root-id
:blocks {id offset, id2 offset2, ...}}"
[handle ^ByteBuffer data]
(let [root (.getInt data)]
(loop [index (transient {})]
(if (.hasRemaining data)
(let [id (.getInt data)
offset (.getLong data)]
(recur (assoc! index id offset)))
{:root (if (zero? root) nil root)
:blocks (persistent! index)}))))
(defn load-block-index!
"Takes a handle, reloads its block index from disk, and returns handle."
[handle]
(let [block (read-block-by-offset handle (read-block-index-offset handle))]
(assert+ (= :block-index (:type block))
{:type ::block-type-mismatch
:expected :block-index
:actual (:type block)})
(reset! (:block-index handle) (:data block))
;(info :block-index (:data block))
)
handle)
(defn read-block-by-id
"Takes a handle and a logical block id. Looks up the offset for the given
block and reads it using read-block-by-offset (which includes verifying the
checksum)."
[handle id]
(assert (instance? Integer id)
(str "Block ids are integers, not " (class id) " - " (pr-str id)))
(prep-read! handle)
(if-let [offset (get-in @(:block-index handle) [:blocks id])]
(read-block-by-offset handle offset)
(throw+ {:type ::block-not-found
:id id
:known-block-ids (sort (keys (:blocks @(:block-index handle))))})))
(defn read-root
"Takes a handle. Looks up the root block from the current block index and
reads it. Returns nil if there is no root."
[handle]
(prep-read! handle)
(when-let [root (:root @(:block-index handle))]
(read-block-by-id handle root)))
;; Fressian blocks
(def fressian-write-handlers
"How do we write Fressian data?"
(-> jsf/write-handlers*
(assoc jepsen.store.format.BlockRef
{"block-ref" (reify WriteHandler
(write [_ w block-ref]
(.writeTag w "block-ref" 1)
(.writeObject w (:id block-ref))))})
fress/associative-lookup
fress/inheritance-lookup))
(def fressian-read-handlers
"How do we read Fressian data?"
(-> jsf/read-handlers*
(assoc "block-ref" (reify ReadHandler
(read [_ rdr tag component-count]
(block-ref (int (.readObject rdr))))))
fress/associative-lookup))
(defn write-fressian-to-file!
"Takes a FileChannel, an offset, a checksum, and a data structure as
Fressian. Writes the data structure as Fressian to the file at the given
offset. Returns the size of the data that was just written, in bytes. Mutates
checksum with written bytes."
[^FileChannel file, ^long offset, ^CRC32 checksum, data]
; First, write the data to the file directly; then we'll go back and write
; the header.
(with-open [foos (FileOffsetOutputStream.
file offset checksum)
bos (BufferedOutputStream.
foos fressian-buffer-size)
w ^Closeable (jsf/writer
bos {:handlers fressian-write-handlers})]
(fress/write-object w data)
(.flush bos)
(.bytesWritten foos)))
(defn write-fressian-block!*
"Takes a handle, a byte offset, and some Clojure data. Writes that data to a
Fressian block at the given offset. Returns handle."
[handle offset data]
; First, write the data to the file directly; then we'll go back and write
; the header.
(let [data-offset (+ offset block-header-size)
checksum (CRC32.)
data-size (write-fressian-to-file!
(:file handle) data-offset checksum data)
; Construct a ByteBuffer over the region we just wrote
;TODO: unused?
data-buf (read-file (:file handle) data-offset data-size)
; And build our header
header (block-header-for-length-and-checksum!
:fressian data-size checksum)]
; Now write the header; data's already in the file.
(write-block-header! handle offset header)
handle))
(defn write-fressian-block!
"Takes a handle, an optional block ID, and some Clojure data. Writes that
data to a Fressian block at the end of the file, records the new block in the
handle's block index, and returns the ID of the newly written block."
([handle data]
(write-fressian-block! handle (new-block-id! handle) data))
([handle id data]
(let [offset (next-block-offset handle)]
(-> handle
(write-fressian-block!* offset data)
(assoc-block! id offset))
id)))
(defn read-fressian-block
"Takes a handle and a ByteBuffer of data from a Fressian block. Returns its
parsed contents."
[handle ^ByteBuffer data]
(with-open [is (bs/to-input-stream data)
r ^Closeable (jsf/reader
is {:handlers fressian-read-handlers})]
(fress/read-object r)))
;; Fressian stream blocks
(defrecord FressianStreamBlockWriter
[handle
^int block-id
^long offset
^CRC32 checksum
^FileOffsetOutputStream file-offset-output-stream
^BufferedOutputStream buffered-output-stream
^Closeable fressian-writer]