From de8f783eeb2d123bd399de303de55803a6d17b41 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 12 Sep 2023 12:18:39 +0700 Subject: [PATCH] torrent: add --webseeds cli arg (#1122) --- common/cli.go | 12 ++ downloader/downloader.go | 153 +++++++++++++++++++--- downloader/downloader_grpc_server.go | 24 ++-- downloader/downloadercfg/downloadercfg.go | 41 +++++- downloader/snaptype/webseeds.go | 19 +++ downloader/util.go | 133 ++++++++++++------- go.mod | 1 + go.sum | 2 + 8 files changed, 304 insertions(+), 81 deletions(-) create mode 100644 downloader/snaptype/webseeds.go diff --git a/common/cli.go b/common/cli.go index 0f0008e35..c195b9151 100644 --- a/common/cli.go +++ b/common/cli.go @@ -20,6 +20,7 @@ import ( "context" "os" "os/signal" + "strings" "syscall" "github.com/ledgerwatch/log/v3" @@ -44,3 +45,14 @@ func RootContext() (context.Context, context.CancelFunc) { }() return ctx, cancel } + +func CliString2Array(input string) []string { + l := strings.Split(input, ",") + res := make([]string, 0, len(l)) + for _, r := range l { + if r = strings.TrimSpace(r); r != "" { + res = append(res, r) + } + } + return res +} diff --git a/downloader/downloader.go b/downloader/downloader.go index ee0a4c1be..0afafabb9 100644 --- a/downloader/downloader.go +++ b/downloader/downloader.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" "io/fs" + "net/http" + "net/url" "os" "path/filepath" "runtime" @@ -34,10 +36,12 @@ import ( common2 "github.com/ledgerwatch/erigon-lib/common" "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" prototypes "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/log/v3" + "github.com/pelletier/go-toml/v2" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) @@ -57,6 +61,8 @@ type Downloader struct { folder storage.ClientImplCloser stopMainLoop context.CancelFunc wg sync.WaitGroup + + webseeds *WebSeeds } type AggStats struct { @@ -75,7 +81,7 @@ type AggStats struct { } func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) { - if err := portMustBeTCPAndUDPOpen(cfg.ListenPort); err != nil { + if err := portMustBeTCPAndUDPOpen(cfg.ClientConfig.ListenPort); err != nil { return nil, err } @@ -83,10 +89,10 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) { // To provide such consistent view - downloader does: // add /snapshots/tmp - then method .onComplete will remove this suffix // and App only work with /snapshot s folder - if dir.FileExist(cfg.DataDir + "_tmp") { // migration from prev versions - _ = os.Rename(cfg.DataDir+"_tmp", filepath.Join(cfg.DataDir, "tmp")) // ignore error, because maybe they are on different drive, or target folder already created manually, all is fine + if dir.FileExist(cfg.SnapDir + "_tmp") { // migration from prev versions + _ = os.Rename(cfg.SnapDir+"_tmp", filepath.Join(cfg.SnapDir, "tmp")) // ignore error, because maybe they are on different drive, or target folder already created manually, all is fine } - if err := moveFromTmp(cfg.DataDir); err != nil { + if err := moveFromTmp(cfg.SnapDir); err != nil { return nil, err } @@ -99,7 +105,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) { if err != nil { return nil, fmt.Errorf("get peer id: %w", err) } - cfg.PeerID = string(peerID) + cfg.ClientConfig.PeerID = string(peerID) if len(peerID) == 0 { if err = savePeerID(db, torrentClient.PeerID()); err != nil { return nil, fmt.Errorf("save peer id: %w", err) @@ -115,10 +121,20 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) { clientLock: &sync.RWMutex{}, statsLock: &sync.RWMutex{}, + + webseeds: &WebSeeds{}, } if err := d.addSegments(ctx); err != nil { return nil, err } + // CornerCase: no peers -> no anoncments to trackers -> no magnetlink resolution (but magnetlink has filename) + // means we can start adding weebseeds without waiting for `<-t.GotInfo()` + d.wg.Add(1) + go func() { + defer d.wg.Done() + d.webseeds.Discover(ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles) + d.applyWebseeds() + }() return d, nil } @@ -142,6 +158,8 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { go func() { defer d.wg.Done() + // 2 loops: 1-st waiting for "torrents resolution" (receiving metadata from trackers) + // Torrents that are already taken care of torrentMap := map[metainfo.Hash]struct{}{} // First loop drops torrents that were downloaded or are already complete @@ -190,9 +208,6 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { atomic.StoreUint64(&d.stats.DroppedCompleted, 0) atomic.StoreUint64(&d.stats.DroppedTotal, 0) - if err := d.addSegments(ctx); err != nil { - return - } DownloadLoop2: torrents = d.Torrent().Torrents() for _, t := range torrents { @@ -289,7 +304,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { func (d *Downloader) SnapDir() string { d.clientLock.RLock() defer d.clientLock.RUnlock() - return d.cfg.DataDir + return d.cfg.SnapDir } func (d *Downloader) ReCalcStats(interval time.Duration) { @@ -451,7 +466,7 @@ func (d *Downloader) VerifyData(ctx context.Context) error { return d.db.Update(context.Background(), func(tx kv.RwTx) error { return nil }) } -func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *prototypes.H160, snapDir string) (bool, error) { +func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *prototypes.H160, name string, snapDir string) (bool, error) { mi := &metainfo.MetaInfo{AnnounceList: Trackers} if hash == nil { return false, nil @@ -464,7 +479,7 @@ func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *pro return true, nil } - magnet := mi.Magnet(&infoHash, nil) + magnet := mi.Magnet(&infoHash, &metainfo.Info{Name: name}) t, err := d.torrentClient.AddMagnet(magnet.String()) if err != nil { //log.Warn("[downloader] add magnet link", "err", err) @@ -491,6 +506,23 @@ func (d *Downloader) createMagnetLinkWithInfoHash(ctx context.Context, hash *pro return false, nil } +func seedableFiles(snapDir string) ([]string, error) { + files, err := seedableSegmentFiles(snapDir) + if err != nil { + return nil, fmt.Errorf("seedableSegmentFiles: %w", err) + } + files2, err := seedableHistorySnapshots(snapDir, "history") + if err != nil { + return nil, fmt.Errorf("seedableHistorySnapshots: %w", err) + } + files = append(files, files2...) + files2, err = seedableHistorySnapshots(snapDir, "warm") + if err != nil { + return nil, fmt.Errorf("seedableHistorySnapshots: %w", err) + } + files = append(files, files2...) + return files, nil +} func (d *Downloader) addSegments(ctx context.Context) error { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() @@ -498,18 +530,16 @@ func (d *Downloader) addSegments(ctx context.Context) error { if err != nil { return err } - files, err := seedableSegmentFiles(d.SnapDir()) - if err != nil { - return fmt.Errorf("seedableSegmentFiles: %w", err) - } - files2, err := seedableHistorySnapshots(d.SnapDir()) + err = AddTorrentFiles(d.SnapDir(), d.torrentClient) if err != nil { - return fmt.Errorf("seedableHistorySnapshots: %w", err) + return fmt.Errorf("AddTorrentFiles: %w", err) } - files = append(files, files2...) - g, ctx := errgroup.WithContext(ctx) i := atomic.Int64{} + files, err := seedableFiles(d.SnapDir()) + if err != nil { + return err + } for _, f := range files { f := f g.Go(func() error { @@ -518,7 +548,7 @@ func (d *Downloader) addSegments(ctx context.Context) error { return ctx.Err() default: } - _, err := AddSegment(f, d.cfg.DataDir, d.torrentClient) + _, err := AddSegment(f, d.cfg.SnapDir, d.torrentClient) if err != nil { return err } @@ -612,3 +642,86 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio return db, c, m, torrentClient, nil } + +func (d *Downloader) applyWebseeds() { + for _, t := range d.Torrent().Torrents() { + urls, ok := d.webseeds.GetByFileNames()[t.Name()] + if !ok { + continue + } + log.Debug("[downloader] addd webseeds", "file", t.Name()) + t.AddWebSeeds(urls) + } +} + +type WebSeeds struct { + lock sync.Mutex + webSeedsByFilName snaptype.WebSeeds +} + +func (d *WebSeeds) GetByFileNames() snaptype.WebSeeds { + d.lock.Lock() + defer d.lock.Unlock() + return d.webSeedsByFilName +} +func (d *WebSeeds) SetByFileNames(l snaptype.WebSeeds) { + d.lock.Lock() + defer d.lock.Unlock() + d.webSeedsByFilName = l +} + +func (d *WebSeeds) callWebSeedsProvider(ctx context.Context, webSeedProviderUrl *url.URL) (snaptype.WebSeedsFromProvider, error) { + request, err := http.NewRequest(http.MethodGet, webSeedProviderUrl.String(), nil) + if err != nil { + return nil, err + } + request = request.WithContext(ctx) + resp, err := http.DefaultClient.Do(request) + if err != nil { + return nil, err + } + defer resp.Body.Close() + response := snaptype.WebSeedsFromProvider{} + if err := toml.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, err + } + return response, nil +} +func (d *WebSeeds) readWebSeedsFile(webSeedProviderPath string) (snaptype.WebSeedsFromProvider, error) { + data, err := os.ReadFile(webSeedProviderPath) + if err != nil { + return nil, err + } + response := snaptype.WebSeedsFromProvider{} + if err := toml.Unmarshal(data, &response); err != nil { + return nil, err + } + return response, nil +} + +func (d *WebSeeds) Discover(ctx context.Context, urls []*url.URL, files []string) { + list := make([]snaptype.WebSeedsFromProvider, len(urls)+len(files)) + for _, webSeedProviderURL := range urls { + select { + case <-ctx.Done(): + break + default: + } + response, err := d.callWebSeedsProvider(ctx, webSeedProviderURL) + if err != nil { // don't fail on error + log.Warn("[downloader] callWebSeedsProvider", "err", err, "url", webSeedProviderURL.EscapedPath()) + continue + } + list = append(list, response) + } + for _, webSeedFile := range files { + response, err := d.readWebSeedsFile(webSeedFile) + if err != nil { // don't fail on error + _, fileName := filepath.Split(webSeedFile) + log.Warn("[downloader] readWebSeedsFile", "err", err, "file", fileName) + continue + } + list = append(list, response) + } + d.SetByFileNames(snaptype.NewWebSeeds(list)) +} diff --git a/downloader/downloader_grpc_server.go b/downloader/downloader_grpc_server.go index e6e7a5c2e..812fe9648 100644 --- a/downloader/downloader_grpc_server.go +++ b/downloader/downloader_grpc_server.go @@ -47,13 +47,16 @@ type GrpcServer struct { func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.DownloadRequest) (*emptypb.Empty, error) { logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() + defer s.d.applyWebseeds() torrentClient := s.d.Torrent() snapDir := s.d.SnapDir() for i, it := range request.Items { + if it.Path == "" { + return nil, fmt.Errorf("field 'path' is required") + } + select { - case <-ctx.Done(): - return nil, ctx.Err() case <-logEvery.C: log.Info("[snapshots] initializing", "files", fmt.Sprintf("%d/%d", i, len(request.Items))) default: @@ -62,7 +65,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow if it.TorrentHash == nil { // if we dont have the torrent hash then we seed a new snapshot log.Info("[snapshots] seeding a new snapshot") - ok, err := seedNewSnapshot(it, torrentClient, snapDir) + ok, err := seedNewSnapshot(ctx, it.Path, torrentClient, snapDir) if err != nil { return nil, err } @@ -74,7 +77,7 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow continue } - _, err := s.d.createMagnetLinkWithInfoHash(ctx, it.TorrentHash, snapDir) + _, err := s.d.createMagnetLinkWithInfoHash(ctx, it.TorrentHash, it.Path, snapDir) if err != nil { return nil, err } @@ -117,14 +120,19 @@ func Proto2InfoHash(in *prototypes.H160) metainfo.Hash { // decides what we do depending on wether we have the .seg file or the .torrent file // have .torrent no .seg => get .seg file from .torrent // have .seg no .torrent => get .torrent from .seg -func seedNewSnapshot(it *proto_downloader.DownloadItem, torrentClient *torrent.Client, snapDir string) (bool, error) { +func seedNewSnapshot(ctx context.Context, name string, torrentClient *torrent.Client, snapDir string) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } // if we dont have the torrent file we build it if we have the .seg file - if err := buildTorrentIfNeed(it.Path, snapDir); err != nil { + if err := buildTorrentIfNeed(ctx, name, snapDir); err != nil { return false, err } // we add the .seg file we have and create the .torrent file if we dont have it - ok, err := AddSegment(it.Path, snapDir, torrentClient) + ok, err := AddSegment(name, snapDir, torrentClient) if err != nil { return false, fmt.Errorf("AddSegment: %w", err) } @@ -137,5 +145,3 @@ func seedNewSnapshot(it *proto_downloader.DownloadItem, torrentClient *torrent.C // we skip the item in for loop since we build the seg and torrent file here return true, nil } - -// we dont have .seg or .torrent so we get them through the torrent hash diff --git a/downloader/downloadercfg/downloadercfg.go b/downloader/downloadercfg/downloadercfg.go index aa5dbf241..1cdacd12c 100644 --- a/downloader/downloadercfg/downloadercfg.go +++ b/downloader/downloadercfg/downloadercfg.go @@ -19,6 +19,8 @@ package downloadercfg import ( "io/ioutil" "net" + "net/url" + "path/filepath" "runtime" "strings" @@ -26,6 +28,9 @@ import ( lg "github.com/anacrolix/log" "github.com/anacrolix/torrent" "github.com/c2h5oh/datasize" + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/common/dir" "github.com/ledgerwatch/log/v3" "golang.org/x/time/rate" ) @@ -40,8 +45,11 @@ const DefaultPieceSize = 2 * 1024 * 1024 const DefaultNetworkChunkSize = 512 * 1024 type Cfg struct { - *torrent.ClientConfig + ClientConfig *torrent.ClientConfig + SnapDir string DownloadSlots int + WebSeedUrls []*url.URL + WebSeedFiles []string } func Default() *torrent.ClientConfig { @@ -51,7 +59,6 @@ func Default() *torrent.ClientConfig { torrentConfig.NoDHT = true //torrentConfig.DisableTrackers = true //torrentConfig.DisableWebtorrent = true - //torrentConfig.DisableWebseeds = true // Reduce defaults - to avoid peers with very bad geography //torrentConfig.MinDialTimeout = 1 * time.Second // default: 3sec @@ -70,13 +77,14 @@ func Default() *torrent.ClientConfig { return torrentConfig } -func New(snapDir string, version string, verbosity lg.Level, downloadRate, uploadRate datasize.ByteSize, port, connsPerFile, downloadSlots int, staticPeers []string) (*Cfg, error) { +func New(dataDir datadir.Dirs, version string, verbosity lg.Level, downloadRate, uploadRate datasize.ByteSize, port, connsPerFile, downloadSlots int, staticPeers []string, webseeds string) (*Cfg, error) { torrentConfig := Default() + torrentConfig.DataDir = dataDir.Snap // `DataDir` of torrent-client-lib is different from Erigon's `DataDir`. Just same naming. + torrentConfig.ExtendedHandshakeClientVersion = version // We would-like to reduce amount of goroutines in Erigon, so reducing next params torrentConfig.EstablishedConnsPerTorrent = connsPerFile // default: 50 - torrentConfig.DataDir = snapDir torrentConfig.ListenPort = port // check if ipv6 is enabled @@ -94,7 +102,7 @@ func New(snapDir string, version string, verbosity lg.Level, downloadRate, uploa // debug // torrentConfig.Debug = false - torrentConfig.Logger = lg.Default.FilterLevel(verbosity) + torrentConfig.Logger.WithFilterLevel(verbosity) torrentConfig.Logger.Handlers = []lg.Handler{adapterHandler{}} if len(staticPeers) > 0 { @@ -133,7 +141,28 @@ func New(snapDir string, version string, verbosity lg.Level, downloadRate, uploa //staticPeers } - return &Cfg{ClientConfig: torrentConfig, DownloadSlots: downloadSlots}, nil + webseedUrlsOrFiles := common.CliString2Array(webseeds) + webseedUrls := make([]*url.URL, 0, len(webseedUrlsOrFiles)) + webseedFiles := make([]string, 0, len(webseedUrlsOrFiles)) + for _, webseed := range webseedUrlsOrFiles { + uri, err := url.ParseRequestURI(webseed) + if err != nil { + if strings.HasSuffix(webseed, ".toml") && dir.FileExist(webseed) { + webseedFiles = append(webseedFiles, webseed) + } + continue + } + webseedUrls = append(webseedUrls, uri) + } + localCfgFile := filepath.Join(dataDir.DataDir, "webseeds.toml") // datadir/webseeds.toml allowed + if dir.FileExist(localCfgFile) { + webseedFiles = append(webseedFiles, localCfgFile) + } + + return &Cfg{SnapDir: torrentConfig.DataDir, + ClientConfig: torrentConfig, DownloadSlots: downloadSlots, + WebSeedUrls: webseedUrls, WebSeedFiles: webseedFiles, + }, nil } func getIpv6Enabled() bool { diff --git a/downloader/snaptype/webseeds.go b/downloader/snaptype/webseeds.go new file mode 100644 index 000000000..0a7e6a7db --- /dev/null +++ b/downloader/snaptype/webseeds.go @@ -0,0 +1,19 @@ +package snaptype + +import "github.com/anacrolix/torrent/metainfo" + +// Each provider can provide only 1 WebSeed url per file +// but overall BitTorrent protocol allowing multiple +type WebSeedsFromProvider map[string]string // fileName -> Url, can be Http/Ftp + +type WebSeeds map[string]metainfo.UrlList // fileName -> []Url, can be Http/Ftp + +func NewWebSeeds(list []WebSeedsFromProvider) WebSeeds { + merged := WebSeeds{} + for _, m := range list { + for name, wUrl := range m { + merged[name] = append(merged[name], wUrl) + } + } + return merged +} diff --git a/downloader/util.go b/downloader/util.go index 88cd38293..0eb4dbc35 100644 --- a/downloader/util.go +++ b/downloader/util.go @@ -127,13 +127,6 @@ func seedableSegmentFiles(dir string) ([]string, error) { if !snaptype.IsCorrectFileName(f.Name()) { continue } - fileInfo, err := f.Info() - if err != nil { - return nil, err - } - if fileInfo.Size() == 0 { - continue - } if filepath.Ext(f.Name()) != ".seg" { // filter out only compressed files continue } @@ -149,10 +142,10 @@ func seedableSegmentFiles(dir string) ([]string, error) { return res, nil } -var historyFileRegex = regexp.MustCompile("^([[:lower:]]+).([0-9]+)-([0-9]+).(v|ef)$") +var historyFileRegex = regexp.MustCompile("^([[:lower:]]+).([0-9]+)-([0-9]+).(.*)$") -func seedableHistorySnapshots(dir string) ([]string, error) { - historyDir := filepath.Join(dir, "history") +func seedableHistorySnapshots(dir, subDir string) ([]string, error) { + historyDir := filepath.Join(dir, subDir) dir2.MustExist(historyDir) files, err := os.ReadDir(historyDir) if err != nil { @@ -166,13 +159,6 @@ func seedableHistorySnapshots(dir string) ([]string, error) { if !f.Type().IsRegular() { continue } - fileInfo, err := f.Info() - if err != nil { - return nil, err - } - if fileInfo.Size() == 0 { - continue - } ext := filepath.Ext(f.Name()) if ext != ".v" && ext != ".ef" && ext != ".kv" { // filter out only compressed files continue @@ -182,7 +168,7 @@ func seedableHistorySnapshots(dir string) ([]string, error) { if len(subs) != 5 { continue } - + // Check that it's seedable from, err := strconv.ParseUint(subs[2], 10, 64) if err != nil { return nil, fmt.Errorf("ParseFileName: %w", err) @@ -191,15 +177,15 @@ func seedableHistorySnapshots(dir string) ([]string, error) { if err != nil { return nil, fmt.Errorf("ParseFileName: %w", err) } - if to-from != snaptype.Erigon3SeedableSteps { + if (to-from)%snaptype.Erigon3SeedableSteps != 0 { continue } - res = append(res, filepath.Join("history", f.Name())) + res = append(res, filepath.Join(subDir, f.Name())) } return res, nil } -func buildTorrentIfNeed(fName, root string) (err error) { +func buildTorrentIfNeed(ctx context.Context, fName, root string) (err error) { fPath := filepath.Join(root, fName) if dir2.FileExist(fPath + ".torrent") { return @@ -213,7 +199,7 @@ func buildTorrentIfNeed(fName, root string) (err error) { } info.Name = fName - return createTorrentFileFromInfo(root, info, nil) + return CreateTorrentFileFromInfo(root, info, nil) } // AddSegment - add existing .seg file, create corresponding .torrent if need @@ -222,7 +208,11 @@ func AddSegment(originalFileName, snapDir string, client *torrent.Client) (bool, if !dir2.FileExist(fPath + ".torrent") { return false, nil } - _, err := AddTorrentFile(fPath+".torrent", client) + ts, err := loadTorrent(fPath + ".torrent") + if err != nil { + return false, err + } + _, err = AddTorrentFile(ts, client) if err != nil { return false, fmt.Errorf("AddTorrentFile: %w", err) } @@ -234,15 +224,10 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) ([]string, err logEvery := time.NewTicker(20 * time.Second) defer logEvery.Stop() - files, err := seedableSegmentFiles(snapDir) - if err != nil { - return nil, err - } - files2, err := seedableHistorySnapshots(snapDir) + files, err := seedableFiles(snapDir) if err != nil { return nil, err } - files = append(files, files2...) errs := make(chan error, len(files)*2) wg := &sync.WaitGroup{} @@ -258,7 +243,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) ([]string, err defer i.Add(1) defer sem.Release(1) defer wg.Done() - if err := buildTorrentIfNeed(f, snapDir); err != nil { + if err := buildTorrentIfNeed(ctx, f, snapDir); err != nil { errs <- err } @@ -288,17 +273,17 @@ func CreateTorrentFileIfNotExists(root string, info *metainfo.Info, mi *metainfo if dir2.FileExist(fPath + ".torrent") { return nil } - if err := createTorrentFileFromInfo(root, info, mi); err != nil { + if err := CreateTorrentFileFromInfo(root, info, mi); err != nil { return err } return nil } -func createTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error { +func CreateMetaInfo(info *metainfo.Info, mi *metainfo.MetaInfo) (*metainfo.MetaInfo, error) { if mi == nil { infoBytes, err := bencode.Marshal(info) if err != nil { - return err + return nil, err } mi = &metainfo.MetaInfo{ CreationDate: time.Now().Unix(), @@ -309,8 +294,10 @@ func createTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.Me } else { mi.AnnounceList = Trackers } + return mi, nil +} +func CreateTorrentFromMetaInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo) error { torrentFileName := filepath.Join(root, info.Name+".torrent") - file, err := os.Create(torrentFileName) if err != nil { return err @@ -322,28 +309,82 @@ func createTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.Me file.Sync() return nil } +func CreateTorrentFileFromInfo(root string, info *metainfo.Info, mi *metainfo.MetaInfo) (err error) { + mi, err = CreateMetaInfo(info, mi) + if err != nil { + return err + } + return CreateTorrentFromMetaInfo(root, info, mi) +} -// nolint -func segmentFileNameFromTorrentFileName(in string) string { - ext := filepath.Ext(in) - return in[0 : len(in)-len(ext)] +func AddTorrentFiles(snapDir string, torrentClient *torrent.Client) error { + files, err := allTorrentFiles(snapDir) + if err != nil { + return err + } + for _, ts := range files { + _, err := AddTorrentFile(ts, torrentClient) + if err != nil { + return err + } + } + + return nil } -// AddTorrentFile - adding .torrent file to torrentClient (and checking their hashes), if .torrent file -// added first time - pieces verification process will start (disk IO heavy) - Progress -// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again. -// Don't need call torrent.VerifyData manually -func AddTorrentFile(torrentFilePath string, torrentClient *torrent.Client) (*torrent.Torrent, error) { - mi, err := metainfo.LoadFromFile(torrentFilePath) +func allTorrentFiles(snapDir string) (res []*torrent.TorrentSpec, err error) { + res, err = torrentInDir(snapDir) if err != nil { return nil, err } - mi.AnnounceList = Trackers - ts, err := torrent.TorrentSpecFromMetaInfoErr(mi) + res2, err := torrentInDir(filepath.Join(snapDir, "history")) if err != nil { return nil, err } + res = append(res, res2...) + res2, err = torrentInDir(filepath.Join(snapDir, "warm")) + if err != nil { + return nil, err + } + res = append(res, res2...) + return res, nil +} +func torrentInDir(snapDir string) (res []*torrent.TorrentSpec, err error) { + files, err := os.ReadDir(snapDir) + if err != nil { + return nil, err + } + for _, f := range files { + if f.IsDir() || !f.Type().IsRegular() { + continue + } + if filepath.Ext(f.Name()) != ".torrent" { // filter out only compressed files + continue + } + + a, err := loadTorrent(filepath.Join(snapDir, f.Name())) + if err != nil { + return nil, err + } + res = append(res, a) + } + return res, nil +} + +func loadTorrent(torrentFilePath string) (*torrent.TorrentSpec, error) { + mi, err := metainfo.LoadFromFile(torrentFilePath) + if err != nil { + return nil, fmt.Errorf("LoadFromFile: %w, file=%s", err, torrentFilePath) + } + mi.AnnounceList = Trackers + return torrent.TorrentSpecFromMetaInfoErr(mi) +} +// AddTorrentFile - adding .torrent file to torrentClient (and checking their hashes), if .torrent file +// added first time - pieces verification process will start (disk IO heavy) - Progress +// kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again. +// Don't need call torrent.VerifyData manually +func AddTorrentFile(ts *torrent.TorrentSpec, torrentClient *torrent.Client) (*torrent.Torrent, error) { if _, ok := torrentClient.Torrent(ts.InfoHash); !ok { // can set ChunkSize only for new torrents ts.ChunkSize = downloadercfg.DefaultNetworkChunkSize } else { diff --git a/go.mod b/go.mod index 0d74ff6d8..a99a43743 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/holiman/uint256 v1.2.3 github.com/matryer/moq v0.3.2 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 + github.com/pelletier/go-toml/v2 v2.1.0 github.com/quasilyte/go-ruleguard/dsl v0.3.22 github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index cb7bdc001..f37f81e07 100644 --- a/go.sum +++ b/go.sum @@ -277,6 +277,8 @@ github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= +github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= +github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pion/datachannel v1.5.2 h1:piB93s8LGmbECrpO84DnkIVWasRMk3IimbcXkTQLE6E=