Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checking if we build torrent file #4723

Merged
merged 13 commits into from
Jul 16, 2022
110 changes: 76 additions & 34 deletions cmd/downloader/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/metainfo"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader"
Expand Down Expand Up @@ -32,49 +33,31 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
defer logEvery.Stop()

torrentClient := s.d.Torrent()
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
snapDir := s.d.SnapDir()
for i, it := range request.Items {
if it.TorrentHash == nil { // seed new snapshot
if err := BuildTorrentFileIfNeed(it.Path, s.d.SnapDir()); err != nil {
return nil, err
}
}

hash := Proto2InfoHash(it.TorrentHash)
if _, ok := torrentClient.Torrent(hash); ok {
continue
}

ok, err := AddSegment(it.Path, s.d.SnapDir(), torrentClient)
if err != nil {
return nil, fmt.Errorf("AddSegment: %w", err)
}
select {
case <-logEvery.C:
log.Info("[snpshots] initializing", "files", fmt.Sprintf("%d/%d", i, len(request.Items)))
log.Info("[snapshots] initializing", "files", fmt.Sprintf("%d/%d", i, len(request.Items)))
default:
}
if ok {
continue
}

magnet := mi.Magnet(&hash, nil)
go func(magnetUrl string) {
t, err := torrentClient.AddMagnet(magnetUrl)
if it.TorrentHash == nil {
// if we dont have the torrent hash then we seed a new snapshot
log.Info("[snapshots] seeding a new snapshot")
ok, err := seedNewSnapshot(it, torrentClient, snapDir)
if err != nil {
log.Warn("[downloader] add magnet link", "err", err)
return
return nil, err
}
t.DisallowDataDownload()
t.AllowDataUpload()
<-t.GotInfo()

mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(s.d.SnapDir(), t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
return
if ok {
log.Debug("[snapshots] already have both seg and torrent file")
continue
}
enriavil1 marked this conversation as resolved.
Show resolved Hide resolved
}(magnet.String())
}

_, err := createMagnetLinkWithInfoHash(it.TorrentHash, torrentClient, snapDir)
enriavil1 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
}
s.d.ReCalcStats(10 * time.Second) // immediately call ReCalc to set stat.Complete flag
return &emptypb.Empty{}, nil
Expand Down Expand Up @@ -110,3 +93,62 @@ func (s *GrpcServer) Stats(ctx context.Context, request *proto_downloader.StatsR
func Proto2InfoHash(in *prototypes.H160) metainfo.Hash {
return gointerfaces.ConvertH160toAddress(in)
}

// decides what we do depending on wether we have the .seg file or the .torrent file
// have .torrent no .seg => get .seg file from .torrent
// have .seg no .torrent => get .torrent from .seg
func seedNewSnapshot(it *proto_downloader.DownloadItem, torrentClient *torrent.Client, snapDir string) (bool, error) {
// if we dont have the torrent file we build it if we have the .seg file
if err := BuildTorrentFileIfNeed(it.Path, snapDir); err != nil {
return false, err
}

// we add the .seg file we have and create the .torrent file if we dont have it
ok, err := AddSegment(it.Path, snapDir, torrentClient)
if err != nil {
return false, fmt.Errorf("AddSegment: %w", err)
}

// torrent file does exist and seg
if !ok {
return false, nil
}

// we skip the item in for loop since we build the seg and torrent file here
return true, nil
}

// we dont have .seg or .torrent so we get them through the torrent hash
func createMagnetLinkWithInfoHash(hash *prototypes.H160, torrentClient *torrent.Client, snapDir string) (bool, error) {
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
if hash == nil {
return false, nil
}
infoHash := Proto2InfoHash(hash)
log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash)

if _, ok := torrentClient.Torrent(infoHash); ok {
log.Debug("[downloader] torrent client related to hash found", "hash", infoHash)
return true, nil
}

magnet := mi.Magnet(&infoHash, nil)
go func(magnetUrl string) {
t, err := torrentClient.AddMagnet(magnetUrl)
if err != nil {
log.Warn("[downloader] add magnet link", "err", err)
return
}
t.DisallowDataDownload()
t.AllowDataUpload()
<-t.GotInfo()

mi := t.Metainfo()
if err := CreateTorrentFileIfNotExists(snapDir, t.Info(), &mi); err != nil {
log.Warn("[downloader] create torrent file", "err", err)
return
}
}(magnet.String())
log.Debug("[downloader] downloaded both seg and torrent files", "hash", infoHash)
return false, nil
}