diff --git a/cmd/downloader/downloader/downloader.go b/cmd/downloader/downloader/downloader.go index 4e167c951c8..5ea02a0fe33 100644 --- a/cmd/downloader/downloader/downloader.go +++ b/cmd/downloader/downloader/downloader.go @@ -19,6 +19,7 @@ import ( "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/log/v3" mdbx2 "github.com/torquem-ch/mdbx-go/mdbx" + "go.uber.org/atomic" "golang.org/x/sync/semaphore" ) @@ -96,7 +97,10 @@ func New(cfg *downloadercfg.Cfg) (*Downloader, error) { statsLock: &sync.RWMutex{}, } - return d, d.addSegments() + if err := d.addSegments(); err != nil { + return nil, err + } + return d, nil } func (d *Downloader) SnapDir() string { @@ -226,6 +230,45 @@ func (d *Downloader) onComplete() { _ = d.addSegments() } +func (d *Downloader) verify() error { + total := 0 + for _, t := range d.torrentClient.Torrents() { + select { + case <-t.GotInfo(): + total += t.NumPieces() + default: + continue + } + } + logInterval := 20 * time.Second + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + + wg := &sync.WaitGroup{} + j := atomic.NewInt64(0) + + for _, t := range d.torrentClient.Torrents() { + wg.Add(1) + go func(t *torrent.Torrent) { + defer wg.Done() + for i := 0; i < t.NumPieces(); i++ { + j.Inc() + t.Piece(i).VerifyData() + + select { + case <-logEvery.C: + log.Info("[snapshots] Verifying", "progress", fmt.Sprintf("%.2f%%", 100*float64(j.Load())/float64(total))) + default: + } + //<-t.Complete.On() + } + }(t) + } + wg.Wait() + + return nil +} + func (d *Downloader) addSegments() error { if err := BuildTorrentFilesIfNeed(context.Background(), d.cfg.DataDir); err != nil { return err @@ -325,12 +368,12 @@ func MainLoop(ctx context.Context, d *Downloader, silent bool) { t.AllowDataDownload() t.DownloadAll() go func(t *torrent.Torrent) { + defer sem.Release(1) //r := t.NewReader() //r.SetReadahead(t.Length()) //_, _ = io.Copy(io.Discard, r) // enable streaming - it will prioritize sequential download <-t.Complete.On() - sem.Release(1) }(t) } time.Sleep(30 * time.Second) diff --git a/cmd/downloader/downloader/downloader_grpc_server.go b/cmd/downloader/downloader/downloader_grpc_server.go index 60bb4642459..11f1f596039 100644 --- a/cmd/downloader/downloader/downloader_grpc_server.go +++ b/cmd/downloader/downloader/downloader_grpc_server.go @@ -77,6 +77,14 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow return &emptypb.Empty{}, nil } +func (s *GrpcServer) Verify(ctx context.Context, request *proto_downloader.VerifyRequest) (*emptypb.Empty, error) { + err := s.d.verify() + if err != nil { + return nil, err + } + return &emptypb.Empty{}, nil +} + func (s *GrpcServer) Stats(ctx context.Context, request *proto_downloader.StatsRequest) (*proto_downloader.StatsReply, error) { stats := s.d.Stats() return &proto_downloader.StatsReply{ diff --git a/cmd/downloader/main.go b/cmd/downloader/main.go index 6f833871ac1..731fd5ab3bf 100644 --- a/cmd/downloader/main.go +++ b/cmd/downloader/main.go @@ -118,7 +118,6 @@ func Downloader(ctx context.Context) error { if err != nil { return err } - log.Info("torrentLogLevel", torrentLogLevel) var downloadRate, uploadRate datasize.ByteSize if err := downloadRate.UnmarshalText([]byte(downloadRateStr)); err != nil { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 50150af6354..d4cc43990ef 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -668,6 +668,10 @@ var ( Name: "no-downloader", Usage: "to disable downloader component", } + DownloaderVerifyFlag = cli.BoolFlag{ + Name: "downloader.verify", + Usage: "verify snapshots on startup. it will not report founded problems but just re-download broken pieces", + } TorrentPortFlag = cli.IntFlag{ Name: "torrent.port", Value: 42069, @@ -1389,6 +1393,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C cfg.Snapshot.KeepBlocks = ctx.GlobalBool(SnapKeepBlocksFlag.Name) cfg.Snapshot.Produce = !ctx.GlobalBool(SnapStopFlag.Name) cfg.Snapshot.NoDownloader = ctx.GlobalBool(NoDownloaderFlag.Name) + cfg.Snapshot.Verify = ctx.GlobalBool(DownloaderVerifyFlag.Name) cfg.Snapshot.DownloaderAddr = strings.TrimSpace(ctx.GlobalString(DownloaderAddrFlag.Name)) if cfg.Snapshot.DownloaderAddr == "" { downloadRateStr := ctx.GlobalString(TorrentDownloadRateFlag.Name) diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 0e4e745f577..dc54c9b31cb 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -125,6 +125,7 @@ type Snapshot struct { KeepBlocks bool // produce new snapshots of blocks but don't remove blocks from DB Produce bool // produce new snapshots NoDownloader bool // possible to use snapshots without calling Downloader + Verify bool // verify snapshots on startup DownloaderAddr string } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index e94db76b84b..204f76b301f 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1302,13 +1302,14 @@ func WaitForDownloader(ctx context.Context, cfg HeadersCfg) error { } logEvery := time.NewTicker(logInterval) defer logEvery.Stop() + var m runtime.MemStats // Check once without delay, for faster erigon re-start - if stats, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err == nil && stats.Completed { - return nil + stats, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}) + if err == nil && stats.Completed { + goto Finish } - var m runtime.MemStats // Print download progress until all segments are available Loop: for { @@ -1319,6 +1320,11 @@ Loop: if stats, err := cfg.snapshotDownloader.Stats(ctx, &proto_downloader.StatsRequest{}); err != nil { log.Warn("Error while waiting for snapshots progress", "err", err) } else if stats.Completed { + if !cfg.snapshots.Cfg().Verify { // will verify after loop + if _, err := cfg.snapshotDownloader.Verify(ctx, &proto_downloader.VerifyRequest{}); err != nil { + return err + } + } break Loop } else { if stats.MetadataReady < stats.FilesTotal { @@ -1338,5 +1344,12 @@ Loop: } } } + +Finish: + if cfg.snapshots.Cfg().Verify { + if _, err := cfg.snapshotDownloader.Verify(ctx, &proto_downloader.VerifyRequest{}); err != nil { + return err + } + } return nil } diff --git a/go.mod b/go.mod index 68ed3b07b6b..bebbc1d6159 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/json-iterator/go v1.1.12 github.com/julienschmidt/httprouter v1.3.0 github.com/kevinburke/go-bindata v3.21.0+incompatible - github.com/ledgerwatch/erigon-lib v0.0.0-20220629154434-59f7b5b57b68 + github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21 github.com/ledgerwatch/log/v3 v3.4.1 github.com/ledgerwatch/secp256k1 v1.0.0 github.com/nxadm/tail v1.4.9-0.20211216163028-4472660a31a6 diff --git a/go.sum b/go.sum index 40b0cee0689..b99c0955a95 100644 --- a/go.sum +++ b/go.sum @@ -386,8 +386,8 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20220629154434-59f7b5b57b68 h1:GWy2Jan7bkQe7xkptxxM2zWCjNyxGNDgSUl30oDMmHQ= -github.com/ledgerwatch/erigon-lib v0.0.0-20220629154434-59f7b5b57b68/go.mod h1:7sQ5B5m54zoo7RVRVukH3YZCYVrCC+BmwDBD+9KyTrE= +github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21 h1:mZAojUAtvuvFLS8sumuYlZrHKGvkjTBxA6fvvujT/Kc= +github.com/ledgerwatch/erigon-lib v0.0.0-20220701042032-ed452dbc4b21/go.mod h1:7sQ5B5m54zoo7RVRVukH3YZCYVrCC+BmwDBD+9KyTrE= github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc= github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY= github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ= diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index af923646c0e..93d61401c22 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -123,6 +123,7 @@ var DefaultFlags = []cli.Flag{ utils.SentryLogPeerInfoFlag, utils.DownloaderAddrFlag, utils.NoDownloaderFlag, + utils.DownloaderVerifyFlag, HealthCheckFlag, utils.HeimdallURLFlag, utils.WithoutHeimdallFlag, diff --git a/turbo/shards/state_change_accumulator.go b/turbo/shards/state_change_accumulator.go index 9fdf9f0b8dc..dc7372c9537 100644 --- a/turbo/shards/state_change_accumulator.go +++ b/turbo/shards/state_change_accumulator.go @@ -81,7 +81,7 @@ func (a *Accumulator) ChangeAccount(address common.Address, incarnation uint64, accountChange.Action = remote.Action_UPSERT case remote.Action_CODE: accountChange.Action = remote.Action_UPSERT_CODE - case remote.Action_DELETE: + case remote.Action_REMOVE: panic("") } accountChange.Incarnation = incarnation @@ -104,7 +104,7 @@ func (a *Accumulator) DeleteAccount(address common.Address) { accountChange.Data = nil accountChange.Code = nil accountChange.StorageChanges = nil - accountChange.Action = remote.Action_DELETE + accountChange.Action = remote.Action_REMOVE } // ChangeCode adds code to the latest change @@ -122,7 +122,7 @@ func (a *Accumulator) ChangeCode(address common.Address, incarnation uint64, cod accountChange.Action = remote.Action_CODE case remote.Action_UPSERT: accountChange.Action = remote.Action_UPSERT_CODE - case remote.Action_DELETE: + case remote.Action_REMOVE: panic("") } accountChange.Incarnation = incarnation @@ -138,7 +138,7 @@ func (a *Accumulator) ChangeStorage(address common.Address, incarnation uint64, a.accountChangeIndex[address] = i } accountChange := a.latestChange.Changes[i] - if accountChange.Action == remote.Action_DELETE { + if accountChange.Action == remote.Action_REMOVE { panic("") } accountChange.Incarnation = incarnation