Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
snapshot merger: smaller interface (ledgerwatch#4786)
Browse files Browse the repository at this point in the history
* save

* save

* save
  • Loading branch information
AskAlexSharov committed Jul 22, 2022
1 parent 46a8c53 commit cd8b10f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
- name: parse hive output
run: docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -v ${{ github.workspace }}:/work --entrypoint /app/hivecioutput gatewayfm/hive:latest --resultsdir=/work/results --outdir=/work/results

- name: archive hive results
- name: archive hive results
uses: actions/upload-artifact@v3
if: always()
with:
Expand Down
8 changes: 1 addition & 7 deletions turbo/app/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,7 @@ func doIndicesCommand(cliCtx *cli.Context) error {

if rebuild {
cfg := ethconfig.NewSnapCfg(true, true, false)
workers := runtime.GOMAXPROCS(-1) - 1
if workers < 1 {
workers = 1
}
if workers > 4 {
workers = 4
}
workers := cmp.InRange(1, 4, runtime.GOMAXPROCS(-1)-1)
if err := rebuildIndices(ctx, chainDB, cfg, dirs, from, workers); err != nil {
log.Error("Error", "err", err)
}
Expand Down
144 changes: 72 additions & 72 deletions turbo/snapshotsync/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,17 @@ func (s *RoSnapshots) Reopen() error {

return nil
}

func (s *RoSnapshots) Ranges() (ranges []Range) {
_ = s.Headers.View(func(segments []*HeaderSegment) error {
for _, sn := range segments {
ranges = append(ranges, sn.ranges)
}
return nil
})
return ranges
}

func (s *RoSnapshots) ReopenSegments() error {
s.Headers.lock.Lock()
defer s.Headers.lock.Unlock()
Expand Down Expand Up @@ -614,6 +625,25 @@ func (s *RoSnapshots) ViewTxs(blockNum uint64, f func(sn *TxnSegment) error) (fo
return s.Txs.ViewSegment(blockNum, f)
}

func buildIdx(ctx context.Context, sn snap.FileInfo, chainID uint256.Int, tmpDir string, lvl log.Lvl) error {
switch sn.T {
case snap.Headers:
if err := HeadersIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil {
return err
}
case snap.Bodies:
if err := BodiesIdx(ctx, sn.Path, sn.From, tmpDir, lvl); err != nil {
return err
}
case snap.Transactions:
dir, _ := filepath.Split(sn.Path)
if err := TransactionsIdx(ctx, chainID, sn.From, sn.To, dir, tmpDir, lvl); err != nil {
return err
}
}
return nil
}

func BuildIndices(ctx context.Context, s *RoSnapshots, chainID uint256.Int, tmpDir string, from uint64, workers int, lvl log.Lvl) error {
log.Log(lvl, "[snapshots] Build indices", "from", from)
logEvery := time.NewTicker(20 * time.Second)
Expand Down Expand Up @@ -987,11 +1017,11 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
return fmt.Errorf("reopen: %w", err)
}
merger := NewMerger(tmpDir, workers, lvl, chainID, notifier)
ranges := merger.FindMergeRanges(snapshots)
if len(ranges) == 0 {
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges())
if len(rangesToMerge) == 0 {
return nil
}
err := merger.Merge(ctx, snapshots, ranges, snapshots.Dir(), true)
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true)
if err != nil {
return err
}
Expand All @@ -1000,7 +1030,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
}

var downloadRequest []DownloadRequest
for _, r := range ranges {
for _, r := range rangesToMerge {
downloadRequest = append(downloadRequest, NewDownloadRequest(&r, "", ""))
}

Expand All @@ -1019,18 +1049,18 @@ func DumpBlocks(ctx context.Context, blockFrom, blockTo, blocksPerFile uint64, t
return nil
}
func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, snapDir string, chainDB kv.RoDB, workers int, lvl log.Lvl) error {
segmentFile := filepath.Join(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers))
if err := DumpHeaders(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Headers))
if err := DumpHeaders(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpHeaders: %w", err)
}

segmentFile = filepath.Join(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies))
if err := DumpBodies(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
f, _ = snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Bodies))
if err := DumpBodies(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpBodies: %w", err)
}

