-
Notifications
You must be signed in to change notification settings - Fork 0
/
aggregator.go
3295 lines (3140 loc) · 103 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2022 Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregator
import (
"bufio"
"bytes"
"container/heap"
"context"
"encoding/binary"
"errors"
"fmt"
"hash"
"io"
"io/fs"
"math"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/RoaringBitmap/roaring/roaring64"
"github.com/google/btree"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/log/v3"
"github.com/spaolacci/murmur3"
"golang.org/x/crypto/sha3"
"golang.org/x/exp/slices"
"github.com/ledgerwatch/erigon-lib/commitment"
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/length"
"github.com/ledgerwatch/erigon-lib/compress"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/recsplit"
"github.com/ledgerwatch/erigon-lib/recsplit/eliasfano32"
)
// Aggregator of multiple state files to support state reader and state writer
// The convension for the file names are as follows
// State is composed of three types of files:
// 1. Accounts. keys are addresses (20 bytes), values are encoding of accounts
// 2. Contract storage. Keys are concatenation of addresses (20 bytes) and storage locations (32 bytes), values have their leading zeroes removed
// 3. Contract codes. Keys are addresses (20 bytes), values are bycodes
// Within each type, any file can cover an interval of block numbers, for example, `accounts.1-16` represents changes in accounts
// that were effected by the blocks from 1 to 16, inclusively. The second component of the interval will be called "end block" for the file.
// Finally, for each type and interval, there are two files - one with the compressed data (extension `dat`),
// and another with the index (extension `idx`) consisting of the minimal perfect hash table mapping keys to the offsets of corresponding keys
// in the data file
// Aggregator consists (apart from the file it is aggregating) of the 4 parts:
// 1. Persistent table of expiration time for each of the files. Key - name of the file, value - timestamp, at which the file can be removed
// 2. Transient (in-memory) mapping the "end block" of each file to the objects required for accessing the file (compress.Decompressor and resplit.Index)
// 3. Persistent tables (one for accounts, one for contract storage, and one for contract code) summarising all the 1-block state diff files
// that were not yet merged together to form larger files. In these tables, keys are the same as keys in the state diff files, but values are also
// augemented by the number of state diff files this key is present. This number gets decremented every time when a 1-block state diff files is removed
// from the summary table (due to being merged). And when this number gets to 0, the record is deleted from the summary table.
// This number is encoded into first 4 bytes of the value
// 4. Aggregating persistent hash table. Maps state keys to the block numbers for the use in the part 2 (which is not necessarily the block number where
// the item last changed, but it is guaranteed to find correct element in the Transient mapping of part 2
type FileType int
const (
Account FileType = iota
Storage
Code
Commitment
AccountHistory
StorageHistory
CodeHistory
AccountBitmap
StorageBitmap
CodeBitmap
NumberOfTypes
)
const (
FirstType = Account
NumberOfAccountStorageTypes = Code
NumberOfStateTypes = AccountHistory
)
func (ft FileType) String() string {
switch ft {
case Account:
return "account"
case Storage:
return "storage"
case Code:
return "code"
case Commitment:
return "commitment"
case AccountHistory:
return "ahistory"
case CodeHistory:
return "chistory"
case StorageHistory:
return "shistory"
case AccountBitmap:
return "abitmap"
case CodeBitmap:
return "cbitmap"
case StorageBitmap:
return "sbitmap"
default:
panic(fmt.Sprintf("unknown file type: %d", ft))
}
}
func (ft FileType) Table() string {
switch ft {
case Account:
return kv.StateAccounts
case Storage:
return kv.StateStorage
case Code:
return kv.StateCode
case Commitment:
return kv.StateCommitment
default:
panic(fmt.Sprintf("unknown file type: %d", ft))
}
}
func ParseFileType(s string) (FileType, bool) {
switch s {
case "account":
return Account, true
case "storage":
return Storage, true
case "code":
return Code, true
case "commitment":
return Commitment, true
case "ahistory":
return AccountHistory, true
case "chistory":
return CodeHistory, true
case "shistory":
return StorageHistory, true
case "abitmap":
return AccountBitmap, true
case "cbitmap":
return CodeBitmap, true
case "sbitmap":
return StorageBitmap, true
default:
return NumberOfTypes, false
}
}
type Aggregator struct {
files [NumberOfTypes]*btree.BTree
hph commitment.Trie //*commitment.HexPatriciaHashed
archHasher murmur3.Hash128
keccak hash.Hash
historyChannel chan struct{}
mergeChannel chan struct{}
tracedKeys map[string]struct{} // Set of keys being traced during aggregations
changesBtree *btree.BTree // btree of ChangesItem
historyError chan error
mergeError chan error
aggChannel chan *AggregationTask
aggError chan error
diffDir string // Directory where the state diff files are stored
arches [NumberOfStateTypes][]uint32 // Over-arching hash tables containing the block number of last aggregation
historyWg sync.WaitGroup
aggWg sync.WaitGroup
mergeWg sync.WaitGroup
unwindLimit uint64 // How far the chain may unwind
aggregationStep uint64 // How many items (block, but later perhaps txs or changes) are required to form one state diff file
fileHits uint64 // Counter for state file hit ratio
fileMisses uint64 // Counter for state file hit ratio
fileLocks [NumberOfTypes]sync.RWMutex
commitments bool // Whether to calculate commitments
changesets bool // Whether to generate changesets (off by default)
trace bool // Turns on tracing for specific accounts and locations
}
type ChangeFile struct {
r *bufio.Reader
rTx *bufio.Reader
w *bufio.Writer
fileTx *os.File
wTx *bufio.Writer
file *os.File
pathTx string
path string
dir string
namebase string
words []byte // Words pending for the next block record, in the same slice
wordOffsets []int // Offsets of words in the `words` slice
step uint64
txNum uint64 // Currently read transaction number
txRemaining uint64 // Remaining number of bytes to read in the current transaction
}
func (cf *ChangeFile) closeFile() error {
if len(cf.wordOffsets) > 0 {
return fmt.Errorf("closeFile without finish")
}
if cf.w != nil {
if err := cf.w.Flush(); err != nil {
return err
}
cf.w = nil
}
if cf.file != nil {
if err := cf.file.Close(); err != nil {
return err
}
cf.file = nil
}
if cf.wTx != nil {
if err := cf.wTx.Flush(); err != nil {
return err
}
cf.wTx = nil
}
if cf.fileTx != nil {
if err := cf.fileTx.Close(); err != nil {
return err
}
cf.fileTx = nil
}
return nil
}
func (cf *ChangeFile) openFile(blockNum uint64, write bool) error {
if len(cf.wordOffsets) > 0 {
return fmt.Errorf("openFile without finish")
}
rem := blockNum % cf.step
startBlock := blockNum - rem
endBlock := startBlock + cf.step - 1
if cf.w == nil {
cf.path = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.chg", cf.namebase, startBlock, endBlock))
cf.pathTx = filepath.Join(cf.dir, fmt.Sprintf("%s.%d-%d.ctx", cf.namebase, startBlock, endBlock))
var err error
if write {
if cf.file, err = os.OpenFile(cf.path, os.O_RDWR|os.O_CREATE, 0755); err != nil {
return err
}
if cf.fileTx, err = os.OpenFile(cf.pathTx, os.O_RDWR|os.O_CREATE, 0755); err != nil {
return err
}
if _, err = cf.file.Seek(0, 2 /* relative to the end of the file */); err != nil {
return err
}
if _, err = cf.fileTx.Seek(0, 2 /* relative to the end of the file */); err != nil {
return err
}
} else {
if cf.file, err = os.Open(cf.path); err != nil {
return err
}
if cf.fileTx, err = os.Open(cf.pathTx); err != nil {
return err
}
}
if write {
cf.w = bufio.NewWriter(cf.file)
cf.wTx = bufio.NewWriter(cf.fileTx)
}
cf.r = bufio.NewReader(cf.file)
cf.rTx = bufio.NewReader(cf.fileTx)
}
return nil
}
func (cf *ChangeFile) rewind() error {
var err error
if _, err = cf.file.Seek(0, 0); err != nil {
return err
}
cf.r = bufio.NewReader(cf.file)
if _, err = cf.fileTx.Seek(0, 0); err != nil {
return err
}
cf.rTx = bufio.NewReader(cf.fileTx)
return nil
}
func (cf *ChangeFile) add(word []byte) {
cf.words = append(cf.words, word...)
cf.wordOffsets = append(cf.wordOffsets, len(cf.words))
}
func (cf *ChangeFile) finish(txNum uint64) error {
var numBuf [10]byte
// Write out words
lastOffset := 0
var size uint64
for _, offset := range cf.wordOffsets {
word := cf.words[lastOffset:offset]
n := binary.PutUvarint(numBuf[:], uint64(len(word)))
if _, err := cf.w.Write(numBuf[:n]); err != nil {
return err
}
if len(word) > 0 {
if _, err := cf.w.Write(word); err != nil {
return err
}
}
size += uint64(n + len(word))
lastOffset = offset
}
cf.words = cf.words[:0]
cf.wordOffsets = cf.wordOffsets[:0]
n := binary.PutUvarint(numBuf[:], txNum)
if _, err := cf.wTx.Write(numBuf[:n]); err != nil {
return err
}
n = binary.PutUvarint(numBuf[:], size)
if _, err := cf.wTx.Write(numBuf[:n]); err != nil {
return err
}
return nil
}
// prevTx positions the reader to the beginning
// of the transaction
func (cf *ChangeFile) nextTx() (bool, error) {
var err error
if cf.txNum, err = binary.ReadUvarint(cf.rTx); err != nil {
if errors.Is(err, io.EOF) {
return false, nil
}
return false, err
}
if cf.txRemaining, err = binary.ReadUvarint(cf.rTx); err != nil {
return false, err
}
return true, nil
}
func (cf *ChangeFile) nextWord(wordBuf []byte) ([]byte, bool, error) {
if cf.txRemaining == 0 {
return wordBuf, false, nil
}
ws, err := binary.ReadUvarint(cf.r)
if err != nil {
return wordBuf, false, fmt.Errorf("word size: %w", err)
}
var buf []byte
if total := len(wordBuf) + int(ws); cap(wordBuf) >= total {
buf = wordBuf[:total] // Reuse the space in wordBuf, is it has enough capacity
} else {
buf = make([]byte, total)
copy(buf, wordBuf)
}
if _, err = io.ReadFull(cf.r, buf[len(wordBuf):]); err != nil {
return wordBuf, false, fmt.Errorf("read word (%d %d): %w", ws, len(buf[len(wordBuf):]), err)
}
var numBuf [10]byte
n := binary.PutUvarint(numBuf[:], ws)
cf.txRemaining -= uint64(n) + ws
return buf, true, nil
}
func (cf *ChangeFile) deleteFile() error {
if err := os.Remove(cf.path); err != nil {
return err
}
if err := os.Remove(cf.pathTx); err != nil {
return err
}
return nil
}
type Changes struct {
namebase string
dir string
keys ChangeFile
before ChangeFile
after ChangeFile
step uint64
beforeOn bool
}
func (c *Changes) Init(namebase string, step uint64, dir string, beforeOn bool) {
c.namebase = namebase
c.step = step
c.dir = dir
c.keys.namebase = namebase + ".keys"
c.keys.dir = dir
c.keys.step = step
c.before.namebase = namebase + ".before"
c.before.dir = dir
c.before.step = step
c.after.namebase = namebase + ".after"
c.after.dir = dir
c.after.step = step
c.beforeOn = beforeOn
}
func (c *Changes) closeFiles() error {
if err := c.keys.closeFile(); err != nil {
return err
}
if c.beforeOn {
if err := c.before.closeFile(); err != nil {
return err
}
}
if err := c.after.closeFile(); err != nil {
return err
}
return nil
}
func (c *Changes) openFiles(blockNum uint64, write bool) error {
if err := c.keys.openFile(blockNum, write); err != nil {
return err
}
if c.beforeOn {
if err := c.before.openFile(blockNum, write); err != nil {
return err
}
}
if err := c.after.openFile(blockNum, write); err != nil {
return err
}
return nil
}
func (c *Changes) insert(key, after []byte) {
c.keys.add(key)
if c.beforeOn {
c.before.add(nil)
}
c.after.add(after)
}
func (c *Changes) update(key, before, after []byte) {
c.keys.add(key)
if c.beforeOn {
c.before.add(before)
}
c.after.add(after)
}
func (c *Changes) delete(key, before []byte) {
c.keys.add(key)
if c.beforeOn {
c.before.add(before)
}
c.after.add(nil)
}
func (c *Changes) finish(txNum uint64) error {
if err := c.keys.finish(txNum); err != nil {
return err
}
if c.beforeOn {
if err := c.before.finish(txNum); err != nil {
return err
}
}
if err := c.after.finish(txNum); err != nil {
return err
}
return nil
}
func (c *Changes) nextTx() (bool, uint64, error) {
bkeys, err := c.keys.nextTx()
if err != nil {
return false, 0, err
}
var bbefore, bafter bool
if c.beforeOn {
if bbefore, err = c.before.nextTx(); err != nil {
return false, 0, err
}
}
if bafter, err = c.after.nextTx(); err != nil {
return false, 0, err
}
if c.beforeOn && bkeys != bbefore {
return false, 0, fmt.Errorf("inconsistent tx iteration")
}
if bkeys != bafter {
return false, 0, fmt.Errorf("inconsistent tx iteration")
}
txNum := c.keys.txNum
if c.beforeOn {
if txNum != c.before.txNum {
return false, 0, fmt.Errorf("inconsistent txNum, keys: %d, before: %d", txNum, c.before.txNum)
}
}
if txNum != c.after.txNum {
return false, 0, fmt.Errorf("inconsistent txNum, keys: %d, after: %d", txNum, c.after.txNum)
}
return bkeys, txNum, nil
}
func (c *Changes) rewind() error {
if err := c.keys.rewind(); err != nil {
return err
}
if c.beforeOn {
if err := c.before.rewind(); err != nil {
return err
}
}
if err := c.after.rewind(); err != nil {
return err
}
return nil
}
func (c *Changes) nextTriple(keyBuf, beforeBuf, afterBuf []byte) ([]byte, []byte, []byte, bool, error) {
key, bkeys, err := c.keys.nextWord(keyBuf)
if err != nil {
return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next key: %w", err)
}
var before, after []byte
var bbefore, bafter bool
if c.beforeOn {
if before, bbefore, err = c.before.nextWord(beforeBuf); err != nil {
return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next before: %w", err)
}
}
if c.beforeOn && bkeys != bbefore {
return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("inconsistent word iteration")
}
if after, bafter, err = c.after.nextWord(afterBuf); err != nil {
return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("next after: %w", err)
}
if bkeys != bafter {
return keyBuf, beforeBuf, afterBuf, false, fmt.Errorf("inconsistent word iteration")
}
return key, before, after, bkeys, nil
}
func (c *Changes) deleteFiles() error {
if err := c.keys.deleteFile(); err != nil {
return err
}
if c.beforeOn {
if err := c.before.deleteFile(); err != nil {
return err
}
}
if err := c.after.deleteFile(); err != nil {
return err
}
return nil
}
func buildIndex(d *compress.Decompressor, idxPath, tmpDir string, count int) (*recsplit.Index, error) {
var rs *recsplit.RecSplit
var err error
if rs, err = recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: count,
Enums: false,
BucketSize: 2000,
LeafSize: 8,
TmpDir: tmpDir,
IndexFile: idxPath,
EtlBufLimit: etl.BufferOptimalSize / 2,
}); err != nil {
return nil, err
}
defer rs.Close()
word := make([]byte, 0, 256)
var pos uint64
g := d.MakeGetter()
for {
g.Reset(0)
for g.HasNext() {
word, _ = g.Next(word[:0])
if err = rs.AddKey(word, pos); err != nil {
return nil, err
}
// Skip value
pos = g.Skip()
}
if err = rs.Build(); err != nil {
if rs.Collision() {
log.Info("Building recsplit. Collision happened. It's ok. Restarting...")
rs.ResetNextSalt()
} else {
return nil, err
}
} else {
break
}
}
var idx *recsplit.Index
if idx, err = recsplit.OpenIndex(idxPath); err != nil {
return nil, err
}
return idx, nil
}
// aggregate gathers changes from the changefiles into a B-tree, and "removes" them from the database
// This function is time-critical because it needs to be run in the same go-routine (thread) as the general
// execution (due to read-write tx). After that, we can optimistically execute the rest in the background
func (c *Changes) aggregate(blockFrom, blockTo uint64, prefixLen int, tx kv.RwTx, table string, commitMerger commitmentMerger) (*btree.BTreeG[*AggregateItem], error) {
if err := c.openFiles(blockTo, false /* write */); err != nil {
return nil, fmt.Errorf("open files: %w", err)
}
bt := btree.NewG[*AggregateItem](32, AggregateItemLess)
err := c.aggregateToBtree(bt, prefixLen, commitMerger)
if err != nil {
return nil, fmt.Errorf("aggregateToBtree: %w", err)
}
// Clean up the DB table
var e error
bt.Ascend(func(item *AggregateItem) bool {
if item.count == 0 {
return true
}
dbPrefix := item.k
prevV, err := tx.GetOne(table, dbPrefix)
if err != nil {
e = err
return false
}
if prevV == nil {
e = fmt.Errorf("record not found in db for %s key %x", table, dbPrefix)
return false
}
prevNum := binary.BigEndian.Uint32(prevV[:4])
if prevNum < item.count {
e = fmt.Errorf("record count too low for %s key %s count %d, subtracting %d", table, dbPrefix, prevNum, item.count)
return false
}
if prevNum == item.count {
if e = tx.Delete(table, dbPrefix); e != nil {
return false
}
} else {
v := make([]byte, len(prevV))
binary.BigEndian.PutUint32(v[:4], prevNum-item.count)
copy(v[4:], prevV[4:])
if e = tx.Put(table, dbPrefix, v); e != nil {
return false
}
}
return true
})
if e != nil {
return nil, fmt.Errorf("clean up table %s after aggregation: %w", table, e)
}
return bt, nil
}
func (a *Aggregator) updateArch(bt *btree.BTreeG[*AggregateItem], fType FileType, blockNum32 uint32) {
arch := a.arches[fType]
h := a.archHasher
n := uint64(len(arch))
if n == 0 {
return
}
bt.Ascend(func(item *AggregateItem) bool {
if item.count == 0 {
return true
}
h.Reset()
h.Write(item.k) //nolint:errcheck
p, _ := h.Sum128()
p = p % n
v := atomic.LoadUint32(&arch[p])
if v < blockNum32 {
//fmt.Printf("Updated %s arch [%x]=%d %d\n", fType.String(), item.k, p, blockNum32)
atomic.StoreUint32(&arch[p], blockNum32)
}
return true
})
}
type AggregateItem struct {
k, v []byte
count uint32
}
func AggregateItemLess(a, than *AggregateItem) bool { return bytes.Compare(a.k, than.k) < 0 }
func (i *AggregateItem) Less(than btree.Item) bool {
return bytes.Compare(i.k, than.(*AggregateItem).k) < 0
}
func (c *Changes) produceChangeSets(blockFrom, blockTo uint64, historyType, bitmapType FileType) (*compress.Decompressor, *recsplit.Index, *compress.Decompressor, *recsplit.Index, error) {
chsetDatPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", historyType.String(), blockFrom, blockTo))
chsetIdxPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", historyType.String(), blockFrom, blockTo))
bitmapDatPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.dat", bitmapType.String(), blockFrom, blockTo))
bitmapIdxPath := filepath.Join(c.dir, fmt.Sprintf("%s.%d-%d.idx", bitmapType.String(), blockFrom, blockTo))
var blockSuffix [8]byte
binary.BigEndian.PutUint64(blockSuffix[:], blockTo)
bitmaps := map[string]*roaring64.Bitmap{}
comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, chsetDatPath, c.dir, compress.MinPatternScore, 1, log.LvlDebug)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets NewCompressor: %w", err)
}
defer func() {
if comp != nil {
comp.Close()
}
}()
var totalRecords int
var b bool
var e error
var txNum uint64
var key, before, after []byte
if err = c.rewind(); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets rewind: %w", err)
}
var txKey = make([]byte, 8, 60)
for b, txNum, e = c.nextTx(); b && e == nil; b, txNum, e = c.nextTx() {
binary.BigEndian.PutUint64(txKey[:8], txNum)
for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) {
totalRecords++
txKey = append(txKey[:8], key...)
// In the inital files and most merged file, the txKey is added to the file, but it gets removed in the final merge
if err = comp.AddUncompressedWord(txKey); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets AddWord key: %w", err)
}
if err = comp.AddUncompressedWord(before); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets AddWord before: %w", err)
}
//if historyType == AccountHistory {
// fmt.Printf("produce %s.%d-%d [%x]=>[%x]\n", historyType.String(), blockFrom, blockTo, txKey, before)
//}
var bitmap *roaring64.Bitmap
var ok bool
if bitmap, ok = bitmaps[string(key)]; !ok {
bitmap = roaring64.New()
bitmaps[string(key)] = bitmap
}
bitmap.Add(txNum)
}
if e != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets nextTriple: %w", e)
}
}
if e != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets prevTx: %w", e)
}
if err = comp.Compress(); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets Compress: %w", err)
}
comp.Close()
comp = nil
var d *compress.Decompressor
var index *recsplit.Index
if d, err = compress.NewDecompressor(chsetDatPath); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets changeset decompressor: %w", err)
}
if index, err = buildIndex(d, chsetIdxPath, c.dir, totalRecords); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets changeset buildIndex: %w", err)
}
// Create bitmap files
bitmapC, err := compress.NewCompressor(context.Background(), AggregatorPrefix, bitmapDatPath, c.dir, compress.MinPatternScore, 1, log.LvlDebug)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap NewCompressor: %w", err)
}
defer func() {
if bitmapC != nil {
bitmapC.Close()
}
}()
idxKeys := make([]string, len(bitmaps))
i := 0
var buf []byte
for key := range bitmaps {
idxKeys[i] = key
i++
}
slices.Sort(idxKeys)
for _, key := range idxKeys {
if err = bitmapC.AddUncompressedWord([]byte(key)); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap add key: %w", err)
}
bitmap := bitmaps[key]
ef := eliasfano32.NewEliasFano(bitmap.GetCardinality(), bitmap.Maximum())
it := bitmap.Iterator()
for it.HasNext() {
v := it.Next()
ef.AddOffset(v)
}
ef.Build()
buf = ef.AppendBytes(buf[:0])
if err = bitmapC.AddUncompressedWord(buf); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap add val: %w", err)
}
}
if err = bitmapC.Compress(); err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap Compress: %w", err)
}
bitmapC.Close()
bitmapC = nil
bitmapD, err := compress.NewDecompressor(bitmapDatPath)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap decompressor: %w", err)
}
bitmapI, err := buildIndex(bitmapD, bitmapIdxPath, c.dir, len(idxKeys))
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("produceChangeSets bitmap buildIndex: %w", err)
}
return d, index, bitmapD, bitmapI, nil
}
// aggregateToBtree iterates over all available changes in the change files covered by this instance `c`
// (there are 3 of them, one for "keys", one for values "before" every change, and one for values "after" every change)
// and create a B-tree where each key is only represented once, with the value corresponding to the "after" value
// of the latest change.
func (c *Changes) aggregateToBtree(bt *btree.BTreeG[*AggregateItem], prefixLen int, commitMerge commitmentMerger) error {
var b bool
var e error
var key, before, after []byte
var ai AggregateItem
var prefix []byte
// Note that the following loop iterates over transactions forwards, therefore it replace entries in the B-tree
for b, _, e = c.nextTx(); b && e == nil; b, _, e = c.nextTx() {
// Within each transaction, keys are unique, but they can appear in any order
for key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]); b && e == nil; key, before, after, b, e = c.nextTriple(key[:0], before[:0], after[:0]) {
if prefixLen > 0 && !bytes.Equal(prefix, key[:prefixLen]) {
prefix = common.Copy(key[:prefixLen])
item := &AggregateItem{k: prefix, count: 0}
bt.ReplaceOrInsert(item)
}
ai.k = key
i, ok := bt.Get(&ai)
if !ok || i == nil {
item := &AggregateItem{k: common.Copy(key), v: common.Copy(after), count: 1}
bt.ReplaceOrInsert(item)
continue
}
item := i
if commitMerge != nil {
mergedVal, err := commitMerge(item.v, after, nil)
if err != nil {
return fmt.Errorf("merge branches (%T) : %w", commitMerge, err)
}
//fmt.Printf("aggregateToBtree prefix [%x], [%x]+[%x]=>[%x]\n", commitment.CompactToHex(key), after, item.v, mergedVal)
item.v = mergedVal
} else {
item.v = common.Copy(after)
}
item.count++
}
if e != nil {
return fmt.Errorf("aggregateToBtree nextTriple: %w", e)
}
}
if e != nil {
return fmt.Errorf("aggregateToBtree prevTx: %w", e)
}
return nil
}
const AggregatorPrefix = "aggregator"
func btreeToFile(bt *btree.BTreeG[*AggregateItem], datPath, tmpdir string, trace bool, workers int) (int, error) {
comp, err := compress.NewCompressor(context.Background(), AggregatorPrefix, datPath, tmpdir, compress.MinPatternScore, workers, log.LvlDebug)
if err != nil {
return 0, err
}
defer comp.Close()
comp.SetTrace(trace)
count := 0
bt.Ascend(func(item *AggregateItem) bool {
//fmt.Printf("btreeToFile %s [%x]=>[%x]\n", datPath, item.k, item.v)
if err = comp.AddUncompressedWord(item.k); err != nil {
return false
}
count++ // Only counting keys, not values
if err = comp.AddUncompressedWord(item.v); err != nil {
return false
}
return true
})
if err != nil {
return 0, err
}
if err = comp.Compress(); err != nil {
return 0, err
}
return count, nil
}
type ChangesItem struct {
endBlock uint64
startBlock uint64
fileCount int
}
func (i *ChangesItem) Less(than btree.Item) bool {
if i.endBlock == than.(*ChangesItem).endBlock {
// Larger intevals will come last
return i.startBlock > than.(*ChangesItem).startBlock
}
return i.endBlock < than.(*ChangesItem).endBlock
}
type byEndBlockItem struct {
decompressor *compress.Decompressor
getter *compress.Getter // reader for the decompressor
getterMerge *compress.Getter // reader for the decompressor used in the background merge thread
index *recsplit.Index
indexReader *recsplit.IndexReader // reader for the index
readerMerge *recsplit.IndexReader // index reader for the background merge thread
tree *btree.BTreeG[*AggregateItem] // Substitute for decompressor+index combination
startBlock uint64
endBlock uint64
}
func ByEndBlockItemLess(i, than *byEndBlockItem) bool {
if i.endBlock == than.endBlock {
return i.startBlock > than.startBlock
}
return i.endBlock < than.endBlock
}
func (i *byEndBlockItem) Less(than btree.Item) bool {
if i.endBlock == than.(*byEndBlockItem).endBlock {
return i.startBlock > than.(*byEndBlockItem).startBlock
}
return i.endBlock < than.(*byEndBlockItem).endBlock
}
func (a *Aggregator) scanStateFiles(files []fs.DirEntry) {
typeStrings := make([]string, NumberOfTypes)
for fType := FileType(0); fType < NumberOfTypes; fType++ {
typeStrings[fType] = fType.String()
}
re := regexp.MustCompile("^(" + strings.Join(typeStrings, "|") + ").([0-9]+)-([0-9]+).(dat|idx)$")
var err error
for _, f := range files {
name := f.Name()
subs := re.FindStringSubmatch(name)
if len(subs) != 5 {
if len(subs) != 0 {
log.Warn("File ignored by aggregator, more than 4 submatches", "name", name, "submatches", len(subs))
}
continue
}
var startBlock, endBlock uint64
if startBlock, err = strconv.ParseUint(subs[2], 10, 64); err != nil {
log.Warn("File ignored by aggregator, parsing startBlock", "error", err, "name", name)
continue
}
if endBlock, err = strconv.ParseUint(subs[3], 10, 64); err != nil {
log.Warn("File ignored by aggregator, parsing endBlock", "error", err, "name", name)
continue
}
if startBlock > endBlock {
log.Warn("File ignored by aggregator, startBlock > endBlock", "name", name)
continue
}
fType, ok := ParseFileType(subs[1])
if !ok {
log.Warn("File ignored by aggregator, type unknown", "type", subs[1])
}
var item = &byEndBlockItem{startBlock: startBlock, endBlock: endBlock}
var foundI *byEndBlockItem
a.files[fType].AscendGreaterOrEqual(&byEndBlockItem{startBlock: endBlock, endBlock: endBlock}, func(i btree.Item) bool {
it := i.(*byEndBlockItem)
if it.endBlock == endBlock {
foundI = it
}
return false
})
if foundI == nil || foundI.startBlock > startBlock {
log.Info("Load state file", "name", name, "type", fType.String(), "startBlock", startBlock, "endBlock", endBlock)
a.files[fType].ReplaceOrInsert(item)
}
}
}
func NewAggregator(diffDir string, unwindLimit uint64, aggregationStep uint64, changesets, commitments bool, minArch uint64, trie commitment.Trie, tx kv.RwTx) (*Aggregator, error) {
a := &Aggregator{
diffDir: diffDir,
unwindLimit: unwindLimit,