Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce first strategic provider: do nothing #6292

Merged
merged 18 commits into from
May 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"sync"

"github.com/ipfs/go-metrics-interface"
"go.uber.org/fx"

"github.com/ipfs/go-ipfs/core/bootstrap"
"github.com/ipfs/go-ipfs/core/node"

"github.com/ipfs/go-metrics-interface"
michaelavila marked this conversation as resolved.
Show resolved Hide resolved
"go.uber.org/fx"
)

type BuildCfg = node.BuildCfg // Alias for compatibility until we properly refactor the constructor interface
Expand Down
2 changes: 1 addition & 1 deletion core/commands/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ Trigger reprovider to announce our data to network.
return ErrNotOnline
}

err = nd.Reprovider.Trigger(req.Context)
err = nd.Provider.Reprovide(req.Context)
if err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/repo"
rp "github.com/ipfs/go-ipfs/reprovide"

bserv "github.com/ipfs/go-blockservice"
bstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -92,8 +91,7 @@ type IpfsNode struct {
Routing routing.IpfsRouting `optional:"true"` // the routing system. recommend ipfs-dht
Exchange exchange.Interface // the block exchange + strategy (bitswap)
Namesys namesys.NameSystem // the name system, resolves paths to hashes
Provider provider.Provider // the value provider system
Reprovider *rp.Reprovider `optional:"true"` // the value reprovider system
Provider provider.System // the value provider system
IpnsRepub *ipnsrp.Republisher `optional:"true"`

AutoNAT *autonat.AutoNATService `optional:"true"`
Expand Down
2 changes: 1 addition & 1 deletion core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type CoreAPI struct {
namesys namesys.NameSystem
routing routing.IpfsRouting

provider provider.Provider
provider provider.System

pubSub *pubsub.PubSub

Expand Down
21 changes: 12 additions & 9 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,18 @@ func Dag(bs blockservice.BlockService) format.DAGService {
}

// OnlineExchange creates new LibP2P backed block exchange (BitSwap)
func OnlineExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.IpfsRouting, bs blockstore.GCBlockstore) exchange.Interface {
bitswapNetwork := network.NewFromIpfsHost(host, rt)
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs)
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return exch.Close()
},
})
return exch
func OnlineExchange(provide bool) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routing.IpfsRouting, bs blockstore.GCBlockstore) exchange.Interface {
bitswapNetwork := network.NewFromIpfsHost(host, rt)
exch := bitswap.New(helpers.LifecycleCtx(mctx, lc), bitswapNetwork, bs, bitswap.ProvideEnabled(provide))
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return exch.Close()
},
})
return exch

}
}

