Skip to content

Commit

Permalink
Snapshots: save initial list to db, to avoid future snapshots downloa…
Browse files Browse the repository at this point in the history
…ding #4625
  • Loading branch information
AskAlexSharov committed Jul 4, 2022
1 parent 99d9535 commit ff847cd
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 15 deletions.
7 changes: 0 additions & 7 deletions cmd/downloader/downloader/downloader.go
Expand Up @@ -109,13 +109,6 @@ func (d *Downloader) SnapDir() string {
return d.cfg.DataDir
}

func (d *Downloader) IsInitialSync() bool {
d.clientLock.RLock()
defer d.clientLock.RUnlock()
_, lastPart := filepath.Split(d.cfg.DataDir)
return lastPart == "tmp"
}

func (d *Downloader) ReCalcStats(interval time.Duration) {
d.statsLock.Lock()
defer d.statsLock.Unlock()
Expand Down
7 changes: 1 addition & 6 deletions cmd/downloader/downloader/downloader_grpc_server.go
Expand Up @@ -28,8 +28,6 @@ type GrpcServer struct {

// Download - create new .torrent ONLY if initialSync, everything else Erigon can generate by itself
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
isInitialSync := s.d.IsInitialSync()

torrentClient := s.d.Torrent()
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
for _, it := range request.Items {
Expand All @@ -38,6 +36,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
return nil, err
}
}

ok, err := AddSegment(it.Path, s.d.SnapDir(), torrentClient)
if err != nil {
return nil, fmt.Errorf("AddSegment: %w", err)
Expand All @@ -46,10 +45,6 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
continue
}

if !isInitialSync {
continue
}

hash := Proto2InfoHash(it.TorrentHash)
if _, ok := torrentClient.Torrent(hash); ok {
continue
Expand Down
27 changes: 27 additions & 0 deletions core/rawdb/accessors_chain.go
Expand Up @@ -1616,3 +1616,30 @@ func IsPosBlock(db kv.Getter, blockHash common.Hash) (trans bool, err error) {

return header.Difficulty.Cmp(common.Big0) == 0, nil
}

func ReadSnapshots(tx kv.Tx) (map[string]string, error) {
res := map[string]string{}
if err := tx.ForEach(kv.Snapshots, nil, func(k, v []byte) error {
res[string(k)] = string(v)
return nil
}); err != nil {
return nil, err
}
return res, nil
}

func WriteSnapshots(tx kv.RwTx, list map[string]string) error {
for k, v := range list {
has, err := tx.Has(kv.Snapshots, []byte(k))
if err != nil {
return err
}
if has {
continue
}
if err = tx.Put(kv.Snapshots, []byte(k), []byte(v)); err != nil {
return err
}
}
return nil
}
24 changes: 22 additions & 2 deletions eth/stagedsync/stage_headers.go
Expand Up @@ -1165,7 +1165,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
return nil
}

if err := WaitForDownloader(ctx, cfg); err != nil {
if err := WaitForDownloader(ctx, cfg, tx); err != nil {
return err
}
if err := cfg.snapshots.Reopen(); err != nil {
Expand Down Expand Up @@ -1280,16 +1280,30 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R

// WaitForDownloader - wait for Downloader service to download all expected snapshots
// for MVP we sync with Downloader only once, in future will send new snapshots also
func WaitForDownloader(ctx context.Context, cfg HeadersCfg) error {
func WaitForDownloader(ctx context.Context, cfg HeadersCfg, tx kv.RwTx) error {
if cfg.snapshots.Cfg().NoDownloader {
return nil
}

snInDB, err := rawdb.ReadSnapshots(tx)
if err != nil {
return err
}
dbEmpty := len(snInDB) == 0

// send all hashes to the Downloader service
preverified := snapshothashes.KnownConfig(cfg.chainConfig.ChainName).Preverified
req := &proto_downloader.DownloadRequest{Items: make([]*proto_downloader.DownloadItem, 0, len(preverified))}
i := 0
for _, p := range preverified {
_, has := snInDB[p.Name]
if !dbEmpty && !has {
continue
}
if dbEmpty {
snInDB[p.Name] = p.Hash
}

req.Items = append(req.Items, &proto_downloader.DownloadItem{
TorrentHash: downloadergrpc.String2Proto(p.Hash),
Path: p.Name,
Expand Down Expand Up @@ -1361,5 +1375,11 @@ Finish:
return err
}
}

if dbEmpty {
if err = rawdb.WriteSnapshots(tx, snInDB); err != nil {
return err
}
}
return nil
}

0 comments on commit ff847cd

Please sign in to comment.