Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add extended providers support #292

Merged
merged 1 commit into from Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Expand Up @@ -153,6 +153,33 @@ advertise content, see:

* [`engine/example_test.go`](engine/example_test.go)

#### Publishing ads with extended providers

[Extended providers](https://github.com/filecoin-project/storetheindex/blob/main/doc/ingest.md#extendedprovider)
field allows for specification of provider families, in cases where a provider operates multiple PeerIDs, perhaps
with different transport protocols between them, but over the same database of content.

Such ads can be composed manually or using a convenience builder `ExtendedProvidersAdBuilder`.
```

adv, err := ep.NewExtendedProviderAdBuilder(providerID, priv, addrs). // the main ad's providerID, private key and addresses
WithContextID(contextID). // optional context id
WithMetadata(metadata). // optional metadata
WithOverride(override). // override flag, false by default
WithExtendedProviders(extendedProviders). // one or more extended providers to be included in the ad, represented by ExtendedProviderInfo struct
WithLastAdID(lastAdId). // cid of the last published ad, which is false by default
BuildAndSign()

if err != nil {
//...
}

engine.Publish(ctx, *adv)
)
```

> Identity of the main provider will be added to the extended providers list automatically and should not be passed in explicitly

### `provider` CLI

The `provider` CLI can be used to interact with a running daemon via the admin server to perform a
Expand Down Expand Up @@ -243,6 +270,7 @@ advertisement. The cache expansion is logged in `INFO` level at `provider/engine
* [Indexer Node Design](https://www.notion.so/protocollabs/Indexer-Node-Design-4fb94471b6be4352b6849dc9b9527825)
* [Providing data to a network indexer](https://github.com/filecoin-project/storetheindex/blob/main/doc/ingest.md)
* [`storetheindex`](https://github.com/filecoin-project/storetheindex): indexer node implementation
* [`storetheindex` documentation](https://github.com/filecoin-project/storetheindex/blob/main/doc/)
* [`go-indexer-core`](https://github.com/filecoin-project/go-indexer-core): Core index key-value
store

Expand Down
46 changes: 16 additions & 30 deletions engine/engine_test.go
Expand Up @@ -49,7 +49,7 @@ func TestEngine_NotifyRemoveWithUnknownContextIDIsError(t *testing.T) {

func Test_NewEngineWithNoPublisherAndRoot(t *testing.T) {
rng := rand.New(rand.NewSource(1413))
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
mhs := testutil.RandomMultihashes(t, rng, 1)
contextID := []byte("fish")

Expand All @@ -73,7 +73,7 @@ func Test_NewEngineWithNoPublisherAndRoot(t *testing.T) {
}

func TestEngine_PublishLocal(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand All @@ -92,7 +92,7 @@ func TestEngine_PublishLocal(t *testing.T) {
require.NoError(t, err)
wantAd := schema.Advertisement{
Provider: subject.Host().ID().String(),
Addresses: multiAddsToString(subject.Host().Addrs()),
Addresses: testutil.MultiAddsToString(subject.Host().Addrs()),
Entries: chunkLnk,
ContextID: []byte("fish"),
Metadata: mdBytes,
Expand All @@ -111,7 +111,7 @@ func TestEngine_PublishLocal(t *testing.T) {
}

func TestEngine_PublishWithDataTransferPublisher(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestEngine_PublishWithDataTransferPublisher(t *testing.T) {

wantAd := schema.Advertisement{
Provider: subject.Host().ID().String(),
Addresses: multiAddsToString(subject.Host().Addrs()),
Addresses: testutil.MultiAddsToString(subject.Host().Addrs()),
Entries: chunkLnk,
ContextID: wantContextID,
Metadata: mdBytes,
Expand Down Expand Up @@ -311,7 +311,7 @@ func TestEngine_PublishWithDataTransferPublisher(t *testing.T) {
}

func TestEngine_NotifyPutWithoutListerIsError(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
subject, err := engine.New()
require.NoError(t, err)
err = subject.Start(ctx)
Expand All @@ -324,7 +324,7 @@ func TestEngine_NotifyPutWithoutListerIsError(t *testing.T) {
}

func TestEngine_NotifyPutThenNotifyRemove(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestEngine_NotifyPutThenNotifyRemove(t *testing.T) {
}

func TestEngine_NotifyRemoveWithDefaultProvider(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestEngine_NotifyRemoveWithDefaultProvider(t *testing.T) {
}

func TestEngine_NotifyRemoveWithCustomProvider(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestEngine_NotifyRemoveWithCustomProvider(t *testing.T) {
}

func TestEngine_ProducesSingleChainForMultipleProviders(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs1 := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -480,7 +480,7 @@ func TestEngine_ProducesSingleChainForMultipleProviders(t *testing.T) {
}

func TestEngine_NotifyPutUseDefaultProviderAndAddressesWhenNoneGiven(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestEngine_NotifyPutUseDefaultProviderAndAddressesWhenNoneGiven(t *testing.
}

func TestEngine_VerifyErrAlreadyAdvertised(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -544,7 +544,7 @@ func TestEngine_VerifyErrAlreadyAdvertised(t *testing.T) {
}

func TestEngine_ShouldHaveSameChunksInChunkerForSameCIDs(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -610,7 +610,7 @@ func TestEngine_DatastoreBackwardsCompatibilityTest(t *testing.T) {
ma3, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/62695")
pID, _ := peer.Decode("QmPxKFBM2A7VZURXZhZLCpEnhMFtZ7WSZwFLneFEiYneES")

ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
subject, err := engine.New(engine.WithDatastore(ds), engine.WithProvider(peer.AddrInfo{ID: pID, Addrs: []multiaddr.Multiaddr{ma1, ma2, ma3}}))
require.NoError(t, err)
err = subject.Start(ctx)
Expand Down Expand Up @@ -683,30 +683,16 @@ func verifyAd(t *testing.T, ctx context.Context, subject *engine.Engine, expecte
}
}

func contextWithTimeout(t *testing.T) context.Context {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)
return ctx
}

func requireEqualDagsyncMessage(t *testing.T, got, want gossiptopic.Message) {
require.Equal(t, want.Cid, got.Cid)
require.Equal(t, want.ExtraData, got.ExtraData)
wantAddrs, err := want.GetAddrs()
require.NoError(t, err)
gotAddrs, err := got.GetAddrs()
require.NoError(t, err)
wantAddrsStr := multiAddsToString(wantAddrs)
wantAddrsStr := testutil.MultiAddsToString(wantAddrs)
sort.Strings(wantAddrsStr)
gotAddrsStr := multiAddsToString(gotAddrs)
gotAddrsStr := testutil.MultiAddsToString(gotAddrs)
sort.Strings(gotAddrsStr)
require.Equal(t, wantAddrsStr, gotAddrsStr)
}

func multiAddsToString(addrs []multiaddr.Multiaddr) []string {
var rAddrs []string
for _, addr := range addrs {
rAddrs = append(rAddrs, addr.String())
}
return rAddrs
}
6 changes: 3 additions & 3 deletions engine/linksystem_test.go
Expand Up @@ -26,7 +26,7 @@ import (
var testMetadata = metadata.Default.New(metadata.Bitswap{})

func Test_SchemaNoEntriesErr(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)

subject, err := engine.New()
require.NoError(t, err)
Expand All @@ -40,7 +40,7 @@ func Test_SchemaNoEntriesErr(t *testing.T) {

func Test_RemovalAdvertisementWithNoEntriesIsRetrievable(t *testing.T) {
rng := rand.New(rand.NewSource(1413))
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)

subject, err := engine.New()
require.NoError(t, err)
Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_RemovalAdvertisementWithNoEntriesIsRetrievable(t *testing.T) {

func Test_EvictedCachedEntriesChainIsRegeneratedGracefully(t *testing.T) {
rng := rand.New(rand.NewSource(1413))
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)

chunkSize := 2
cacheCap := 1
Expand Down
164 changes: 164 additions & 0 deletions engine/xproviders/xproviders.go
@@ -0,0 +1,164 @@
package xproviders

import (
"errors"

"github.com/filecoin-project/storetheindex/api/v0/ingest/schema"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)

// AdBuilder contains fields required for building and signing of a new ad with Extended Providers
type AdBuilder struct {
ischasny marked this conversation as resolved.
Show resolved Hide resolved
// providerID contains a peer ID of the main provider (the one from the body of the ad)
providerID string
// privKey contains a private key of the main provider
privKey crypto.PrivKey
// addrs contains addresses of the main provider
addrs []string
// providers contains providers' identities, keys, metadata and addresses that will be used as extended providers in the ad.
providers []Info
// contextID contains optional context id
contextID []byte
// contextID contains optional metadata
metadata []byte
// override contains override flag that is false by default
override bool
// lastAdID contains optional last ad cid which is cid.Undef by default
lastAdID cid.Cid
}

// Info contains information about extended provider.
type Info struct {
// ID contains peer ID of the extended provider
ID string
// Metadata contains optional metadata of the extended provider
Metadata []byte
// Addrs contains a list of extended provider's addresses
Addrs []string
// Priv contains a provtae key of the extended provider
Priv crypto.PrivKey
}

// NewAdBuilder creates a new ExtendedProvidersAdBuilder
func NewAdBuilder(providerID peer.ID, privKey crypto.PrivKey, addrs []ma.Multiaddr) *AdBuilder {
pub := &AdBuilder{
providerID: providerID.String(),
privKey: privKey,
addrs: multiaddrsToStrings(addrs),
providers: []Info{},
lastAdID: cid.Undef,
}
return pub
}

// WithContextID sets contextID
func (pub *AdBuilder) WithContextID(contextID []byte) *AdBuilder {
pub.contextID = contextID
return pub
}

// WithMetadata sets metadata
func (pub *AdBuilder) WithMetadata(md []byte) *AdBuilder {
pub.metadata = md
return pub
}

// WithOverride sets override
func (pub *AdBuilder) WithOverride(override bool) *AdBuilder {
pub.override = override
return pub
}

// WithExtendedProviders sets extended providers
func (pub *AdBuilder) WithExtendedProviders(eps ...Info) *AdBuilder {
pub.providers = append(pub.providers, eps...)
return pub
}

// WithLastAdID sets last ad cid
func (pub *AdBuilder) WithLastAdID(lastAdID cid.Cid) *AdBuilder {
pub.lastAdID = lastAdID
return pub
}

// BuildAndSign verifies and signs a new extended provider ad. After that it can be published using engine. Identity of the main provider will be appended to the
// extended provider list automatically.
func (pub *AdBuilder) BuildAndSign() (*schema.Advertisement, error) {
if len(pub.providers) == 0 {
return nil, errors.New("providers list is empty")
}

if len(pub.contextID) == 0 && pub.override {
return nil, errors.New("override is true for empty context")
}

adv := schema.Advertisement{
Provider: pub.providerID,
Entries: schema.NoEntries,
Addresses: pub.addrs,
ContextID: pub.contextID,
Metadata: pub.metadata,
ExtendedProvider: &schema.ExtendedProvider{
Override: pub.override,
},
}

epMap := map[string]Info{}
for _, epInfo := range pub.providers {
adv.ExtendedProvider.Providers = append(adv.ExtendedProvider.Providers, schema.Provider{
ID: epInfo.ID,
Addresses: epInfo.Addrs,
Metadata: epInfo.Metadata,
})
epMap[epInfo.ID] = epInfo
}

// The main provider has to be on the extended list too
adv.ExtendedProvider.Providers = append(adv.ExtendedProvider.Providers, schema.Provider{
ID: pub.providerID,
Addresses: pub.addrs,
Metadata: pub.metadata,
})

if pub.lastAdID != cid.Undef {
prev := ipld.Link(cidlink.Link{Cid: pub.lastAdID})
adv.PreviousID = prev
}

err := adv.SignWithExtendedProviders(pub.privKey, func(provId string) (crypto.PrivKey, error) {
epInfo, ok := epMap[provId]
if !ok {
return nil, errors.New("unknown provider")
}
return epInfo.Priv, nil
})

if err != nil {
return nil, err
}

return &adv, nil
}

// NewInfo allows to create a new ExtendedProviderInfo providing type safety around its fields
func NewInfo(peerID peer.ID, priv crypto.PrivKey, metadata []byte, addrs []ma.Multiaddr) Info {
return Info{
ID: peerID.String(),
Metadata: metadata,
Addrs: multiaddrsToStrings(addrs),
Priv: priv,
}
}

func multiaddrsToStrings(addrs []ma.Multiaddr) []string {
var stringAddrs []string
for _, addr := range addrs {
stringAddrs = append(stringAddrs, addr.String())
}
return stringAddrs
}