From c03d57356c225eeafd703e95d52ec9823881d8e9 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Thu, 30 Jun 2022 22:35:44 +0600 Subject: [PATCH] prevent downloading new snapshots after initial sync (#4585) --- cmd/downloader/downloader/downloader.go | 36 +++-- .../downloader/downloader_grpc_server.go | 20 ++- cmd/downloader/downloader/util.go | 129 +++++++----------- eth/stagedsync/stage_headers.go | 16 --- turbo/snapshotsync/snap/files.go | 5 + 5 files changed, 100 insertions(+), 106 deletions(-) diff --git a/cmd/downloader/downloader/downloader.go b/cmd/downloader/downloader/downloader.go index 39ea6b980a4..4e167c951c8 100644 --- a/cmd/downloader/downloader/downloader.go +++ b/cmd/downloader/downloader/downloader.go @@ -86,7 +86,7 @@ func New(cfg *downloadercfg.Cfg) (*Downloader, error) { } } - return &Downloader{ + d := &Downloader{ cfg: cfg, db: db, pieceCompletionDB: c, @@ -95,7 +95,8 @@ func New(cfg *downloadercfg.Cfg) (*Downloader, error) { clientLock: &sync.RWMutex{}, statsLock: &sync.RWMutex{}, - }, nil + } + return d, d.addSegments() } func (d *Downloader) SnapDir() string { @@ -104,6 +105,13 @@ func (d *Downloader) SnapDir() string { return d.cfg.DataDir } +func (d *Downloader) IsInitialSync() bool { + d.clientLock.RLock() + defer d.clientLock.RUnlock() + _, lastPart := filepath.Split(d.cfg.DataDir) + return lastPart == "tmp" +} + func (d *Downloader) ReCalcStats(interval time.Duration) { d.statsLock.Lock() defer d.statsLock.Unlock() @@ -215,6 +223,24 @@ func (d *Downloader) onComplete() { d.pieceCompletionDB = c d.folder = m d.torrentClient = torrentClient + _ = d.addSegments() +} + +func (d *Downloader) addSegments() error { + if err := BuildTorrentFilesIfNeed(context.Background(), d.cfg.DataDir); err != nil { + return err + } + files, err := seedableSegmentFiles(d.cfg.DataDir) + if err != nil { + return fmt.Errorf("seedableSegmentFiles: %w", err) + } + for _, f := range files { + _, err := AddSegment(f, d.cfg.DataDir, d.torrentClient) + if err != nil { + return err + } + } + return nil } func (d *Downloader) Stats() AggStats { @@ -279,12 +305,6 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio return nil, nil, nil, nil, fmt.Errorf("torrent.NewClient: %w", err) } - if err := BuildTorrentsAndAdd(context.Background(), snapDir, torrentClient); err != nil { - if err != nil { - return nil, nil, nil, nil, fmt.Errorf("BuildTorrentsAndAdd: %w", err) - } - } - return db, c, m, torrentClient, nil } diff --git a/cmd/downloader/downloader/downloader_grpc_server.go b/cmd/downloader/downloader/downloader_grpc_server.go index 8c7411e2158..60bb4642459 100644 --- a/cmd/downloader/downloader/downloader_grpc_server.go +++ b/cmd/downloader/downloader/downloader_grpc_server.go @@ -2,6 +2,7 @@ package downloader import ( "context" + "fmt" "time" "github.com/anacrolix/torrent/metainfo" @@ -25,15 +26,27 @@ type GrpcServer struct { d *Downloader } +// Download - create new .torrent ONLY if initialSync, everything else Erigon can generate by itself func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) { + isInitialSync := s.d.IsInitialSync() + torrentClient := s.d.Torrent() mi := &metainfo.MetaInfo{AnnounceList: Trackers} for _, it := range request.Items { - if it.TorrentHash == nil { - err := BuildTorrentAndAdd(ctx, it.Path, s.d.SnapDir(), torrentClient) - if err != nil { + if it.TorrentHash == nil { // seed new snapshot + if err := BuildTorrentFileIfNeed(it.Path, s.d.SnapDir()); err != nil { return nil, err } + } + ok, err := AddSegment(it.Path, s.d.SnapDir(), torrentClient) + if err != nil { + return nil, fmt.Errorf("AddSegment: %w", err) + } + if ok { + continue + } + + if !isInitialSync { continue } @@ -52,6 +65,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow t.DisallowDataDownload() t.AllowDataUpload() <-t.GotInfo() + mi := t.Metainfo() if err := CreateTorrentFileIfNotExists(s.d.SnapDir(), t.Info(), &mi); err != nil { log.Warn("[downloader] create torrent file", "err", err) diff --git a/cmd/downloader/downloader/util.go b/cmd/downloader/downloader/util.go index 1cc79d588ed..905dfa101d5 100644 --- a/cmd/downloader/downloader/util.go +++ b/cmd/downloader/downloader/util.go @@ -75,7 +75,7 @@ func AllTorrentFiles(dir string) ([]string, error) { } return res, nil } -func allSegmentFiles(dir string) ([]string, error) { +func seedableSegmentFiles(dir string) ([]string, error) { files, err := os.ReadDir(dir) if err != nil { return nil, err @@ -95,50 +95,57 @@ func allSegmentFiles(dir string) ([]string, error) { if filepath.Ext(f.Name()) != ".seg" { // filter out only compressed files continue } + ff, err := snap.ParseFileName(dir, f.Name()) + if err != nil { + return nil, fmt.Errorf("ParseFileName: %w", err) + } + if !ff.Seedable() { + continue + } res = append(res, f.Name()) } return res, nil } // BuildTorrentFileIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually -func BuildTorrentFileIfNeed(ctx context.Context, originalFileName, root string) (ok bool, err error) { +func BuildTorrentFileIfNeed(originalFileName, root string) (err error) { f, err := snap.ParseFileName(root, originalFileName) if err != nil { - return false, fmt.Errorf("ParseFileName: %w", err) + return fmt.Errorf("ParseFileName: %w", err) } - if f.To-f.From != snap.DEFAULT_SEGMENT_SIZE { - return false, nil + if !f.NeedTorrentFile() { + return nil } - torrentFilePath := filepath.Join(root, originalFileName+".torrent") - if _, err := os.Stat(torrentFilePath); err != nil { - if !errors.Is(err, os.ErrNotExist) { - return false, fmt.Errorf("os.Stat: %w", err) - } - info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize} - if err := info.BuildFromFilePath(filepath.Join(root, originalFileName)); err != nil { - return false, fmt.Errorf("BuildFromFilePath: %w", err) - } - if err := CreateTorrentFile(root, info, nil); err != nil { - return false, fmt.Errorf("CreateTorrentFile: %w", err) - } + if err := createTorrentFileFromSegment(f, nil); err != nil { + return fmt.Errorf("createTorrentFileFromInfo: %w", err) } - return true, nil + return nil +} + +func createTorrentFileFromSegment(f snap.FileInfo, mi *metainfo.MetaInfo) error { + info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize} + if err := info.BuildFromFilePath(f.Path); err != nil { + return fmt.Errorf("createTorrentFileFromSegment: %w", err) + } + + dir, _ := filepath.Split(f.Path) + return createTorrentFileFromInfo(dir, info, mi) } -func BuildTorrentAndAdd(ctx context.Context, originalFileName, snapDir string, client *torrent.Client) error { - ok, err := BuildTorrentFileIfNeed(ctx, originalFileName, snapDir) +// AddSegment - add existing .seg file, create corresponding .torrent if need +func AddSegment(originalFileName, snapDir string, client *torrent.Client) (bool, error) { + f, err := snap.ParseFileName(snapDir, originalFileName) if err != nil { - return fmt.Errorf("BuildTorrentFileIfNeed: %w", err) + return false, fmt.Errorf("ParseFileName: %w", err) } - if !ok { - return nil + if !f.TorrentFileExists() { + return false, nil } - torrentFilePath := filepath.Join(snapDir, originalFileName+".torrent") - _, err = AddTorrentFile(ctx, torrentFilePath, client) + _, err = AddTorrentFile(f.Path+".torrent", client) if err != nil { - return fmt.Errorf("AddTorrentFile: %w", err) + return false, fmt.Errorf("AddTorrentFile: %w", err) } - return nil + return true, nil } // BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually @@ -146,17 +153,23 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() - files, err := allSegmentFiles(snapDir) + files, err := seedableSegmentFiles(snapDir) if err != nil { return err } errs := make(chan error, len(files)*2) wg := &sync.WaitGroup{} + workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) * 2 + var sem = semaphore.NewWeighted(int64(workers)) for i, f := range files { wg.Add(1) + if err := sem.Acquire(ctx, 1); err != nil { + return err + } go func(f string, i int) { + defer sem.Release(1) defer wg.Done() - _, err = BuildTorrentFileIfNeed(ctx, f, snapDir) + err = BuildTorrentFileIfNeed(f, snapDir) if err != nil { errs <- err } @@ -182,63 +195,21 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error { return nil } -// BuildTorrentsAndAdd - create .torrent files from .seg files (big IO) - if .seg files were placed manually to snapDir -// torrent.Client does automaticaly read all .torrent files, but we also willing to add .seg files even if corresponding .torrent doesn't exist -func BuildTorrentsAndAdd(ctx context.Context, snapDir string, client *torrent.Client) error { - logEvery := time.NewTicker(20 * time.Second) - defer logEvery.Stop() - files, err := allSegmentFiles(snapDir) +func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error { + f, err := snap.ParseFileName(root, info.Name) if err != nil { - return fmt.Errorf("allSegmentFiles: %w", err) + return fmt.Errorf("ParseFileName: %w", err) } - errs := make(chan error, len(files)*2) - wg := &sync.WaitGroup{} - workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) - var sem = semaphore.NewWeighted(int64(workers)) - for i, f := range files { - wg.Add(1) - if err := sem.Acquire(ctx, 1); err != nil { - return err - } - go func(f string, i int) { - defer sem.Release(1) - defer wg.Done() - - select { - case <-ctx.Done(): - errs <- ctx.Err() - case <-logEvery.C: - log.Info("[Snapshots] Verify snapshots", "Progress", fmt.Sprintf("%d/%d", i, len(files))) - default: - } - errs <- BuildTorrentAndAdd(ctx, f, snapDir, client) - }(f, i) - } - go func() { - wg.Wait() - close(errs) - }() - for err := range errs { - if err != nil { - return err - } + if !f.NeedTorrentFile() { + return nil } - - return nil -} - -func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error { - torrentFileName := filepath.Join(root, info.Name+".torrent") - if _, err := os.Stat(torrentFileName); err != nil { - if errors.Is(err, os.ErrNotExist) { - return CreateTorrentFile(root, info, mi) - } + if err := createTorrentFileFromInfo(root, info, mi); err != nil { return err } return nil } -func CreateTorrentFile(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error { +func createTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error { if mi == nil { infoBytes, err := bencode.Marshal(info) if err != nil { @@ -322,7 +293,7 @@ func verifyTorrent(info *metainfo.Info, root string, consumer func(i int, good b // added first time - pieces verification process will start (disk IO heavy) - Progress // kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again. // Don't need call torrent.VerifyData manually -func AddTorrentFile(ctx context.Context, torrentFilePath string, torrentClient *torrent.Client) (*torrent.Torrent, error) { +func AddTorrentFile(torrentFilePath string, torrentClient *torrent.Client) (*torrent.Torrent, error) { mi, err := metainfo.LoadFromFile(torrentFilePath) if err != nil { return nil, err diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 4aefc5079e6..2a88d823da3 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -1155,22 +1155,6 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if err := cfg.snapshots.Reopen(); err != nil { return fmt.Errorf("ReopenSegments: %w", err) } - expect := snapshothashes.KnownConfig(cfg.chainConfig.ChainName).ExpectBlocks - if cfg.snapshots.SegmentsMax() < expect { - k, err := rawdb.SecondKey(tx, kv.Headers) // genesis always first - if err != nil { - return err - } - var hasInDB uint64 = 1 - if k != nil { - hasInDB = binary.BigEndian.Uint64(k) - } - if cfg.snapshots.SegmentsMax() < hasInDB { - return fmt.Errorf("not enough snapshots available: snapshots=%d, blockInDB=%d, expect=%d", cfg.snapshots.SegmentsMax(), hasInDB, expect) - } else { - log.Warn(fmt.Sprintf("not enough snapshots available: %d < %d, but we can re-generate them because DB has historical blocks up to: %d", cfg.snapshots.SegmentsMax(), expect, hasInDB)) - } - } var m runtime.MemStats libcommon.ReadMemStats(&m) diff --git a/turbo/snapshotsync/snap/files.go b/turbo/snapshotsync/snap/files.go index e5ff027a00e..fa009192ce0 100644 --- a/turbo/snapshotsync/snap/files.go +++ b/turbo/snapshotsync/snap/files.go @@ -9,6 +9,7 @@ import ( "strconv" "strings" + "github.com/ledgerwatch/erigon/common" "github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes" "golang.org/x/exp/slices" ) @@ -140,6 +141,10 @@ type FileInfo struct { T Type } +func (f FileInfo) TorrentFileExists() bool { return common.FileExist(f.Path + ".torrent") } +func (f FileInfo) Seedable() bool { return f.To-f.From == DEFAULT_SEGMENT_SIZE } +func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() } + func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") } func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") } func TmpFiles(dir string) (res []string, err error) {