Skip to content

Commit

Permalink
prevent downloading new snapshots after initial sync (#4585)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Jun 30, 2022
1 parent 087105d commit c03d573
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 106 deletions.
36 changes: 28 additions & 8 deletions cmd/downloader/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func New(cfg *downloadercfg.Cfg) (*Downloader, error) {
}
}

return &Downloader{
d := &Downloader{
cfg: cfg,
db: db,
pieceCompletionDB: c,
Expand All @@ -95,7 +95,8 @@ func New(cfg *downloadercfg.Cfg) (*Downloader, error) {
clientLock: &sync.RWMutex{},

statsLock: &sync.RWMutex{},
}, nil
}
return d, d.addSegments()
}

func (d *Downloader) SnapDir() string {
Expand All @@ -104,6 +105,13 @@ func (d *Downloader) SnapDir() string {
return d.cfg.DataDir
}

func (d *Downloader) IsInitialSync() bool {
d.clientLock.RLock()
defer d.clientLock.RUnlock()
_, lastPart := filepath.Split(d.cfg.DataDir)
return lastPart == "tmp"
}

func (d *Downloader) ReCalcStats(interval time.Duration) {
d.statsLock.Lock()
defer d.statsLock.Unlock()
Expand Down Expand Up @@ -215,6 +223,24 @@ func (d *Downloader) onComplete() {
d.pieceCompletionDB = c
d.folder = m
d.torrentClient = torrentClient
_ = d.addSegments()
}

func (d *Downloader) addSegments() error {
if err := BuildTorrentFilesIfNeed(context.Background(), d.cfg.DataDir); err != nil {
return err
}
files, err := seedableSegmentFiles(d.cfg.DataDir)
if err != nil {
return fmt.Errorf("seedableSegmentFiles: %w", err)
}
for _, f := range files {
_, err := AddSegment(f, d.cfg.DataDir, d.torrentClient)
if err != nil {
return err
}
}
return nil
}

func (d *Downloader) Stats() AggStats {
Expand Down Expand Up @@ -279,12 +305,6 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio
return nil, nil, nil, nil, fmt.Errorf("torrent.NewClient: %w", err)
}

if err := BuildTorrentsAndAdd(context.Background(), snapDir, torrentClient); err != nil {
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("BuildTorrentsAndAdd: %w", err)
}
}

return db, c, m, torrentClient, nil
}

Expand Down
20 changes: 17 additions & 3 deletions cmd/downloader/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package downloader

