Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prevent downloading new snapshots after initial sync #4585

Merged
merged 5 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions cmd/downloader/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func New(cfg *downloadercfg.Cfg) (*Downloader, error) {
}
}

return &Downloader{
d := &Downloader{
cfg: cfg,
db: db,
pieceCompletionDB: c,
Expand All @@ -95,7 +95,8 @@ func New(cfg *downloadercfg.Cfg) (*Downloader, error) {
clientLock: &sync.RWMutex{},

statsLock: &sync.RWMutex{},
}, nil
}
return d, d.addSegments()
}

func (d *Downloader) SnapDir() string {
Expand All @@ -104,6 +105,13 @@ func (d *Downloader) SnapDir() string {
return d.cfg.DataDir
}

func (d *Downloader) IsInitialSync() bool {
d.clientLock.RLock()
defer d.clientLock.RUnlock()
_, lastPart := filepath.Split(d.cfg.DataDir)
return lastPart == "tmp"
}

func (d *Downloader) ReCalcStats(interval time.Duration) {
d.statsLock.Lock()
defer d.statsLock.Unlock()
Expand Down Expand Up @@ -215,6 +223,24 @@ func (d *Downloader) onComplete() {
d.pieceCompletionDB = c
d.folder = m
d.torrentClient = torrentClient
_ = d.addSegments()
}

func (d *Downloader) addSegments() error {
if err := BuildTorrentFilesIfNeed(context.Background(), d.cfg.DataDir); err != nil {
return err
}
files, err := seedableSegmentFiles(d.cfg.DataDir)
if err != nil {
return fmt.Errorf("seedableSegmentFiles: %w", err)
}
for _, f := range files {
_, err := AddSegment(f, d.cfg.DataDir, d.torrentClient)
if err != nil {
return err
}
}
return nil
}

func (d *Downloader) Stats() AggStats {
Expand Down Expand Up @@ -279,12 +305,6 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio
return nil, nil, nil, nil, fmt.Errorf("torrent.NewClient: %w", err)
}

if err := BuildTorrentsAndAdd(context.Background(), snapDir, torrentClient); err != nil {
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("BuildTorrentsAndAdd: %w", err)
}
}

return db, c, m, torrentClient, nil
}

Expand Down
20 changes: 17 additions & 3 deletions cmd/downloader/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package downloader

