diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0394b71f0..485d313d0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -50,6 +50,7 @@ jobs: uses: golangci/golangci-lint-action@v3 with: version: v1.54 + skip-build-cache: true - name: Lint source code licenses if: matrix.os == 'ubuntu-20.04' diff --git a/.golangci.yml b/.golangci.yml index a227f0e75..ffbb5f792 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,5 +1,8 @@ run: deadline: 10m + build-tags: + - nosqlite + - noboltdb linters: presets: diff --git a/common/dbg/experiments.go b/common/dbg/experiments.go index 26aafa733..fb50a4df8 100644 --- a/common/dbg/experiments.go +++ b/common/dbg/experiments.go @@ -313,3 +313,35 @@ func NoPrune() bool { }) return noPrune } + +var ( + snMadvNormal bool + snMadvNormalOnce sync.Once +) + +func SnMadvNormal() bool { + snMadvNormalOnce.Do(func() { + v, _ := os.LookupEnv("SN_MADV_NORMAL") + if v == "true" { + snMadvNormal = true + log.Info("[Experiment]", "SN_MADV_NORMAL", snMadvNormal) + } + }) + return snMadvNormal +} + +var ( + mdbxLockInRam bool + mdbxLockInRamOnce sync.Once +) + +func MdbxLockInRam() bool { + mdbxLockInRamOnce.Do(func() { + v, _ := os.LookupEnv("MDBX_LOCK_IN_RAM") + if v == "true" { + mdbxLockInRam = true + log.Info("[Experiment]", "MDBX_LOCK_IN_RAM", mdbxLockInRam) + } + }) + return mdbxLockInRam +} diff --git a/compress/decompress.go b/compress/decompress.go index fba8938e4..3877b08b0 100644 --- a/compress/decompress.go +++ b/compress/decompress.go @@ -377,7 +377,11 @@ func (d *Decompressor) DisableReadAhead() { } leftReaders := d.readAheadRefcnt.Add(-1) if leftReaders == 0 { - _ = mmap.MadviseRandom(d.mmapHandle1) + if dbg.SnMadvNormal() { + _ = mmap.MadviseNormal(d.mmapHandle1) + } else { + _ = mmap.MadviseRandom(d.mmapHandle1) + } } else if leftReaders < 0 { log.Warn("read-ahead negative counter", "file", d.FileName()) } diff --git a/downloader/downloader.go b/downloader/downloader.go index 6d55c7be3..d9cf831d7 100644 --- a/downloader/downloader.go +++ b/downloader/downloader.go @@ -51,14 +51,15 @@ type Downloader struct { db kv.RwDB pieceCompletionDB storage.PieceCompletion torrentClient *torrent.Client - clientLock *sync.RWMutex cfg *downloadercfg.Cfg statsLock *sync.RWMutex stats AggStats - folder storage.ClientImplCloser + folder storage.ClientImplCloser + + ctx context.Context stopMainLoop context.CancelFunc wg sync.WaitGroup @@ -118,13 +119,12 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) { pieceCompletionDB: c, folder: m, torrentClient: torrentClient, - clientLock: &sync.RWMutex{}, - - statsLock: &sync.RWMutex{}, - - webseeds: &WebSeeds{}, + statsLock: &sync.RWMutex{}, + webseeds: &WebSeeds{}, } - if err := d.addSegments(ctx); err != nil { + d.ctx, d.stopMainLoop = context.WithCancel(ctx) + + if err := d.addSegments(d.ctx); err != nil { return nil, err } // CornerCase: no peers -> no anoncments to trackers -> no magnetlink resolution (but magnetlink has filename) @@ -132,18 +132,17 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg) (*Downloader, error) { d.wg.Add(1) go func() { defer d.wg.Done() - d.webseeds.Discover(ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles) + d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles) d.applyWebseeds() }() return d, nil } -func (d *Downloader) MainLoopInBackground(ctx context.Context, silent bool) { - ctx, d.stopMainLoop = context.WithCancel(ctx) +func (d *Downloader) MainLoopInBackground(silent bool) { d.wg.Add(1) go func() { defer d.wg.Done() - if err := d.mainLoop(ctx, silent); err != nil { + if err := d.mainLoop(silent); err != nil { if !errors.Is(err, context.Canceled) { log.Warn("[snapshots]", "err", err) } @@ -151,7 +150,7 @@ func (d *Downloader) MainLoopInBackground(ctx context.Context, silent bool) { }() } -func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { +func (d *Downloader) mainLoop(silent bool) error { var sem = semaphore.NewWeighted(int64(d.cfg.DownloadSlots)) d.wg.Add(1) @@ -162,13 +161,13 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { torrentMap := map[metainfo.Hash]struct{}{} // First loop drops torrents that were downloaded or are already complete // This improves efficiency of download by reducing number of active torrent (empirical observation) - for torrents := d.Torrent().Torrents(); len(torrents) > 0; torrents = d.Torrent().Torrents() { + for torrents := d.torrentClient.Torrents(); len(torrents) > 0; torrents = d.torrentClient.Torrents() { for _, t := range torrents { if _, already := torrentMap[t.InfoHash()]; already { continue } select { - case <-ctx.Done(): + case <-d.ctx.Done(): return case <-t.GotInfo(): } @@ -179,7 +178,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { torrentMap[t.InfoHash()] = struct{}{} continue } - if err := sem.Acquire(ctx, 1); err != nil { + if err := sem.Acquire(d.ctx, 1); err != nil { return } t.AllowDataDownload() @@ -190,7 +189,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { defer d.wg.Done() defer sem.Release(1) select { - case <-ctx.Done(): + case <-d.ctx.Done(): return case <-t.Complete.On(): } @@ -202,16 +201,16 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { } atomic.StoreUint64(&d.stats.DroppedCompleted, 0) atomic.StoreUint64(&d.stats.DroppedTotal, 0) - d.addSegments(ctx) + d.addSegments(d.ctx) maps.Clear(torrentMap) for { - torrents := d.Torrent().Torrents() + torrents := d.torrentClient.Torrents() for _, t := range torrents { if _, already := torrentMap[t.InfoHash()]; already { continue } select { - case <-ctx.Done(): + case <-d.ctx.Done(): return case <-t.GotInfo(): } @@ -219,7 +218,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { torrentMap[t.InfoHash()] = struct{}{} continue } - if err := sem.Acquire(ctx, 1); err != nil { + if err := sem.Acquire(d.ctx, 1); err != nil { return } t.AllowDataDownload() @@ -230,7 +229,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { defer d.wg.Done() defer sem.Release(1) select { - case <-ctx.Done(): + case <-d.ctx.Done(): return case <-t.Complete.On(): } @@ -250,8 +249,8 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { justCompleted := true for { select { - case <-ctx.Done(): - return ctx.Err() + case <-d.ctx.Done(): + return d.ctx.Err() case <-statEvery.C: d.ReCalcStats(statInterval) @@ -266,7 +265,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { if justCompleted { justCompleted = false // force fsync of db. to not loose results of downloading on power-off - _ = d.db.Update(ctx, func(tx kv.RwTx) error { return nil }) + _ = d.db.Update(d.ctx, func(tx kv.RwTx) error { return nil }) } log.Info("[snapshots] Seeding", @@ -286,7 +285,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { "files", stats.FilesTotal) if stats.PeersUnique == 0 { - ips := d.Torrent().BadPeerIPs() + ips := d.TorrentClient().BadPeerIPs() if len(ips) > 0 { log.Info("[snapshots] Stats", "banned", ips) } @@ -295,11 +294,7 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) error { } } -func (d *Downloader) SnapDir() string { - d.clientLock.RLock() - defer d.clientLock.RUnlock() - return d.cfg.SnapDir -} +func (d *Downloader) SnapDir() string { return d.cfg.SnapDir } func (d *Downloader) ReCalcStats(interval time.Duration) { //Call this methods outside of `statsLock` critical section, because they have own locks with contention @@ -366,7 +361,10 @@ func moveFromTmp(snapDir string) error { return err } for _, p := range paths { - if p.Name() == "." || p.Name() == ".." || p.Name() == "tmp" { + if p.IsDir() || !p.Type().IsRegular() { + continue + } + if p.Name() == "tmp" { continue } src := filepath.Join(tmpDir, p.Name()) @@ -446,7 +444,7 @@ func (d *Downloader) VerifyData(ctx context.Context) error { g, ctx := errgroup.WithContext(ctx) // torrent lib internally limiting amount of hashers per file // set limit here just to make load predictable, not to control Disk/CPU consumption - g.SetLimit(runtime.GOMAXPROCS(-1) * 2) + g.SetLimit(runtime.GOMAXPROCS(-1) * 4) for _, t := range d.torrentClient.Torrents() { t := t @@ -460,14 +458,49 @@ func (d *Downloader) VerifyData(ctx context.Context) error { return d.db.Update(context.Background(), func(tx kv.RwTx) error { return nil }) } -func (d *Downloader) AddInfoHashAsMagnetLink(ctx context.Context, infoHash metainfo.Hash, name string) error { - mi := &metainfo.MetaInfo{AnnounceList: Trackers} - //log.Debug("[downloader] downloading torrent and seg file", "hash", infoHash) +// AddNewSeedableFile decides what we do depending on wether we have the .seg file or the .torrent file +// have .torrent no .seg => get .seg file from .torrent +// have .seg no .torrent => get .torrent from .seg +func (d *Downloader) AddNewSeedableFile(ctx context.Context, name string) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // if we don't have the torrent file we build it if we have the .seg file + torrentFilePath, err := BuildTorrentIfNeed(ctx, name, d.SnapDir()) + if err != nil { + return err + } + ts, err := loadTorrent(torrentFilePath) + if err != nil { + return err + } + _, err = addTorrentFile(ts, d.torrentClient) + if err != nil { + return fmt.Errorf("addTorrentFile: %w", err) + } + return nil +} - if _, ok := d.torrentClient.Torrent(infoHash); ok { - //log.Debug("[downloader] torrent client related to hash found", "hash", infoHash) +func (d *Downloader) exists(name string) bool { + // Paranoic Mode on: if same file changed infoHash - skip it + // use-cases: + // - release of re-compressed version of same file, + // - ErigonV1.24 produced file X, then ErigonV1.25 released with new compression algorithm and produced X with anouther infoHash. + // ErigonV1.24 node must keep using existing file instead of downloading new one. + for _, t := range d.torrentClient.Torrents() { + if t.Name() == name { + return true + } + } + return false +} +func (d *Downloader) AddInfoHashAsMagnetLink(ctx context.Context, infoHash metainfo.Hash, name string) error { + if d.exists(name) { return nil } + mi := &metainfo.MetaInfo{AnnounceList: Trackers} magnet := mi.Magnet(&infoHash, &metainfo.Info{Name: name}) t, err := d.torrentClient.AddMagnet(magnet.String()) @@ -514,50 +547,11 @@ func seedableFiles(snapDir string) ([]string, error) { return files, nil } func (d *Downloader) addSegments(ctx context.Context) error { - logEvery := time.NewTicker(20 * time.Second) - defer logEvery.Stop() - _, err := BuildTorrentFilesIfNeed(context.Background(), d.SnapDir()) + _, err := BuildTorrentFilesIfNeed(ctx, d.SnapDir()) if err != nil { return err } - err = AddTorrentFiles(d.SnapDir(), d.torrentClient) - if err != nil { - return fmt.Errorf("AddTorrentFiles: %w", err) - } - g, ctx := errgroup.WithContext(ctx) - i := atomic.Int64{} - files, err := seedableFiles(d.SnapDir()) - if err != nil { - return err - } - for _, f := range files { - f := f - g.Go(func() error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - _, err := AddSegment(f, d.cfg.SnapDir, d.torrentClient) - if err != nil { - return err - } - - i.Add(1) - select { - case <-ctx.Done(): - return ctx.Err() - case <-logEvery.C: - log.Info("[snpshots] initializing", "files", fmt.Sprintf("%d/%d", i.Load(), len(files))) - default: - } - return nil - }) - } - if err := g.Wait(); err != nil { - return err - } - return nil + return AddTorrentFiles(d.SnapDir(), d.torrentClient) } func (d *Downloader) Stats() AggStats { @@ -595,11 +589,7 @@ func (d *Downloader) StopSeeding(hash metainfo.Hash) error { return nil } -func (d *Downloader) Torrent() *torrent.Client { - d.clientLock.RLock() - defer d.clientLock.RUnlock() - return d.torrentClient -} +func (d *Downloader) TorrentClient() *torrent.Client { return d.torrentClient } func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletion, m storage.ClientImplCloser, torrentClient *torrent.Client, err error) { snapDir := cfg.DataDir @@ -610,7 +600,7 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio Path(filepath.Join(snapDir, "db")). Open() if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, fmt.Errorf("torrentcfg.openClient: %w", err) } c, err = NewMdbxPieceCompletion(db) if err != nil { @@ -634,7 +624,7 @@ func openClient(cfg *torrent.ClientConfig) (db kv.RwDB, c storage.PieceCompletio } func (d *Downloader) applyWebseeds() { - for _, t := range d.Torrent().Torrents() { + for _, t := range d.TorrentClient().Torrents() { urls, ok := d.webseeds.GetByFileNames()[t.Name()] if !ok { continue diff --git a/downloader/downloader_grpc_server.go b/downloader/downloader_grpc_server.go index 9dfc524c8..bb50349ff 100644 --- a/downloader/downloader_grpc_server.go +++ b/downloader/downloader_grpc_server.go @@ -21,7 +21,6 @@ import ( "fmt" "time" - "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/metainfo" "github.com/ledgerwatch/erigon-lib/gointerfaces" proto_downloader "github.com/ledgerwatch/erigon-lib/gointerfaces/downloader" @@ -49,8 +48,6 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow defer logEvery.Stop() defer s.d.applyWebseeds() - torrentClient := s.d.Torrent() - snapDir := s.d.SnapDir() for i, it := range request.Items { if it.Path == "" { return nil, fmt.Errorf("field 'path' is required") @@ -63,22 +60,14 @@ func (s *GrpcServer) Download(ctx context.Context, request *proto_downloader.Dow } if it.TorrentHash == nil { - // if we dont have the torrent hash then we seed a new snapshot - log.Info("[snapshots] seeding a new snapshot") - ok, err := seedNewSnapshot(ctx, it.Path, torrentClient, snapDir) - if err != nil { + // if we don't have the torrent hash then we seed a new snapshot + if err := s.d.AddNewSeedableFile(ctx, it.Path); err != nil { return nil, err } - if ok { - log.Debug("[snapshots] already have both seg and torrent file") - } else { - log.Warn("[snapshots] didn't get the seg or the torrent file") - } continue } - err := s.d.AddInfoHashAsMagnetLink(ctx, Proto2InfoHash(it.TorrentHash), it.Path) - if err != nil { + if err := s.d.AddInfoHashAsMagnetLink(ctx, Proto2InfoHash(it.TorrentHash), it.Path); err != nil { return nil, err } } @@ -116,32 +105,3 @@ func (s *GrpcServer) Stats(ctx context.Context, request *proto_downloader.StatsR func Proto2InfoHash(in *prototypes.H160) metainfo.Hash { return gointerfaces.ConvertH160toAddress(in) } - -// decides what we do depending on wether we have the .seg file or the .torrent file -// have .torrent no .seg => get .seg file from .torrent -// have .seg no .torrent => get .torrent from .seg -func seedNewSnapshot(ctx context.Context, name string, torrentClient *torrent.Client, snapDir string) (bool, error) { - select { - case <-ctx.Done(): - return false, ctx.Err() - default: - } - // if we dont have the torrent file we build it if we have the .seg file - if err := buildTorrentIfNeed(ctx, name, snapDir); err != nil { - return false, err - } - - // we add the .seg file we have and create the .torrent file if we dont have it - ok, err := AddSegment(name, snapDir, torrentClient) - if err != nil { - return false, fmt.Errorf("AddSegment: %w", err) - } - - // torrent file does exist and seg - if !ok { - return false, nil - } - - // we skip the item in for loop since we build the seg and torrent file here - return true, nil -} diff --git a/downloader/downloader_test.go b/downloader/downloader_test.go new file mode 100644 index 000000000..2fe045997 --- /dev/null +++ b/downloader/downloader_test.go @@ -0,0 +1,58 @@ +package downloader + +import ( + "context" + lg "github.com/anacrolix/log" + "github.com/ledgerwatch/erigon-lib/common/datadir" + downloadercfg2 "github.com/ledgerwatch/erigon-lib/downloader/downloadercfg" + "github.com/ledgerwatch/erigon-lib/downloader/snaptype" + "github.com/stretchr/testify/require" + "path/filepath" + "testing" +) + +func TestChangeInfoHashOfSameFile(t *testing.T) { + require := require.New(t) + dirs := datadir.New(t.TempDir()) + cfg, err := downloadercfg2.New(dirs, "", lg.Info, 0, 0, 0, 0, 0, nil, "") + require.NoError(err) + d, err := New(context.Background(), cfg) + require.NoError(err) + defer d.Close() + err = d.AddInfoHashAsMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg") + require.NoError(err) + tt, ok := d.torrentClient.Torrent(snaptype.Hex2InfoHash("aa")) + require.True(ok) + require.Equal("a.seg", tt.Name()) + + // adding same file twice is ok + err = d.AddInfoHashAsMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg") + require.NoError(err) + + // adding same file with another infoHash - is ok, must be skipped + // use-cases: + // - release of re-compressed version of same file, + // - ErigonV1.24 produced file X, then ErigonV1.25 released with new compression algorithm and produced X with anouther infoHash. + // ErigonV1.24 node must keep using existing file instead of downloading new one. + err = d.AddInfoHashAsMagnetLink(d.ctx, snaptype.Hex2InfoHash("bb"), "a.seg") + require.NoError(err) + tt, ok = d.torrentClient.Torrent(snaptype.Hex2InfoHash("aa")) + require.True(ok) + require.Equal("a.seg", tt.Name()) + + // allow adding files only if they are insidesnapshots dir + _, err = BuildTorrentIfNeed(d.ctx, "a.seg", dirs.Snap) + require.NoError(err) + _, err = BuildTorrentIfNeed(d.ctx, "b/a.seg", dirs.Snap) + require.NoError(err) + _, err = BuildTorrentIfNeed(d.ctx, filepath.Join(dirs.Snap, "a.seg"), dirs.Snap) + require.NoError(err) + _, err = BuildTorrentIfNeed(d.ctx, filepath.Join(dirs.Snap, "b", "a.seg"), dirs.Snap) + require.NoError(err) + + // reject escaping snapshots dir + _, err = BuildTorrentIfNeed(d.ctx, filepath.Join(dirs.Chaindata, "b", "a.seg"), dirs.Snap) + require.Error(err) + _, err = BuildTorrentIfNeed(d.ctx, "./../a.seg", dirs.Snap) + require.Error(err) +} diff --git a/downloader/path.go b/downloader/path.go new file mode 100644 index 000000000..06ba51865 --- /dev/null +++ b/downloader/path.go @@ -0,0 +1,272 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package filepath implements utility routines for manipulating filename paths +// in a way compatible with the target operating system-defined file paths. +// +// The filepath package uses either forward slashes or backslashes, +// depending on the operating system. To process paths such as URLs +// that always use forward slashes regardless of the operating +// system, see the path package. +package downloader + +import ( + "io/fs" + "os" + "runtime" + "strings" +) + +// A lazybuf is a lazily constructed path buffer. +// It supports append, reading previously appended bytes, +// and retrieving the final string. It does not allocate a buffer +// to hold the output until that output diverges from s. +type lazybuf struct { + path string + buf []byte + w int + volAndPath string + volLen int +} + +func (b *lazybuf) index(i int) byte { + if b.buf != nil { + return b.buf[i] + } + return b.path[i] +} + +func (b *lazybuf) append(c byte) { + if b.buf == nil { + if b.w < len(b.path) && b.path[b.w] == c { + b.w++ + return + } + b.buf = make([]byte, len(b.path)) + copy(b.buf, b.path[:b.w]) + } + b.buf[b.w] = c + b.w++ +} + +func (b *lazybuf) string() string { + if b.buf == nil { + return b.volAndPath[:b.volLen+b.w] + } + return b.volAndPath[:b.volLen] + string(b.buf[:b.w]) +} + +const ( + Separator = os.PathSeparator + ListSeparator = os.PathListSeparator +) + +// Clean returns the shortest path name equivalent to path +// by purely lexical processing. It applies the following rules +// iteratively until no further processing can be done: +// +// 1. Replace multiple Separator elements with a single one. +// 2. Eliminate each . path name element (the current directory). +// 3. Eliminate each inner .. path name element (the parent directory) +// along with the non-.. element that precedes it. +// 4. Eliminate .. elements that begin a rooted path: +// that is, replace "/.." by "/" at the beginning of a path, +// assuming Separator is '/'. +// +// The returned path ends in a slash only if it represents a root directory, +// such as "/" on Unix or `C:\` on Windows. +// +// Finally, any occurrences of slash are replaced by Separator. +// +// If the result of this process is an empty string, Clean +// returns the string ".". +// +// See also Rob Pike, “Lexical File Names in Plan 9 or +// Getting Dot-Dot Right,” +// https://9p.io/sys/doc/lexnames.html +func Clean(path string) string { + originalPath := path + volLen := volumeNameLen(path) + path = path[volLen:] + if path == "" { + if volLen > 1 && os.IsPathSeparator(originalPath[0]) && os.IsPathSeparator(originalPath[1]) { + // should be UNC + return FromSlash(originalPath) + } + return originalPath + "." + } + rooted := os.IsPathSeparator(path[0]) + + // Invariants: + // reading from path; r is index of next byte to process. + // writing to buf; w is index of next byte to write. + // dotdot is index in buf where .. must stop, either because + // it is the leading slash or it is a leading ../../.. prefix. + n := len(path) + out := lazybuf{path: path, volAndPath: originalPath, volLen: volLen} + r, dotdot := 0, 0 + if rooted { + out.append(Separator) + r, dotdot = 1, 1 + } + + for r < n { + switch { + case os.IsPathSeparator(path[r]): + // empty path element + r++ + case path[r] == '.' && (r+1 == n || os.IsPathSeparator(path[r+1])): + // . element + r++ + case path[r] == '.' && path[r+1] == '.' && (r+2 == n || os.IsPathSeparator(path[r+2])): + // .. element: remove to last separator + r += 2 + switch { + case out.w > dotdot: + // can backtrack + out.w-- + for out.w > dotdot && !os.IsPathSeparator(out.index(out.w)) { + out.w-- + } + case !rooted: + // cannot backtrack, but not rooted, so append .. element. + if out.w > 0 { + out.append(Separator) + } + out.append('.') + out.append('.') + dotdot = out.w + } + default: + // real path element. + // add slash if needed + if rooted && out.w != 1 || !rooted && out.w != 0 { + out.append(Separator) + } + // If a ':' appears in the path element at the start of a Windows path, + // insert a .\ at the beginning to avoid converting relative paths + // like a/../c: into c:. + if runtime.GOOS == "windows" && out.w == 0 && out.volLen == 0 && r != 0 { + for i := r; i < n && !os.IsPathSeparator(path[i]); i++ { + if path[i] == ':' { + out.append('.') + out.append(Separator) + break + } + } + } + // copy element + for ; r < n && !os.IsPathSeparator(path[r]); r++ { + out.append(path[r]) + } + } + } + + // Turn empty string into "." + if out.w == 0 { + out.append('.') + } + + return FromSlash(out.string()) +} + +func unixIsLocal(path string) bool { + if IsAbs(path) || path == "" { + return false + } + hasDots := false + for p := path; p != ""; { + var part string + part, p, _ = strings.Cut(p, "/") + if part == "." || part == ".." { + hasDots = true + break + } + } + if hasDots { + path = Clean(path) + } + if path == ".." || strings.HasPrefix(path, "../") { + return false + } + return true +} + +// FromSlash returns the result of replacing each slash ('/') character +// in path with a separator character. Multiple slashes are replaced +// by multiple separators. +func FromSlash(path string) string { + if Separator == '/' { + return path + } + return strings.ReplaceAll(path, "/", string(Separator)) +} + +// Join joins any number of path elements into a single path, +// separating them with an OS specific Separator. Empty elements +// are ignored. The result is Cleaned. However, if the argument +// list is empty or all its elements are empty, Join returns +// an empty string. +// On Windows, the result will only be a UNC path if the first +// non-empty element is a UNC path. +func Join(elem ...string) string { + return join(elem) +} + +// nolint +func unixAbs(path string) (string, error) { + if IsAbs(path) { + return Clean(path), nil + } + wd, err := os.Getwd() + if err != nil { + return "", err + } + return Join(wd, path), nil +} + +// SkipDir is used as a return value from WalkFuncs to indicate that +// the directory named in the call is to be skipped. It is not returned +// as an error by any function. +var SkipDir error = fs.SkipDir + +// WalkFunc is the type of the function called by Walk to visit each +// file or directory. +// +// The path argument contains the argument to Walk as a prefix. +// That is, if Walk is called with root argument "dir" and finds a file +// named "a" in that directory, the walk function will be called with +// argument "dir/a". +// +// The directory and file are joined with Join, which may clean the +// directory name: if Walk is called with the root argument "x/../dir" +// and finds a file named "a" in that directory, the walk function will +// be called with argument "dir/a", not "x/../dir/a". +// +// The info argument is the fs.FileInfo for the named path. +// +// The error result returned by the function controls how Walk continues. +// If the function returns the special value SkipDir, Walk skips the +// current directory (path if info.IsDir() is true, otherwise path's +// parent directory). If the function returns the special value SkipAll, +// Walk skips all remaining files and directories. Otherwise, if the function +// returns a non-nil error, Walk stops entirely and returns that error. +// +// The err argument reports an error related to path, signaling that Walk +// will not walk into that directory. The function can decide how to +// handle that error; as described earlier, returning the error will +// cause Walk to stop walking the entire tree. +// +// Walk calls the function with a non-nil err argument in two cases. +// +// First, if an os.Lstat on the root directory or any directory or file +// in the tree fails, Walk calls the function with path set to that +// directory or file's path, info set to nil, and err set to the error +// from os.Lstat. +// +// Second, if a directory's Readdirnames method fails, Walk calls the +// function with path set to the directory's path, info, set to an +// fs.FileInfo describing the directory, and err set to the error from +// Readdirnames. +type WalkFunc func(path string, info fs.FileInfo, err error) error diff --git a/downloader/path_plan9.go b/downloader/path_plan9.go new file mode 100644 index 000000000..01cb95fc9 --- /dev/null +++ b/downloader/path_plan9.go @@ -0,0 +1,55 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package downloader + +import "strings" + +func isLocal(path string) bool { + return unixIsLocal(path) +} + +// IsAbs reports whether the path is absolute. +func IsAbs(path string) bool { + return strings.HasPrefix(path, "/") || strings.HasPrefix(path, "#") +} + +// volumeNameLen returns length of the leading volume name on Windows. +// It returns 0 elsewhere. +func volumeNameLen(path string) int { + return 0 +} + +// HasPrefix exists for historical compatibility and should not be used. +// +// Deprecated: HasPrefix does not respect path boundaries and +// does not ignore case when required. +func HasPrefix(p, prefix string) bool { + return strings.HasPrefix(p, prefix) +} + +func splitList(path string) []string { + if path == "" { + return []string{} + } + return strings.Split(path, string(ListSeparator)) +} + +func abs(path string) (string, error) { + return unixAbs(path) +} + +func join(elem []string) string { + // If there's a bug here, fix the logic in ./path_unix.go too. + for i, e := range elem { + if e != "" { + return Clean(strings.Join(elem[i:], string(Separator))) + } + } + return "" +} + +func sameWord(a, b string) bool { + return a == b +} diff --git a/downloader/path_unix.go b/downloader/path_unix.go new file mode 100644 index 000000000..cb3b3bcd4 --- /dev/null +++ b/downloader/path_unix.go @@ -0,0 +1,34 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build unix || (js && wasm) + +package downloader + +import "strings" + +func isLocal(path string) bool { + return unixIsLocal(path) +} + +// IsAbs reports whether the path is absolute. +func IsAbs(path string) bool { + return strings.HasPrefix(path, "/") +} + +// volumeNameLen returns length of the leading volume name on Windows. +// It returns 0 elsewhere. +func volumeNameLen(path string) int { + return 0 +} + +func join(elem []string) string { + // If there's a bug here, fix the logic in ./path_plan9.go too. + for i, e := range elem { + if e != "" { + return Clean(strings.Join(elem[i:], string(Separator))) + } + } + return "" +} diff --git a/downloader/path_windows.go b/downloader/path_windows.go new file mode 100644 index 000000000..f5f4a01d9 --- /dev/null +++ b/downloader/path_windows.go @@ -0,0 +1,306 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package downloader + +import ( + "strings" + "syscall" +) + +func isSlash(c uint8) bool { + return c == '\\' || c == '/' +} + +func toUpper(c byte) byte { + if 'a' <= c && c <= 'z' { + return c - ('a' - 'A') + } + return c +} + +// isReservedName reports if name is a Windows reserved device name or a console handle. +// It does not detect names with an extension, which are also reserved on some Windows versions. +// +// For details, search for PRN in +// https://docs.microsoft.com/en-us/windows/desktop/fileio/naming-a-file. +func isReservedName(name string) bool { + if 3 <= len(name) && len(name) <= 4 { + switch string([]byte{toUpper(name[0]), toUpper(name[1]), toUpper(name[2])}) { + case "CON", "PRN", "AUX", "NUL": + return len(name) == 3 + case "COM", "LPT": + return len(name) == 4 && '1' <= name[3] && name[3] <= '9' + } + } + // Passing CONIN$ or CONOUT$ to CreateFile opens a console handle. + // https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea#consoles + // + // While CONIN$ and CONOUT$ aren't documented as being files, + // they behave the same as CON. For example, ./CONIN$ also opens the console input. + if len(name) == 6 && name[5] == '$' && strings.EqualFold(name, "CONIN$") { + return true + } + if len(name) == 7 && name[6] == '$' && strings.EqualFold(name, "CONOUT$") { + return true + } + return false +} + +func isLocal(path string) bool { + if path == "" { + return false + } + if isSlash(path[0]) { + // Path rooted in the current drive. + return false + } + if strings.IndexByte(path, ':') >= 0 { + // Colons are only valid when marking a drive letter ("C:foo"). + // Rejecting any path with a colon is conservative but safe. + return false + } + hasDots := false // contains . or .. path elements + for p := path; p != ""; { + var part string + part, p, _ = cutPath(p) + if part == "." || part == ".." { + hasDots = true + } + // Trim the extension and look for a reserved name. + base, _, hasExt := strings.Cut(part, ".") + if isReservedName(base) { + if !hasExt { + return false + } + // The path element is a reserved name with an extension. Some Windows + // versions consider this a reserved name, while others do not. Use + // FullPath to see if the name is reserved. + // + // FullPath will convert references to reserved device names to their + // canonical form: \\.\${DEVICE_NAME} + // + // FullPath does not perform this conversion for paths which contain + // a reserved device name anywhere other than in the last element, + // so check the part rather than the full path. + if p, _ := syscall.FullPath(part); len(p) >= 4 && p[:4] == `\\.\` { + return false + } + } + } + if hasDots { + path = Clean(path) + } + if path == ".." || strings.HasPrefix(path, `..\`) { + return false + } + return true +} + +// IsAbs reports whether the path is absolute. +func IsAbs(path string) (b bool) { + l := volumeNameLen(path) + if l == 0 { + return false + } + // If the volume name starts with a double slash, this is an absolute path. + if isSlash(path[0]) && isSlash(path[1]) { + return true + } + path = path[l:] + if path == "" { + return false + } + return isSlash(path[0]) +} + +// volumeNameLen returns length of the leading volume name on Windows. +// It returns 0 elsewhere. +// +// See: https://learn.microsoft.com/en-us/dotnet/standard/io/file-path-formats +func volumeNameLen(path string) int { + if len(path) < 2 { + return 0 + } + // with drive letter + c := path[0] + if path[1] == ':' && ('a' <= c && c <= 'z' || 'A' <= c && c <= 'Z') { + return 2 + } + // UNC and DOS device paths start with two slashes. + if !isSlash(path[0]) || !isSlash(path[1]) { + return 0 + } + rest := path[2:] + p1, rest, _ := cutPath(rest) + p2, rest, ok := cutPath(rest) + if !ok { + return len(path) + } + if p1 != "." && p1 != "?" { + // This is a UNC path: \\${HOST}\${SHARE}\ + return len(path) - len(rest) - 1 + } + // This is a DOS device path. + if len(p2) == 3 && toUpper(p2[0]) == 'U' && toUpper(p2[1]) == 'N' && toUpper(p2[2]) == 'C' { + // This is a DOS device path that links to a UNC: \\.\UNC\${HOST}\${SHARE}\ + _, rest, _ = cutPath(rest) // host + _, rest, ok = cutPath(rest) // share + if !ok { + return len(path) + } + } + return len(path) - len(rest) - 1 +} + +// cutPath slices path around the first path separator. +func cutPath(path string) (before, after string, found bool) { + for i := range path { + if isSlash(path[i]) { + return path[:i], path[i+1:], true + } + } + return path, "", false +} + +// HasPrefix exists for historical compatibility and should not be used. +// +// Deprecated: HasPrefix does not respect path boundaries and +// does not ignore case when required. +func HasPrefix(p, prefix string) bool { + if strings.HasPrefix(p, prefix) { + return true + } + return strings.HasPrefix(strings.ToLower(p), strings.ToLower(prefix)) +} + +func splitList(path string) []string { + // The same implementation is used in LookPath in os/exec; + // consider changing os/exec when changing this. + + if path == "" { + return []string{} + } + + // Split path, respecting but preserving quotes. + list := []string{} + start := 0 + quo := false + for i := 0; i < len(path); i++ { + switch c := path[i]; { + case c == '"': + quo = !quo + case c == ListSeparator && !quo: + list = append(list, path[start:i]) + start = i + 1 + } + } + list = append(list, path[start:]) + + // Remove quotes. + for i, s := range list { + list[i] = strings.ReplaceAll(s, `"`, ``) + } + + return list +} + +func abs(path string) (string, error) { + if path == "" { + // syscall.FullPath returns an error on empty path, because it's not a valid path. + // To implement Abs behavior of returning working directory on empty string input, + // special-case empty path by changing it to "." path. See golang.org/issue/24441. + path = "." + } + fullPath, err := syscall.FullPath(path) + if err != nil { + return "", err + } + return Clean(fullPath), nil +} + +func join(elem []string) string { + var b strings.Builder + var lastChar byte + for _, e := range elem { + switch { + case b.Len() == 0: + // Add the first non-empty path element unchanged. + case isSlash(lastChar): + // If the path ends in a slash, strip any leading slashes from the next + // path element to avoid creating a UNC path (any path starting with "\\") + // from non-UNC elements. + // + // The correct behavior for Join when the first element is an incomplete UNC + // path (for example, "\\") is underspecified. We currently join subsequent + // elements so Join("\\", "host", "share") produces "\\host\share". + for len(e) > 0 && isSlash(e[0]) { + e = e[1:] + } + case lastChar == ':': + // If the path ends in a colon, keep the path relative to the current directory + // on a drive and don't add a separator. Preserve leading slashes in the next + // path element, which may make the path absolute. + // + // Join(`C:`, `f`) = `C:f` + // Join(`C:`, `\f`) = `C:\f` + default: + // In all other cases, add a separator between elements. + b.WriteByte('\\') + lastChar = '\\' + } + if len(e) > 0 { + b.WriteString(e) + lastChar = e[len(e)-1] + } + } + if b.Len() == 0 { + return "" + } + return Clean(b.String()) +} + +// joinNonEmpty is like join, but it assumes that the first element is non-empty. +func joinNonEmpty(elem []string) string { + if len(elem[0]) == 2 && elem[0][1] == ':' { + // First element is drive letter without terminating slash. + // Keep path relative to current directory on that drive. + // Skip empty elements. + i := 1 + for ; i < len(elem); i++ { + if elem[i] != "" { + break + } + } + return Clean(elem[0] + strings.Join(elem[i:], string(Separator))) + } + // The following logic prevents Join from inadvertently creating a + // UNC path on Windows. Unless the first element is a UNC path, Join + // shouldn't create a UNC path. See golang.org/issue/9167. + p := Clean(strings.Join(elem, string(Separator))) + if !isUNC(p) { + return p + } + // p == UNC only allowed when the first element is a UNC path. + head := Clean(elem[0]) + if isUNC(head) { + return p + } + // head + tail == UNC, but joining two non-UNC paths should not result + // in a UNC path. Undo creation of UNC path. + tail := Clean(strings.Join(elem[1:], string(Separator))) + if head[len(head)-1] == Separator { + return head + tail + } + return head + string(Separator) + tail +} + +// isUNC reports whether path is a UNC path. +func isUNC(path string) bool { + return len(path) > 1 && isSlash(path[0]) && isSlash(path[1]) +} + +func sameWord(a, b string) bool { + return strings.EqualFold(a, b) +} diff --git a/downloader/util.go b/downloader/util.go index 4180a727b..a5f11a4b4 100644 --- a/downloader/util.go +++ b/downloader/util.go @@ -42,7 +42,7 @@ import ( "golang.org/x/sync/errgroup" ) -// `github.com/anacrolix/torrent` library spawning several goroutines and producing many requests for each tracker. So we limit amout of trackers by 7 +// udpOrHttpTrackers - torrent library spawning several goroutines and producing many requests for each tracker. So we limit amout of trackers by 7 var udpOrHttpTrackers = []string{ "udp://tracker.opentrackr.org:1337/announce", "udp://9.rarbg.com:2810/announce", @@ -170,7 +170,7 @@ func seedableSnapshotsBySubDir(dir, subDir string) ([]string, error) { continue } ext := filepath.Ext(f.Name()) - if ext != ".kv" && ext != ".v" && ext != ".ef" && ext != ".kv" { // filter out only compressed files + if ext != ".kv" && ext != ".v" && ext != ".ef" { // filter out only compressed files continue } @@ -195,12 +195,33 @@ func seedableSnapshotsBySubDir(dir, subDir string) ([]string, error) { return res, nil } -func buildTorrentIfNeed(ctx context.Context, fName, root string) (err error) { +func ensureCantLeaveDir(fName, root string) (string, error) { + if filepath.IsAbs(fName) { + newFName, err := filepath.Rel(root, fName) + if err != nil { + return fName, err + } + if !IsLocal(newFName) { + return fName, fmt.Errorf("file=%s, is outside of snapshots dir", fName) + } + fName = newFName + } + if !IsLocal(fName) { + return fName, fmt.Errorf("relative paths are not allowed: %s", fName) + } + return fName, nil +} + +func BuildTorrentIfNeed(ctx context.Context, fName, root string) (torrentFilePath string, err error) { select { case <-ctx.Done(): - return ctx.Err() + return "", ctx.Err() default: } + fName, err = ensureCantLeaveDir(fName, root) + if err != nil { + return "", err + } fPath := filepath.Join(root, fName) if dir2.FileExist(fPath + ".torrent") { @@ -209,30 +230,14 @@ func buildTorrentIfNeed(ctx context.Context, fName, root string) (err error) { if !dir2.FileExist(fPath) { return } + info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize, Name: fName} if err := info.BuildFromFilePath(fPath); err != nil { - return fmt.Errorf("createTorrentFileFromSegment: %w", err) + return "", fmt.Errorf("createTorrentFileFromSegment: %w", err) } info.Name = fName - return CreateTorrentFileFromInfo(root, info, nil) -} - -// AddSegment - add existing .seg file, create corresponding .torrent if need -func AddSegment(originalFileName, snapDir string, client *torrent.Client) (bool, error) { - fPath := filepath.Join(snapDir, originalFileName) - if !dir2.FileExist(fPath + ".torrent") { - return false, nil - } - ts, err := loadTorrent(fPath + ".torrent") - if err != nil { - return false, err - } - _, err = AddTorrentFile(ts, client) - if err != nil { - return false, fmt.Errorf("AddTorrentFile: %w", err) - } - return true, nil + return fPath + ".torrent", CreateTorrentFileFromInfo(root, info, nil) } // BuildTorrentFilesIfNeed - create .torrent files from .seg files (big IO) - if .seg files were added manually @@ -253,7 +258,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) ([]string, err file := file g.Go(func() error { defer i.Add(1) - if err := buildTorrentIfNeed(ctx, file, snapDir); err != nil { + if _, err := BuildTorrentIfNeed(ctx, file, snapDir); err != nil { return err } return nil @@ -332,7 +337,7 @@ func AddTorrentFiles(snapDir string, torrentClient *torrent.Client) error { return err } for _, ts := range files { - _, err := AddTorrentFile(ts, torrentClient) + _, err := addTorrentFile(ts, torrentClient) if err != nil { return err } @@ -389,11 +394,11 @@ func loadTorrent(torrentFilePath string) (*torrent.TorrentSpec, error) { return torrent.TorrentSpecFromMetaInfoErr(mi) } -// AddTorrentFile - adding .torrent file to torrentClient (and checking their hashes), if .torrent file +// addTorrentFile - adding .torrent file to torrentClient (and checking their hashes), if .torrent file // added first time - pieces verification process will start (disk IO heavy) - Progress // kept in `piece completion storage` (surviving reboot). Once it done - no disk IO needed again. // Don't need call torrent.VerifyData manually -func AddTorrentFile(ts *torrent.TorrentSpec, torrentClient *torrent.Client) (*torrent.Torrent, error) { +func addTorrentFile(ts *torrent.TorrentSpec, torrentClient *torrent.Client) (*torrent.Torrent, error) { if _, ok := torrentClient.Torrent(ts.InfoHash); !ok { // can set ChunkSize only for new torrents ts.ChunkSize = downloadercfg.DefaultNetworkChunkSize } else { @@ -403,7 +408,7 @@ func AddTorrentFile(ts *torrent.TorrentSpec, torrentClient *torrent.Client) (*to ts.DisallowDataDownload = true t, _, err := torrentClient.AddTorrentSpec(ts) if err != nil { - return nil, err + return nil, fmt.Errorf("addTorrentFile %s: %w", ts.DisplayName, err) } t.DisallowDataDownload() @@ -454,3 +459,8 @@ func readPeerID(db kv.RoDB) (peerID []byte, err error) { } return peerID, nil } + +// Deprecated: use `filepath.IsLocal` after drop go1.19 support +func IsLocal(path string) bool { + return isLocal(path) +} diff --git a/gointerfaces/downloader/downloader.pb.go b/gointerfaces/downloader/downloader.pb.go index 8cef15ac6..773282e31 100644 --- a/gointerfaces/downloader/downloader.pb.go +++ b/gointerfaces/downloader/downloader.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: downloader/downloader.proto package downloader diff --git a/gointerfaces/downloader/downloader_grpc.pb.go b/gointerfaces/downloader/downloader_grpc.pb.go index 8a6a60a7d..831743bbc 100644 --- a/gointerfaces/downloader/downloader_grpc.pb.go +++ b/gointerfaces/downloader/downloader_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: downloader/downloader.proto package downloader diff --git a/gointerfaces/execution/execution.pb.go b/gointerfaces/execution/execution.pb.go index 60dcef584..5c2effd51 100644 --- a/gointerfaces/execution/execution.pb.go +++ b/gointerfaces/execution/execution.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: execution/execution.proto package execution diff --git a/gointerfaces/execution/execution_grpc.pb.go b/gointerfaces/execution/execution_grpc.pb.go index 9faac8573..b3779a0b1 100644 --- a/gointerfaces/execution/execution_grpc.pb.go +++ b/gointerfaces/execution/execution_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: execution/execution.proto package execution diff --git a/gointerfaces/remote/ethbackend.pb.go b/gointerfaces/remote/ethbackend.pb.go index ac0a099c6..118a3f763 100644 --- a/gointerfaces/remote/ethbackend.pb.go +++ b/gointerfaces/remote/ethbackend.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: remote/ethbackend.proto package remote diff --git a/gointerfaces/remote/ethbackend_grpc.pb.go b/gointerfaces/remote/ethbackend_grpc.pb.go index 8e986e082..4a410a32b 100644 --- a/gointerfaces/remote/ethbackend_grpc.pb.go +++ b/gointerfaces/remote/ethbackend_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: remote/ethbackend.proto package remote diff --git a/gointerfaces/remote/kv.pb.go b/gointerfaces/remote/kv.pb.go index 6dd2f965e..b5ac8e64a 100644 --- a/gointerfaces/remote/kv.pb.go +++ b/gointerfaces/remote/kv.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: remote/kv.proto package remote diff --git a/gointerfaces/remote/kv_grpc.pb.go b/gointerfaces/remote/kv_grpc.pb.go index eb32cbf39..d0305cb0f 100644 --- a/gointerfaces/remote/kv_grpc.pb.go +++ b/gointerfaces/remote/kv_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: remote/kv.proto package remote diff --git a/gointerfaces/remote/mocks.go b/gointerfaces/remote/mocks.go index 24e98be04..8300eb434 100644 --- a/gointerfaces/remote/mocks.go +++ b/gointerfaces/remote/mocks.go @@ -650,10 +650,10 @@ var _ KV_StateChangesClient = &KV_StateChangesClientMock{} // RecvFunc: func() (*StateChangeBatch, error) { // panic("mock out the Recv method") // }, -// RecvMsgFunc: func(m interface{}) error { +// RecvMsgFunc: func(m any) error { // panic("mock out the RecvMsg method") // }, -// SendMsgFunc: func(m interface{}) error { +// SendMsgFunc: func(m any) error { // panic("mock out the SendMsg method") // }, // TrailerFunc: func() metadata.MD { @@ -679,10 +679,10 @@ type KV_StateChangesClientMock struct { RecvFunc func() (*StateChangeBatch, error) // RecvMsgFunc mocks the RecvMsg method. - RecvMsgFunc func(m interface{}) error + RecvMsgFunc func(m any) error // SendMsgFunc mocks the SendMsg method. - SendMsgFunc func(m interface{}) error + SendMsgFunc func(m any) error // TrailerFunc mocks the Trailer method. TrailerFunc func() metadata.MD @@ -704,12 +704,12 @@ type KV_StateChangesClientMock struct { // RecvMsg holds details about calls to the RecvMsg method. RecvMsg []struct { // M is the m argument value. - M interface{} + M any } // SendMsg holds details about calls to the SendMsg method. SendMsg []struct { // M is the m argument value. - M interface{} + M any } // Trailer holds details about calls to the Trailer method. Trailer []struct { @@ -847,9 +847,9 @@ func (mock *KV_StateChangesClientMock) RecvCalls() []struct { } // RecvMsg calls RecvMsgFunc. -func (mock *KV_StateChangesClientMock) RecvMsg(m interface{}) error { +func (mock *KV_StateChangesClientMock) RecvMsg(m any) error { callInfo := struct { - M interface{} + M any }{ M: m, } @@ -870,10 +870,10 @@ func (mock *KV_StateChangesClientMock) RecvMsg(m interface{}) error { // // len(mockedKV_StateChangesClient.RecvMsgCalls()) func (mock *KV_StateChangesClientMock) RecvMsgCalls() []struct { - M interface{} + M any } { var calls []struct { - M interface{} + M any } mock.lockRecvMsg.RLock() calls = mock.calls.RecvMsg @@ -882,9 +882,9 @@ func (mock *KV_StateChangesClientMock) RecvMsgCalls() []struct { } // SendMsg calls SendMsgFunc. -func (mock *KV_StateChangesClientMock) SendMsg(m interface{}) error { +func (mock *KV_StateChangesClientMock) SendMsg(m any) error { callInfo := struct { - M interface{} + M any }{ M: m, } @@ -905,10 +905,10 @@ func (mock *KV_StateChangesClientMock) SendMsg(m interface{}) error { // // len(mockedKV_StateChangesClient.SendMsgCalls()) func (mock *KV_StateChangesClientMock) SendMsgCalls() []struct { - M interface{} + M any } { var calls []struct { - M interface{} + M any } mock.lockSendMsg.RLock() calls = mock.calls.SendMsg diff --git a/gointerfaces/sentinel/sentinel.pb.go b/gointerfaces/sentinel/sentinel.pb.go index 0e8be2e06..608597e7f 100644 --- a/gointerfaces/sentinel/sentinel.pb.go +++ b/gointerfaces/sentinel/sentinel.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: p2psentinel/sentinel.proto package sentinel diff --git a/gointerfaces/sentinel/sentinel_grpc.pb.go b/gointerfaces/sentinel/sentinel_grpc.pb.go index 13052e192..a62786b60 100644 --- a/gointerfaces/sentinel/sentinel_grpc.pb.go +++ b/gointerfaces/sentinel/sentinel_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: p2psentinel/sentinel.proto package sentinel diff --git a/gointerfaces/sentry/sentry.pb.go b/gointerfaces/sentry/sentry.pb.go index 0e43453fd..87710f442 100644 --- a/gointerfaces/sentry/sentry.pb.go +++ b/gointerfaces/sentry/sentry.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: p2psentry/sentry.proto package sentry diff --git a/gointerfaces/sentry/sentry_grpc.pb.go b/gointerfaces/sentry/sentry_grpc.pb.go index 7802cf4fd..1a9d1959b 100644 --- a/gointerfaces/sentry/sentry_grpc.pb.go +++ b/gointerfaces/sentry/sentry_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: p2psentry/sentry.proto package sentry diff --git a/gointerfaces/txpool/mining.pb.go b/gointerfaces/txpool/mining.pb.go index deacde3e6..20b3e0bd7 100644 --- a/gointerfaces/txpool/mining.pb.go +++ b/gointerfaces/txpool/mining.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: txpool/mining.proto package txpool diff --git a/gointerfaces/txpool/mining_grpc.pb.go b/gointerfaces/txpool/mining_grpc.pb.go index c2054b4e1..d0465eb5f 100644 --- a/gointerfaces/txpool/mining_grpc.pb.go +++ b/gointerfaces/txpool/mining_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: txpool/mining.proto package txpool diff --git a/gointerfaces/txpool/txpool.pb.go b/gointerfaces/txpool/txpool.pb.go index 65b061e9a..52b9b02de 100644 --- a/gointerfaces/txpool/txpool.pb.go +++ b/gointerfaces/txpool/txpool.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: txpool/txpool.proto package txpool diff --git a/gointerfaces/txpool/txpool_grpc.pb.go b/gointerfaces/txpool/txpool_grpc.pb.go index a1ae12fc0..d8c6da0d0 100644 --- a/gointerfaces/txpool/txpool_grpc.pb.go +++ b/gointerfaces/txpool/txpool_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: txpool/txpool.proto package txpool diff --git a/gointerfaces/types/types.pb.go b/gointerfaces/types/types.pb.go index 088bbfb73..adae72de7 100644 --- a/gointerfaces/types/types.pb.go +++ b/gointerfaces/types/types.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: types/types.proto package types diff --git a/recsplit/index.go b/recsplit/index.go index a8ba6d076..4fa95025e 100644 --- a/recsplit/index.go +++ b/recsplit/index.go @@ -348,7 +348,11 @@ func (idx *Index) DisableReadAhead() { } leftReaders := idx.readAheadRefcnt.Add(-1) if leftReaders == 0 { - _ = mmap.MadviseRandom(idx.mmapHandle1) + if dbg.SnMadvNormal() { + _ = mmap.MadviseNormal(idx.mmapHandle1) + } else { + _ = mmap.MadviseRandom(idx.mmapHandle1) + } } else if leftReaders < 0 { log.Warn("read-ahead negative counter", "file", idx.FileName()) } diff --git a/state/aggregator_v3.go b/state/aggregator_v3.go index e4c498d7c..1c2209039 100644 --- a/state/aggregator_v3.go +++ b/state/aggregator_v3.go @@ -1539,51 +1539,6 @@ func (a *AggregatorV3) ComputeCommitment(saveStateAfter, trace bool) (rootHash [ return a.domains.Commit(saveStateAfter, trace) } -// DisableReadAhead - usage: `defer d.EnableReadAhead().DisableReadAhead()`. Please don't use this funcs without `defer` to avoid leak. -func (a *AggregatorV3) DisableReadAhead() { - a.accounts.DisableReadAhead() - a.storage.DisableReadAhead() - a.code.DisableReadAhead() - a.commitment.DisableReadAhead() - a.logAddrs.DisableReadAhead() - a.logTopics.DisableReadAhead() - a.tracesFrom.DisableReadAhead() - a.tracesTo.DisableReadAhead() -} -func (a *AggregatorV3) EnableReadAhead() *AggregatorV3 { - a.accounts.EnableReadAhead() - a.storage.EnableReadAhead() - a.code.EnableReadAhead() - a.commitment.EnableReadAhead() - a.logAddrs.EnableReadAhead() - a.logTopics.EnableReadAhead() - a.tracesFrom.EnableReadAhead() - a.tracesTo.EnableReadAhead() - return a -} -func (a *AggregatorV3) EnableMadvWillNeed() *AggregatorV3 { - a.accounts.EnableMadvWillNeed() - a.storage.EnableMadvWillNeed() - a.code.EnableMadvWillNeed() - a.commitment.EnableMadvWillNeed() - a.logAddrs.EnableMadvWillNeed() - a.logTopics.EnableMadvWillNeed() - a.tracesFrom.EnableMadvWillNeed() - a.tracesTo.EnableMadvWillNeed() - return a -} -func (a *AggregatorV3) EnableMadvNormal() *AggregatorV3 { - a.accounts.EnableMadvNormalReadAhead() - a.storage.EnableMadvNormalReadAhead() - a.code.EnableMadvNormalReadAhead() - a.commitment.EnableMadvNormalReadAhead() - a.logAddrs.EnableMadvNormalReadAhead() - a.logTopics.EnableMadvNormalReadAhead() - a.tracesFrom.EnableMadvNormalReadAhead() - a.tracesTo.EnableMadvNormalReadAhead() - return a -} - func (ac *AggregatorV3Context) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc order.By, limit int, tx kv.Tx) (timestamps iter.U64, err error) { switch name { case kv.AccountsHistoryIdx: diff --git a/state/btree_index.go b/state/btree_index.go index 474ff459e..3130a3e91 100644 --- a/state/btree_index.go +++ b/state/btree_index.go @@ -869,11 +869,11 @@ func OpenBtreeIndexWithDecompressor(indexPath string, M uint64, kv *compress.Dec if len(idx.data[pos:]) == 0 { return idx, nil } + defer idx.decompressor.EnableReadAhead().DisableReadAhead() idx.ef, pos = eliasfano32.ReadEliasFano(idx.data[pos:]) getter := NewArchiveGetter(idx.decompressor.MakeGetter(), idx.compressed) - defer idx.decompressor.EnableReadAhead().DisableReadAhead() //fmt.Printf("open btree index %s with %d keys b+=%t data compressed %t\n", indexPath, idx.ef.Count(), UseBpsTree, idx.compressed) switch UseBpsTree { diff --git a/state/domain_shared.go b/state/domain_shared.go index d7a4ef152..83883d8d2 100644 --- a/state/domain_shared.go +++ b/state/domain_shared.go @@ -514,6 +514,9 @@ func (sd *SharedDomains) SetBlockNum(blockNum uint64) { } func (sd *SharedDomains) Commit(saveStateAfter, trace bool) (rootHash []byte, err error) { + t := time.Now() + defer func() { log.Info("[dbg] [agg] commitment", "took", time.Since(t)) }() + // if commitment mode is Disabled, there will be nothing to compute on. mxCommitmentRunning.Inc() defer mxCommitmentRunning.Dec() diff --git a/state/history.go b/state/history.go index 298c26bae..6dc7cbfbe 100644 --- a/state/history.go +++ b/state/history.go @@ -2210,46 +2210,6 @@ func (h *History) DisableReadAhead() { }) } -func (h *History) EnableReadAhead() *History { - h.InvertedIndex.EnableReadAhead() - h.files.Walk(func(items []*filesItem) bool { - for _, item := range items { - item.decompressor.EnableReadAhead() - if item.index != nil { - item.index.EnableReadAhead() - } - } - return true - }) - return h -} -func (h *History) EnableMadvWillNeed() *History { - h.InvertedIndex.EnableMadvWillNeed() - h.files.Walk(func(items []*filesItem) bool { - for _, item := range items { - item.decompressor.EnableWillNeed() - if item.index != nil { - item.index.EnableWillNeed() - } - } - return true - }) - return h -} -func (h *History) EnableMadvNormalReadAhead() *History { - h.InvertedIndex.EnableMadvNormalReadAhead() - h.files.Walk(func(items []*filesItem) bool { - for _, item := range items { - item.decompressor.EnableMadvNormal() - if item.index != nil { - item.index.EnableMadvNormal() - } - } - return true - }) - return h -} - // HistoryStep used for incremental state reconsitution, it isolates only one snapshot interval type HistoryStep struct { compressVals bool diff --git a/state/inverted_index.go b/state/inverted_index.go index acabded49..ff8bfae39 100644 --- a/state/inverted_index.go +++ b/state/inverted_index.go @@ -1767,43 +1767,6 @@ func (ii *InvertedIndex) DisableReadAhead() { }) } -func (ii *InvertedIndex) EnableReadAhead() *InvertedIndex { - ii.files.Walk(func(items []*filesItem) bool { - for _, item := range items { - item.decompressor.EnableReadAhead() - if item.index != nil { - item.index.EnableReadAhead() - } - } - return true - }) - return ii -} -func (ii *InvertedIndex) EnableMadvWillNeed() *InvertedIndex { - ii.files.Walk(func(items []*filesItem) bool { - for _, item := range items { - item.decompressor.EnableWillNeed() - if item.index != nil { - item.index.EnableWillNeed() - } - } - return true - }) - return ii -} -func (ii *InvertedIndex) EnableMadvNormalReadAhead() *InvertedIndex { - ii.files.Walk(func(items []*filesItem) bool { - for _, item := range items { - item.decompressor.EnableMadvNormal() - if item.index != nil { - item.index.EnableMadvNormal() - } - } - return true - }) - return ii -} - func (ii *InvertedIndex) collectFilesStat() (filesCount, filesSize, idxSize uint64) { if ii.files == nil { return 0, 0, 0 diff --git a/txpool/fetch.go b/txpool/fetch.go index a9b24a6db..ecc9797b7 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -222,20 +222,15 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) } - var hashbuf [32]byte - var unknownHashes types2.Hashes - for i := 0; i < hashCount; i++ { - _, pos, err = types2.ParseHash(req.Data, pos, hashbuf[:0]) - if err != nil { - return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) - } - known, err := f.pool.IdHashKnown(tx, hashbuf[:]) - if err != nil { + hashes := make([]byte, 32*hashCount) + for i := 0; i < len(hashes); i += 32 { + if _, pos, err = types2.ParseHash(req.Data, pos, hashes[i:]); err != nil { return err } - if !known { - unknownHashes = append(unknownHashes, hashbuf[:]...) - } + } + unknownHashes, err := f.pool.FilterKnownIdHashes(tx, hashes) + if err != nil { + return err } if len(unknownHashes) > 0 { var encodedRequest []byte @@ -256,15 +251,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes88: %w", err) } - var unknownHashes types2.Hashes - for i := 0; i < len(hashes); i += 32 { - known, err := f.pool.IdHashKnown(tx, hashes[i:i+32]) - if err != nil { - return err - } - if !known { - unknownHashes = append(unknownHashes, hashes[i:i+32]...) - } + unknownHashes, err := f.pool.FilterKnownIdHashes(tx, hashes) + if err != nil { + return err } if len(unknownHashes) > 0 { @@ -482,7 +471,10 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error { _, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil) if unwindTxs.Txs[i].Type == types2.BlobTxType { - knownBlobTxn := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:]) + knownBlobTxn, err := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:]) + if err != nil { + return err + } if knownBlobTxn != nil { unwindTxs.Txs[i] = knownBlobTxn.Tx } diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go index 78140c64c..22e8e8121 100644 --- a/txpool/mocks_test.go +++ b/txpool/mocks_test.go @@ -31,7 +31,10 @@ var _ Pool = &PoolMock{} // AddRemoteTxsFunc: func(ctx context.Context, newTxs types2.TxSlots) { // panic("mock out the AddRemoteTxs method") // }, -// GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) *metaTx { +// FilterKnownIdHashesFunc: func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { +// panic("mock out the FilterKnownIdHashes method") +// }, +// GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) (*metaTx, error) { // panic("mock out the GetKnownBlobTxn method") // }, // GetRlpFunc: func(tx kv.Tx, hash []byte) ([]byte, error) { @@ -65,8 +68,11 @@ type PoolMock struct { // AddRemoteTxsFunc mocks the AddRemoteTxs method. AddRemoteTxsFunc func(ctx context.Context, newTxs types2.TxSlots) + // FilterKnownIdHashesFunc mocks the FilterKnownIdHashes method. + FilterKnownIdHashesFunc func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) + // GetKnownBlobTxnFunc mocks the GetKnownBlobTxn method. - GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) *metaTx + GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) (*metaTx, error) // GetRlpFunc mocks the GetRlp method. GetRlpFunc func(tx kv.Tx, hash []byte) ([]byte, error) @@ -106,6 +112,13 @@ type PoolMock struct { // NewTxs is the newTxs argument value. NewTxs types2.TxSlots } + // FilterKnownIdHashes holds details about calls to the FilterKnownIdHashes method. + FilterKnownIdHashes []struct { + // Tx is the tx argument value. + Tx kv.Tx + // Hashes is the hashes argument value. + Hashes types2.Hashes + } // GetKnownBlobTxn holds details about calls to the GetKnownBlobTxn method. GetKnownBlobTxn []struct { // Tx is the tx argument value. @@ -152,6 +165,7 @@ type PoolMock struct { lockAddLocalTxs sync.RWMutex lockAddNewGoodPeer sync.RWMutex lockAddRemoteTxs sync.RWMutex + lockFilterKnownIdHashes sync.RWMutex lockGetKnownBlobTxn sync.RWMutex lockGetRlp sync.RWMutex lockIdHashKnown sync.RWMutex @@ -272,8 +286,48 @@ func (mock *PoolMock) AddRemoteTxsCalls() []struct { return calls } +// FilterKnownIdHashes calls FilterKnownIdHashesFunc. +func (mock *PoolMock) FilterKnownIdHashes(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { + callInfo := struct { + Tx kv.Tx + Hashes types2.Hashes + }{ + Tx: tx, + Hashes: hashes, + } + mock.lockFilterKnownIdHashes.Lock() + mock.calls.FilterKnownIdHashes = append(mock.calls.FilterKnownIdHashes, callInfo) + mock.lockFilterKnownIdHashes.Unlock() + if mock.FilterKnownIdHashesFunc == nil { + var ( + unknownHashesOut types2.Hashes + errOut error + ) + return unknownHashesOut, errOut + } + return mock.FilterKnownIdHashesFunc(tx, hashes) +} + +// FilterKnownIdHashesCalls gets all the calls that were made to FilterKnownIdHashes. +// Check the length with: +// +// len(mockedPool.FilterKnownIdHashesCalls()) +func (mock *PoolMock) FilterKnownIdHashesCalls() []struct { + Tx kv.Tx + Hashes types2.Hashes +} { + var calls []struct { + Tx kv.Tx + Hashes types2.Hashes + } + mock.lockFilterKnownIdHashes.RLock() + calls = mock.calls.FilterKnownIdHashes + mock.lockFilterKnownIdHashes.RUnlock() + return calls +} + // GetKnownBlobTxn calls GetKnownBlobTxnFunc. -func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { +func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { callInfo := struct { Tx kv.Tx Hash []byte @@ -287,8 +341,9 @@ func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { if mock.GetKnownBlobTxnFunc == nil { var ( metaTxMoqParamOut *metaTx + errOut error ) - return metaTxMoqParamOut + return metaTxMoqParamOut, errOut } return mock.GetKnownBlobTxnFunc(tx, hash) } diff --git a/txpool/pool.go b/txpool/pool.go index 185730a7d..d99ba9114 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -85,9 +85,10 @@ type Pool interface { OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error // IdHashKnown check whether transaction with given Id hash is known to the pool IdHashKnown(tx kv.Tx, hash []byte) (bool, error) + FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) Started() bool GetRlp(tx kv.Tx, hash []byte) ([]byte, error) - GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx + GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) AddNewGoodPeer(peerID types.PeerID) } @@ -291,40 +292,40 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, } func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { + if err := minedTxs.Valid(); err != nil { + return err + } + defer newBlockTimer.UpdateDuration(time.Now()) //t := time.Now() - cache := p.cache() + coreDB, cache := p.coreDBWithCache() cache.OnNewBlock(stateChanges) - coreTx, err := p.coreDB().BeginRo(ctx) + coreTx, err := coreDB.BeginRo(ctx) if err != nil { return err } defer coreTx.Rollback() - p.lock.Lock() - defer p.lock.Unlock() - p.lastSeenBlock.Store(stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight) if !p.started.Load() { - if err := p.fromDB(ctx, tx, coreTx); err != nil { + if err := p.fromDBWithLock(ctx, tx, coreTx); err != nil { return fmt.Errorf("OnNewBlock: loading txs from DB: %w", err) } } - cacheView, err := cache.View(ctx, coreTx) if err != nil { return err } + + p.lock.Lock() + defer p.lock.Unlock() + if assert.Enable { if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil { p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String()) } } - - if err := minedTxs.Valid(); err != nil { - return err - } baseFee := stateChanges.PendingBlockBaseFee pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee) @@ -404,9 +405,9 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return fmt.Errorf("txpool not started yet") } - cache := p.cache() defer processBatchTxsTimer.UpdateDuration(time.Now()) - coreTx, err := p.coreDB().BeginRo(ctx) + coreDB, cache := p.coreDBWithCache() + coreTx, err := coreDB.BeginRo(ctx) if err != nil { return err } @@ -516,59 +517,87 @@ func (p *TxPool) AppendAllAnnouncements(types []byte, sizes []uint32, hashes []b types, sizes, hashes = p.AppendRemoteAnnouncements(types, sizes, hashes) return types, sizes, hashes } -func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { - p.lock.Lock() - defer p.lock.Unlock() - if _, ok := p.discardReasonsLRU.Get(string(hash)); ok { +func (p *TxPool) idHashKnown(tx kv.Tx, hash []byte, hashS string) (bool, error) { + if _, ok := p.unprocessedRemoteByHash[hashS]; ok { return true, nil } - if _, ok := p.unprocessedRemoteByHash[string(hash)]; ok { + if _, ok := p.discardReasonsLRU.Get(hashS); ok { return true, nil } - if _, ok := p.byHash[string(hash)]; ok { + if _, ok := p.byHash[hashS]; ok { return true, nil } - if _, ok := p.minedBlobTxsByHash[string(hash)]; ok { + if _, ok := p.minedBlobTxsByHash[hashS]; ok { return true, nil } return tx.Has(kv.PoolTransaction, hash) } +func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { + hashS := string(hash) + p.lock.Lock() + defer p.lock.Unlock() + return p.idHashKnown(tx, hash, hashS) +} +func (p *TxPool) FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) { + p.lock.Lock() + defer p.lock.Unlock() + for i := 0; i < len(hashes); i += 32 { + known, err := p.idHashKnown(tx, hashes[i:i+32], string(hashes[i:i+32])) + if err != nil { + return unknownHashes, err + } + if !known { + unknownHashes = append(unknownHashes, hashes[i:i+32]...) + } + } + return unknownHashes, err +} + +func (p *TxPool) getUnprocessedTxn(hashS string) (*types.TxSlot, bool) { + if i, ok := p.unprocessedRemoteByHash[hashS]; ok { + return p.unprocessedRemoteTxs.Txs[i], true + } + return nil, false +} -func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { +func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { + hashS := string(hash) p.lock.Lock() defer p.lock.Unlock() - if mt, ok := p.minedBlobTxsByHash[string(hash)]; ok { - return mt + if mt, ok := p.minedBlobTxsByHash[hashS]; ok { + return mt, nil } - if i, ok := p.unprocessedRemoteByHash[string(hash)]; ok { - return newMetaTx(p.unprocessedRemoteTxs.Txs[i], false, 0) + if txn, ok := p.getUnprocessedTxn(hashS); ok { + return newMetaTx(txn, false, 0), nil } - if mt, ok := p.byHash[string(hash)]; ok { - return mt + if mt, ok := p.byHash[hashS]; ok { + return mt, nil } - if has, _ := tx.Has(kv.PoolTransaction, hash); has { - txn, _ := tx.GetOne(kv.PoolTransaction, hash) - parseCtx := types.NewTxParseContext(p.chainID) - parseCtx.WithSender(false) - txSlot := &types.TxSlot{} - parseCtx.ParseTransaction(txn, 0, txSlot, nil, false, true, nil) - return newMetaTx(txSlot, false, 0) + has, err := tx.Has(kv.PoolTransaction, hash) + if err != nil { + return nil, err } - return nil + if !has { + return nil, nil + } + txn, _ := tx.GetOne(kv.PoolTransaction, hash) + parseCtx := types.NewTxParseContext(p.chainID) + parseCtx.WithSender(false) + txSlot := &types.TxSlot{} + parseCtx.ParseTransaction(txn, 0, txSlot, nil, false, true, nil) + return newMetaTx(txSlot, false, 0), nil } func (p *TxPool) IsLocal(idHash []byte) bool { + hashS := string(idHash) p.lock.Lock() defer p.lock.Unlock() - return p.isLocalLRU.Contains(string(idHash)) + return p.isLocalLRU.Contains(hashS) } func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } func (p *TxPool) Started() bool { return p.started.Load() } func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { - p.lock.Lock() - defer p.lock.Unlock() - // First wait for the corresponding block to arrive if p.lastSeenBlock.Load() < onTopOf { return false, 0, nil // Too early @@ -650,11 +679,15 @@ func (p *TxPool) ResetYieldedStatus() { } func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { + p.lock.Lock() + defer p.lock.Unlock() return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip) } func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) { set := mapset.NewThreadUnsafeSet[[32]byte]() + p.lock.Lock() + defer p.lock.Unlock() onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, set) return onTime, err } @@ -669,11 +702,12 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) { p.lock.Lock() defer p.lock.Unlock() for i, txn := range newTxs.Txs { - _, ok := p.unprocessedRemoteByHash[string(txn.IDHash[:])] + hashS := string(txn.IDHash[:]) + _, ok := p.unprocessedRemoteByHash[hashS] if ok { continue } - p.unprocessedRemoteByHash[string(txn.IDHash[:])] = len(p.unprocessedRemoteTxs.Txs) + p.unprocessedRemoteByHash[hashS] = len(p.unprocessedRemoteTxs.Txs) p.unprocessedRemoteTxs.Append(txn, newTxs.Senders.At(i), false) } } @@ -970,13 +1004,14 @@ func fillDiscardReasons(reasons []txpoolcfg.DiscardReason, newTxs types.TxSlots, } func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) { - coreTx, err := p.coreDB().BeginRo(ctx) + coreDb, cache := p.coreDBWithCache() + coreTx, err := coreDb.BeginRo(ctx) if err != nil { return nil, err } defer coreTx.Rollback() - cacheView, err := p.cache().View(ctx, coreTx) + cacheView, err := cache.View(ctx, coreTx) if err != nil { return nil, err } @@ -1034,19 +1069,11 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, } return reasons, nil } - -func (p *TxPool) coreDB() kv.RoDB { - p.lock.Lock() - defer p.lock.Unlock() - return p._chainDB -} - -func (p *TxPool) cache() kvcache.Cache { +func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) { p.lock.Lock() defer p.lock.Unlock() - return p._stateCache + return p._chainDB, p._stateCache } - func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, @@ -1243,11 +1270,12 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo // Remove from mined cache in case this is coming from unwind txs // and to ensure not double adding into the memory - if _, ok := p.minedBlobTxsByHash[string(mt.Tx.IDHash[:])]; ok { - p.deleteMinedBlobTxn(string(mt.Tx.IDHash[:])) + hashStr := string(mt.Tx.IDHash[:]) + if _, ok := p.minedBlobTxsByHash[hashStr]; ok { + p.deleteMinedBlobTxn(hashStr) } - p.byHash[string(mt.Tx.IDHash[:])] = mt + p.byHash[hashStr] = mt if replaced := p.all.replaceOrInsert(mt); replaced != nil { if assert.Enable { @@ -1256,7 +1284,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo } if mt.subPool&IsLocal != 0 { - p.isLocalLRU.Add(string(mt.Tx.IDHash[:]), struct{}{}) + p.isLocalLRU.Add(hashStr, struct{}{}) } // All transactions are first added to the queued pool and then immediately promoted from there if required p.queued.Add(mt, p.logger) @@ -1266,10 +1294,11 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo // dropping transaction from all sub-structures and from db // Important: don't call it while iterating by all func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) { - delete(p.byHash, string(mt.Tx.IDHash[:])) + hashStr := string(mt.Tx.IDHash[:]) + delete(p.byHash, hashStr) p.deletedTxs = append(p.deletedTxs, mt) p.all.delete(mt) - p.discardReasonsLRU.Add(string(mt.Tx.IDHash[:]), reason) + p.discardReasonsLRU.Add(hashStr, reason) } // Cache recently mined blobs in anticipation of reorg, delete finalized ones @@ -1827,6 +1856,11 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { return nil } +func (p *TxPool) fromDBWithLock(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { + p.lock.Lock() + defer p.lock.Unlock() + return p.fromDB(ctx, tx, coreTx) +} func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if p.lastSeenBlock.Load() == 0 { lastSeenBlock, err := LastSeenBlock(tx)