Skip to content

Commit

Permalink
Add extended providers builder
Browse files Browse the repository at this point in the history
xproviders.AdBuilder allows to easily create, verify and sign extended provider ads that then can be published using Engine
  • Loading branch information
ischasny committed Nov 14, 2022
1 parent 8ddbc29 commit 5afd290
Show file tree
Hide file tree
Showing 10 changed files with 397 additions and 75 deletions.
28 changes: 28 additions & 0 deletions README.md
Expand Up @@ -153,6 +153,33 @@ advertise content, see:

* [`engine/example_test.go`](engine/example_test.go)

#### Publishing ads with extended providers

[Extended providers](https://github.com/filecoin-project/storetheindex/blob/main/doc/ingest.md#extendedprovider)
field allows for specification of provider families, in cases where a provider operates multiple PeerIDs, perhaps
with different transport protocols between them, but over the same database of content.

Such ads can be composed manually or using a convenience builder `ExtendedProvidersAdBuilder`.
```
adv, err := ep.NewExtendedProviderAdBuilder(providerID, priv, addrs). // the main ad's providerID, private key and addresses
WithContextID(contextID). // optional context id
WithMetadata(metadata). // optional metadata
WithOverride(override). // override flag, false by default
WithExtendedProviders(extendedProviders). // one or more extended providers to be included in the ad, represented by ExtendedProviderInfo struct
WithLastAdID(lastAdId). // cid of the last published ad, which is false by default
BuildAndSign()
if err != nil {
//...
}
engine.Publish(ctx, *adv)
)
```

> Identity of the main provider will be added to the extended providers list automatically and should not be passed in explicitly
### `provider` CLI

The `provider` CLI can be used to interact with a running daemon via the admin server to perform a
Expand Down Expand Up @@ -243,6 +270,7 @@ advertisement. The cache expansion is logged in `INFO` level at `provider/engine
* [Indexer Node Design](https://www.notion.so/protocollabs/Indexer-Node-Design-4fb94471b6be4352b6849dc9b9527825)
* [Providing data to a network indexer](https://github.com/filecoin-project/storetheindex/blob/main/doc/ingest.md)
* [`storetheindex`](https://github.com/filecoin-project/storetheindex): indexer node implementation
* [`storetheindex` documentation](https://github.com/filecoin-project/storetheindex/blob/main/doc/)
* [`go-indexer-core`](https://github.com/filecoin-project/go-indexer-core): Core index key-value
store

Expand Down
46 changes: 16 additions & 30 deletions engine/engine_test.go
Expand Up @@ -49,7 +49,7 @@ func TestEngine_NotifyRemoveWithUnknownContextIDIsError(t *testing.T) {

func Test_NewEngineWithNoPublisherAndRoot(t *testing.T) {
rng := rand.New(rand.NewSource(1413))
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
mhs := testutil.RandomMultihashes(t, rng, 1)
contextID := []byte("fish")

Expand All @@ -73,7 +73,7 @@ func Test_NewEngineWithNoPublisherAndRoot(t *testing.T) {
}

func TestEngine_PublishLocal(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand All @@ -92,7 +92,7 @@ func TestEngine_PublishLocal(t *testing.T) {
require.NoError(t, err)
wantAd := schema.Advertisement{
Provider: subject.Host().ID().String(),
Addresses: multiAddsToString(subject.Host().Addrs()),
Addresses: testutil.MultiAddsToString(subject.Host().Addrs()),
Entries: chunkLnk,
ContextID: []byte("fish"),
Metadata: mdBytes,
Expand All @@ -111,7 +111,7 @@ func TestEngine_PublishLocal(t *testing.T) {
}

func TestEngine_PublishWithDataTransferPublisher(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -240,7 +240,7 @@ func TestEngine_PublishWithDataTransferPublisher(t *testing.T) {

wantAd := schema.Advertisement{
Provider: subject.Host().ID().String(),
Addresses: multiAddsToString(subject.Host().Addrs()),
Addresses: testutil.MultiAddsToString(subject.Host().Addrs()),
Entries: chunkLnk,
ContextID: wantContextID,
Metadata: mdBytes,
Expand Down Expand Up @@ -311,7 +311,7 @@ func TestEngine_PublishWithDataTransferPublisher(t *testing.T) {
}

func TestEngine_NotifyPutWithoutListerIsError(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
subject, err := engine.New()
require.NoError(t, err)
err = subject.Start(ctx)
Expand All @@ -324,7 +324,7 @@ func TestEngine_NotifyPutWithoutListerIsError(t *testing.T) {
}

func TestEngine_NotifyPutThenNotifyRemove(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -362,7 +362,7 @@ func TestEngine_NotifyPutThenNotifyRemove(t *testing.T) {
}

func TestEngine_NotifyRemoveWithDefaultProvider(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestEngine_NotifyRemoveWithDefaultProvider(t *testing.T) {
}

func TestEngine_NotifyRemoveWithCustomProvider(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestEngine_NotifyRemoveWithCustomProvider(t *testing.T) {
}

func TestEngine_ProducesSingleChainForMultipleProviders(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs1 := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -480,7 +480,7 @@ func TestEngine_ProducesSingleChainForMultipleProviders(t *testing.T) {
}

func TestEngine_NotifyPutUseDefaultProviderAndAddressesWhenNoneGiven(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestEngine_NotifyPutUseDefaultProviderAndAddressesWhenNoneGiven(t *testing.
}

func TestEngine_VerifyErrAlreadyAdvertised(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -544,7 +544,7 @@ func TestEngine_VerifyErrAlreadyAdvertised(t *testing.T) {
}

func TestEngine_ShouldHaveSameChunksInChunkerForSameCIDs(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
rng := rand.New(rand.NewSource(1413))

mhs := testutil.RandomMultihashes(t, rng, 42)
Expand Down Expand Up @@ -610,7 +610,7 @@ func TestEngine_DatastoreBackwardsCompatibilityTest(t *testing.T) {
ma3, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/62695")
pID, _ := peer.Decode("QmPxKFBM2A7VZURXZhZLCpEnhMFtZ7WSZwFLneFEiYneES")

ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)
subject, err := engine.New(engine.WithDatastore(ds), engine.WithProvider(peer.AddrInfo{ID: pID, Addrs: []multiaddr.Multiaddr{ma1, ma2, ma3}}))
require.NoError(t, err)
err = subject.Start(ctx)
Expand Down Expand Up @@ -683,30 +683,16 @@ func verifyAd(t *testing.T, ctx context.Context, subject *engine.Engine, expecte
}
}

func contextWithTimeout(t *testing.T) context.Context {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)
return ctx
}

func requireEqualDagsyncMessage(t *testing.T, got, want gossiptopic.Message) {
require.Equal(t, want.Cid, got.Cid)
require.Equal(t, want.ExtraData, got.ExtraData)
wantAddrs, err := want.GetAddrs()
require.NoError(t, err)
gotAddrs, err := got.GetAddrs()
require.NoError(t, err)
wantAddrsStr := multiAddsToString(wantAddrs)
wantAddrsStr := testutil.MultiAddsToString(wantAddrs)
sort.Strings(wantAddrsStr)
gotAddrsStr := multiAddsToString(gotAddrs)
gotAddrsStr := testutil.MultiAddsToString(gotAddrs)
sort.Strings(gotAddrsStr)
require.Equal(t, wantAddrsStr, gotAddrsStr)
}

func multiAddsToString(addrs []multiaddr.Multiaddr) []string {
var rAddrs []string
for _, addr := range addrs {
rAddrs = append(rAddrs, addr.String())
}
return rAddrs
}
6 changes: 3 additions & 3 deletions engine/linksystem_test.go
Expand Up @@ -26,7 +26,7 @@ import (
var testMetadata = metadata.Default.New(metadata.Bitswap{})

func Test_SchemaNoEntriesErr(t *testing.T) {
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)

subject, err := engine.New()
require.NoError(t, err)
Expand All @@ -40,7 +40,7 @@ func Test_SchemaNoEntriesErr(t *testing.T) {

func Test_RemovalAdvertisementWithNoEntriesIsRetrievable(t *testing.T) {
rng := rand.New(rand.NewSource(1413))
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)

subject, err := engine.New()
require.NoError(t, err)
Expand Down Expand Up @@ -100,7 +100,7 @@ func Test_RemovalAdvertisementWithNoEntriesIsRetrievable(t *testing.T) {

func Test_EvictedCachedEntriesChainIsRegeneratedGracefully(t *testing.T) {
rng := rand.New(rand.NewSource(1413))
ctx := contextWithTimeout(t)
ctx := testutil.ContextWithTimeout(t)

chunkSize := 2
cacheCap := 1
Expand Down
164 changes: 164 additions & 0 deletions engine/xproviders/xproviders.go
@@ -0,0 +1,164 @@
package xproviders

import (
"errors"

"github.com/filecoin-project/storetheindex/api/v0/ingest/schema"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
)

// AdBuilder contains fields required for building and signing of a new ad with Extended Providers
type AdBuilder struct {
// providerID contains a peer ID of the main provider (the one from the body of the ad)
providerID string
// privKey contains a private key of the main provider
privKey crypto.PrivKey
// addrs contains addresses of the main provider
addrs []string
// providers contains providers' identities, keys, metadata and addresses that will be used as extended providers in the ad.
providers []Info
// contextID contains optional context id
contextID []byte
// contextID contains optional metadata
metadata []byte
// override contains override flag that is false by default
override bool
// lastAdID contains optional last ad cid which is cid.Undef by default
lastAdID cid.Cid
}

// Info contains information about extended provider.
type Info struct {
// ID contains peer ID of the extended provider
ID string
// Metadata contains optional metadata of the extended provider
Metadata []byte
// Addrs contains a list of extended provider's addresses
Addrs []string
// Priv contains a provtae key of the extended provider
Priv crypto.PrivKey
}

// NewAdBuilder creates a new ExtendedProvidersAdBuilder
func NewAdBuilder(providerID peer.ID, privKey crypto.PrivKey, addrs []ma.Multiaddr) *AdBuilder {
pub := &AdBuilder{
providerID: providerID.String(),
privKey: privKey,
addrs: multiaddrsToStrings(addrs),
providers: []Info{},
lastAdID: cid.Undef,
}
return pub
}

// WithContextID sets contextID
func (pub *AdBuilder) WithContextID(contextID []byte) *AdBuilder {
pub.contextID = contextID
return pub
}

// WithMetadata sets metadata
func (pub *AdBuilder) WithMetadata(md []byte) *AdBuilder {
pub.metadata = md
return pub
}

// WithOverride sets override
func (pub *AdBuilder) WithOverride(override bool) *AdBuilder {
pub.override = override
return pub
}

// WithExtendedProviders sets extended providers
func (pub *AdBuilder) WithExtendedProviders(eps ...Info) *AdBuilder {
pub.providers = append(pub.providers, eps...)
return pub
}

// WithLastAdID sets last ad cid
func (pub *AdBuilder) WithLastAdID(lastAdID cid.Cid) *AdBuilder {
pub.lastAdID = lastAdID
return pub
}

// BuildAndSign verifies and signs a new extended provider ad. After that it can be published using engine. Identity of the main provider will be appended to the
// extended provider list automatically.
func (pub *AdBuilder) BuildAndSign() (*schema.Advertisement, error) {
if len(pub.providers) == 0 {
return nil, errors.New("providers list is empty")
}

if len(pub.contextID) == 0 && pub.override {
return nil, errors.New("override is true for empty context")
}

adv := schema.Advertisement{
Provider: pub.providerID,
Entries: schema.NoEntries,
Addresses: pub.addrs,
ContextID: pub.contextID,
Metadata: pub.metadata,
ExtendedProvider: &schema.ExtendedProvider{
Override: pub.override,
},
}

epMap := map[string]Info{}
for _, epInfo := range pub.providers {
adv.ExtendedProvider.Providers = append(adv.ExtendedProvider.Providers, schema.Provider{
ID: epInfo.ID,
Addresses: epInfo.Addrs,
Metadata: epInfo.Metadata,
})
epMap[epInfo.ID] = epInfo
}

// The main provider has to be on the extended list too
adv.ExtendedProvider.Providers = append(adv.ExtendedProvider.Providers, schema.Provider{
ID: pub.providerID,
Addresses: pub.addrs,
Metadata: pub.metadata,
})

if pub.lastAdID != cid.Undef {
prev := ipld.Link(cidlink.Link{Cid: pub.lastAdID})
adv.PreviousID = prev
}

err := adv.SignWithExtendedProviders(pub.privKey, func(provId string) (crypto.PrivKey, error) {
epInfo, ok := epMap[provId]
if !ok {
return nil, errors.New("unknown provider")
}
return epInfo.Priv, nil
})

if err != nil {
return nil, err
}

return &adv, nil
}

// NewInfo allows to create a new ExtendedProviderInfo providing type safety around its fields
func NewInfo(peerID peer.ID, priv crypto.PrivKey, metadata []byte, addrs []ma.Multiaddr) Info {
return Info{
ID: peerID.String(),
Metadata: metadata,
Addrs: multiaddrsToStrings(addrs),
Priv: priv,
}
}

func multiaddrsToStrings(addrs []ma.Multiaddr) []string {
var stringAddrs []string
for _, addr := range addrs {
stringAddrs = append(stringAddrs, addr.String())
}
return stringAddrs
}

0 comments on commit 5afd290

Please sign in to comment.