Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

Commit

Permalink
--downloader.verfiy flag to verify once on startup (ledgerwatch#4597)
Browse files Browse the repository at this point in the history
* save

* save

* save

* save

* save

* save
  • Loading branch information
AskAlexSharov committed Jul 1, 2022
1 parent c92ef88 commit 2415fec
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 13 deletions.
47 changes: 45 additions & 2 deletions cmd/downloader/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/log/v3"
mdbx2 "github.com/torquem-ch/mdbx-go/mdbx"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"
)

Expand Down Expand Up @@ -96,7 +97,10 @@ func New(cfg *downloadercfg.Cfg) (*Downloader, error) {

statsLock: &sync.RWMutex{},
}
return d, d.addSegments()
if err := d.addSegments(); err != nil {
return nil, err
}
return d, nil
}

func (d *Downloader) SnapDir() string {
Expand Down Expand Up @@ -226,6 +230,45 @@ func (d *Downloader) onComplete() {
_ = d.addSegments()
}

func (d *Downloader) verify() error {
total := 0
for _, t := range d.torrentClient.Torrents() {
select {
case <-t.GotInfo():
total += t.NumPieces()
default:
continue
}
}
logInterval := 20 * time.Second
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

wg := &sync.WaitGroup{}
j := atomic.NewInt64(0)

for _, t := range d.torrentClient.Torrents() {
wg.Add(1)
go func(t *torrent.Torrent) {
defer wg.Done()
for i := 0; i < t.NumPieces(); i++ {
j.Inc()
t.Piece(i).VerifyData()

select {
case <-logEvery.C:
log.Info("[snapshots] Verifying", "progress", fmt.Sprintf("%.2f%%", 100*float64(j.Load())/float64(total)))
default:
}
//<-t.Complete.On()
}
}(t)
}
wg.Wait()

return nil
}

func (d *Downloader) addSegments() error {
if err := BuildTorrentFilesIfNeed(context.Background(), d.cfg.DataDir); err != nil {
return err
Expand Down Expand Up @@ -325,12 +368,12 @@ func MainLoop(ctx context.Context, d *Downloader, silent bool) {
t.AllowDataDownload()
t.DownloadAll()
go func(t *torrent.Torrent) {
defer sem.Release(1)
//r := t.NewReader()
//r.SetReadahead(t.Length())
//_, _ = io.Copy(io.Discard, r) // enable streaming - it will prioritize sequential download

<-t.Complete.On()
sem.Release(1)
}(t)
}
time.Sleep(30 * time.Second)
Expand Down
8 changes: 8 additions & 0 deletions cmd/downloader/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
return &emptypb.Empty{}, nil
}

func (s *GrpcServer) Verify(ctx context.Context, request *proto_downloader.VerifyRequest) (*emptypb.Empty, error) {
err := s.d.verify()
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}

func (s *GrpcServer) Stats(ctx context.Context, request *proto_downloader.StatsRequest) (*proto_downloader.StatsReply, error) {
stats := s.d.Stats()
return &proto_downloader.StatsReply{
Expand Down
1 change: 0 additions & 1 deletion cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func Downloader(ctx context.Context) error {
if err != nil {
return err
}
log.Info("torrentLogLevel", torrentLogLevel)

var downloadRate, uploadRate datasize.ByteSize
if err := downloadRate.UnmarshalText([]byte(downloadRateStr)); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,10 @@ var (
Name: "no-downloader",
Usage: "to disable downloader component",
}
DownloaderVerifyFlag = cli.BoolFlag{
Name: "downloader.verify",
Usage: "verify snapshots on startup. it will not report founded problems but just re-download broken pieces",
}
TorrentPortFlag = cli.IntFlag{
Name: "torrent.port",
Value: 42069,
Expand Down Expand Up @@ -1389,6 +1393,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C
cfg.Snapshot.KeepBlocks = ctx.GlobalBool(SnapKeepBlocksFlag.Name)
cfg.Snapshot.Produce = !ctx.GlobalBool(SnapStopFlag.Name)
cfg.Snapshot.NoDownloader = ctx.GlobalBool(NoDownloaderFlag.Name)
cfg.Snapshot.Verify = ctx.GlobalBool(DownloaderVerifyFlag.Name)
cfg.Snapshot.DownloaderAddr = strings.TrimSpace(ctx.GlobalString(DownloaderAddrFlag.Name))
if cfg.Snapshot.DownloaderAddr == "" {
downloadRateStr := ctx.GlobalString(TorrentDownloadRateFlag.Name)
Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type Snapshot struct {
KeepBlocks bool // produce new snapshots of blocks but don't remove blocks from DB
Produce bool // produce new snapshots
NoDownloader bool // possible to use snapshots without calling Downloader
Verify bool // verify snapshots on startup
DownloaderAddr string
}

Expand Down
19 changes: 16 additions & 3 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,13 +1302,14 @@ func WaitForDownloader(ctx context.Context, cfg HeadersCfg) error {
}
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
var m runtime.MemStats

// Check once without delay, for faster erigon re-start
if stats, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err == nil && stats.Completed {
return nil
stats, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{})
if err == nil && stats.Completed {
goto Finish
}

var m runtime.MemStats
// Print download progress until all segments are available
Loop:
for {
Expand All @@ -1319,6 +1320,11 @@ Loop:
if stats, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil {
log.Warn("Error while waiting for snapshots progress", "err", err)
} else if stats.Completed {
if !cfg.snapshots.Cfg().Verify { // will verify after loop
if _, err := cfg.snapshotDownloader.Verify(ctx, &proto_downloader.VerifyRequest{}); err != nil {
return err
}
}
break Loop
} else {
if stats.MetadataReady < stats.FilesTotal {
Expand All @@ -1338,5 +1344,12 @@ Loop:
}
}
}

Finish:
if cfg.snapshots.Cfg().Verify {
if _, err := cfg.snapshotDownloader.Verify(ctx, &proto_downloader.VerifyRequest{}); err != nil {
return err
}
}
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20220629154434-59f7b5b57b68
github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21
github.com/ledgerwatch/log/v3 v3.4.1
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/nxadm/tail v1.4.9-0.20211216163028-4472660a31a6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220629154434-59f7b5b57b68 h1:GWy2Jan7bkQe7xkptxxM2zWCjNyxGNDgSUl30oDMmHQ=
github.com/ledgerwatch/erigon-lib v0.0.0-20220629154434-59f7b5b57b68/go.mod h1:7sQ5B5m54zoo7RVRVukH3YZCYVrCC+BmwDBD+9KyTrE=
github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21 h1:mZAojUAtvuvFLS8sumuYlZrHKGvkjTBxA6fvvujT/Kc=
github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21/go.mod h1:7sQ5B5m54zoo7RVRVukH3YZCYVrCC+BmwDBD+9KyTrE=
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=
github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ var DefaultFlags = []cli.Flag{
utils.SentryLogPeerInfoFlag,
utils.DownloaderAddrFlag,
utils.NoDownloaderFlag,
utils.DownloaderVerifyFlag,
HealthCheckFlag,
utils.HeimdallURLFlag,
utils.WithoutHeimdallFlag,
Expand Down
8 changes: 4 additions & 4 deletions turbo/shards/state_change_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (a *Accumulator) ChangeAccount(address common.Address, incarnation uint64,
accountChange.Action = remote.Action_UPSERT
case remote.Action_CODE:
accountChange.Action = remote.Action_UPSERT_CODE
case remote.Action_DELETE:
case remote.Action_REMOVE:
panic("")
}
accountChange.Incarnation = incarnation
Expand All @@ -104,7 +104,7 @@ func (a *Accumulator) DeleteAccount(address common.Address) {
accountChange.Data = nil
accountChange.Code = nil
accountChange.StorageChanges = nil
accountChange.Action = remote.Action_DELETE
accountChange.Action = remote.Action_REMOVE
}

// ChangeCode adds code to the latest change
Expand All @@ -122,7 +122,7 @@ func (a *Accumulator) ChangeCode(address common.Address, incarnation uint64, cod
accountChange.Action = remote.Action_CODE
case remote.Action_UPSERT:
accountChange.Action = remote.Action_UPSERT_CODE
case remote.Action_DELETE:
case remote.Action_REMOVE:
panic("")
}
accountChange.Incarnation = incarnation
Expand All @@ -138,7 +138,7 @@ func (a *Accumulator) ChangeStorage(address common.Address, incarnation uint64,
a.accountChangeIndex[address] = i
}
accountChange := a.latestChange.Changes[i]
if accountChange.Action == remote.Action_DELETE {
if accountChange.Action == remote.Action_REMOVE {
panic("")
}
accountChange.Incarnation = incarnation
Expand Down

0 comments on commit 2415fec

Please sign in to comment.