import (
"context"
"fmt"
"time"

"github.com/anacrolix/torrent/metainfo"
Expand All @@ -25,15 +26,27 @@ type GrpcServer struct {
d *Downloader
}

// Download - create new .torrent ONLY if initialSync, everything else Erigon can generate by itself
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
isInitialSync := s.d.IsInitialSync()

torrentClient := s.d.Torrent()
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
for _, it := range request.Items {
if it.TorrentHash == nil {
err := BuildTorrentAndAdd(ctx, it.Path, s.d.SnapDir(), torrentClient)
if err != nil {
if it.TorrentHash == nil { // seed new snapshot
if err := BuildTorrentFileIfNeed(it.Path, s.d.SnapDir()); err != nil {
return nil, err
}
}
ok, err := AddSegment(it.Path, s.d.SnapDir(), torrentClient)
if err != nil {
return nil, fmt.Errorf("AddSegment: %w", err)
}
if ok {
continue
}

if !isInitialSync {
continue
}

Expand All @@ -52,6 +65,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
t.DisallowDataDownload()
t.AllowDataUpload()
<-t.GotInfo()

mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(s.d.SnapDir(), t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
Expand Down
129 changes: 50 additions & 79 deletions cmd/downloader/downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func AllTorrentFiles(dir string) ([]string, error) {
}
return res, nil
}
func allSegmentFiles(dir string) ([]string, error) {
func seedableSegmentFiles(dir string) ([]string, error) {
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
Expand All @@ -95,68 +95,81 @@ func allSegmentFiles(dir string) ([]string, error) {
if filepath.Ext(f.Name()) != ".seg" { // filter out only compressed files
continue
}
ff, err := snap.ParseFileName(dir, f.Name())
if err != nil {
return nil, fmt.Errorf("ParseFileName: %w", err)
}
if !ff.Seedable() {
continue
}
res = append(res, f.Name())
}
return res, nil
}

// BuildTorrentFileIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFileIfNeed(ctx context.Context, originalFileName, root string) (ok bool, err error) {
func BuildTorrentFileIfNeed(originalFileName, root string) (err error) {
f, err := snap.ParseFileName(root, originalFileName)
if err != nil {
return false, fmt.Errorf("ParseFileName: %w", err)
return fmt.Errorf("ParseFileName: %w", err)
}
if f.To-f.From != snap.DEFAULT_SEGMENT_SIZE {
return false, nil
if !f.NeedTorrentFile() {
return nil
}
torrentFilePath := filepath.Join(root, originalFileName+".torrent")
if _, err := os.Stat(torrentFilePath); err != nil {
if !errors.Is(err, os.ErrNotExist) {
return false, fmt.Errorf("os.Stat: %w", err)
}
info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize}
if err := info.BuildFromFilePath(filepath.Join(root, originalFileName)); err != nil {
return false, fmt.Errorf("BuildFromFilePath: %w", err)
}
if err := CreateTorrentFile(root, info, nil); err != nil {
return false, fmt.Errorf("CreateTorrentFile: %w", err)
}
if err := createTorrentFileFromSegment(f, nil); err != nil {
return fmt.Errorf("createTorrentFileFromInfo: %w", err)
}
return true, nil
return nil
}

func createTorrentFileFromSegment(f snap.FileInfo, mi *metainfo.MetaInfo) error {
info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize}
if err := info.BuildFromFilePath(f.Path); err != nil {
return fmt.Errorf("createTorrentFileFromSegment: %w", err)
}

dir, _ := filepath.Split(f.Path)
return createTorrentFileFromInfo(dir, info, mi)
}

func BuildTorrentAndAdd(ctx context.Context, originalFileName, snapDir string, client *torrent.Client) error {
ok, err := BuildTorrentFileIfNeed(ctx, originalFileName, snapDir)
// AddSegment - add existing .seg file, create corresponding .torrent if need
func AddSegment(originalFileName, snapDir string, client *torrent.Client) (bool, error) {
f, err := snap.ParseFileName(snapDir, originalFileName)
if err != nil {
return fmt.Errorf("BuildTorrentFileIfNeed: %w", err)
return false, fmt.Errorf("ParseFileName: %w", err)
}
if !ok {
return nil
if !f.TorrentFileExists() {
return false, nil
}
torrentFilePath := filepath.Join(snapDir, originalFileName+".torrent")
_, err = AddTorrentFile(ctx, torrentFilePath, client)
_, err = AddTorrentFile(f.Path+".torrent", client)
if err != nil {
return fmt.Errorf("AddTorrentFile: %w", err)
return false, fmt.Errorf("AddTorrentFile: %w", err)
}
return nil
return true, nil
}

// BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually
func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()

files, err := allSegmentFiles(snapDir)
files, err := seedableSegmentFiles(snapDir)
if err != nil {
return err
}
errs := make(chan error, len(files)*2)
wg := &sync.WaitGroup{}
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) * 2
var sem = semaphore.NewWeighted(int64(workers))
for i, f := range files {
wg.Add(1)
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
go func(f string, i int) {
defer sem.Release(1)
defer wg.Done()
_, err = BuildTorrentFileIfNeed(ctx, f, snapDir)
err = BuildTorrentFileIfNeed(f, snapDir)
if err != nil {
errs <- err
}
Expand All @@ -182,63 +195,21 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) error {
return nil
}

// BuildTorrentsAndAdd - create .torrent files from .seg files (big IO) - if .seg files were placed manually to snapDir
// torrent.Client does automaticaly read all .torrent files, but we also willing to add .seg files even if corresponding .torrent doesn't exist
func BuildTorrentsAndAdd(ctx context.Context, snapDir string, client *torrent.Client) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
files, err := allSegmentFiles(snapDir)
func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
f, err := snap.ParseFileName(root, info.Name)
if err != nil {
return fmt.Errorf("allSegmentFiles: %w", err)
return fmt.Errorf("ParseFileName: %w", err)
}
errs := make(chan error, len(files)*2)
wg := &sync.WaitGroup{}
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1)
var sem = semaphore.NewWeighted(int64(workers))
for i, f := range files {
wg.Add(1)
if err := sem.Acquire(ctx, 1); err != nil {
return err
}
go func(f string, i int) {
defer sem.Release(1)
defer wg.Done()

select {
case <-ctx.Done():
errs <- ctx.Err()
case <-logEvery.C:
log.Info("[Snapshots] Verify snapshots", "Progress", fmt.Sprintf("%d/%d", i, len(files)))
default:
}
errs <- BuildTorrentAndAdd(ctx, f, snapDir, client)
}(f, i)
}
go func() {
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
return err
}
if !f.NeedTorrentFile() {
return nil
}

return nil
}

func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
torrentFileName := filepath.Join(root, info.Name+".torrent")
if _, err := os.Stat(torrentFileName); err != nil {
if errors.Is(err, os.ErrNotExist) {
return CreateTorrentFile(root, info, mi)
}
if err := createTorrentFileFromInfo(root, info, mi); err != nil {
return err
}
return nil
}

func CreateTorrentFile(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
func createTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error {
if mi == nil {
infoBytes, err := bencode.Marshal(info)
if err != nil {
Expand Down Expand Up @@ -322,7 +293,7 @@ func verifyTorrent(info *metainfo.Info, root string, consumer func(i int, good b
// added first time - pieces verification process will start (disk IO heavy) - Progress
// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again.
// Don't need call torrent.VerifyData manually
func AddTorrentFile(ctx context.Context, torrentFilePath string, torrentClient *torrent.Client) (*torrent.Torrent, error) {
func AddTorrentFile(torrentFilePath string, torrentClient *torrent.Client) (*torrent.Torrent, error) {
mi, err := metainfo.LoadFromFile(torrentFilePath)
if err != nil {
return nil, err
Expand Down
16 changes: 0 additions & 16 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,22 +1155,6 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R
if err := cfg.snapshots.Reopen(); err != nil {
return fmt.Errorf("ReopenSegments: %w", err)
}
expect := snapshothashes.KnownConfig(cfg.chainConfig.ChainName).ExpectBlocks
if cfg.snapshots.SegmentsMax() < expect {
k, err := rawdb.SecondKey(tx, kv.Headers) // genesis always first
if err != nil {
return err
}
var hasInDB uint64 = 1
if k != nil {
hasInDB = binary.BigEndian.Uint64(k)
}
if cfg.snapshots.SegmentsMax() < hasInDB {
return fmt.Errorf("not enough snapshots available: snapshots=%d, blockInDB=%d, expect=%d", cfg.snapshots.SegmentsMax(), hasInDB, expect)
} else {
log.Warn(fmt.Sprintf("not enough snapshots available: %d < %d, but we can re-generate them because DB has historical blocks up to: %d", cfg.snapshots.SegmentsMax(), expect, hasInDB))
}
}

var m runtime.MemStats
libcommon.ReadMemStats(&m)
Expand Down
5 changes: 5 additions & 0 deletions turbo/snapshotsync/snap/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/turbo/snapshotsync/snapshothashes"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -140,6 +141,10 @@ type FileInfo struct {
T Type
}

func (f FileInfo) TorrentFileExists() bool { return common.FileExist(f.Path + ".torrent") }
func (f FileInfo) Seedable() bool { return f.To-f.From == DEFAULT_SEGMENT_SIZE }
func (f FileInfo) NeedTorrentFile() bool { return f.Seedable() && !f.TorrentFileExists() }

func IdxFiles(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".idx") }
func Segments(dir string) (res []FileInfo, err error) { return FilesWithExt(dir, ".seg") }
func TmpFiles(dir string) (res []string, err error) {
Expand Down