Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
torrent: add --webseeds cli arg (#1122)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Sep 12, 2023
1 parent a177861 commit de8f783
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 81 deletions.
12 changes: 12 additions & 0 deletions common/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"os"
"os/signal"
"strings"
"syscall"

"github.com/ledgerwatch/log/v3"
Expand All @@ -44,3 +45,14 @@ func RootContext() (context.Context, context.CancelFunc) {
}()
return ctx, cancel
}

func CliString2Array(input string) []string {
l := strings.Split(input, ",")
res := make([]string, 0, len(l))
for _, r := range l {
if r = strings.TrimSpace(r); r != "" {
res = append(res, r)
}
}
return res
}
153 changes: 133 additions & 20 deletions downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"errors"
"fmt"
"io/fs"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
Expand All @@ -34,10 +36,12 @@ import (
common2 "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"github.com/pelletier/go-toml/v2"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand All @@ -57,6 +61,8 @@ type Downloader struct {
folder storage.ClientImplCloser
stopMainLoop context.CancelFunc
wg sync.WaitGroup

webseeds *WebSeeds
}

type AggStats struct {
Expand All @@ -75,18 +81,18 @@ type AggStats struct {
}

func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) {
if err := portMustBeTCPAndUDPOpen(cfg.ListenPort); err != nil {
if err := portMustBeTCPAndUDPOpen(cfg.ClientConfig.ListenPort); err != nil {
return nil, err
}

// Application must never see partially-downloaded files
// To provide such consistent view - downloader does:
// add <datadir>/snapshots/tmp - then method .onComplete will remove this suffix
// and App only work with <datadir>/snapshot s folder
if dir.FileExist(cfg.DataDir + "_tmp") { // migration from prev versions
_ = os.Rename(cfg.DataDir+"_tmp", filepath.Join(cfg.DataDir, "tmp")) // ignore error, because maybe they are on different drive, or target folder already created manually, all is fine
if dir.FileExist(cfg.SnapDir + "_tmp") { // migration from prev versions
_ = os.Rename(cfg.SnapDir+"_tmp", filepath.Join(cfg.SnapDir, "tmp")) // ignore error, because maybe they are on different drive, or target folder already created manually, all is fine
}
if err := moveFromTmp(cfg.DataDir); err != nil {
if err := moveFromTmp(cfg.SnapDir); err != nil {
return nil, err
}

Expand All @@ -99,7 +105,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) {
if err != nil {
return nil, fmt.Errorf("get peer id: %w", err)
}
cfg.PeerID = string(peerID)
cfg.ClientConfig.PeerID = string(peerID)
if len(peerID) == 0 {
if err = savePeerID(db, torrentClient.PeerID()); err != nil {
return nil, fmt.Errorf("save peer id: %w", err)
Expand All @@ -115,10 +121,20 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) {
clientLock: &sync.RWMutex{},

statsLock: &sync.RWMutex{},

webseeds: &WebSeeds{},
}
if err := d.addSegments(ctx); err != nil {
return nil, err
}
// CornerCase: no peers -> no anoncments to trackers -> no magnetlink resolution (but magnetlink has filename)
// means we can start adding weebseeds without waiting for `<-t.GotInfo()`
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.webseeds.Discover(ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles)
d.applyWebseeds()
}()
return d, nil
}

Expand All @@ -142,6 +158,8 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error {
go func() {
defer d.wg.Done()

// 2 loops: 1-st waiting for "torrents resolution" (receiving metadata from trackers)

// Torrents that are already taken care of
torrentMap := map[metainfo.Hash]struct{}{}
// First loop drops torrents that were downloaded or are already complete
Expand Down Expand Up @@ -190,9 +208,6 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error {

atomic.StoreUint64(&d.stats.DroppedCompleted, 0)
atomic.StoreUint64(&d.stats.DroppedTotal, 0)
if err := d.addSegments(ctx); err != nil {
return
}
DownloadLoop2:
torrents = d.Torrent().Torrents()
for _, t := range torrents {
Expand Down Expand Up @@ -289,7 +304,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error {
func (d *Downloader) SnapDir() string {
d.clientLock.RLock()
defer d.clientLock.RUnlock()
return d.cfg.DataDir
return d.cfg.SnapDir
}

func (d *Downloader) ReCalcStats(interval time.Duration) {
Expand Down Expand Up @@ -451,7 +466,7 @@ func (d *Downloader) VerifyData(ctx context.Context) error {
return d.db.Update(context.Background(), func(tx kv.RwTx) error { return nil })
}

func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *prototypes.H160, snapDir string) (bool, error) {
func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *prototypes.H160, name string, snapDir string) (bool, error) {
mi := &metainfo.MetaInfo{AnnounceList: Trackers}
if hash == nil {
return false, nil
Expand All @@ -464,7 +479,7 @@ func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *pro
return true, nil
}

magnet := mi.Magnet(&infoHash, nil)
magnet := mi.Magnet(&infoHash, &metainfo.Info{Name: name})
t, err := d.torrentClient.AddMagnet(magnet.String())
if err != nil {
//log.Warn("[downloader] add magnet link", "err", err)
Expand All @@ -491,25 +506,40 @@ func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *pro
return false, nil
}

func seedableFiles(snapDir string) ([]string, error) {
files, err := seedableSegmentFiles(snapDir)
if err != nil {
return nil, fmt.Errorf("seedableSegmentFiles: %w", err)
}
files2, err := seedableHistorySnapshots(snapDir, "history")
if err != nil {
return nil, fmt.Errorf("seedableHistorySnapshots: %w", err)
}
files = append(files, files2...)
files2, err = seedableHistorySnapshots(snapDir, "warm")
if err != nil {
return nil, fmt.Errorf("seedableHistorySnapshots: %w", err)
}
files = append(files, files2...)
return files, nil
}
func (d *Downloader) addSegments(ctx context.Context) error {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
_, err := BuildTorrentFilesIfNeed(context.Background(), d.SnapDir())
if err != nil {
return err
}
files, err := seedableSegmentFiles(d.SnapDir())
if err != nil {
return fmt.Errorf("seedableSegmentFiles: %w", err)
}
files2, err := seedableHistorySnapshots(d.SnapDir())
err = AddTorrentFiles(d.SnapDir(), d.torrentClient)
if err != nil {
return fmt.Errorf("seedableHistorySnapshots: %w", err)
return fmt.Errorf("AddTorrentFiles: %w", err)
}
files = append(files, files2...)

g, ctx := errgroup.WithContext(ctx)
i := atomic.Int64{}
files, err := seedableFiles(d.SnapDir())
if err != nil {
return err
}
for _, f := range files {
f := f
g.Go(func() error {
Expand All @@ -518,7 +548,7 @@ func (d *Downloader) addSegments(ctx context.Context) error {
return ctx.Err()
default:
}
_, err := AddSegment(f, d.cfg.DataDir, d.torrentClient)
_, err := AddSegment(f, d.cfg.SnapDir, d.torrentClient)
if err != nil {
return err
}
Expand Down Expand Up @@ -612,3 +642,86 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio

return db, c, m, torrentClient, nil
}

func (d *Downloader) applyWebseeds() {
for _, t := range d.Torrent().Torrents() {
urls, ok := d.webseeds.GetByFileNames()[t.Name()]
if !ok {
continue
}
log.Debug("[downloader] addd webseeds", "file", t.Name())
t.AddWebSeeds(urls)
}
}

type WebSeeds struct {
lock sync.Mutex
webSeedsByFilName snaptype.WebSeeds
}

func (d *WebSeeds) GetByFileNames() snaptype.WebSeeds {
d.lock.Lock()
defer d.lock.Unlock()
return d.webSeedsByFilName
}
func (d *WebSeeds) SetByFileNames(l snaptype.WebSeeds) {
d.lock.Lock()
defer d.lock.Unlock()
d.webSeedsByFilName = l
}

func (d *WebSeeds) callWebSeedsProvider(ctx context.Context, webSeedProviderUrl *url.URL) (snaptype.WebSeedsFromProvider, error) {
request, err := http.NewRequest(http.MethodGet, webSeedProviderUrl.String(), nil)
if err != nil {
return nil, err
}
request = request.WithContext(ctx)
resp, err := http.DefaultClient.Do(request)
if err != nil {
return nil, err
}
defer resp.Body.Close()
response := snaptype.WebSeedsFromProvider{}
if err := toml.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, err
}
return response, nil
}
func (d *WebSeeds) readWebSeedsFile(webSeedProviderPath string) (snaptype.WebSeedsFromProvider, error) {
data, err := os.ReadFile(webSeedProviderPath)
if err != nil {
return nil, err
}
response := snaptype.WebSeedsFromProvider{}
if err := toml.Unmarshal(data, &response); err != nil {
return nil, err
}
return response, nil
}

func (d *WebSeeds) Discover(ctx context.Context, urls []*url.URL, files []string) {
list := make([]snaptype.WebSeedsFromProvider, len(urls)+len(files))
for _, webSeedProviderURL := range urls {
select {
case <-ctx.Done():
break
default:
}
response, err := d.callWebSeedsProvider(ctx, webSeedProviderURL)
if err != nil { // don't fail on error
log.Warn("[downloader] callWebSeedsProvider", "err", err, "url", webSeedProviderURL.EscapedPath())
continue
}
list = append(list, response)
}
for _, webSeedFile := range files {
response, err := d.readWebSeedsFile(webSeedFile)
if err != nil { // don't fail on error
_, fileName := filepath.Split(webSeedFile)
log.Warn("[downloader] readWebSeedsFile", "err", err, "file", fileName)
continue
}
list = append(list, response)
}
d.SetByFileNames(snaptype.NewWebSeeds(list))
}
24 changes: 15 additions & 9 deletions downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ type GrpcServer struct {
func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) {
logEvery := time.NewTicker(20 * time.Second)
defer logEvery.Stop()
defer s.d.applyWebseeds()

torrentClient := s.d.Torrent()
snapDir := s.d.SnapDir()
for i, it := range request.Items {
if it.Path == "" {
return nil, fmt.Errorf("field 'path' is required")
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-logEvery.C:
log.Info("[snapshots] initializing", "files", fmt.Sprintf("%d/%d", i, len(request.Items)))
default:
Expand All @@ -62,7 +65,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
if it.TorrentHash == nil {
// if we dont have the torrent hash then we seed a new snapshot
log.Info("[snapshots] seeding a new snapshot")
ok, err := seedNewSnapshot(it, torrentClient, snapDir)
ok, err := seedNewSnapshot(ctx, it.Path, torrentClient, snapDir)
if err != nil {
return nil, err
}
Expand All @@ -74,7 +77,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow
continue
}

_, err := s.d.createMagnetLinkWithInfoHash(ctx, it.TorrentHash, snapDir)
_, err := s.d.createMagnetLinkWithInfoHash(ctx, it.TorrentHash, it.Path, snapDir)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -117,14 +120,19 @@ func Proto2InfoHash(in *prototypes.H160) metainfo.Hash {
// decides what we do depending on wether we have the .seg file or the .torrent file
// have .torrent no .seg => get .seg file from .torrent
// have .seg no .torrent => get .torrent from .seg
func seedNewSnapshot(it *proto_downloader.DownloadItem, torrentClient *torrent.Client, snapDir string) (bool, error) {
func seedNewSnapshot(ctx context.Context, name string, torrentClient *torrent.Client, snapDir string) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
// if we dont have the torrent file we build it if we have the .seg file
if err := buildTorrentIfNeed(it.Path, snapDir); err != nil {
if err := buildTorrentIfNeed(ctx, name, snapDir); err != nil {
return false, err
}

// we add the .seg file we have and create the .torrent file if we dont have it
ok, err := AddSegment(it.Path, snapDir, torrentClient)
ok, err := AddSegment(name, snapDir, torrentClient)
if err != nil {
return false, fmt.Errorf("AddSegment: %w", err)
}
Expand All @@ -137,5 +145,3 @@ func seedNewSnapshot(it *proto_downloader.DownloadItem, torrentClient *torrent.C
// we skip the item in for loop since we build the seg and torrent file here
return true, nil
}

// we dont have .seg or .torrent so we get them through the torrent hash
Loading

0 comments on commit de8f783

Please sign in to comment.