Skip to content

Commit

Permalink
Support step-by-step stage sync (#112)
Browse files Browse the repository at this point in the history
* support step-by-step stage sync

* update sync logic
  • Loading branch information
calmbeing committed Jun 1, 2023
1 parent 27caf6e commit 425ddf2
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 4 deletions.
1 change: 1 addition & 0 deletions cmd/erigon-el/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func NewStagedSync(ctx context.Context,
notifications,
forkValidator,
cfg.StageSyncUpperBound,
cfg.StageSyncStep,
),
stagedsync.StageCumulativeIndexCfg(db),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig),
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,11 @@ var (
Usage: "Upper bound for stage sync",
Value: 0,
}
StageSyncStepFlag = cli.Uint64Flag{
Name: "stage.step",
Usage: "step size for stage sync",
Value: 0,
}
)

var MetricFlags = []cli.Flag{&MetricsEnabledFlag, &MetricsHTTPFlag, &MetricsPortFlag}
Expand Down Expand Up @@ -1613,6 +1618,10 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C
if ctx.IsSet(StageSyncUpperBoundFlag.Name) {
cfg.StageSyncUpperBound = ctx.Uint64(StageSyncUpperBoundFlag.Name)
}

if ctx.IsSet(StageSyncStepFlag.Name) {
cfg.StageSyncStep = ctx.Uint64(StageSyncStepFlag.Name)
}
}

// SetDNSDiscoveryDefaults configures DNS discovery with the given URL if
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ type Config struct {
DropUselessPeers bool

StageSyncUpperBound uint64
StageSyncStep uint64
}

type Sync struct {
Expand Down
13 changes: 10 additions & 3 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type HeadersCfg struct {
forkValidator *engineapi.ForkValidator
notifications *shards.Notifications
StageSyncUpperBound uint64
StageSyncStep uint64
}

func StageHeadersCfg(
Expand All @@ -70,7 +71,7 @@ func StageHeadersCfg(
tmpdir string,
notifications *shards.Notifications,
forkValidator *engineapi.ForkValidator,
StageSyncUpperBound uint64) HeadersCfg {
StageSyncUpperBound, StageSyncStep uint64) HeadersCfg {
return HeadersCfg{
db: db,
hd: headerDownload,
Expand All @@ -87,6 +88,7 @@ func StageHeadersCfg(
forkValidator: forkValidator,
notifications: notifications,
StageSyncUpperBound: StageSyncUpperBound,
StageSyncStep: StageSyncStep,
}
}

Expand Down Expand Up @@ -799,7 +801,13 @@ func HeadersPOW(
headerInserter := headerdownload.NewHeaderInserter(logPrefix, localTd, headerProgress, cfg.blockReader)
cfg.hd.SetHeaderReader(&ChainReaderImpl{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

cfg.hd.SetStageSyncUpperBound(cfg.StageSyncUpperBound)
if cfg.StageSyncUpperBound > 0 && cfg.StageSyncStep == 0 {
// if sync upperbound enabled but disabled step sync, then set upperbound, otherwise not to avoid repeated set.
cfg.hd.SetStageSyncUpperBound(cfg.StageSyncUpperBound)
}

cfg.hd.SetStageSyncStep(cfg.StageSyncStep)

stopped := false
var noProgressCounter uint = 0
prevProgress := headerProgress
Expand All @@ -809,7 +817,6 @@ func HeadersPOW(
var sentToPeer bool
Loop:
for !stopped {

transitionedToPoS, err := rawdb.Transitioned(tx, headerProgress, cfg.chainConfig.TerminalTotalDifficulty)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,5 @@ var DefaultFlags = []cli.Flag{
&utils.SentinelAddrFlag,
&utils.SentinelPortFlag,
&utils.StageSyncUpperBoundFlag,
&utils.StageSyncStepFlag,
}
8 changes: 7 additions & 1 deletion turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func (hd *HeaderDownload) InsertHeader(hf FeedHeaderFunc, terminalTotalDifficult
var lastTime uint64
if hd.insertQueue.Len() > 0 && hd.insertQueue[0].blockHeight <= hd.highestInDb+1 {
link := hd.insertQueue[0]
if link.blockHeight > hd.stageSyncUpperBound {
if hd.stageSyncUpperBound > 0 && link.blockHeight > hd.stageSyncUpperBound {
log.Warn("Link Beyond the specified upper bound, will not insert")
return false, true, 0, lastTime, nil
}
Expand Down Expand Up @@ -1226,6 +1226,12 @@ func (hd *HeaderDownload) SetStageSyncUpperBound(stageSyncUpperBound uint64) {
hd.stageSyncUpperBound = stageSyncUpperBound
}

func (hd *HeaderDownload) SetStageSyncStep(stageSyncStep uint64) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.stageSyncUpperBound += stageSyncStep
}

func (hd *HeaderDownload) SetRequestId(requestId int) {
hd.lock.Lock()
defer hd.lock.Unlock()
Expand Down
1 change: 1 addition & 0 deletions turbo/stages/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
mock.Notifications,
engineapi.NewForkValidatorMock(1),
cfg.StageSyncUpperBound,
cfg.StageSyncStep,
),
stagedsync.StageCumulativeIndexCfg(mock.DB),
stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig),
Expand Down
2 changes: 2 additions & 0 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ func NewDefaultStages(ctx context.Context,
notifications,
forkValidator,
cfg.StageSyncUpperBound,
cfg.StageSyncStep,
),
stagedsync.StageCumulativeIndexCfg(db),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig),
Expand Down Expand Up @@ -458,6 +459,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config
dirs.Tmp,
nil, nil,
cfg.StageSyncUpperBound,
cfg.StageSyncStep,
),
stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, snapshots, blockReader, cfg.HistoryV3, cfg.TransactionsV3),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig),
Expand Down

0 comments on commit 425ddf2

Please sign in to comment.