diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 12ba6dac513..018f9934933 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -116,7 +116,7 @@ jobs: - name: parse hive output run: docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -v ${{ github.workspace }}:/work --entrypoint /app/hivecioutput gatewayfm/hive:latest --resultsdir=/work/results --outdir=/work/results - - name: archive hive results + - name: archive hive results uses: actions/upload-artifact@v3 if: always() with: diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index b9c73f302d7..13a1155c629 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -130,13 +130,7 @@ func doIndicesCommand(cliCtx *cli.Context) error { if rebuild { cfg := ethconfig.NewSnapCfg(true, true, false) - workers := runtime.GOMAXPROCS(-1) - 1 - if workers < 1 { - workers = 1 - } - if workers > 4 { - workers = 4 - } + workers := cmp.InRange(1, 4, runtime.GOMAXPROCS(-1)-1) if err := rebuildIndices(ctx, chainDB, cfg, dirs, from, workers); err != nil { log.Error("Error", "err", err) } diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index fc7850eca7a..1ba201f4cc0 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -482,6 +482,17 @@ func (s *RoSnapshots) Reopen() error { return nil } + +func (s *RoSnapshots) Ranges() (ranges []Range) { + _ = s.Headers.View(func(segments []*HeaderSegment) error { + for _, sn := range segments { + ranges = append(ranges, sn.ranges) + } + return nil + }) + return ranges +} + func (s *RoSnapshots) ReopenSegments() error { s.Headers.lock.Lock() defer s.Headers.lock.Unlock() @@ -614,6 +625,25 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo return s.Txs.ViewSegment(blockNum, f) } +func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, lvl log.Lvl) error { + switch sn.T { + case snap.Headers: + if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil { + return err + } + case snap.Bodies: + if err := BodiesIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil { + return err + } + case snap.Transactions: + dir, _ := filepath.Split(sn.Path) + if err := TransactionsIdx(ctx, chainID, sn.From, sn.To, dir, tmpDir, lvl); err != nil { + return err + } + } + return nil +} + func BuildIndices(ctx context.Context, s *RoSnapshots, chainID uint256.Int, tmpDir string, from uint64, workers int, lvl log.Lvl) error { log.Log(lvl, "[snapshots] Build indices", "from", from) logEvery := time.NewTicker(20 * time.Second) @@ -987,11 +1017,11 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 return fmt.Errorf("reopen: %w", err) } merger := NewMerger(tmpDir, workers, lvl, chainID, notifier) - ranges := merger.FindMergeRanges(snapshots) - if len(ranges) == 0 { + rangesToMerge := merger.FindMergeRanges(snapshots.Ranges()) + if len(rangesToMerge) == 0 { return nil } - err := merger.Merge(ctx, snapshots, ranges, snapshots.Dir(), true) + err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true) if err != nil { return err } @@ -1000,7 +1030,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 } var downloadRequest []DownloadRequest - for _, r := range ranges { + for _, r := range rangesToMerge { downloadRequest = append(downloadRequest, NewDownloadRequest(&r, "", "")) } @@ -1019,18 +1049,18 @@ func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, t return nil } func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapDir string, chainDB kv.RoDB, workers int, lvl log.Lvl) error { - segmentFile := filepath.Join(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers)) - if err := DumpHeaders(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { + f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers)) + if err := DumpHeaders(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpHeaders: %w", err) } - segmentFile = filepath.Join(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)) - if err := DumpBodies(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { + f, _ = snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies)) + if err := DumpBodies(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpBodies: %w", err) } - segmentFile = filepath.Join(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions)) - if _, err := DumpTxs(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { + f, _ = snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions)) + if _, err := DumpTxs(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil { return fmt.Errorf("DumpTxs: %w", err) } @@ -1048,7 +1078,7 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF chainConfig := tool.ChainConfigFromDB(db) chainID, _ := uint256.FromBig(chainConfig.ChainID) - f, err := compress.NewCompressor(ctx, "Transactions", segmentFile, tmpDir, compress.MinPatternScore, workers, lvl) + f, err := compress.NewCompressor(ctx, "Snapshots Txs", segmentFile, tmpDir, compress.MinPatternScore, workers, lvl) if err != nil { return 0, fmt.Errorf("NewCompressor: %w, %s", err, segmentFile) } @@ -1222,7 +1252,7 @@ func DumpHeaders(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() - f, err := compress.NewCompressor(ctx, "Headers", segmentFilePath, tmpDir, compress.MinPatternScore, workers, lvl) + f, err := compress.NewCompressor(ctx, "Snapshots Headers", segmentFilePath, tmpDir, compress.MinPatternScore, workers, lvl) if err != nil { return err } @@ -1285,7 +1315,7 @@ func DumpBodies(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string, logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() - f, err := compress.NewCompressor(ctx, "Bodies", segmentFilePath, tmpDir, compress.MinPatternScore, workers, lvl) + f, err := compress.NewCompressor(ctx, "Snapshots Bodies", segmentFilePath, tmpDir, compress.MinPatternScore, workers, lvl) if err != nil { return err } @@ -1715,33 +1745,35 @@ type Range struct { func (r Range) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) } -func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []Range) { - for i := len(snapshots.Headers.segments) - 1; i > 0; i-- { - sn := snapshots.Headers.segments[i] - if sn.ranges.to-sn.ranges.from >= snap.DEFAULT_SEGMENT_SIZE { // is complete .seg +func (*Merger) FindMergeRanges(currentRanges []Range) (toMerge []Range) { + for i := len(currentRanges) - 1; i > 0; i-- { + r := currentRanges[i] + if r.to-r.from >= snap.DEFAULT_SEGMENT_SIZE { // is complete .seg continue } for _, span := range []uint64{500_000, 100_000, 10_000} { - if sn.ranges.to%span != 0 { + if r.to%span != 0 { continue } - if sn.ranges.to-sn.ranges.from == span { + if r.to-r.from == span { break } - aggFrom := sn.ranges.to - span - res = append(res, Range{from: aggFrom, to: sn.ranges.to}) - for snapshots.Headers.segments[i].ranges.from > aggFrom { + aggFrom := r.to - span + toMerge = append(toMerge, Range{from: aggFrom, to: r.to}) + for currentRanges[i].from > aggFrom { i-- } break } } - slices.SortFunc(res, func(i, j Range) bool { return i.from < j.from }) - return res + slices.SortFunc(toMerge, func(i, j Range) bool { return i.from < j.from }) + return toMerge } -func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string, err error) { - err = snapshots.Headers.View(func(hSegments []*HeaderSegment) error { + +func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (map[snap.Type][]string, error) { + toMerge := map[snap.Type][]string{} + err := snapshots.Headers.View(func(hSegments []*HeaderSegment) error { return snapshots.Bodies.View(func(bSegments []*BodySegment) error { return snapshots.Txs.View(func(tSegments []*TxnSegment) error { for i, sn := range hSegments { @@ -1751,17 +1783,16 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeH if sn.ranges.to > to { break } - - toMergeHeaders = append(toMergeHeaders, hSegments[i].seg.FilePath()) - toMergeBodies = append(toMergeBodies, bSegments[i].seg.FilePath()) - toMergeTxs = append(toMergeTxs, tSegments[i].Seg.FilePath()) + toMerge[snap.Headers] = append(toMerge[snap.Headers], hSegments[i].seg.FilePath()) + toMerge[snap.Bodies] = append(toMerge[snap.Bodies], bSegments[i].seg.FilePath()) + toMerge[snap.Transactions] = append(toMerge[snap.Transactions], tSegments[i].Seg.FilePath()) } return nil }) }) }) - return + return toMerge, err } // Merge does merge segments in given ranges @@ -1773,42 +1804,18 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges defer logEvery.Stop() log.Log(m.lvl, "[snapshots] Merge segments", "ranges", fmt.Sprintf("%v", mergeRanges)) for _, r := range mergeRanges { - toMergeHeaders, toMergeBodies, toMergeTxs, err := m.filesByRange(snapshots, r.from, r.to) + toMerge, err := m.filesByRange(snapshots, r.from, r.to) if err != nil { return err } - { - segFilePath := filepath.Join(snapDir, snap.SegmentFileName(r.from, r.to, snap.Bodies)) - if err := m.merge(ctx, toMergeBodies, segFilePath, logEvery); err != nil { - return fmt.Errorf("mergeByAppendSegments: %w", err) - } - if doIndex { - if err := BodiesIdx(ctx, segFilePath, r.from, m.tmpDir, m.lvl); err != nil { - return fmt.Errorf("BodiesIdx: %w", err) - } - } - } - - { - segFilePath := filepath.Join(snapDir, snap.SegmentFileName(r.from, r.to, snap.Headers)) - if err := m.merge(ctx, toMergeHeaders, segFilePath, logEvery); err != nil { - return fmt.Errorf("mergeByAppendSegments: %w", err) - } - if doIndex { - if err := HeadersIdx(ctx, segFilePath, r.from, m.tmpDir, m.lvl); err != nil { - return fmt.Errorf("HeadersIdx: %w", err) - } - } - } - - { - segFilePath := filepath.Join(snapDir, snap.SegmentFileName(r.from, r.to, snap.Transactions)) - if err := m.merge(ctx, toMergeTxs, segFilePath, logEvery); err != nil { + for _, t := range snap.AllSnapshotTypes { + f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(r.from, r.to, t)) + if err := m.merge(ctx, toMerge[t], f.Path, logEvery); err != nil { return fmt.Errorf("mergeByAppendSegments: %w", err) } if doIndex { - if err := TransactionsIdx(ctx, m.chainID, r.from, r.to, snapDir, m.tmpDir, m.lvl); err != nil { - return fmt.Errorf("TransactionsIdx: %w", err) + if err := buildIdx(ctx, f, m.chainID, m.tmpDir, m.lvl); err != nil { + return err } } } @@ -1820,17 +1827,10 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges m.notifier.OnNewSnapshot() time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use } - - if err := m.removeOldFiles(toMergeHeaders, snapDir); err != nil { - return err - } - - if err := m.removeOldFiles(toMergeBodies, snapDir); err != nil { - return err - } - - if err := m.removeOldFiles(toMergeTxs, snapDir); err != nil { - return err + for _, t := range snap.AllSnapshotTypes { + if err := m.removeOldFiles(toMerge[t], snapDir); err != nil { + return err + } } } log.Log(m.lvl, "[snapshots] Merge done", "from", mergeRanges[0].from) diff --git a/turbo/snapshotsync/block_snapshots_test.go b/turbo/snapshotsync/block_snapshots_test.go index fb1c8f8ab83..bcb75e7695a 100644 --- a/turbo/snapshotsync/block_snapshots_test.go +++ b/turbo/snapshotsync/block_snapshots_test.go @@ -77,7 +77,7 @@ func TestMergeSnapshots(t *testing.T) { { merger := NewMerger(dir, 1, log.LvlInfo, uint256.Int{}, nil) - ranges := merger.FindMergeRanges(s) + ranges := merger.FindMergeRanges(s.Ranges()) require.True(len(ranges) > 0) err := merger.Merge(context.Background(), s, ranges, s.Dir(), false) require.NoError(err) @@ -92,7 +92,7 @@ func TestMergeSnapshots(t *testing.T) { { merger := NewMerger(dir, 1, log.LvlInfo, uint256.Int{}, nil) - ranges := merger.FindMergeRanges(s) + ranges := merger.FindMergeRanges(s.Ranges()) require.True(len(ranges) == 0) err := merger.Merge(context.Background(), s, ranges, s.Dir(), false) require.NoError(err)