Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
RetireBlocks: less arguments (ledgerwatch#4785)
Browse files Browse the repository at this point in the history
* save

* save
  • Loading branch information
AskAlexSharov committed Jul 22, 2022
1 parent 6060b87 commit 66758c7
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 60 deletions.
3 changes: 1 addition & 2 deletions cmd/state/commands/history22.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ func History22(genesis *core.Genesis, logger log.Logger) error {
prevTime := time.Now()

var blockReader services.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
if err := allSnapshots.Reopen(); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)
Expand Down
3 changes: 1 addition & 2 deletions cmd/state/commands/state_recon.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,7 @@ func Recon(genesis *core.Genesis, logger log.Logger) error {
return err
}
var blockReader services.FullBlockReader
var allSnapshots *snapshotsync.RoSnapshots
allSnapshots = snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
allSnapshots := snapshotsync.NewRoSnapshots(ethconfig.NewSnapCfg(true, false, true), path.Join(datadir, "snapshots"))
defer allSnapshots.Close()
if err := allSnapshots.Reopen(); err != nil {
return fmt.Errorf("reopen snapshot segments: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion core/state/state_recon_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (rs *ReconState) RollbackTx(txTask TxTask, dependency uint64) {
if rs.doneBitmap.Contains(dependency) {
heap.Push(&rs.queue, txTask)
} else {
tt, _ := rs.triggers[dependency]
tt := rs.triggers[dependency]
tt = append(tt, txTask)
rs.triggers[dependency] = tt
}
Expand Down
2 changes: 1 addition & 1 deletion core/vm/lightclient/iavl/proof_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (pl PathToLeaf) isRightmost() bool {
}

func (pl PathToLeaf) isEmpty() bool {
return pl == nil || len(pl) == 0
return len(pl) == 0
}

func (pl PathToLeaf) dropRoot() PathToLeaf {
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,7 +1311,7 @@ func WaitForDownloader(ctx context.Context, cfg HeadersCfg, tx kv.RwTx) error {
return err
}
dbEmpty := len(snInDB) == 0
var missingSnapshots []snapshotsync.MergeRange
var missingSnapshots []snapshotsync.Range
if !dbEmpty {
_, missingSnapshots, err = snapshotsync.Segments(cfg.snapshots.Dir())
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions eth/stagedsync/stage_senders.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/cmp"
"github.com/ledgerwatch/erigon-lib/common/length"
Expand Down Expand Up @@ -430,8 +429,7 @@ func retireBlocksInSingleBackgroundThread(s *PruneState, cfg SendersCfg, ctx con
}
}

chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, *chainID, log.LvlInfo)
cfg.blockRetire.RetireBlocksInBackground(ctx, s.ForwardProgress, log.LvlInfo)

return nil
}
5 changes: 1 addition & 4 deletions rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,7 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage, stream *json
func (h *handler) isMethodAllowedByGranularControl(method string) bool {
_, isForbidden := h.forbiddenList[method]
if len(h.allowList) == 0 {
if isForbidden {
return false
}
return true
return !isForbidden
}

_, ok := h.allowList[method]
Expand Down
4 changes: 1 addition & 3 deletions turbo/app/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,6 @@ func doRetireCommand(cliCtx *cli.Context) error {
defer chainDB.Close()

cfg := ethconfig.NewSnapCfg(true, true, true)
chainConfig := tool.ChainConfigFromDB(chainDB)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
snapshots := snapshotsync.NewRoSnapshots(cfg, dirs.Snap)
if err := snapshots.Reopen(); err != nil {
return err
Expand All @@ -256,7 +254,7 @@ func doRetireCommand(cliCtx *cli.Context) error {

log.Info("Params", "from", from, "to", to, "every", every)
for i := from; i < to; i += every {
if err := br.RetireBlocks(ctx, i, i+every, *chainID, log.LvlInfo); err != nil {
if err := br.RetireBlocks(ctx, i, i+every, log.LvlInfo); err != nil {
panic(err)
}
if err := chainDB.Update(ctx, func(tx kv.RwTx) error {
Expand Down
72 changes: 29 additions & 43 deletions turbo/snapshotsync/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,28 @@ import (
)

type DownloadRequest struct {
ranges *MergeRange
ranges *Range
path string
torrentHash string
}

type HeaderSegment struct {
seg *compress.Decompressor // value: first_byte_of_header_hash + header_rlp
idxHeaderHash *recsplit.Index // header_hash -> headers_segment_offset
ranges MergeRange
ranges Range
}

type BodySegment struct {
seg *compress.Decompressor // value: rlp(types.BodyForStorage)
idxBodyNumber *recsplit.Index // block_num_u64 -> bodies_segment_offset
ranges MergeRange
ranges Range
}

type TxnSegment struct {
Seg *compress.Decompressor // value: first_byte_of_transaction_hash + sender_address + transaction_rlp
IdxTxnHash *recsplit.Index // transaction_hash -> transactions_segment_offset
IdxTxnHash2BlockNum *recsplit.Index // transaction_hash -> block_number
ranges MergeRange
ranges Range
}

func (sn *HeaderSegment) close() {
Expand Down Expand Up @@ -382,22 +382,6 @@ func (s *RoSnapshots) ReopenSomeIndices(types ...snap.Type) (err error) {
return nil
}

func (s *RoSnapshots) AsyncOpenAll(ctx context.Context) {
go func() {
for !s.segmentsReady.Load() || !s.indicesReady.Load() {
select {
case <-ctx.Done():
return
default:
}
if err := s.Reopen(); err != nil && !errors.Is(err, os.ErrNotExist) {
log.Error("AsyncOpenAll", "err", err)
}
time.Sleep(15 * time.Second)
}
}()
}

// OptimisticReopen - optimistically open snapshots (ignoring error), useful at App startup because:
// - user must be able: delete any snapshot file and Erigon will self-heal by re-downloading
// - RPC return Nil for historical blocks if snapshots are not open
Expand All @@ -422,7 +406,7 @@ func (s *RoSnapshots) Reopen() error {
s.Txs.segments = s.Txs.segments[:0]
for _, f := range files {
{
seg := &BodySegment{ranges: MergeRange{f.From, f.To}}
seg := &BodySegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Bodies)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
Expand All @@ -434,7 +418,7 @@ func (s *RoSnapshots) Reopen() error {
s.Bodies.segments = append(s.Bodies.segments, seg)
}
{
seg := &HeaderSegment{ranges: MergeRange{f.From, f.To}}
seg := &HeaderSegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Headers)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
Expand All @@ -446,7 +430,7 @@ func (s *RoSnapshots) Reopen() error {
s.Headers.segments = append(s.Headers.segments, seg)
}
{
seg := &TxnSegment{ranges: MergeRange{f.From, f.To}}
seg := &TxnSegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Transactions)
seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
Expand Down Expand Up @@ -517,7 +501,7 @@ func (s *RoSnapshots) ReopenSegments() error {
var segmentsMaxSet bool
for _, f := range files {
{
seg := &BodySegment{ranges: MergeRange{f.From, f.To}}
seg := &BodySegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Bodies)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
Expand All @@ -529,7 +513,7 @@ func (s *RoSnapshots) ReopenSegments() error {
s.Bodies.segments = append(s.Bodies.segments, seg)
}
{
seg := &HeaderSegment{ranges: MergeRange{f.From, f.To}}
seg := &HeaderSegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Headers)
seg.seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
Expand All @@ -541,7 +525,7 @@ func (s *RoSnapshots) ReopenSegments() error {
s.Headers.segments = append(s.Headers.segments, seg)
}
{
seg := &TxnSegment{ranges: MergeRange{f.From, f.To}}
seg := &TxnSegment{ranges: Range{f.From, f.To}}
fileName := snap.SegmentFileName(f.From, f.To, snap.Transactions)
seg.Seg, err = compress.NewDecompressor(path.Join(s.dir, fileName))
if err != nil {
Expand Down Expand Up @@ -793,14 +777,14 @@ func BuildIndices(ctx context.Context, s *RoSnapshots, chainID uint256.Int, tmpD
return nil
}

func noGaps(in []snap.FileInfo) (out []snap.FileInfo, missingSnapshots []MergeRange) {
func noGaps(in []snap.FileInfo) (out []snap.FileInfo, missingSnapshots []Range) {
var prevTo uint64
for _, f := range in {
if f.To <= prevTo {
continue
}
if f.From != prevTo { // no gaps
missingSnapshots = append(missingSnapshots, MergeRange{prevTo, f.From})
missingSnapshots = append(missingSnapshots, Range{prevTo, f.From})
continue
}
prevTo = f.To
Expand Down Expand Up @@ -854,7 +838,7 @@ func noOverlaps(in []snap.FileInfo) (res []snap.FileInfo) {
return res
}

func Segments(dir string) (res []snap.FileInfo, missingSnapshots []MergeRange, err error) {
func Segments(dir string) (res []snap.FileInfo, missingSnapshots []Range, err error) {
list, err := snap.Segments(dir)
if err != nil {
return nil, missingSnapshots, err
Expand Down Expand Up @@ -944,10 +928,12 @@ func CanDeleteTo(curBlockNum uint64, snapshots *RoSnapshots) (blockTo uint64) {
hardLimit := (curBlockNum/1_000)*1_000 - params.FullImmutabilityThreshold
return cmp.Min(hardLimit, snapshots.BlocksAvailable()+1)
}
func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint256.Int, lvl log.Lvl) error {
return retireBlocks(ctx, blockFrom, blockTo, chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier)
func (br *BlockRetire) RetireBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl) error {
chainConfig := tool.ChainConfigFromDB(br.db)
chainID, _ := uint256.FromBig(chainConfig.ChainID)
return retireBlocks(ctx, blockFrom, blockTo, *chainID, br.tmpDir, br.snapshots, br.db, br.workers, br.downloader, lvl, br.notifier)
}
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, chainID uint256.Int, lvl log.Lvl) {
func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProgress uint64, lvl log.Lvl) {
if br.working.Load() {
// go-routine is still working
return
Expand All @@ -968,7 +954,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg
return
}

err := br.RetireBlocks(ctx, blockFrom, blockTo, chainID, lvl)
err := br.RetireBlocks(ctx, blockFrom, blockTo, lvl)
br.result = &BlockRetireResult{
BlockFrom: blockFrom,
BlockTo: blockTo,
Expand All @@ -988,7 +974,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
return fmt.Errorf("DumpBlocks: %w", err)
}
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("Reopen: %w", err)
return fmt.Errorf("reopen: %w", err)
}
idxWorkers := workers
if idxWorkers > 4 {
Expand All @@ -998,7 +984,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
return err
}
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("Reopen: %w", err)
return fmt.Errorf("reopen: %w", err)
}
merger := NewMerger(tmpDir, workers, lvl, chainID, notifier)
ranges := merger.FindMergeRanges(snapshots)
Expand All @@ -1010,7 +996,7 @@ func retireBlocks(ctx context.Context, blockFrom, blockTo uint64, chainID uint25
return err
}
if err := snapshots.Reopen(); err != nil {
return fmt.Errorf("Reopen: %w", err)
return fmt.Errorf("reopen: %w", err)
}

var downloadRequest []DownloadRequest
Expand Down Expand Up @@ -1723,13 +1709,13 @@ func NewMerger(tmpDir string, workers int, lvl log.Lvl, chainID uint256.Int, not
return &Merger{tmpDir: tmpDir, workers: workers, lvl: lvl, chainID: chainID, notifier: notifier}
}

type MergeRange struct {
type Range struct {
from, to uint64
}

func (r MergeRange) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) }
func (r Range) String() string { return fmt.Sprintf("%dk-%dk", r.from/1000, r.to/1000) }

func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []MergeRange) {
func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []Range) {
for i := len(snapshots.Headers.segments) - 1; i > 0; i-- {
sn := snapshots.Headers.segments[i]
if sn.ranges.to-sn.ranges.from >= snap.DEFAULT_SEGMENT_SIZE { // is complete .seg
Expand All @@ -1744,14 +1730,14 @@ func (*Merger) FindMergeRanges(snapshots *RoSnapshots) (res []MergeRange) {
break
}
aggFrom := sn.ranges.to - span
res = append(res, MergeRange{from: aggFrom, to: sn.ranges.to})
res = append(res, Range{from: aggFrom, to: sn.ranges.to})
for snapshots.Headers.segments[i].ranges.from > aggFrom {
i--
}
break
}
}
slices.SortFunc(res, func(i, j MergeRange) bool { return i.from < j.from })
slices.SortFunc(res, func(i, j Range) bool { return i.from < j.from })
return res
}
func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeHeaders, toMergeBodies, toMergeTxs []string, err error) {
Expand Down Expand Up @@ -1779,7 +1765,7 @@ func (m *Merger) filesByRange(snapshots *RoSnapshots, from, to uint64) (toMergeH
}

// Merge does merge segments in given ranges
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []MergeRange, snapDir string, doIndex bool) error {
func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges []Range, snapDir string, doIndex bool) error {
if len(mergeRanges) == 0 {
return nil
}
Expand Down Expand Up @@ -1943,7 +1929,7 @@ func assertSegment(segmentFile string) {
}
}

func NewDownloadRequest(ranges *MergeRange, path string, torrentHash string) DownloadRequest {
func NewDownloadRequest(ranges *Range, path string, torrentHash string) DownloadRequest {
return DownloadRequest{
ranges: ranges,
path: path,
Expand Down

0 comments on commit 66758c7

Please sign in to comment.