segmentFile = filepath.Join(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions))
if _, err := DumpTxs(ctx, chainDB, segmentFile, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
f, _ = snap.ParseFileName(snapDir, snap.SegmentFileName(blockFrom, blockTo, snap.Transactions))
if _, err := DumpTxs(ctx, chainDB, f.Path, tmpDir, blockFrom, blockTo, workers, lvl); err != nil {
return fmt.Errorf("DumpTxs: %w", err)
}

Expand All @@ -1048,7 +1078,7 @@ func DumpTxs(ctx context.Context, db kv.RoDB, segmentFile, tmpDir string, blockF
chainConfig := tool.ChainConfigFromDB(db)
chainID, _ := uint256.FromBig(chainConfig.ChainID)

f, err := compress.NewCompressor(ctx, "Transactions", segmentFile, tmpDir, compress.MinPatternScore, workers, lvl)
f, err := compress.NewCompressor(ctx, "Snapshots Txs", segmentFile, tmpDir, compress.MinPatternScore, workers, lvl)
if err != nil {
return 0, fmt.Errorf("NewCompressor: %w, %s", err, segmentFile)
}
Expand Down Expand Up @@ -1222,7 +1252,7 @@ func DumpHeaders(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

f, err := compress.NewCompressor(ctx, "Headers", segmentFilePath, tmpDir, compress.MinPatternScore, workers, lvl)
f, err := compress.NewCompressor(ctx, "Snapshots Headers", segmentFilePath, tmpDir, compress.MinPatternScore, workers, lvl)
if err != nil {
return err
}
Expand Down Expand Up @@ -1285,7 +1315,7 @@ func DumpBodies(ctx context.Context, db kv.RoDB, segmentFilePath, tmpDir string,
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

f, err := compress.NewCompressor(ctx, "Bodies", segmentFilePath, tmpDir, compress.MinPatternScore, workers, lvl)
f, err := compress.NewCompressor(ctx, "Snapshots Bodies", segmentFilePath, tmpDir, compress.MinPatternScore, workers, lvl)
if err != nil {
return err
}
Expand Down Expand Up @@ -1715,33 +1745,35 @@ type Range struct {

func (r Range) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) }

func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []Range) {
for i := len(snapshots.Headers.segments) - 1; i > 0; i-- {
sn := snapshots.Headers.segments[i]
if sn.ranges.to-sn.ranges.from >= snap.DEFAULT_SEGMENT_SIZE { // is complete .seg
func (*Merger) FindMergeRanges(currentRanges []Range) (toMerge []Range) {
for i := len(currentRanges) - 1; i > 0; i-- {
r := currentRanges[i]
if r.to-r.from >= snap.DEFAULT_SEGMENT_SIZE { // is complete .seg
continue
}

for _, span := range []uint64{500_000, 100_000, 10_000} {
if sn.ranges.to%span != 0 {
if r.to%span != 0 {
continue
}
if sn.ranges.to-sn.ranges.from == span {
if r.to-r.from == span {
break
}
aggFrom := sn.ranges.to - span
res = append(res, Range{from: aggFrom, to: sn.ranges.to})
for snapshots.Headers.segments[i].ranges.from > aggFrom {
aggFrom := r.to - span
toMerge = append(toMerge, Range{from: aggFrom, to: r.to})
for currentRanges[i].from > aggFrom {
i--
}
break
}
}
slices.SortFunc(res, func(i, j Range) bool { return i.from < j.from })
return res
slices.SortFunc(toMerge, func(i, j Range) bool { return i.from < j.from })
return toMerge
}
func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string, err error) {
err = snapshots.Headers.View(func(hSegments []*HeaderSegment) error {

func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (map[snap.Type][]string, error) {
toMerge := map[snap.Type][]string{}
err := snapshots.Headers.View(func(hSegments []*HeaderSegment) error {
return snapshots.Bodies.View(func(bSegments []*BodySegment) error {
return snapshots.Txs.View(func(tSegments []*TxnSegment) error {
for i, sn := range hSegments {
Expand All @@ -1751,17 +1783,16 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeH
if sn.ranges.to > to {
break
}

toMergeHeaders = append(toMergeHeaders, hSegments[i].seg.FilePath())
toMergeBodies = append(toMergeBodies, bSegments[i].seg.FilePath())
toMergeTxs = append(toMergeTxs, tSegments[i].Seg.FilePath())
toMerge[snap.Headers] = append(toMerge[snap.Headers], hSegments[i].seg.FilePath())
toMerge[snap.Bodies] = append(toMerge[snap.Bodies], bSegments[i].seg.FilePath())
toMerge[snap.Transactions] = append(toMerge[snap.Transactions], tSegments[i].Seg.FilePath())
}

return nil
})
})
})
return
return toMerge, err
}

// Merge does merge segments in given ranges
Expand All @@ -1773,42 +1804,18 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
defer logEvery.Stop()
log.Log(m.lvl, "[snapshots] Merge segments", "ranges", fmt.Sprintf("%v", mergeRanges))
for _, r := range mergeRanges {
toMergeHeaders, toMergeBodies, toMergeTxs, err := m.filesByRange(snapshots, r.from, r.to)
toMerge, err := m.filesByRange(snapshots, r.from, r.to)
if err != nil {
return err
}
{
segFilePath := filepath.Join(snapDir, snap.SegmentFileName(r.from, r.to, snap.Bodies))
if err := m.merge(ctx, toMergeBodies, segFilePath, logEvery); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
if err := BodiesIdx(ctx, segFilePath, r.from, m.tmpDir, m.lvl); err != nil {
return fmt.Errorf("BodiesIdx: %w", err)
}
}
}

{
segFilePath := filepath.Join(snapDir, snap.SegmentFileName(r.from, r.to, snap.Headers))
if err := m.merge(ctx, toMergeHeaders, segFilePath, logEvery); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
if err := HeadersIdx(ctx, segFilePath, r.from, m.tmpDir, m.lvl); err != nil {
return fmt.Errorf("HeadersIdx: %w", err)
}
}
}

{
segFilePath := filepath.Join(snapDir, snap.SegmentFileName(r.from, r.to, snap.Transactions))
if err := m.merge(ctx, toMergeTxs, segFilePath, logEvery); err != nil {
for _, t := range snap.AllSnapshotTypes {
f, _ := snap.ParseFileName(snapDir, snap.SegmentFileName(r.from, r.to, t))
if err := m.merge(ctx, toMerge[t], f.Path, logEvery); err != nil {
return fmt.Errorf("mergeByAppendSegments: %w", err)
}
if doIndex {
if err := TransactionsIdx(ctx, m.chainID, r.from, r.to, snapDir, m.tmpDir, m.lvl); err != nil {
return fmt.Errorf("TransactionsIdx: %w", err)
if err := buildIdx(ctx, f, m.chainID, m.tmpDir, m.lvl); err != nil {
return err
}
}
}
Expand All @@ -1820,17 +1827,10 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges
m.notifier.OnNewSnapshot()
time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use
}

if err := m.removeOldFiles(toMergeHeaders, snapDir); err != nil {
return err
}

if err := m.removeOldFiles(toMergeBodies, snapDir); err != nil {
return err
}

if err := m.removeOldFiles(toMergeTxs, snapDir); err != nil {
return err
for _, t := range snap.AllSnapshotTypes {
if err := m.removeOldFiles(toMerge[t], snapDir); err != nil {
return err
}
}
}
log.Log(m.lvl, "[snapshots] Merge done", "from", mergeRanges[0].from)
Expand Down
4 changes: 2 additions & 2 deletions turbo/snapshotsync/block_snapshots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestMergeSnapshots(t *testing.T) {

{
merger := NewMerger(dir, 1, log.LvlInfo, uint256.Int{}, nil)
ranges := merger.FindMergeRanges(s)
ranges := merger.FindMergeRanges(s.Ranges())
require.True(len(ranges) > 0)
err := merger.Merge(context.Background(), s, ranges, s.Dir(), false)
require.NoError(err)
Expand All @@ -92,7 +92,7 @@ func TestMergeSnapshots(t *testing.T) {

{
merger := NewMerger(dir, 1, log.LvlInfo, uint256.Int{}, nil)
ranges := merger.FindMergeRanges(s)
ranges := merger.FindMergeRanges(s.Ranges())
require.True(len(ranges) == 0)
err := merger.Merge(context.Background(), s, ranges, s.Dir(), false)
require.NoError(err)
Expand Down

0 comments on commit cd8b10f

Please sign in to comment.