// Files loads persisted MFS root
Expand Down
61 changes: 14 additions & 47 deletions core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (

"github.com/ipfs/go-ipfs/core/node/libp2p"
"github.com/ipfs/go-ipfs/p2p"
"github.com/ipfs/go-ipfs/provider"
"github.com/ipfs/go-ipfs/reprovide"

offline "github.com/ipfs/go-ipfs-exchange-offline"
offroute "github.com/ipfs/go-ipfs-routing/offline"
Expand Down Expand Up @@ -186,42 +184,6 @@ var IPNS = fx.Options(
fx.Provide(RecordValidator),
)

// Providers groups units managing provider routing records
func Providers(cfg *config.Config) fx.Option {
reproviderInterval := kReprovideFrequency
if cfg.Reprovider.Interval != "" {
dur, err := time.ParseDuration(cfg.Reprovider.Interval)
if err != nil {
return fx.Error(err)
}

reproviderInterval = dur
}

var keyProvider fx.Option
switch cfg.Reprovider.Strategy {
case "all":
fallthrough
case "":
keyProvider = fx.Provide(reprovide.NewBlockstoreProvider)
case "roots":
keyProvider = fx.Provide(reprovide.NewPinnedProvider(true))
case "pinned":
keyProvider = fx.Provide(reprovide.NewPinnedProvider(false))
default:
return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", cfg.Reprovider.Strategy))
}

return fx.Options(
fx.Provide(ProviderQueue),
fx.Provide(ProviderCtor),
fx.Provide(ReproviderCtor(reproviderInterval)),
keyProvider,

fx.Invoke(Reprovider),
)
}

// Online groups online-only units
func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {

Expand Down Expand Up @@ -261,26 +223,31 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option {
recordLifetime = d
}

/* don't provide from bitswap when the strategic provider service is active */
shouldBitswapProvide := !cfg.Experimental.StrategicProviding

return fx.Options(
fx.Provide(OnlineExchange),
fx.Provide(OnlineExchange(shouldBitswapProvide)),
fx.Provide(Namesys(ipnsCacheSize)),

fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)),

fx.Provide(p2p.New),

LibP2P(bcfg, cfg),
Providers(cfg),
OnlineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

// Offline groups offline alternatives to Online units
var Offline = fx.Options(
fx.Provide(offline.Exchange),
fx.Provide(Namesys(0)),
fx.Provide(offroute.NewOfflineRouter),
fx.Provide(provider.NewOfflineProvider),
)
func Offline(cfg *config.Config) fx.Option {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
return fx.Options(
fx.Provide(offline.Exchange),
fx.Provide(Namesys(0)),
fx.Provide(offroute.NewOfflineRouter),
OfflineProviders(cfg.Experimental.StrategicProviding, cfg.Reprovider.Strategy, cfg.Reprovider.Interval),
)
}

// Core groups basic IPFS services
var Core = fx.Options(
Expand All @@ -295,7 +262,7 @@ func Networked(bcfg *BuildCfg, cfg *config.Config) fx.Option {
if bcfg.Online {
return Online(bcfg, cfg)
}
return Offline
return Offline(cfg)
}

// IPFS builds a group of fx Options based on the passed BuildCfg
Expand Down
121 changes: 94 additions & 27 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,117 @@ package node

import (
"context"
"fmt"
"time"

"github.com/libp2p/go-libp2p-routing"
"go.uber.org/fx"

"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/provider"
q "github.com/ipfs/go-ipfs/provider/queue"
"github.com/ipfs/go-ipfs/provider/simple"
"github.com/ipfs/go-ipfs/repo"
"github.com/ipfs/go-ipfs/reprovide"
"github.com/libp2p/go-libp2p-routing"
)

const kReprovideFrequency = time.Hour * 12

// SIMPLE

// ProviderQueue creates new datastore backed provider queue
func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*provider.Queue, error) {
return provider.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore())
func ProviderQueue(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (*q.Queue, error) {
return q.NewQueue(helpers.LifecycleCtx(mctx, lc), "provider-v1", repo.Datastore())
}

// SimpleProvider creates new record provider
func SimpleProvider(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *q.Queue, rt routing.IpfsRouting) provider.Provider {
return simple.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt)
}

// SimpleReprovider creates new reprovider
func SimpleReprovider(reproviderInterval time.Duration) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.IpfsRouting, keyProvider simple.KeyChanFunc) (provider.Reprovider, error) {
return simple.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
}
}

// SimpleProviderSys creates new provider system
func SimpleProviderSys(isOnline bool) interface{} {
return func(lc fx.Lifecycle, p provider.Provider, r provider.Reprovider) provider.System {
sys := provider.NewSystem(p, r)

if isOnline {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sys.Run()
return nil
},
OnStop: func(ctx context.Context) error {
return sys.Close()
},
})
}

return sys
}
}

// ProviderCtor creates new record provider
func ProviderCtor(mctx helpers.MetricsCtx, lc fx.Lifecycle, queue *provider.Queue, rt routing.IpfsRouting) provider.Provider {
p := provider.NewProvider(helpers.LifecycleCtx(mctx, lc), queue, rt)

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
p.Run()
return nil
},
OnStop: func(ctx context.Context) error {
return p.Close()
},
})

