From 66758c79607e40bd96d1ccdeb0f57bd4b2d24524 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Fri, 22 Jul 2022 13:44:42 +0700 Subject: [PATCH] RetireBlocks: less arguments (#4785) * save * save --- cmd/state/commands/history22.go | 3 +- cmd/state/commands/state_recon.go | 3 +- core/state/state_recon_writer.go | 2 +- core/vm/lightclient/iavl/proof_path.go | 2 +- eth/stagedsync/stage_headers.go | 2 +- eth/stagedsync/stage_senders.go | 4 +- rpc/handler.go | 5 +- turbo/app/snapshots.go | 4 +- turbo/snapshotsync/block_snapshots.go | 72 +++++++++++--------------- 9 files changed, 37 insertions(+), 60 deletions(-) diff --git a/cmd/state/commands/history22.go b/cmd/state/commands/history22.go index a7ecf4d8ad9..02890f1f3c3 100644 --- a/cmd/state/commands/history22.go +++ b/cmd/state/commands/history22.go @@ -130,8 +130,7 @@ func History22(genesis *core.Genesis, logger log.Logger) error { prevTime := time.Now() var blockReader services.FullBlockReader - var allSnapshots *snapshotsync.RoSnapshots - allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) + allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) defer allSnapshots.Close() if err := allSnapshots.Reopen(); err != nil { return fmt.Errorf("reopen snapshot segments: %w", err) diff --git a/cmd/state/commands/state_recon.go b/cmd/state/commands/state_recon.go index 231fcca9499..db5ea046249 100644 --- a/cmd/state/commands/state_recon.go +++ b/cmd/state/commands/state_recon.go @@ -376,8 +376,7 @@ func Recon(genesis *core.Genesis, logger log.Logger) error { return err } var blockReader services.FullBlockReader - var allSnapshots *snapshotsync.RoSnapshots - allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) + allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots")) defer allSnapshots.Close() if err := allSnapshots.Reopen(); err != nil { return fmt.Errorf("reopen snapshot segments: %w", err) diff --git a/core/state/state_recon_writer.go b/core/state/state_recon_writer.go index 49f2635c75f..0d1c8e8e1ef 100644 --- a/core/state/state_recon_writer.go +++ b/core/state/state_recon_writer.go @@ -156,7 +156,7 @@ func (rs *ReconState) RollbackTx(txTask TxTask, dependency uint64) { if rs.doneBitmap.Contains(dependency) { heap.Push(&rs.queue, txTask) } else { - tt, _ := rs.triggers[dependency] + tt := rs.triggers[dependency] tt = append(tt, txTask) rs.triggers[dependency] = tt } diff --git a/core/vm/lightclient/iavl/proof_path.go b/core/vm/lightclient/iavl/proof_path.go index de366f33813..5b2609654bb 100644 --- a/core/vm/lightclient/iavl/proof_path.go +++ b/core/vm/lightclient/iavl/proof_path.go @@ -118,7 +118,7 @@ func (pl PathToLeaf) isRightmost() bool { } func (pl PathToLeaf) isEmpty() bool { - return pl == nil || len(pl) == 0 + return len(pl) == 0 } func (pl PathToLeaf) dropRoot() PathToLeaf { diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index e1c316e7eb9..b982d87c421 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1311,7 +1311,7 @@ func WaitForDownloader(ctx context.Context, cfg HeadersCfg, tx kv.RwTx) error { return err } dbEmpty := len(snInDB) == 0 - var missingSnapshots []snapshotsync.MergeRange + var missingSnapshots []snapshotsync.Range if !dbEmpty { _, missingSnapshots, err = snapshotsync.Segments(cfg.snapshots.Dir()) if err != nil { diff --git a/eth/stagedsync/stage_senders.go b/eth/stagedsync/stage_senders.go index 22f6c5ce513..0a9db4af808 100644 --- a/eth/stagedsync/stage_senders.go +++ b/eth/stagedsync/stage_senders.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/holiman/uint256" libcommon "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/cmp" "github.com/ledgerwatch/erigon-lib/common/length" @@ -430,8 +429,7 @@ func retireBlocksInSingleBackgroundThread(s *PruneState, cfg SendersCfg, ctx con } } - chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) - cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, *chainID, log.LvlInfo) + cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, log.LvlInfo) return nil } diff --git a/rpc/handler.go b/rpc/handler.go index 86985ea56e5..be73dbe7167 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -383,10 +383,7 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage, stream *json func (h *handler) isMethodAllowedByGranularControl(method string) bool { _, isForbidden := h.forbiddenList[method] if len(h.allowList) == 0 { - if isForbidden { - return false - } - return true + return !isForbidden } _, ok := h.allowList[method] diff --git a/turbo/app/snapshots.go b/turbo/app/snapshots.go index fd87d640159..b9c73f302d7 100644 --- a/turbo/app/snapshots.go +++ b/turbo/app/snapshots.go @@ -244,8 +244,6 @@ func doRetireCommand(cliCtx *cli.Context) error { defer chainDB.Close() cfg := ethconfig.NewSnapCfg(true, true, true) - chainConfig := tool.ChainConfigFromDB(chainDB) - chainID, _ := uint256.FromBig(chainConfig.ChainID) snapshots := snapshotsync.NewRoSnapshots(cfg, dirs.Snap) if err := snapshots.Reopen(); err != nil { return err @@ -256,7 +254,7 @@ func doRetireCommand(cliCtx *cli.Context) error { log.Info("Params", "from", from, "to", to, "every", every) for i := from; i < to; i += every { - if err := br.RetireBlocks(ctx, i, i+every, *chainID, log.LvlInfo); err != nil { + if err := br.RetireBlocks(ctx, i, i+every, log.LvlInfo); err != nil { panic(err) } if err := chainDB.Update(ctx, func(tx kv.RwTx) error { diff --git a/turbo/snapshotsync/block_snapshots.go b/turbo/snapshotsync/block_snapshots.go index 144f4c206e4..fc7850eca7a 100644 --- a/turbo/snapshotsync/block_snapshots.go +++ b/turbo/snapshotsync/block_snapshots.go @@ -42,7 +42,7 @@ import ( ) type DownloadRequest struct { - ranges *MergeRange + ranges *Range path string torrentHash string } @@ -50,20 +50,20 @@ type DownloadRequest struct { type HeaderSegment struct { seg *compress.Decompressor // value: first_byte_of_header_hash + header_rlp idxHeaderHash *recsplit.Index // header_hash -> headers_segment_offset - ranges MergeRange + ranges Range } type BodySegment struct { seg *compress.Decompressor // value: rlp(types.BodyForStorage) idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset - ranges MergeRange + ranges Range } type TxnSegment struct { Seg *compress.Decompressor // value: first_byte_of_transaction_hash + sender_address + transaction_rlp IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number - ranges MergeRange + ranges Range } func (sn *HeaderSegment) close() { @@ -382,22 +382,6 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...snap.Type) (err error) { return nil } -func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) { - go func() { - for !s.segmentsReady.Load() || !s.indicesReady.Load() { - select { - case <-ctx.Done(): - return - default: - } - if err := s.Reopen(); err != nil && !errors.Is(err, os.ErrNotExist) { - log.Error("AsyncOpenAll", "err", err) - } - time.Sleep(15 * time.Second) - } - }() -} - // OptimisticReopen - optimistically open snapshots (ignoring error), useful at App startup because: // - user must be able: delete any snapshot file and Erigon will self-heal by re-downloading // - RPC return Nil for historical blocks if snapshots are not open @@ -422,7 +406,7 @@ func (s *RoSnapshots) Reopen() error { s.Txs.segments = s.Txs.segments[:0] for _, f := range files { { - seg := &BodySegment{ranges: MergeRange{f.From, f.To}} + seg := &BodySegment{ranges: Range{f.From, f.To}} fileName := snap.SegmentFileName(f.From, f.To, snap.Bodies) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { @@ -434,7 +418,7 @@ func (s *RoSnapshots) Reopen() error { s.Bodies.segments = append(s.Bodies.segments, seg) } { - seg := &HeaderSegment{ranges: MergeRange{f.From, f.To}} + seg := &HeaderSegment{ranges: Range{f.From, f.To}} fileName := snap.SegmentFileName(f.From, f.To, snap.Headers) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { @@ -446,7 +430,7 @@ func (s *RoSnapshots) Reopen() error { s.Headers.segments = append(s.Headers.segments, seg) } { - seg := &TxnSegment{ranges: MergeRange{f.From, f.To}} + seg := &TxnSegment{ranges: Range{f.From, f.To}} fileName := snap.SegmentFileName(f.From, f.To, snap.Transactions) seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { @@ -517,7 +501,7 @@ func (s *RoSnapshots) ReopenSegments() error { var segmentsMaxSet bool for _, f := range files { { - seg := &BodySegment{ranges: MergeRange{f.From, f.To}} + seg := &BodySegment{ranges: Range{f.From, f.To}} fileName := snap.SegmentFileName(f.From, f.To, snap.Bodies) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { @@ -529,7 +513,7 @@ func (s *RoSnapshots) ReopenSegments() error { s.Bodies.segments = append(s.Bodies.segments, seg) } { - seg := &HeaderSegment{ranges: MergeRange{f.From, f.To}} + seg := &HeaderSegment{ranges: Range{f.From, f.To}} fileName := snap.SegmentFileName(f.From, f.To, snap.Headers) seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { @@ -541,7 +525,7 @@ func (s *RoSnapshots) ReopenSegments() error { s.Headers.segments = append(s.Headers.segments, seg) } { - seg := &TxnSegment{ranges: MergeRange{f.From, f.To}} + seg := &TxnSegment{ranges: Range{f.From, f.To}} fileName := snap.SegmentFileName(f.From, f.To, snap.Transactions) seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName)) if err != nil { @@ -793,14 +777,14 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, chainID uint256.Int, tmpD return nil } -func noGaps(in []snap.FileInfo) (out []snap.FileInfo, missingSnapshots []MergeRange) { +func noGaps(in []snap.FileInfo) (out []snap.FileInfo, missingSnapshots []Range) { var prevTo uint64 for _, f := range in { if f.To <= prevTo { continue } if f.From != prevTo { // no gaps - missingSnapshots = append(missingSnapshots, MergeRange{prevTo, f.From}) + missingSnapshots = append(missingSnapshots, Range{prevTo, f.From}) continue } prevTo = f.To @@ -854,7 +838,7 @@ func noOverlaps(in []snap.FileInfo) (res []snap.FileInfo) { return res } -func Segments(dir string) (res []snap.FileInfo, missingSnapshots []MergeRange, err error) { +func Segments(dir string) (res []snap.FileInfo, missingSnapshots []Range, err error) { list, err := snap.Segments(dir) if err != nil { return nil, missingSnapshots, err @@ -944,10 +928,12 @@ func CanDeleteTo(curBlockNum uint64, snapshots *RoSnapshots) (blockTo uint64) { hardLimit := (curBlockNum/1_000)*1_000 - params.FullImmutabilityThreshold return cmp.Min(hardLimit, snapshots.BlocksAvailable()+1) } -func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, lvl log.Lvl) error { - return retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier) +func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl) error { + chainConfig := tool.ChainConfigFromDB(br.db) + chainID, _ := uint256.FromBig(chainConfig.ChainID) + return retireBlocks(ctx, blockFrom, blockTo, *chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier) } -func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, chainID uint256.Int, lvl log.Lvl) { +func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, lvl log.Lvl) { if br.working.Load() { // go-routine is still working return @@ -968,7 +954,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg return } - err := br.RetireBlocks(ctx, blockFrom, blockTo, chainID, lvl) + err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl) br.result = &BlockRetireResult{ BlockFrom: blockFrom, BlockTo: blockTo, @@ -988,7 +974,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 return fmt.Errorf("DumpBlocks: %w", err) } if err := snapshots.Reopen(); err != nil { - return fmt.Errorf("Reopen: %w", err) + return fmt.Errorf("reopen: %w", err) } idxWorkers := workers if idxWorkers > 4 { @@ -998,7 +984,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 return err } if err := snapshots.Reopen(); err != nil { - return fmt.Errorf("Reopen: %w", err) + return fmt.Errorf("reopen: %w", err) } merger := NewMerger(tmpDir, workers, lvl, chainID, notifier) ranges := merger.FindMergeRanges(snapshots) @@ -1010,7 +996,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25 return err } if err := snapshots.Reopen(); err != nil { - return fmt.Errorf("Reopen: %w", err) + return fmt.Errorf("reopen: %w", err) } var downloadRequest []DownloadRequest @@ -1723,13 +1709,13 @@ func NewMerger(tmpDir string, workers int, lvl log.Lvl, chainID uint256.Int, not return &Merger{tmpDir: tmpDir, workers: workers, lvl: lvl, chainID: chainID, notifier: notifier} } -type MergeRange struct { +type Range struct { from, to uint64 } -func (r MergeRange) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) } +func (r Range) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) } -func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []MergeRange) { +func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []Range) { for i := len(snapshots.Headers.segments) - 1; i > 0; i-- { sn := snapshots.Headers.segments[i] if sn.ranges.to-sn.ranges.from >= snap.DEFAULT_SEGMENT_SIZE { // is complete .seg @@ -1744,14 +1730,14 @@ func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []MergeRange) { break } aggFrom := sn.ranges.to - span - res = append(res, MergeRange{from: aggFrom, to: sn.ranges.to}) + res = append(res, Range{from: aggFrom, to: sn.ranges.to}) for snapshots.Headers.segments[i].ranges.from > aggFrom { i-- } break } } - slices.SortFunc(res, func(i, j MergeRange) bool { return i.from < j.from }) + slices.SortFunc(res, func(i, j Range) bool { return i.from < j.from }) return res } func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string, err error) { @@ -1779,7 +1765,7 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeH } // Merge does merge segments in given ranges -func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []MergeRange, snapDir string, doIndex bool) error { +func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []Range, snapDir string, doIndex bool) error { if len(mergeRanges) == 0 { return nil } @@ -1943,7 +1929,7 @@ func assertSegment(segmentFile string) { } } -func NewDownloadRequest(ranges *MergeRange, path string, torrentHash string) DownloadRequest { +func NewDownloadRequest(ranges *Range, path string, torrentHash string) DownloadRequest { return DownloadRequest{ ranges: ranges, path: path,