From e44e658cb393ded1de11c5c9a1b2b9079a4585dc Mon Sep 17 00:00:00 2001 From: Jorropo Date: Sat, 3 Jun 2023 09:00:17 +0200 Subject: [PATCH] provider: second round of reprovider refactor This revert the revert 4c5c98b94b219d9870c3181f73a99d2f15edae7d and fixes a few deadlocks as well as include a required bump from `go-libp2p-routing-helpers`. --- CHANGELOG.md | 9 + examples/go.mod | 2 +- examples/go.sum | 4 +- go.mod | 3 +- go.sum | 6 +- provider/README.md | 30 -- provider/batched/system.go | 420 --------------- provider/batched/system_test.go | 119 ----- provider/{ => internal}/queue/queue.go | 133 ++--- provider/{ => internal}/queue/queue_test.go | 39 +- provider/noop.go | 32 ++ provider/offline.go | 29 -- provider/provider.go | 107 +++- provider/reprovider.go | 545 ++++++++++++++++++++ provider/reprovider_test.go | 219 ++++++++ provider/simple/provider.go | 116 ----- provider/simple/provider_test.go | 166 ------ provider/simple/reprovide.go | 256 --------- provider/simple/reprovide_test.go | 313 ----------- provider/system.go | 60 --- 20 files changed, 990 insertions(+), 1618 deletions(-) delete mode 100644 provider/README.md delete mode 100644 provider/batched/system.go delete mode 100644 provider/batched/system_test.go rename provider/{ => internal}/queue/queue.go (50%) rename provider/{ => internal}/queue/queue_test.go (80%) create mode 100644 provider/noop.go delete mode 100644 provider/offline.go create mode 100644 provider/reprovider.go create mode 100644 provider/reprovider_test.go delete mode 100644 provider/simple/provider.go delete mode 100644 provider/simple/provider_test.go delete mode 100644 provider/simple/reprovide.go delete mode 100644 provider/simple/reprovide_test.go delete mode 100644 provider/system.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a2b623ac2..60a4b8180 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,15 @@ The following emojis are used to highlight certain changes: - `DirectKeys` - `RecursiveKeys` - `InternalKeys` +- 🛠 Provider API refactor + - `provider/queue` has been moved to `provider/internal/queue`. + - `provider/batched.New` has been moved to `provider.New` and arguments has been changed: + - a routing system is now passed with the `provider.Online` option, by default the system run in offline mode (push stuff onto the queue); and + - you do not have to pass a queue anymore, you pass a `datastore.Datastore` exclusively. + - `provider/simple` has been removed, now instead `provider.New` will accept non batched routing systems and use type assertion for the `ProvideMany` call, giving a single implementation. + - `provider.NewOfflineProvider` has been renamed to `provider.NewNoopProvider` to show more clearly that is does nothing. + - `provider.NewSystem` has been removed, `provider.New` now returns a `provider.System` directly. + - `provider.Provider` and `provider.Reprovider` has been merged under one `provider.System` ## [0.8.0] - 2023-04-05 ### Added diff --git a/examples/go.mod b/examples/go.mod index a6ca81cdf..76ac8bf02 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -10,7 +10,7 @@ require ( github.com/ipfs/go-datastore v0.6.0 github.com/ipld/go-ipld-prime v0.20.0 github.com/libp2p/go-libp2p v0.26.3 - github.com/libp2p/go-libp2p-routing-helpers v0.6.0 + github.com/libp2p/go-libp2p-routing-helpers v0.7.0 github.com/multiformats/go-multiaddr v0.8.0 github.com/multiformats/go-multicodec v0.8.1 github.com/prometheus/client_golang v1.14.0 diff --git a/examples/go.sum b/examples/go.sum index 420a041c3..a70076d58 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -399,8 +399,8 @@ github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9b github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= -github.com/libp2p/go-libp2p-routing-helpers v0.6.0 h1:Rfyd+wp/cU0PjNjCphGzLYzd7Q51fjOMs5Sjj6zWGT0= -github.com/libp2p/go-libp2p-routing-helpers v0.6.0/go.mod h1:wwK/XSLt6njjO7sRbjhf8w7PGBOfdntMQ2mOQPZ5s/Q= +github.com/libp2p/go-libp2p-routing-helpers v0.7.0 h1:sirOYVD0wGWjkDwHZvinunIpaqPLBXkcnXApVHwZFGA= +github.com/libp2p/go-libp2p-routing-helpers v0.7.0/go.mod h1:R289GUxUMzRXIbWGSuUUTPrlVJZ3Y/pPz495+qgXJX8= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= github.com/libp2p/go-msgio v0.3.0/go.mod h1:nyRM819GmVaF9LX3l03RMh10QdOroF++NBbxAb0mmDM= diff --git a/go.mod b/go.mod index ad907d114..fd7271db0 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.19 require ( github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/benbjohnson/clock v1.3.0 - github.com/cenkalti/backoff v2.2.1+incompatible github.com/cespare/xxhash/v2 v2.2.0 github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 github.com/cskr/pubsub v1.0.2 @@ -41,7 +40,7 @@ require ( github.com/libp2p/go-libp2p v0.26.3 github.com/libp2p/go-libp2p-kad-dht v0.21.1 github.com/libp2p/go-libp2p-record v0.2.0 - github.com/libp2p/go-libp2p-routing-helpers v0.4.0 + github.com/libp2p/go-libp2p-routing-helpers v0.7.0 github.com/libp2p/go-libp2p-testing v0.12.0 github.com/libp2p/go-msgio v0.3.0 github.com/miekg/dns v1.1.50 diff --git a/go.sum b/go.sum index 26c09b584..e95249d53 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -421,8 +419,8 @@ github.com/libp2p/go-libp2p-kbucket v0.5.0 h1:g/7tVm8ACHDxH29BGrpsQlnNeu+6OF1A9b github.com/libp2p/go-libp2p-kbucket v0.5.0/go.mod h1:zGzGCpQd78b5BNTDGHNDLaTt9aDK/A02xeZp9QeFC4U= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= -github.com/libp2p/go-libp2p-routing-helpers v0.4.0 h1:b7y4aixQ7AwbqYfcOQ6wTw8DQvuRZeTAA0Od3YYN5yc= -github.com/libp2p/go-libp2p-routing-helpers v0.4.0/go.mod h1:dYEAgkVhqho3/YKxfOEGdFMIcWfAFNlZX8iAIihYA2E= +github.com/libp2p/go-libp2p-routing-helpers v0.7.0 h1:sirOYVD0wGWjkDwHZvinunIpaqPLBXkcnXApVHwZFGA= +github.com/libp2p/go-libp2p-routing-helpers v0.7.0/go.mod h1:R289GUxUMzRXIbWGSuUUTPrlVJZ3Y/pPz495+qgXJX8= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= diff --git a/provider/README.md b/provider/README.md deleted file mode 100644 index 0e4f4650d..000000000 --- a/provider/README.md +++ /dev/null @@ -1,30 +0,0 @@ -## Usage - -Here's how you create, start, interact with, and stop the provider system: - -```golang -import ( - "context" - "time" - - "github.com/ipfs/boxo/provider" - "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/boxo/provider/simple" -) - -rsys := (your routing system here) -dstore := (your datastore here) -cid := (your cid to provide here) - -q := queue.NewQueue(context.Background(), "example", dstore) - -reprov := simple.NewReprovider(context.Background(), time.Hour * 12, rsys, simple.NewBlockstoreProvider(dstore)) -prov := simple.NewProvider(context.Background(), q, rsys) -sys := provider.NewSystem(prov, reprov) - -sys.Run() - -sys.Provide(cid) - -sys.Close() -``` diff --git a/provider/batched/system.go b/provider/batched/system.go deleted file mode 100644 index 37504a8ce..000000000 --- a/provider/batched/system.go +++ /dev/null @@ -1,420 +0,0 @@ -package batched - -import ( - "context" - "errors" - "fmt" - "strconv" - "sync" - "time" - - provider "github.com/ipfs/boxo/provider" - "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/boxo/provider/simple" - "github.com/ipfs/boxo/verifcid" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - logging "github.com/ipfs/go-log/v2" - "github.com/multiformats/go-multihash" -) - -var log = logging.Logger("provider.batched") - -type BatchProvidingSystem struct { - ctx context.Context - close context.CancelFunc - closewg sync.WaitGroup - - reprovideInterval time.Duration - initalReprovideDelay time.Duration - initialReprovideDelaySet bool - - rsys provideMany - keyProvider simple.KeyChanFunc - - q *queue.Queue - ds datastore.Batching - - reprovideCh chan cid.Cid - - totalProvides, lastReprovideBatchSize int - avgProvideDuration, lastReprovideDuration time.Duration -} - -var _ provider.System = (*BatchProvidingSystem)(nil) - -type provideMany interface { - ProvideMany(ctx context.Context, keys []multihash.Multihash) error - Ready() bool -} - -// Option defines the functional option type that can be used to configure -// BatchProvidingSystem instances -type Option func(system *BatchProvidingSystem) error - -var lastReprovideKey = datastore.NewKey("/provider/reprovide/lastreprovide") - -func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) { - s := &BatchProvidingSystem{ - reprovideInterval: time.Hour * 24, - rsys: provider, - keyProvider: nil, - q: q, - ds: datastore.NewMapDatastore(), - reprovideCh: make(chan cid.Cid), - } - - for _, o := range opts { - if err := o(s); err != nil { - return nil, err - } - } - - // Setup default behavior for the initial reprovide delay - // - // If the reprovide ticker is larger than a minute (likely), - // provide once after we've been up a minute. - // - // Don't provide _immediately_ as we might be just about to stop. - if !s.initialReprovideDelaySet && s.reprovideInterval > time.Minute { - s.initalReprovideDelay = time.Minute - s.initialReprovideDelaySet = true - } - - if s.keyProvider == nil { - s.keyProvider = func(ctx context.Context) (<-chan cid.Cid, error) { - ch := make(chan cid.Cid) - close(ch) - return ch, nil - } - } - - // This is after the options processing so we do not have to worry about leaking a context if there is an - // initialization error processing the options - ctx, cancel := context.WithCancel(context.Background()) - s.ctx = ctx - s.close = cancel - - return s, nil -} - -func Datastore(batching datastore.Batching) Option { - return func(system *BatchProvidingSystem) error { - system.ds = batching - return nil - } -} - -func ReproviderInterval(duration time.Duration) Option { - return func(system *BatchProvidingSystem) error { - system.reprovideInterval = duration - return nil - } -} - -func KeyProvider(fn simple.KeyChanFunc) Option { - return func(system *BatchProvidingSystem) error { - system.keyProvider = fn - return nil - } -} - -func initialReprovideDelay(duration time.Duration) Option { - return func(system *BatchProvidingSystem) error { - system.initialReprovideDelaySet = true - system.initalReprovideDelay = duration - return nil - } -} - -func (s *BatchProvidingSystem) Run() { - // how long we wait between the first provider we hear about and batching up the provides to send out - const pauseDetectionThreshold = time.Millisecond * 500 - // how long we are willing to collect providers for the batch after we receive the first one - const maxCollectionDuration = time.Minute * 10 - - provCh := s.q.Dequeue() - - s.closewg.Add(1) - go func() { - defer s.closewg.Done() - - m := make(map[cid.Cid]struct{}) - - // setup stopped timers - maxCollectionDurationTimer := time.NewTimer(time.Hour) - pauseDetectTimer := time.NewTimer(time.Hour) - stopAndEmptyTimer(maxCollectionDurationTimer) - stopAndEmptyTimer(pauseDetectTimer) - - // make sure timers are cleaned up - defer maxCollectionDurationTimer.Stop() - defer pauseDetectTimer.Stop() - - resetTimersAfterReceivingProvide := func() { - firstProvide := len(m) == 0 - if firstProvide { - // after receiving the first provider start up the timers - maxCollectionDurationTimer.Reset(maxCollectionDuration) - pauseDetectTimer.Reset(pauseDetectionThreshold) - } else { - // otherwise just do a full restart of the pause timer - stopAndEmptyTimer(pauseDetectTimer) - pauseDetectTimer.Reset(pauseDetectionThreshold) - } - } - - for { - performedReprovide := false - - // at the start of every loop the maxCollectionDurationTimer and pauseDetectTimer should be already be - // stopped and have empty channels - loop: - for { - select { - case <-maxCollectionDurationTimer.C: - // if this timer has fired then the pause timer has started so let's stop and empty it - stopAndEmptyTimer(pauseDetectTimer) - break loop - default: - } - - select { - case c := <-provCh: - resetTimersAfterReceivingProvide() - m[c] = struct{}{} - continue - default: - } - - select { - case c := <-provCh: - resetTimersAfterReceivingProvide() - m[c] = struct{}{} - case c := <-s.reprovideCh: - resetTimersAfterReceivingProvide() - m[c] = struct{}{} - performedReprovide = true - case <-pauseDetectTimer.C: - // if this timer has fired then the max collection timer has started so let's stop and empty it - stopAndEmptyTimer(maxCollectionDurationTimer) - break loop - case <-maxCollectionDurationTimer.C: - // if this timer has fired then the pause timer has started so let's stop and empty it - stopAndEmptyTimer(pauseDetectTimer) - break loop - case <-s.ctx.Done(): - return - } - } - - if len(m) == 0 { - continue - } - - keys := make([]multihash.Multihash, 0, len(m)) - for c := range m { - delete(m, c) - - // hash security - if err := verifcid.ValidateCid(c); err != nil { - log.Errorf("insecure hash in reprovider, %s (%s)", c, err) - continue - } - - keys = append(keys, c.Hash()) - } - - // in case after removing all the invalid CIDs there are no valid ones left - if len(keys) == 0 { - continue - } - - for !s.rsys.Ready() { - log.Debugf("reprovider system not ready") - select { - case <-time.After(time.Minute): - case <-s.ctx.Done(): - return - } - } - - log.Debugf("starting provide of %d keys", len(keys)) - start := time.Now() - err := s.rsys.ProvideMany(s.ctx, keys) - if err != nil { - log.Debugf("providing failed %v", err) - continue - } - dur := time.Since(start) - - totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideDuration) - recentAvgProvideDuration := time.Duration(int64(dur) / int64(len(keys))) - s.avgProvideDuration = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) - s.totalProvides += len(keys) - - log.Debugf("finished providing of %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) - - if performedReprovide { - s.lastReprovideBatchSize = len(keys) - s.lastReprovideDuration = dur - - if err := s.ds.Put(s.ctx, lastReprovideKey, storeTime(time.Now())); err != nil { - log.Errorf("could not store last reprovide time: %v", err) - } - if err := s.ds.Sync(s.ctx, lastReprovideKey); err != nil { - log.Errorf("could not perform sync of last reprovide time: %v", err) - } - } - } - }() - - s.closewg.Add(1) - go func() { - defer s.closewg.Done() - - var initialReprovideCh, reprovideCh <-chan time.Time - - // If reproviding is enabled (non-zero) - if s.reprovideInterval > 0 { - reprovideTicker := time.NewTicker(s.reprovideInterval) - defer reprovideTicker.Stop() - reprovideCh = reprovideTicker.C - - // if there is a non-zero initial reprovide time that was set in the initializer or if the fallback has been - if s.initialReprovideDelaySet { - initialReprovideTimer := time.NewTimer(s.initalReprovideDelay) - defer initialReprovideTimer.Stop() - - initialReprovideCh = initialReprovideTimer.C - } - } - - for s.ctx.Err() == nil { - select { - case <-initialReprovideCh: - case <-reprovideCh: - case <-s.ctx.Done(): - return - } - - err := s.reprovide(s.ctx, false) - - // only log if we've hit an actual error, otherwise just tell the client we're shutting down - if s.ctx.Err() == nil && err != nil { - log.Errorf("failed to reprovide: %s", err) - } - } - }() -} - -func stopAndEmptyTimer(t *time.Timer) { - if !t.Stop() { - <-t.C - } -} - -func storeTime(t time.Time) []byte { - val := []byte(fmt.Sprintf("%d", t.UnixNano())) - return val -} - -func parseTime(b []byte) (time.Time, error) { - tns, err := strconv.ParseInt(string(b), 10, 64) - if err != nil { - return time.Time{}, err - } - return time.Unix(0, tns), nil -} - -func (s *BatchProvidingSystem) Close() error { - s.close() - err := s.q.Close() - s.closewg.Wait() - return err -} - -func (s *BatchProvidingSystem) Provide(cid cid.Cid) error { - return s.q.Enqueue(cid) -} - -func (s *BatchProvidingSystem) Reprovide(ctx context.Context) error { - return s.reprovide(ctx, true) -} - -func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error { - if !s.shouldReprovide() && !force { - return nil - } - - kch, err := s.keyProvider(ctx) - if err != nil { - return err - } - -reprovideCidLoop: - for { - select { - case c, ok := <-kch: - if !ok { - break reprovideCidLoop - } - - select { - case s.reprovideCh <- c: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() - } - } - - return nil -} - -func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { - val, err := s.ds.Get(s.ctx, lastReprovideKey) - if errors.Is(err, datastore.ErrNotFound) { - return time.Time{}, nil - } - if err != nil { - return time.Time{}, fmt.Errorf("could not get last reprovide time") - } - - t, err := parseTime(val) - if err != nil { - return time.Time{}, fmt.Errorf("could not decode last reprovide time, got %q", string(val)) - } - - return t, nil -} - -func (s *BatchProvidingSystem) shouldReprovide() bool { - t, err := s.getLastReprovideTime() - if err != nil { - log.Debugf("getting last reprovide time failed: %s", err) - return false - } - - if time.Since(t) < time.Duration(float64(s.reprovideInterval)*0.5) { - return false - } - return true -} - -type BatchedProviderStats struct { - TotalProvides, LastReprovideBatchSize int - AvgProvideDuration, LastReprovideDuration time.Duration -} - -// Stat returns various stats about this provider system -func (s *BatchProvidingSystem) Stat(ctx context.Context) (BatchedProviderStats, error) { - // TODO: Does it matter that there is no locking around the total+average values? - return BatchedProviderStats{ - TotalProvides: s.totalProvides, - LastReprovideBatchSize: s.lastReprovideBatchSize, - AvgProvideDuration: s.avgProvideDuration, - LastReprovideDuration: s.lastReprovideDuration, - }, nil -} diff --git a/provider/batched/system_test.go b/provider/batched/system_test.go deleted file mode 100644 index c8a7d7b84..000000000 --- a/provider/batched/system_test.go +++ /dev/null @@ -1,119 +0,0 @@ -package batched - -import ( - "context" - "strconv" - "sync" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - mh "github.com/multiformats/go-multihash" - - "github.com/ipfs/boxo/internal/test" - q "github.com/ipfs/boxo/provider/queue" -) - -type mockProvideMany struct { - lk sync.Mutex - keys []mh.Multihash -} - -func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { - m.lk.Lock() - defer m.lk.Unlock() - m.keys = keys - return nil -} - -func (m *mockProvideMany) Ready() bool { - return true -} - -func (m *mockProvideMany) GetKeys() []mh.Multihash { - m.lk.Lock() - defer m.lk.Unlock() - return m.keys[:] -} - -var _ provideMany = (*mockProvideMany)(nil) - -func TestBatched(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - provider := &mockProvideMany{} - - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - const numProvides = 100 - keysToProvide := make(map[cid.Cid]int) - for i := 0; i < numProvides; i++ { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - c := cid.NewCidV1(cid.Raw, h) - keysToProvide[c] = i - } - - batchSystem, err := New(provider, queue, KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { - ch := make(chan cid.Cid) - go func() { - for k := range keysToProvide { - select { - case ch <- k: - case <-ctx.Done(): - return - } - } - }() - return ch, nil - }), initialReprovideDelay(0)) - if err != nil { - t.Fatal(err) - } - - batchSystem.Run() - - var keys []mh.Multihash - for { - if ctx.Err() != nil { - t.Fatal("test hung") - } - keys = provider.GetKeys() - if len(keys) != 0 { - break - } - time.Sleep(time.Millisecond * 100) - } - - if len(keys) != numProvides { - t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) - } - - provMap := make(map[string]struct{}) - for _, k := range keys { - provMap[string(k)] = struct{}{} - } - - for i := 0; i < numProvides; i++ { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - if _, found := provMap[string(h)]; !found { - t.Fatalf("could not find provider with value %d", i) - } - } -} diff --git a/provider/queue/queue.go b/provider/internal/queue/queue.go similarity index 50% rename from provider/queue/queue.go rename to provider/internal/queue/queue.go index f4fd73f54..800d3be4e 100644 --- a/provider/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -3,6 +3,7 @@ package queue import ( "context" "fmt" + "sync" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" @@ -21,38 +22,39 @@ var log = logging.Logger("provider.queue") type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider - name string ctx context.Context ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid enqueue chan cid.Cid close context.CancelFunc - closed chan struct{} + closed sync.WaitGroup counter uint64 } // NewQueue creates a queue for cids -func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { - namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) - cancelCtx, cancel := context.WithCancel(ctx) +func NewQueue(ds datastore.Datastore) *Queue { + namespaced := namespace.Wrap(ds, datastore.NewKey("/queue")) + cancelCtx, cancel := context.WithCancel(context.Background()) q := &Queue{ - name: name, ctx: cancelCtx, ds: namespaced, dequeue: make(chan cid.Cid), enqueue: make(chan cid.Cid), close: cancel, - closed: make(chan struct{}, 1), } - q.work() - return q, nil + q.closed.Add(1) + go q.worker() + return q } // Close stops the queue func (q *Queue) Close() error { q.close() - <-q.closed + q.closed.Wait() + // We don't close dequeue because the provider which consume this get caught in + // an infinite loop dequeing cid.Undef if we do that. + // The provider has it's own select on top of dequeue and will handle this by itself. return nil } @@ -71,74 +73,73 @@ func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } -// Run dequeues and enqueues when available. -func (q *Queue) work() { - go func() { - var k datastore.Key = datastore.Key{} - var c cid.Cid = cid.Undef - - defer func() { - // also cancels any in-progess enqueue tasks. - q.close() - // unblocks anyone waiting - close(q.dequeue) - // unblocks the close call - close(q.closed) - }() - - for { - if c == cid.Undef { - head, err := q.getQueueHead() +// worker run dequeues and enqueues when available. +func (q *Queue) worker() { + var k datastore.Key = datastore.Key{} + var c cid.Cid = cid.Undef + + defer q.closed.Done() + defer q.close() + + for { + if c == cid.Undef { + head, err := q.getQueueHead() + switch { + case err != nil: + log.Errorf("error querying for head of queue: %s, stopping provider", err) + return + case head != nil: + k = datastore.NewKey(head.Key) + c, err = cid.Parse(head.Value) if err != nil { - log.Errorf("error querying for head of queue: %s, stopping provider", err) - return - } else if head != nil { - k = datastore.NewKey(head.Key) - c, err = cid.Parse(head.Value) + log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err) + err = q.ds.Delete(q.ctx, k) if err != nil { - log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err) - err = q.ds.Delete(q.ctx, k) - if err != nil { - log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) - return - } - continue + log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) + return } - } else { - c = cid.Undef + continue } + default: + c = cid.Undef } + } - // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue - var dequeue chan cid.Cid - if c != cid.Undef { - dequeue = q.dequeue - } + // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue + var dequeue chan cid.Cid + if c != cid.Undef { + dequeue = q.dequeue + } - select { - case toQueue := <-q.enqueue: - keyPath := fmt.Sprintf("%020d/%s", q.counter, c.String()) - q.counter++ - nextKey := datastore.NewKey(keyPath) + select { + case toQueue := <-q.enqueue: + keyPath := fmt.Sprintf("%020d/%s", q.counter, c.String()) + q.counter++ + nextKey := datastore.NewKey(keyPath) - if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil { - log.Errorf("Failed to enqueue cid: %s", err) - continue - } - case dequeue <- c: - err := q.ds.Delete(q.ctx, k) + if c == cid.Undef { + // fast path, skip rereading the datastore if we don't have anything in hand yet + c = toQueue + k = nextKey + } - if err != nil { - log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) - continue - } - c = cid.Undef - case <-q.ctx.Done(): - return + if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil { + log.Errorf("Failed to enqueue cid: %s", err) + continue } + case dequeue <- c: + err := q.ds.Delete(q.ctx, k) + + if err != nil { + log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) + continue + } + c = cid.Undef + case <-q.ctx.Done(): + return } - }() + } } func (q *Queue) getQueueHead() (*query.Entry, error) { diff --git a/provider/queue/queue_test.go b/provider/internal/queue/queue_test.go similarity index 80% rename from provider/queue/queue_test.go rename to provider/internal/queue/queue_test.go index 9eacf4349..a2d9f0be4 100644 --- a/provider/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -43,10 +43,8 @@ func TestBasicOperation(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) @@ -63,10 +61,8 @@ func TestMangledData(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) for _, c := range cids { @@ -75,7 +71,7 @@ func TestMangledData(t *testing.T) { // put bad data in the queue queueKey := datastore.NewKey("/test/0") - err = queue.ds.Put(ctx, queueKey, []byte("borked")) + err := queue.ds.Put(ctx, queueKey, []byte("borked")) if err != nil { t.Fatal(err) } @@ -91,10 +87,8 @@ func TestInitialization(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) for _, c := range cids { @@ -104,10 +98,8 @@ func TestInitialization(t *testing.T) { assertOrdered(cids[:5], queue, t) // make a new queue, same data - queue, err = NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue = NewQueue(ds) + defer queue.Close() assertOrdered(cids[5:], queue, t) } @@ -118,21 +110,18 @@ func TestInitializationWithManyCids(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) cids := makeCids(25) for _, c := range cids { queue.Enqueue(c) } + queue.Close() + // make a new queue, same data - queue, err = NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue = NewQueue(ds) + defer queue.Close() assertOrdered(cids, queue, t) } diff --git a/provider/noop.go b/provider/noop.go new file mode 100644 index 000000000..5367ccb30 --- /dev/null +++ b/provider/noop.go @@ -0,0 +1,32 @@ +package provider + +import ( + "context" + + "github.com/ipfs/go-cid" +) + +type noopProvider struct{} + +var _ System = (*noopProvider)(nil) + +// NewNoopProvider creates a ProviderSystem that does nothing. +func NewNoopProvider() System { + return &noopProvider{} +} + +func (op *noopProvider) Close() error { + return nil +} + +func (op *noopProvider) Provide(cid.Cid) error { + return nil +} + +func (op *noopProvider) Reprovide(context.Context) error { + return nil +} + +func (op *noopProvider) Stat() (ReproviderStats, error) { + return ReproviderStats{}, nil +} diff --git a/provider/offline.go b/provider/offline.go deleted file mode 100644 index 030a70ab1..000000000 --- a/provider/offline.go +++ /dev/null @@ -1,29 +0,0 @@ -package provider - -import ( - "context" - - "github.com/ipfs/go-cid" -) - -type offlineProvider struct{} - -// NewOfflineProvider creates a ProviderSystem that does nothing -func NewOfflineProvider() System { - return &offlineProvider{} -} - -func (op *offlineProvider) Run() { -} - -func (op *offlineProvider) Close() error { - return nil -} - -func (op *offlineProvider) Provide(cid.Cid) error { - return nil -} - -func (op *offlineProvider) Reprovide(context.Context) error { - return nil -} diff --git a/provider/provider.go b/provider/provider.go index 3b9c6ba3e..a3eff2f04 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -3,25 +3,114 @@ package provider import ( "context" + blocks "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/fetcher" + fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" + pin "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/go-cid" + "github.com/ipfs/go-cidutil" + logging "github.com/ipfs/go-log/v2" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) +var logR = logging.Logger("reprovider.simple") + // Provider announces blocks to the network type Provider interface { - // Run is used to begin processing the provider work - Run() // Provide takes a cid and makes an attempt to announce it to the network Provide(cid.Cid) error - // Close stops the provider - Close() error } // Reprovider reannounces blocks to the network type Reprovider interface { - // Run is used to begin processing the reprovider work and waiting for reprovide triggers - Run() - // Trigger a reprovide - Trigger(context.Context) error - // Close stops the reprovider + // Reprovide starts a new reprovide if one isn't running already. + Reprovide(context.Context) error +} + +// System defines the interface for interacting with the value +// provider system +type System interface { Close() error + Stat() (ReproviderStats, error) + Provider + Reprovider +} + +// KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) + +// NewBlockstoreProvider returns key provider using bstore.AllKeysChan +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +// NewPinnedProvider returns provider supplying pinned keys +func NewPinnedProvider(onlyRoots bool, pinning pin.Pinner, fetchConfig fetcher.Factory) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan cid.Cid) + go func() { + defer close(outCh) + for c := range set.New { + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } + + }() + + return outCh, nil + } +} + +func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { + // FIXME: Listing all pins code is duplicated thrice, twice in Kubo and here, maybe more. + // If this were a method of the [pin.Pinner] life would be easier. + set := cidutil.NewStreamingSet() + + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(set.New) + + for sc := range pinning.DirectKeys(ctx) { + if sc.Err != nil { + logR.Errorf("reprovide direct pins: %s", sc.Err) + return + } + set.Visitor(ctx)(sc.C) + } + + session := fetchConfig.NewSession(ctx) + for sc := range pinning.RecursiveKeys(ctx) { + if sc.Err != nil { + logR.Errorf("reprovide recursive pins: %s", sc.Err) + return + } + set.Visitor(ctx)(sc.C) + if !onlyRoots { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: sc.C}, func(res fetcher.FetchResult) error { + clink, ok := res.LastBlockLink.(cidlink.Link) + if ok { + set.Visitor(ctx)(clink.Cid) + } + return nil + }) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + } + } + }() + + return set, nil } diff --git a/provider/reprovider.go b/provider/reprovider.go new file mode 100644 index 000000000..619bf8196 --- /dev/null +++ b/provider/reprovider.go @@ -0,0 +1,545 @@ +package provider + +import ( + "context" + "errors" + "fmt" + "math" + "strconv" + "sync" + "time" + + "github.com/ipfs/boxo/provider/internal/queue" + "github.com/ipfs/boxo/verifcid" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" + logging "github.com/ipfs/go-log/v2" + "github.com/multiformats/go-multihash" +) + +const ( + // MAGIC: how long we wait before reproviding a key + DefaultReproviderInterval = time.Hour * 22 // https://github.com/ipfs/kubo/pull/9326 + + // MAGIC: If the reprovide ticker is larger than a minute (likely), provide + // once after we've been up a minute. Don't provide _immediately_ as we + // might be just about to stop. + defaultInitialReprovideDelay = time.Minute + + // MAGIC: how long we wait between the first provider we hear about and + // batching up the provides to send out + pauseDetectionThreshold = time.Millisecond * 500 + + // MAGIC: how long we are willing to collect providers for the batch after + // we receive the first one + maxCollectionDuration = time.Minute * 10 +) + +var log = logging.Logger("provider.batched") + +type reprovider struct { + ctx context.Context + close context.CancelFunc + closewg sync.WaitGroup + + reprovideInterval time.Duration + initalReprovideDelay time.Duration + initialReprovideDelaySet bool + + rsys Provide + keyProvider KeyChanFunc + + q *queue.Queue + ds datastore.Batching + + reprovideCh chan cid.Cid + noReprovideInFlight chan struct{} + + maxReprovideBatchSize uint + + statLk sync.Mutex + totalProvides, lastReprovideBatchSize uint64 + avgProvideDuration, lastReprovideDuration time.Duration + + throughputCallback ThroughputCallback + // throughputProvideCurrentCount counts how many provides has been done since the last call to throughputCallback + throughputProvideCurrentCount uint + // throughputDurationSum sums up durations between two calls to the throughputCallback + throughputDurationSum time.Duration + throughputMinimumProvides uint + + keyPrefix datastore.Key +} + +var _ System = (*reprovider)(nil) + +type Provide interface { + Provide(context.Context, cid.Cid, bool) error +} + +type ProvideMany interface { + ProvideMany(ctx context.Context, keys []multihash.Multihash) error +} + +type Ready interface { + Ready() bool +} + +// Option defines the functional option type that can be used to configure +// BatchProvidingSystem instances +type Option func(system *reprovider) error + +var lastReprovideKey = datastore.NewKey("/reprovide/lastreprovide") +var DefaultKeyPrefix = datastore.NewKey("/provider") + +// New creates a new [System]. By default it is offline, that means it will +// enqueue tasks in ds. +// To have it publish records in the network use the [Online] option. +// If provider casts to [ProvideMany] the [ProvideMany.ProvideMany] method will +// be called instead. +// +// If provider casts to [Ready], it will wait until [Ready.Ready] is true. +func New(ds datastore.Batching, opts ...Option) (System, error) { + s := &reprovider{ + reprovideInterval: DefaultReproviderInterval, + maxReprovideBatchSize: math.MaxUint, + keyPrefix: DefaultKeyPrefix, + reprovideCh: make(chan cid.Cid), + noReprovideInFlight: make(chan struct{}), + } + + for _, o := range opts { + if err := o(s); err != nil { + return nil, err + } + } + + // Setup default behavior for the initial reprovide delay + if !s.initialReprovideDelaySet && s.reprovideInterval > defaultInitialReprovideDelay { + s.initalReprovideDelay = defaultInitialReprovideDelay + s.initialReprovideDelaySet = true + } + + if s.keyProvider == nil { + s.keyProvider = func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + close(ch) + return ch, nil + } + } + + s.ds = namespace.Wrap(ds, s.keyPrefix) + s.q = queue.NewQueue(s.ds) + + // This is after the options processing so we do not have to worry about leaking a context if there is an + // initialization error processing the options + ctx, cancel := context.WithCancel(context.Background()) + s.ctx = ctx + s.close = cancel + + if s.rsys != nil { + if _, ok := s.rsys.(ProvideMany); !ok { + s.maxReprovideBatchSize = 1 + } + + s.run() + } + + return s, nil +} + +func ReproviderInterval(duration time.Duration) Option { + return func(system *reprovider) error { + system.reprovideInterval = duration + return nil + } +} + +func KeyProvider(fn KeyChanFunc) Option { + return func(system *reprovider) error { + system.keyProvider = fn + return nil + } +} + +// DatastorePrefix sets a prefix for internal state stored in the Datastore. +// Defaults to [DefaultKeyPrefix]. +func DatastorePrefix(k datastore.Key) Option { + return func(system *reprovider) error { + system.keyPrefix = k + return nil + } +} + +// ThroughputReport will fire the callback synchronously once at least limit +// multihashes have been advertised, it will then wait until a new set of at least +// limit multihashes has been advertised. +// While ThroughputReport is set batches will be at most minimumProvides big. +// If it returns false it wont ever be called again. +func ThroughputReport(f ThroughputCallback, minimumProvides uint) Option { + return func(system *reprovider) error { + system.throughputCallback = f + system.throughputMinimumProvides = minimumProvides + return nil + } +} + +type ThroughputCallback = func(reprovide bool, complete bool, totalKeysProvided uint, totalDuration time.Duration) (continueWatching bool) + +// Online will enable the router and make it send publishes online. +// nil can be used to turn the router offline. +// You can't register multiple providers, if this option is passed multiple times +// it will error. +func Online(rsys Provide) Option { + return func(system *reprovider) error { + if system.rsys != nil { + return fmt.Errorf("trying to register two provider on the same reprovider") + } + system.rsys = rsys + return nil + } +} + +func initialReprovideDelay(duration time.Duration) Option { + return func(system *reprovider) error { + system.initialReprovideDelaySet = true + system.initalReprovideDelay = duration + return nil + } +} + +func (s *reprovider) run() { + provCh := s.q.Dequeue() + + s.closewg.Add(1) + go func() { + defer s.closewg.Done() + + m := make(map[cid.Cid]struct{}) + + // setup stopped timers + maxCollectionDurationTimer := time.NewTimer(time.Hour) + pauseDetectTimer := time.NewTimer(time.Hour) + stopAndEmptyTimer(maxCollectionDurationTimer) + stopAndEmptyTimer(pauseDetectTimer) + + // make sure timers are cleaned up + defer maxCollectionDurationTimer.Stop() + defer pauseDetectTimer.Stop() + + resetTimersAfterReceivingProvide := func() { + firstProvide := len(m) == 0 + if firstProvide { + // after receiving the first provider start up the timers + maxCollectionDurationTimer.Reset(maxCollectionDuration) + pauseDetectTimer.Reset(pauseDetectionThreshold) + } else { + // otherwise just do a full restart of the pause timer + stopAndEmptyTimer(pauseDetectTimer) + pauseDetectTimer.Reset(pauseDetectionThreshold) + } + } + + for { + performedReprovide := false + complete := false + + batchSize := s.maxReprovideBatchSize + if s.throughputCallback != nil && s.throughputMinimumProvides < batchSize { + batchSize = s.throughputMinimumProvides + } + + // at the start of every loop the maxCollectionDurationTimer and pauseDetectTimer should be already be + // stopped and have empty channels + for uint(len(m)) < batchSize { + var noReprovideInFlight chan struct{} + if len(m) == 0 { + noReprovideInFlight = s.noReprovideInFlight + } + + select { + case c := <-provCh: + resetTimersAfterReceivingProvide() + m[c] = struct{}{} + case c := <-s.reprovideCh: + resetTimersAfterReceivingProvide() + m[c] = struct{}{} + performedReprovide = true + case <-pauseDetectTimer.C: + // if this timer has fired then the max collection timer has started so let's stop and empty it + stopAndEmptyTimer(maxCollectionDurationTimer) + complete = true + goto AfterLoop + case <-maxCollectionDurationTimer.C: + // if this timer has fired then the pause timer has started so let's stop and empty it + stopAndEmptyTimer(pauseDetectTimer) + goto AfterLoop + case <-s.ctx.Done(): + return + case noReprovideInFlight <- struct{}{}: + // if no reprovide is in flight get consumer asking for reprovides unstuck + } + } + stopAndEmptyTimer(pauseDetectTimer) + stopAndEmptyTimer(maxCollectionDurationTimer) + AfterLoop: + + if len(m) == 0 { + continue + } + + keys := make([]multihash.Multihash, 0, len(m)) + for c := range m { + delete(m, c) + + // hash security + if err := verifcid.ValidateCid(c); err != nil { + log.Errorf("insecure hash in reprovider, %s (%s)", c, err) + continue + } + + keys = append(keys, c.Hash()) + } + + // in case after removing all the invalid CIDs there are no valid ones left + if len(keys) == 0 { + continue + } + + if r, ok := s.rsys.(Ready); ok { + ticker := time.NewTicker(time.Minute) + for !r.Ready() { + log.Debugf("reprovider system not ready") + select { + case <-ticker.C: + case <-s.ctx.Done(): + return + } + } + ticker.Stop() + } + + log.Debugf("starting provide of %d keys", len(keys)) + start := time.Now() + err := doProvideMany(s.ctx, s.rsys, keys) + if err != nil { + log.Debugf("providing failed %v", err) + continue + } + dur := time.Since(start) + + totalProvideTime := time.Duration(s.totalProvides) * s.avgProvideDuration + recentAvgProvideDuration := dur / time.Duration(len(keys)) + + s.statLk.Lock() + s.avgProvideDuration = time.Duration((totalProvideTime + dur) / (time.Duration(s.totalProvides) + time.Duration(len(keys)))) + s.totalProvides += uint64(len(keys)) + + log.Debugf("finished providing of %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) + + if performedReprovide { + s.lastReprovideBatchSize = uint64(len(keys)) + s.lastReprovideDuration = dur + + s.statLk.Unlock() + + // Don't hold the lock while writing to disk, consumers don't need to wait on IO to read thoses fields. + + if err := s.ds.Put(s.ctx, lastReprovideKey, storeTime(time.Now())); err != nil { + log.Errorf("could not store last reprovide time: %v", err) + } + if err := s.ds.Sync(s.ctx, lastReprovideKey); err != nil { + log.Errorf("could not perform sync of last reprovide time: %v", err) + } + } else { + s.statLk.Unlock() + } + + s.throughputDurationSum += dur + s.throughputProvideCurrentCount += uint(len(keys)) + if s.throughputCallback != nil && s.throughputProvideCurrentCount >= s.throughputMinimumProvides { + if more := s.throughputCallback(performedReprovide, complete, s.throughputProvideCurrentCount, s.throughputDurationSum); !more { + s.throughputCallback = nil + } + s.throughputProvideCurrentCount = 0 + s.throughputDurationSum = 0 + } + } + }() + + s.closewg.Add(1) + go func() { + defer s.closewg.Done() + + var initialReprovideCh, reprovideCh <-chan time.Time + + // If reproviding is enabled (non-zero) + if s.reprovideInterval > 0 { + reprovideTicker := time.NewTicker(s.reprovideInterval) + defer reprovideTicker.Stop() + reprovideCh = reprovideTicker.C + + // if there is a non-zero initial reprovide time that was set in the initializer or if the fallback has been + if s.initialReprovideDelaySet { + initialReprovideTimer := time.NewTimer(s.initalReprovideDelay) + defer initialReprovideTimer.Stop() + + initialReprovideCh = initialReprovideTimer.C + } + } + + for s.ctx.Err() == nil { + select { + case <-initialReprovideCh: + case <-reprovideCh: + case <-s.ctx.Done(): + return + } + + err := s.reprovide(s.ctx, false) + + // only log if we've hit an actual error, otherwise just tell the client we're shutting down + if s.ctx.Err() == nil && err != nil { + log.Errorf("failed to reprovide: %s", err) + } + } + }() +} + +func stopAndEmptyTimer(t *time.Timer) { + if !t.Stop() { + <-t.C + } +} + +func storeTime(t time.Time) []byte { + val := []byte(fmt.Sprintf("%d", t.UnixNano())) + return val +} + +func parseTime(b []byte) (time.Time, error) { + tns, err := strconv.ParseInt(string(b), 10, 64) + if err != nil { + return time.Time{}, err + } + return time.Unix(0, tns), nil +} + +func (s *reprovider) Close() error { + s.close() + err := s.q.Close() + s.closewg.Wait() + return err +} + +func (s *reprovider) Provide(cid cid.Cid) error { + return s.q.Enqueue(cid) +} + +func (s *reprovider) Reprovide(ctx context.Context) error { + return s.reprovide(ctx, true) +} + +func (s *reprovider) reprovide(ctx context.Context, force bool) error { + if !s.shouldReprovide() && !force { + return nil + } + + kch, err := s.keyProvider(ctx) + if err != nil { + return err + } + +reprovideCidLoop: + for { + select { + case c, ok := <-kch: + if !ok { + break reprovideCidLoop + } + + select { + case s.reprovideCh <- c: + case <-ctx.Done(): + return ctx.Err() + case <-s.ctx.Done(): + return errors.New("failed to reprovide: shutting down") + } + case <-ctx.Done(): + return ctx.Err() + case <-s.ctx.Done(): + return errors.New("failed to reprovide: shutting down") + } + } + + // Wait until the underlying operation has completed + select { + case <-s.noReprovideInFlight: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-s.ctx.Done(): + return errors.New("failed to reprovide: shutting down") + } +} + +func (s *reprovider) getLastReprovideTime() (time.Time, error) { + val, err := s.ds.Get(s.ctx, lastReprovideKey) + if errors.Is(err, datastore.ErrNotFound) { + return time.Time{}, nil + } + if err != nil { + return time.Time{}, fmt.Errorf("could not get last reprovide time") + } + + t, err := parseTime(val) + if err != nil { + return time.Time{}, fmt.Errorf("could not decode last reprovide time, got %q", string(val)) + } + + return t, nil +} + +func (s *reprovider) shouldReprovide() bool { + t, err := s.getLastReprovideTime() + if err != nil { + log.Debugf("getting last reprovide time failed: %s", err) + return false + } + + if time.Since(t) < s.reprovideInterval { + return false + } + return true +} + +type ReproviderStats struct { + TotalProvides, LastReprovideBatchSize uint64 + AvgProvideDuration, LastReprovideDuration time.Duration +} + +// Stat returns various stats about this provider system +func (s *reprovider) Stat() (ReproviderStats, error) { + s.statLk.Lock() + defer s.statLk.Unlock() + return ReproviderStats{ + TotalProvides: s.totalProvides, + LastReprovideBatchSize: s.lastReprovideBatchSize, + AvgProvideDuration: s.avgProvideDuration, + LastReprovideDuration: s.lastReprovideDuration, + }, nil +} + +func doProvideMany(ctx context.Context, r Provide, keys []multihash.Multihash) error { + if many, ok := r.(ProvideMany); ok { + return many.ProvideMany(ctx, keys) + } + + for _, k := range keys { + if err := r.Provide(ctx, cid.NewCidV1(cid.Raw, k), true); err != nil { + return err + } + } + return nil +} diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go new file mode 100644 index 000000000..bfb8fc187 --- /dev/null +++ b/provider/reprovider_test.go @@ -0,0 +1,219 @@ +package provider + +import ( + "bytes" + "context" + "runtime" + "strconv" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" +) + +type allFeatures interface { + Provide + ProvideMany + Ready +} + +type mockProvideMany struct { + delay time.Duration + lk sync.Mutex + keys []mh.Multihash + calls uint +} + +func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { + m.lk.Lock() + defer m.lk.Unlock() + m.keys = append(m.keys, keys...) + m.calls++ + time.Sleep(time.Duration(len(keys)) * m.delay) + return nil +} + +func (m *mockProvideMany) Provide(ctx context.Context, key cid.Cid, _ bool) error { + m.lk.Lock() + defer m.lk.Unlock() + m.keys = append(m.keys, key.Hash()) + m.calls++ + time.Sleep(m.delay) + return nil +} + +func (m *mockProvideMany) Ready() bool { + return true +} + +func (m *mockProvideMany) GetKeys() (keys []mh.Multihash, calls uint) { + m.lk.Lock() + defer m.lk.Unlock() + return append([]mh.Multihash(nil), m.keys...), m.calls +} + +var _ allFeatures = (*mockProvideMany)(nil) + +type allButMany interface { + Provide + Ready +} + +type singleMockWrapper struct { + allButMany +} + +func TestReprovider(t *testing.T) { + t.Parallel() + t.Run("many", func(t *testing.T) { + t.Parallel() + testProvider(t, false) + }) + t.Run("single", func(t *testing.T) { + t.Parallel() + testProvider(t, true) + }) +} + +func testProvider(t *testing.T, singleProvide bool) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + + // It has to be so big because the combo of noisy CI runners + OSes that don't + // have scheduler as good as linux's one add a whole lot of jitter. + const provideDelay = time.Millisecond * 5 + orig := &mockProvideMany{ + delay: provideDelay, + } + var provider Provide = orig + if singleProvide { + provider = singleMockWrapper{orig} + } + + const numProvides = 100 + keysToProvide := make([]cid.Cid, numProvides) + for i := range keysToProvide { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + c := cid.NewCidV1(cid.Raw, h) + keysToProvide[i] = c + } + + var keyWait sync.Mutex + keyWait.Lock() + batchSystem, err := New(ds, Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + go func() { + defer keyWait.Unlock() + for _, k := range keysToProvide { + select { + case ch <- k: + case <-ctx.Done(): + return + } + } + }() + return ch, nil + }), + initialReprovideDelay(0), + ThroughputReport(func(_, complete bool, n uint, d time.Duration) bool { + if !singleProvide && complete { + t.Errorf("expected an incomplete report but got a complete one") + } + + const twentyFivePercent = provideDelay / 4 + const seventyFivePercent = provideDelay - twentyFivePercent + const hundredTwentyFivePercent = provideDelay + twentyFivePercent + + avg := d / time.Duration(n) + + // windows's and darwin's schedulers and timers are too unreliable for this check + if runtime.GOOS != "windows" && runtime.GOOS != "darwin" && !(seventyFivePercent <= avg && avg <= hundredTwentyFivePercent) { + t.Errorf("average computed duration is not within bounds, expected between %v and %v but got %v.", seventyFivePercent, hundredTwentyFivePercent, avg) + } + return false + }, numProvides/2), + ) + if err != nil { + t.Fatal(err) + } + defer batchSystem.Close() + + keyWait.Lock() + time.Sleep(pauseDetectionThreshold + time.Millisecond*50) // give it time to call provider after that + + keys, calls := orig.GetKeys() + if len(keys) != numProvides { + t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) + } + if singleProvide { + if calls != 100 { + t.Fatalf("expected 100 call single provide call, got %d", calls) + } + } else { + // Two because of ThroughputReport's limit being half. + if calls != 2 { + t.Fatalf("expected 2 call batched provide call, got %d", calls) + } + } + + provMap := make(map[string]struct{}) + for _, k := range keys { + provMap[string(k)] = struct{}{} + } + + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + if _, found := provMap[string(h)]; !found { + t.Fatalf("could not find provider with value %d", i) + } + } +} + +func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { + // Don't run in Parallel as this test is time sensitive. + + someHash, err := mh.Sum([]byte("Vires in Numeris!"), mh.BLAKE3, -1) + assert.NoError(t, err) + c := cid.NewCidV1(cid.Raw, someHash) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + + // First public using an offline system to enqueue in the datastore. + sys, err := New(ds) + assert.NoError(t, err) + + err = sys.Provide(c) + assert.NoError(t, err) + + err = sys.Close() + assert.NoError(t, err) + + // Secondly restart an online datastore and we want to see this previously provided cid published. + prov := &mockProvideMany{} + sys, err = New(ds, Online(prov), initialReprovideDelay(0)) + assert.NoError(t, err) + + time.Sleep(pauseDetectionThreshold + time.Millisecond*10) // give it time to call provider after that + + err = sys.Close() + assert.NoError(t, err) + + prov.lk.Lock() + defer prov.lk.Unlock() + if len(prov.keys) != 1 { + t.Fatalf("expected to see 1 provide; got %d", len(prov.keys)) + } + if !bytes.Equal(prov.keys[0], someHash) { + t.Fatalf("keys are not equal expected %v, got %v", someHash, prov.keys[0]) + } +} diff --git a/provider/simple/provider.go b/provider/simple/provider.go deleted file mode 100644 index d308f1f63..000000000 --- a/provider/simple/provider.go +++ /dev/null @@ -1,116 +0,0 @@ -// Package simple implements structures and methods to provide blocks, -// keep track of which blocks are provided, and to allow those blocks to -// be reprovided. -package simple - -import ( - "context" - "time" - - q "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p/core/routing" -) - -var logP = logging.Logger("provider.simple") - -// Provider announces blocks to the network -type Provider struct { - ctx context.Context - // the CIDs for which provide announcements should be made - queue *q.Queue - // used to announce providing to the network - contentRouting routing.ContentRouting - // how long to wait for announce to complete before giving up - timeout time.Duration - // how many workers concurrently work through thhe queue - workerLimit int -} - -// Option defines the functional option type that can be used to configure -// provider instances -type Option func(*Provider) - -// WithTimeout is an option to set a timeout on a provider -func WithTimeout(timeout time.Duration) Option { - return func(p *Provider) { - p.timeout = timeout - } -} - -// MaxWorkers is an option to set the max workers on a provider -func MaxWorkers(count int) Option { - return func(p *Provider) { - p.workerLimit = count - } -} - -// NewProvider creates a provider that announces blocks to the network using a content router -func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider { - p := &Provider{ - ctx: ctx, - queue: queue, - contentRouting: contentRouting, - workerLimit: 8, - } - - for _, option := range options { - option(p) - } - - return p -} - -// Close stops the provider -func (p *Provider) Close() error { - return p.queue.Close() -} - -// Run workers to handle provide requests. -func (p *Provider) Run() { - p.handleAnnouncements() -} - -// Provide the given cid using specified strategy. -func (p *Provider) Provide(root cid.Cid) error { - return p.queue.Enqueue(root) -} - -// Handle all outgoing cids by providing (announcing) them -func (p *Provider) handleAnnouncements() { - for workers := 0; workers < p.workerLimit; workers++ { - go func() { - for p.ctx.Err() == nil { - select { - case <-p.ctx.Done(): - return - case c, ok := <-p.queue.Dequeue(): - if !ok { - // queue closed. - return - } - - p.doProvide(c) - } - } - }() - } -} - -func (p *Provider) doProvide(c cid.Cid) { - ctx := p.ctx - if p.timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, p.timeout) - defer cancel() - } else { - ctx = p.ctx - } - - logP.Info("announce - start - ", c) - if err := p.contentRouting.Provide(ctx, c, true); err != nil { - logP.Warnf("Unable to provide entry: %s, %s", c, err) - } - logP.Info("announce - end - ", c) -} diff --git a/provider/simple/provider_test.go b/provider/simple/provider_test.go deleted file mode 100644 index 8734c8ff6..000000000 --- a/provider/simple/provider_test.go +++ /dev/null @@ -1,166 +0,0 @@ -package simple_test - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" - blocksutil "github.com/ipfs/go-ipfs-blocksutil" - "github.com/libp2p/go-libp2p/core/peer" - - "github.com/ipfs/boxo/internal/test" - q "github.com/ipfs/boxo/provider/queue" - - . "github.com/ipfs/boxo/provider/simple" -) - -var blockGenerator = blocksutil.NewBlockGenerator() - -type mockRouting struct { - provided chan cid.Cid -} - -func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { - select { - case r.provided <- cid: - case <-ctx.Done(): - panic("context cancelled, but shouldn't have") - } - return nil -} - -func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan peer.AddrInfo { - return nil -} - -func mockContentRouting() *mockRouting { - r := mockRouting{} - r.provided = make(chan cid.Cid) - return &r -} - -func TestAnnouncement(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r) - prov.Run() - - cids := cid.NewSet() - - for i := 0; i < 100; i++ { - c := blockGenerator.Next().Cid() - cids.Add(c) - } - - go func() { - for _, c := range cids.Keys() { - err = prov.Provide(c) - // A little goroutine stirring to exercise some different states - r := rand.Intn(10) - time.Sleep(time.Microsecond * time.Duration(r)) - } - }() - - for cids.Len() > 0 { - select { - case cp := <-r.provided: - if !cids.Has(cp) { - t.Fatal("Wrong CID provided") - } - cids.Remove(cp) - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for cids to be provided.") - } - } - prov.Close() - - select { - case cp := <-r.provided: - t.Fatal("did not expect to provide CID: ", cp) - case <-time.After(time.Second * 1): - } -} - -func TestClose(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r) - prov.Run() - - prov.Close() - - select { - case cp := <-r.provided: - t.Fatal("did not expect to provide anything, provided: ", cp) - case <-time.After(time.Second * 1): - } -} - -func TestAnnouncementTimeout(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r, WithTimeout(1*time.Second)) - prov.Run() - - cids := cid.NewSet() - - for i := 0; i < 100; i++ { - c := blockGenerator.Next().Cid() - cids.Add(c) - } - - go func() { - for _, c := range cids.Keys() { - err = prov.Provide(c) - // A little goroutine stirring to exercise some different states - r := rand.Intn(10) - time.Sleep(time.Microsecond * time.Duration(r)) - } - }() - - for cids.Len() > 0 { - select { - case cp := <-r.provided: - if !cids.Has(cp) { - t.Fatal("Wrong CID provided") - } - cids.Remove(cp) - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for cids to be provided.") - } - } -} diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go deleted file mode 100644 index d679a2028..000000000 --- a/provider/simple/reprovide.go +++ /dev/null @@ -1,256 +0,0 @@ -package simple - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/cenkalti/backoff" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-cidutil" - logging "github.com/ipfs/go-log/v2" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/libp2p/go-libp2p/core/routing" - - blocks "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/fetcher" - fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" - pin "github.com/ipfs/boxo/pinning/pinner" - "github.com/ipfs/boxo/verifcid" -) - -var logR = logging.Logger("reprovider.simple") - -// ErrClosed is returned by Trigger when operating on a closed reprovider. -var ErrClosed = errors.New("reprovider service stopped") - -// KeyChanFunc is function streaming CIDs to pass to content routing -type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) - -// Reprovider reannounces blocks to the network -type Reprovider struct { - // Reprovider context. Cancel to stop, then wait on closedCh. - ctx context.Context - cancel context.CancelFunc - closedCh chan struct{} - - // Trigger triggers a reprovide. - trigger chan chan<- error - - // The routing system to provide values through - rsys routing.ContentRouting - - keyProvider KeyChanFunc - - tick time.Duration -} - -// NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, reprovideInterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { - ctx, cancel := context.WithCancel(ctx) - return &Reprovider{ - ctx: ctx, - cancel: cancel, - closedCh: make(chan struct{}), - trigger: make(chan chan<- error), - - rsys: rsys, - keyProvider: keyProvider, - tick: reprovideInterval, - } -} - -// Close the reprovider -func (rp *Reprovider) Close() error { - rp.cancel() - <-rp.closedCh - return nil -} - -// Run re-provides keys with 'tick' interval or when triggered -func (rp *Reprovider) Run() { - defer close(rp.closedCh) - - var initialReprovideCh, reprovideCh <-chan time.Time - - // If reproviding is enabled (non-zero) - if rp.tick > 0 { - reprovideTicker := time.NewTicker(rp.tick) - defer reprovideTicker.Stop() - reprovideCh = reprovideTicker.C - - // If the reprovide ticker is larger than a minute (likely), - // provide once after we've been up a minute. - // - // Don't provide _immediately_ as we might be just about to stop. - if rp.tick > time.Minute { - initialReprovideTimer := time.NewTimer(time.Minute) - defer initialReprovideTimer.Stop() - - initialReprovideCh = initialReprovideTimer.C - } - } - - var done chan<- error - for rp.ctx.Err() == nil { - select { - case <-initialReprovideCh: - case <-reprovideCh: - case done = <-rp.trigger: - case <-rp.ctx.Done(): - return - } - - err := rp.Reprovide() - - // only log if we've hit an actual error, otherwise just tell the client we're shutting down - if rp.ctx.Err() != nil { - err = ErrClosed - } else if err != nil { - logR.Errorf("failed to reprovide: %s", err) - } - - if done != nil { - if err != nil { - done <- err - } - close(done) - } - } -} - -// Reprovide registers all keys given by rp.keyProvider to libp2p content routing -func (rp *Reprovider) Reprovide() error { - keychan, err := rp.keyProvider(rp.ctx) - if err != nil { - return fmt.Errorf("failed to get key chan: %s", err) - } - for c := range keychan { - // hash security - if err := verifcid.ValidateCid(c); err != nil { - logR.Errorf("insecure hash in reprovider, %s (%s)", c, err) - continue - } - op := func() error { - err := rp.rsys.Provide(rp.ctx, c, true) - if err != nil { - logR.Debugf("Failed to provide key: %s", err) - } - return err - } - - err := backoff.Retry(op, backoff.WithContext(backoff.NewExponentialBackOff(), rp.ctx)) - if err != nil { - logR.Debugf("Providing failed after number of retries: %s", err) - return err - } - } - return nil -} - -// Trigger starts the reprovision process in rp.Run and waits for it to finish. -// -// Returns an error if a reprovide is already in progress. -func (rp *Reprovider) Trigger(ctx context.Context) error { - resultCh := make(chan error, 1) - select { - case rp.trigger <- resultCh: - default: - return fmt.Errorf("reprovider is already running") - } - - select { - case err := <-resultCh: - return err - case <-rp.ctx.Done(): - return ErrClosed - case <-ctx.Done(): - return ctx.Err() - } -} - -// Strategies - -// NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - return bstore.AllKeysChan(ctx) - } -} - -// Pinner interface defines how the simple.Reprovider wants to interact -// with a Pinning service -type Pinner interface { - DirectKeys(ctx context.Context) <-chan pin.StreamedCid - RecursiveKeys(ctx context.Context) <-chan pin.StreamedCid -} - -// NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(onlyRoots bool, pinning Pinner, fetchConfig fetcher.Factory) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) - if err != nil { - return nil, err - } - - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: - } - } - - }() - - return outCh, nil - } -} - -func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { - set := cidutil.NewStreamingSet() - - go func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(set.New) - - dkeys := pinning.DirectKeys(ctx) - for wrapper := range dkeys { - if wrapper.Err != nil { - logR.Errorf("reprovide direct pins: %s", wrapper.Err) - return - } - set.Visitor(ctx)(wrapper.C) - } - - rkeys := pinning.RecursiveKeys(ctx) - session := fetchConfig.NewSession(ctx) - for wrapper := range rkeys { - if wrapper.Err != nil { - logR.Errorf("reprovide indirect pins: %s", wrapper.Err) - return - } - set.Visitor(ctx)(wrapper.C) - if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: wrapper.C}, func(res fetcher.FetchResult) error { - clink, ok := res.LastBlockLink.(cidlink.Link) - if ok { - set.Visitor(ctx)(clink.Cid) - } - return nil - }) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - } - } - }() - - return set, nil -} diff --git a/provider/simple/reprovide_test.go b/provider/simple/reprovide_test.go deleted file mode 100644 index 6641a3315..000000000 --- a/provider/simple/reprovide_test.go +++ /dev/null @@ -1,313 +0,0 @@ -package simple_test - -import ( - "bytes" - "context" - "testing" - "time" - - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/codec/dagcbor" - "github.com/ipld/go-ipld-prime/fluent/qp" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - testutil "github.com/libp2p/go-libp2p-testing/net" - "github.com/libp2p/go-libp2p/core/peer" - mh "github.com/multiformats/go-multihash" - - bsrv "github.com/ipfs/boxo/blockservice" - blockstore "github.com/ipfs/boxo/blockstore" - offline "github.com/ipfs/boxo/exchange/offline" - bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" - "github.com/ipfs/boxo/internal/test" - pin "github.com/ipfs/boxo/pinning/pinner" - mock "github.com/ipfs/boxo/routing/mock" - - . "github.com/ipfs/boxo/provider/simple" -) - -func setupRouting(t *testing.T) (clA, clB mock.Client, idA, idB peer.ID) { - mrserv := mock.NewServer() - - iidA := testutil.RandIdentityOrFatal(t) - iidB := testutil.RandIdentityOrFatal(t) - - clA = mrserv.Client(iidA) - clB = mrserv.Client(iidB) - - return clA, clB, iidA.ID(), iidB.ID() -} - -func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) { - bstore = blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - for _, data := range []string{"foo", "bar"} { - nb := basicnode.Prototype.Any.NewBuilder() - err := nb.AssignString(data) - if err != nil { - t.Fatal(err) - } - blk := toBlock(t, nb.Build()) - err = bstore.Put(context.Background(), blk) - if err != nil { - t.Fatal(err) - } - nodes = append(nodes, blk.Cid()) - nd, err := qp.BuildMap(basicnode.Prototype.Map, 1, func(ma ipld.MapAssembler) { - qp.MapEntry(ma, "child", qp.Link(cidlink.Link{Cid: blk.Cid()})) - }) - if err != nil { - t.Fatal(err) - } - blk = toBlock(t, nd) - err = bstore.Put(context.Background(), blk) - if err != nil { - t.Fatal(err) - } - nodes = append(nodes, blk.Cid()) - } - - return nodes, bstore -} - -func toBlock(t *testing.T, nd ipld.Node) blocks.Block { - buf := new(bytes.Buffer) - err := dagcbor.Encode(nd, buf) - if err != nil { - t.Fatal(err) - } - c, err := cid.Prefix{ - Version: 1, - Codec: cid.DagCBOR, - MhType: mh.SHA2_256, - MhLength: -1, - }.Sum(buf.Bytes()) - if err != nil { - t.Fatal(err) - } - blk, err := blocks.NewBlockWithCid(buf.Bytes(), c) - if err != nil { - t.Fatal(err) - } - return blk -} - -func TestReprovide(t *testing.T) { - test.Flaky(t) - testReprovide(t, func(r *Reprovider, ctx context.Context) error { - return r.Reprovide() - }) -} - -func TestTrigger(t *testing.T) { - test.Flaky(t) - testReprovide(t, func(r *Reprovider, ctx context.Context) error { - go r.Run() - time.Sleep(1 * time.Second) - defer r.Close() - err := r.Trigger(ctx) - return err - }) -} - -func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context) error) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clA, clB, idA, _ := setupRouting(t) - nodes, bstore := setupDag(t) - - keyProvider := NewBlockstoreProvider(bstore) - reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) - reprov.Trigger(context.Background()) - err := trigger(reprov, ctx) - if err != nil { - t.Fatal(err) - } - - var providers []peer.AddrInfo - maxProvs := 100 - - for _, c := range nodes { - // We provide raw cids because of the multihash keying - // FIXME(@Jorropo): I think this change should be done in the DHT layer, probably an issue with our routing mock. - b := c.Bytes() - b[1] = 0x55 // rewrite the cid to raw - _, c, err := cid.CidFromBytes(b) - if err != nil { - t.Fatal(err) - } - provChan := clB.FindProvidersAsync(ctx, c, maxProvs) - for p := range provChan { - providers = append(providers, p) - } - - if len(providers) == 0 { - t.Fatal("Should have gotten a provider") - } - - if providers[0].ID != idA { - t.Fatal("Somehow got the wrong peer back as a provider.") - } - } -} - -func TestTriggerTwice(t *testing.T) { - test.Flaky(t) - // Ensure we can only trigger once at a time. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clA, _, _, _ := setupRouting(t) - - keyCh := make(chan cid.Cid) - startCh := make(chan struct{}) - keyFunc := func(ctx context.Context) (<-chan cid.Cid, error) { - <-startCh - return keyCh, nil - } - - reprov := NewReprovider(ctx, time.Hour, clA, keyFunc) - go reprov.Run() - defer reprov.Close() - - // Wait for the reprovider to start, otherwise, the reprovider will - // think a concurrent reprovide is running. - // - // We _could_ fix this race... but that would be complexity for nothing. - // 1. We start a reprovide 1 minute after startup anyways. - // 2. The window is really narrow. - time.Sleep(1 * time.Second) - - errCh := make(chan error, 2) - - // Trigger in the background - go func() { - errCh <- reprov.Trigger(ctx) - }() - - // Wait for the trigger to really start. - startCh <- struct{}{} - - start := time.Now() - // Try to trigger again, this should fail immediately. - if err := reprov.Trigger(ctx); err == nil { - t.Fatal("expected an error") - } - if time.Since(start) > 10*time.Millisecond { - t.Fatal("expected reprovide to fail instantly") - } - - // Let the trigger progress. - close(keyCh) - - // Check the result. - err := <-errCh - if err != nil { - t.Fatal(err) - } - - // Try to trigger again, this should work. - go func() { - errCh <- reprov.Trigger(ctx) - }() - startCh <- struct{}{} - err = <-errCh - if err != nil { - t.Fatal(err) - } -} - -type mockPinner struct { - recursive []cid.Cid - direct []cid.Cid -} - -func (mp *mockPinner) DirectKeys(ctx context.Context) <-chan pin.StreamedCid { - out := make(chan pin.StreamedCid) - go func() { - defer close(out) - for _, c := range mp.direct { - select { - case <-ctx.Done(): - return - case out <- pin.StreamedCid{C: c}: - } - } - }() - return out -} - -func (mp *mockPinner) RecursiveKeys(ctx context.Context) <-chan pin.StreamedCid { - out := make(chan pin.StreamedCid) - go func() { - defer close(out) - for _, c := range mp.recursive { - select { - case <-ctx.Done(): - return - case out <- pin.StreamedCid{C: c}: - } - } - }() - return out -} - -func TestReprovidePinned(t *testing.T) { - test.Flaky(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - nodes, bstore := setupDag(t) - - fetchConfig := bsfetcher.NewFetcherConfig(bsrv.New(bstore, offline.Exchange(bstore))) - - for i := 0; i < 2; i++ { - clA, clB, idA, _ := setupRouting(t) - - onlyRoots := i == 0 - t.Logf("only roots: %v", onlyRoots) - - var provide, dont []cid.Cid - if onlyRoots { - provide = []cid.Cid{nodes[1], nodes[3]} - dont = []cid.Cid{nodes[0], nodes[2]} - } else { - provide = []cid.Cid{nodes[0], nodes[1], nodes[3]} - dont = []cid.Cid{nodes[2]} - } - - keyProvider := NewPinnedProvider(onlyRoots, &mockPinner{ - recursive: []cid.Cid{nodes[1]}, - direct: []cid.Cid{nodes[3]}, - }, fetchConfig) - - reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) - err := reprov.Reprovide() - if err != nil { - t.Fatal(err) - } - - for i, c := range provide { - prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) - if !ok { - t.Errorf("Should have gotten a provider for %d", i) - continue - } - - if prov.ID != idA { - t.Errorf("Somehow got the wrong peer back as a provider.") - continue - } - } - for i, c := range dont { - prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) - if ok { - t.Fatalf("found provider %s for %d, expected none", prov.ID, i) - } - } - } -} diff --git a/provider/system.go b/provider/system.go deleted file mode 100644 index 9fc3e8879..000000000 --- a/provider/system.go +++ /dev/null @@ -1,60 +0,0 @@ -package provider - -import ( - "context" - - "github.com/ipfs/go-cid" -) - -// System defines the interface for interacting with the value -// provider system -type System interface { - Run() - Close() error - Provide(cid.Cid) error - Reprovide(context.Context) error -} - -type system struct { - provider Provider - reprovider Reprovider -} - -// NewSystem constructs a new provider system from a provider and reprovider -func NewSystem(provider Provider, reprovider Reprovider) System { - return &system{provider, reprovider} -} - -// Run the provider system by running the provider and reprovider -func (s *system) Run() { - go s.provider.Run() - go s.reprovider.Run() -} - -// Close the provider and reprovider -func (s *system) Close() error { - var errs []error - - if err := s.provider.Close(); err != nil { - errs = append(errs, err) - } - - if err := s.reprovider.Close(); err != nil { - errs = append(errs, err) - } - - if len(errs) > 0 { - return errs[0] - } - return nil -} - -// Provide a value -func (s *system) Provide(cid cid.Cid) error { - return s.provider.Provide(cid) -} - -// Reprovide all the previously provided values -func (s *system) Reprovide(ctx context.Context) error { - return s.reprovider.Trigger(ctx) -}