return p
// ONLINE/OFFLINE

// OnlineProviders groups units managing provider routing records online
func OnlineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}

return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
fx.Provide(SimpleProviderSys(true)),
)
}

// ReproviderCtor creates new reprovider
func ReproviderCtor(reproviderInterval time.Duration) func(helpers.MetricsCtx, fx.Lifecycle, routing.IpfsRouting, reprovide.KeyChanFunc) (*reprovide.Reprovider, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, rt routing.IpfsRouting, keyProvider reprovide.KeyChanFunc) (*reprovide.Reprovider, error) {
return reprovide.NewReprovider(helpers.LifecycleCtx(mctx, lc), reproviderInterval, rt, keyProvider), nil
// OfflineProviders groups units managing provider routing records offline
func OfflineProviders(useStrategicProviding bool, reprovideStrategy string, reprovideInterval string) fx.Option {
if useStrategicProviding {
return fx.Provide(provider.NewOfflineProvider)
}

return fx.Options(
SimpleProviders(reprovideStrategy, reprovideInterval),
fx.Provide(SimpleProviderSys(false)),
)
}

// Reprovider runs the reprovider service
func Reprovider(lp lcProcess, reprovider *reprovide.Reprovider) error {
lp.Append(reprovider.Run)
return nil
// SimpleProviders creates the simple provider/reprovider dependencies
func SimpleProviders(reprovideStrategy string, reprovideInterval string) fx.Option {
reproviderInterval := kReprovideFrequency
if reprovideInterval != "" {
dur, err := time.ParseDuration(reprovideInterval)
if err != nil {
return fx.Error(err)
}

reproviderInterval = dur
}

var keyProvider fx.Option
switch reprovideStrategy {
case "all":
fallthrough
case "":
keyProvider = fx.Provide(simple.NewBlockstoreProvider)
case "roots":
keyProvider = fx.Provide(simple.NewPinnedProvider(true))
case "pinned":
keyProvider = fx.Provide(simple.NewPinnedProvider(false))
default:
return fx.Error(fmt.Errorf("unknown reprovider strategy '%s'", reprovideStrategy))
}

return fx.Options(
fx.Provide(ProviderQueue),
fx.Provide(SimpleProvider),
keyProvider,
fx.Provide(SimpleReprovider(reproviderInterval)),
)
}
29 changes: 29 additions & 0 deletions docs/experimental-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ the above issue.
- [QUIC](#quic)
- [AutoRelay](#autorelay)
- [TLS 1.3 Handshake](#tls-13-as-default-handshake-protocol)
- [Strategic Providing](#strategic-providing)

---

Expand Down Expand Up @@ -674,3 +675,31 @@ ipfs config --json Experimental.PreferTLS true

- [ ] needs testing
- [ ] needs adoption

---

## Strategic Providing

### State

Experimental, disabled by default.

Replaces the existing provide mechanism with a robust, strategic provider system.

### How to enable

Modify your ipfs config:

```
ipfs config --json Experimental.StrategicProviding true
```

### Road to being a real feature

- [ ] needs real world testing
- [ ] needs adoption
- [ ] needs to support all providing features
- [X] provide nothing
- [ ] provide roots
- [ ] provide all
- [ ] provide strategic
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-cmds v0.0.8
github.com/ipfs/go-ipfs-config v0.0.3
github.com/ipfs/go-ipfs-config v0.0.4
github.com/ipfs/go-ipfs-ds-help v0.0.1
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
Expand Down Expand Up @@ -122,6 +122,7 @@ require (
go.uber.org/multierr v1.1.0 // indirect
go4.org v0.0.0-20190313082347-94abd6928b1d // indirect
golang.org/x/sys v0.0.0-20190522044717-8097e1b27ff5
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/cheggaaa/pb.v1 v1.0.28
gotest.tools/gotestsum v0.3.4
)
Expand Down
Loading