import (
"context"
"fmt"
"time"

"github.com/anacrolix/torrent/metainfo"
Expand All @@ -25,15 +26,27 @@ type GrpcServer struct {
d *Downloader
}

// Download - create new .torrent ONLY if initialSync, everything else Erigon can generate by itself
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
isInitialSync := s.d.IsInitialSync()

torrentClient := s.d.Torrent()
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
for _, it := range request.Items {
if it.TorrentHash == nil {
err := BuildTorrentAndAdd(ctx, it.Path, s.d.SnapDir(), torrentClient)
if err != nil {
if it.TorrentHash == nil { // seed new snapshot
if err := BuildTorrentFileIfNeed(it.Path, s.d.SnapDir()); err != nil {
return nil, err
}
}
ok, err := AddSegment(it.Path, s.d.SnapDir(), torrentClient)
if err != nil {
return nil, fmt.Errorf("AddSegment: %w", err)
}
if ok {
continue
}

if !isInitialSync {
continue
}

Expand All @@ -52,6 +65,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
t.DisallowDataDownload()
t.AllowDataUpload()
<-t.GotInfo()

mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(s.d.SnapDir(), t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
Expand Down
129 changes: 50 additions & 79 deletions cmd/downloader/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func AllTorrentFiles(dir string) ([]string, error) {
}
return res, nil
}
func allSegmentFiles(dir string) ([]string, error) {
func seedableSegmentFiles(dir string) ([]string, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
Expand All @@ -95,68 +95,81 @@ func allSegmentFiles(dir string) ([]string, error) {
if filepath.Ext(f.Name()) != ".seg" { // filter out only compressed files
continue
}
ff, err := snap.ParseFileName(dir, f.Name())
if err != nil {
return nil, fmt.Errorf("ParseFileName: %w", err)
}
if !ff.Seedable() {
continue
}
res = append(res, f.Name())
}
return res, nil
}

// BuildTorrentFileIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFileIfNeed(ctx context.Context, originalFileName, root string) (ok bool, err error) {
func BuildTorrentFileIfNeed(originalFileName, root string) (err error) {
f, err := snap.ParseFileName(root, originalFileName)
if err != nil {
return false, fmt.Errorf("ParseFileName: %w", err)
return fmt.Errorf("ParseFileName: %w", err)
}
if f.To-f.From != snap.DEFAULT_SEGMENT_SIZE {
return false, nil
if !f.NeedTorrentFile() {
return nil
}
torrentFilePath := filepath.Join(root, originalFileName+".torrent")
if _, err := os.Stat(torrentFilePath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return false, fmt.Errorf("os.Stat: %w", err)
}
info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize}
if err := info.BuildFromFilePath(filepath.Join(root, originalFileName)); err != nil {
return false, fmt.Errorf("BuildFromFilePath: %w", err)
}
if err := CreateTorrentFile(root, info, nil); err != nil {
return false, fmt.Errorf("CreateTorrentFile: %w", err)
}
if err := createTorrentFileFromSegment(f, nil); err != nil {
return fmt.Errorf("createTorrentFileFromInfo: %w", err)
}
return true, nil
return nil
}

func createTorrentFileFromSegment(f snap.FileInfo, mi *metainfo.MetaInfo) error {
info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize}
if err := info.BuildFromFilePath(f.Path); err != nil {
return fmt.Errorf("createTorrentFileFromSegment: %w", err)
}

dir, _ := filepath.Split(f.Path)
return createTorrentFileFromInfo(dir, info, mi)
}

func BuildTorrentAndAdd(ctx context.Context, originalFileName, snapDir string, client *torrent.Client) error {
ok, err := BuildTorrentFileIfNeed(ctx, originalFileName, snapDir)
// AddSegment - add existing .seg file, create corresponding .torrent if need
func AddSegment(originalFileName, snapDir string, client *torrent.Client) (bool, error) {
f, err := snap.ParseFileName(snapDir, originalFileName)
if err != nil {
return fmt.Errorf("BuildTorrentFileIfNeed: %w", err)
return false, fmt.Errorf("ParseFileName: %w", err)
}
if !ok {
return nil
if !f.TorrentFileExists() {
return false, nil
}
torrentFilePath := filepath.Join(snapDir, originalFileName+".torrent")
_, err = AddTorrentFile(ctx, torrentFilePath, client)
_, err = AddTorrentFile(f.Path+".torrent", client)
if err != nil {
return fmt.Errorf("AddTorrentFile: %w", err)
return false, fmt.Errorf("AddTorrentFile: %w", err)
}
return nil
return true, nil
}

// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

files, err := allSegmentFiles(snapDir)
files, err := seedableSegmentFiles(snapDir)
if err != nil {
return err
}
errs := make(chan error, len(files)*2)
wg := &sync.WaitGroup{}
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) * 2
var sem = semaphore.NewWeighted(int64(workers))
for i, f := range files {
wg.Add(1)
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
go func(f string, i int) {
defer sem.Release(1)
defer wg.Done()
_, err = BuildTorrentFileIfNeed(ctx, f, snapDir)
err = BuildTorrentFileIfNeed(f, snapDir)
if err != nil {
errs <- err
}
Expand All @@ -182,63 +195,21 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error {
return nil
}

// BuildTorrentsAndAdd - create .torrent files from .seg files (big IO) - if .seg files were placed manually to snapDir
// torrent.Client does automaticaly read all .torrent files, but we also willing to add .seg files even if corresponding .torrent doesn't exist
func BuildTorrentsAndAdd(ctx context.Context, snapDir string, client *torrent.Client) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
files, err := allSegmentFiles(snapDir)
func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
f, err := snap.ParseFileName(root, info.Name)
if err != nil {
return fmt.Errorf("allSegmentFiles: %w", err)
return fmt.Errorf("ParseFileName: %w", err)
}
errs := make(chan error, len(files)*2)
wg := &sync.WaitGroup{}
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1)
var sem = semaphore.NewWeighted(int64(workers))
for i, f := range files {
wg.Add(1)
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
go func(f string, i int) {
defer sem.Release(1)
defer wg.Done()

select {
case <-ctx.Done():
errs <- ctx.Err()
case <-logEvery.C:
log.Info("[Snapshots] Verify snapshots", "Progress", fmt.Sprintf("%d/%d", i, len(files)))
default:
}
errs <- BuildTorrentAndAdd(ctx, f, snapDir, client)
}(f, i)
}
go func() {
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
return err
}
if !f.NeedTorrentFile() {
return nil
}

return nil
}

func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
torrentFileName := filepath.Join(root, info.Name+".torrent")
if _, err := os.Stat(torrentFileName); err != nil {
if errors.Is(err, os.ErrNotExist) {
return CreateTorrentFile(root, info, mi)
}
if err := createTorrentFileFromInfo(root, info, mi); err != nil {
return err
}
return nil
}

func CreateTorrentFile(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
func createTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
if mi == nil {
infoBytes, err := bencode.Marshal(info)
if err != nil {
Expand Down Expand Up @@ -322,7 +293,7 @@ func verifyTorrent(info *metainfo.Info, root string, consumer func(i int, good b
// added first time - pieces verification process will start (disk IO heavy) - Progress
// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
// Don't need call torrent.VerifyData manually
func AddTorrentFile(ctx context.Context, torrentFilePath string, torrentClient *torrent.Client) (*torrent.Torrent, error) {
func AddTorrentFile(torrentFilePath string, torrentClient *torrent.Client) (*torrent.Torrent, error) {
mi, err := metainfo.LoadFromFile(torrentFilePath)
if err != nil {
return nil, err
Expand Down
16 changes: 0 additions & 16 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,22 +1155,6 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
if err := cfg.snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
expect := snapshothashes.KnownConfig(cfg.chainConfig.ChainName).ExpectBlocks
if cfg.snapshots.SegmentsMax() < expect {
k, err := rawdb.SecondKey(tx, kv.Headers) // genesis always first
if err != nil {
return err
}
var hasInDB uint64 = 1
if k != nil {
hasInDB = binary.BigEndian.Uint64(k)
}
if cfg.snapshots.SegmentsMax() < hasInDB {
return fmt.Errorf("not enough snapshots available: snapshots=%d, blockInDB=%d, expect=%d", cfg.snapshots.SegmentsMax(), hasInDB, expect)
} else {
log.Warn(fmt.Sprintf("not enough snapshots available: %d < %d, but we can re-generate them because DB has historical blocks up to: %d", cfg.snapshots.SegmentsMax(), expect, hasInDB))
}
}

var m runtime.MemStats
libcommon.ReadMemStats(&m)
Expand Down
5 changes: 5 additions & 0 deletions turbo/snapshotsync/snap/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -140,6 +141,10 @@ type FileInfo struct {
T Type
}

func (f FileInfo) TorrentFileExists() bool { return common.FileExist(f.Path + ".torrent") }
func (f FileInfo) Seedable() bool { return f.To-f.From == DEFAULT_SEGMENT_SIZE }
func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() }

func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") }
func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") }
func TmpFiles(dir string) (res []string, err error) {
Expand Down

0 comments on commit c03d573

Please sign in to comment.