Skip to content

Commit

Permalink
Remove support for publishing ads via data-transfer/graphsync (#429)
Browse files Browse the repository at this point in the history
* Remove support for publishing ads via data-transfer/graphsync
* Update to release v0.6.0 of go-libipni
* minor code cleanup
  • Loading branch information
gammazero committed Feb 1, 2024
1 parent 9e4358c commit f6b2259
Show file tree
Hide file tree
Showing 18 changed files with 180 additions and 142 deletions.
1 change: 0 additions & 1 deletion cmd/provider/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func daemonCommand(cctx *cli.Context) error {
// Starting provider core
eng, err := engine.New(
engine.WithDatastore(ds),
engine.WithDataTransfer(dt),
engine.WithDirectAnnounce(cfg.DirectAnnounce.URLs...),
engine.WithHost(h),
engine.WithEntriesCacheCapacity(cfg.Ingest.LinkCacheSize),
Expand Down
29 changes: 13 additions & 16 deletions cmd/provider/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,22 @@ Usage:
USAGE:
provider [global options] command [command options] [arguments...]
VERSION:
v0.0.0+unknown
COMMANDS:
daemon Starts a reference provider
find Query an indexer for indexed content
index Push a single content index into an indexer
init Initialize reference provider config file and identity
connect Connects to an indexer through its multiaddr
import, i Imports sources of multihashes to the index provider.
register Register provider information with an indexer that trusts the provider
remove, rm Removes previously advertised multihashes by the provider.
verify-ingest, vi Verifies ingestion of multihashes to an indexer node from a Lotus miner, CAR file or a CARv2 Index
list Lists advertisements
help, h Shows a list of commands or help for one command
announce Publish an announcement message for the latest advertisement
announce-http Publish an announcement message for the latest advertisement to a specific indexer via http
connect Connects to an indexer through its multiaddr
daemon Starts a reference provider
import, i Imports sources of multihashes to the index provider.
index Push a single content index into an indexer
init Initialize reference provider config file and identity
list, ls List local paths to data
remove, rm Removes previously advertised multihashes by the provider.
mirror Mirrors the advertisement chain from an existing index provider.
help, h Shows a list of commands or help for one command
GLOBAL OPTIONS:
--help, -h show help (default: false)
--version, -v print the version (default: false)
--help, -h show help
--version, -v print the version
To run a provider daemon it must first be initialized. To initialize the provider, execute:
Expand Down
4 changes: 2 additions & 2 deletions cmd/provider/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var InitCmd = &cli.Command{
var initFlags = []cli.Flag{
&cli.StringFlag{
Name: "pubkind",
Usage: "Set publisher king in config. Must be one of 'http', 'libp2p', 'libp2phttp', 'dtsync'",
Usage: "Set publisher kind in config. Must be one of 'http', 'libp2p', 'libp2phttp'",
Value: "libp2p",
},
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func initCommand(cctx *cli.Context) error {
switch pubkind {
case "":
pubkind = config.Libp2pPublisherKind
case config.Libp2pPublisherKind, config.HttpPublisherKind, config.Libp2pHttpPublisherKind, config.DTSyncPublisherKind:
case config.Libp2pPublisherKind, config.HttpPublisherKind, config.Libp2pHttpPublisherKind:
default:
return fmt.Errorf("unknown publisher kind: %s", pubkind)
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/provider/internal/config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const (
type PublisherKind string

const (
DTSyncPublisherKind PublisherKind = "dtsync"
HttpPublisherKind PublisherKind = "http"
Libp2pPublisherKind PublisherKind = "libp2p"
Libp2pHttpPublisherKind PublisherKind = "libp2phttp"
Expand All @@ -37,9 +36,11 @@ type Ingest struct {
HttpPublisher HttpPublisher

// PublisherKind specifies which dagsync.Publisher implementation to use.
// When set to "http", the publisher serves plain HTTP and libp2phttp.
// Libp2phttp is disabled by setting HttpPublisher.NoLibp2p to true, and
// plain HTTP is disabled by setting HttpPublisher.ListenMultiaddr to "".
// When set to "http", the publisher serves plain HTTP. When set to
// "libp2p" the publisher serves HTTP over libp2p. When set to
// "libp2phttp", the publisher serves both plain HTTP and HTTP over libp2p.
//
// Plain HTTP is disabled if HttpPublisher.ListenMultiaddr is set to "".
PublisherKind PublisherKind

// SyncPolicy configures which indexers are allowed to sync advertisements
Expand Down
7 changes: 3 additions & 4 deletions delegatedrouting/cid_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ func (cq *cidQueue) recordCidNode(node *cidNode) *list.Element {
listElem.Value.(*cidNode).Timestamp = node.Timestamp
cq.nodesLl.MoveToFront(listElem)
return listElem
} else {
listElem := cq.nodesLl.PushFront(node)
cq.listNodeByCid[node.C] = listElem
return listElem
}
listElem := cq.nodesLl.PushFront(node)
cq.listNodeByCid[node.C] = listElem
return listElem
}

func (cq *cidQueue) removeCidNode(c cid.Cid) {
Expand Down
12 changes: 5 additions & 7 deletions delegatedrouting/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,11 @@ func (listener *Listener) FindProviders(ctx context.Context, key cid.Cid, limit
}

func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) {

Check failure on line 296 in delegatedrouting/listener.go

View workflow job for this annotation

GitHub Actions / go-check / All

server.BitswapWriteProvideRequest is deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: (SA1019)
const printFrequency = 10_000
cids := req.Keys
pid := req.ID
paddrs := req.Addrs
startTime := time.Now()
printFrequency := 10_000
listener.lock.Lock()
defer func() {
listener.stats.incDelegatedRoutingCallsProcessed()
Expand Down Expand Up @@ -326,12 +326,10 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa
listener.lastSeenProviderInfo.ID = pid
listener.lastSeenProviderInfo.Addrs = paddrs

timestamp := time.Now()
for i, c := range cids {

// persisting timestamp only if this is not a snapshot
if len(cids) < listener.snapshotSize {
err := listener.dsWrapper.recordCidTimestamp(ctx, c, timestamp)
err := listener.dsWrapper.recordCidTimestamp(ctx, c, startTime)
if err != nil {
log.Errorw("Error persisting timestamp. Continuing.", "cid", c, "err", err)
continue
Expand All @@ -342,7 +340,7 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa
if listElem == nil {
listener.cidQueue.recordCidNode(&cidNode{
C: c,
Timestamp: timestamp,
Timestamp: startTime,
})
err := listener.chunker.addCidToCurrentChunk(ctx, c, func(cc *cidsChunk) error {
return listener.notifyPutAndPersist(ctx, cc)
Expand All @@ -354,7 +352,7 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa
}
} else {
node := listElem.Value.(*cidNode)
node.Timestamp = timestamp
node.Timestamp = startTime
listener.cidQueue.recordCidNode(node)
// if no existing chunk has been found for the cid - adding it to the current one
// This can happen in the following cases:
Expand Down Expand Up @@ -393,12 +391,12 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa

// Revise logic here
func (listener *Listener) removeExpiredCids(ctx context.Context) (bool, error) {
const printFrequency = 100
lastElem := listener.cidQueue.nodesLl.Back()
currentTime := time.Now()
chunksToRemove := make(map[string]*cidsChunk)
cidsToRemove := make(map[cid.Cid]struct{})
removedSomeCids := false
printFrequency := 100
var cidsRemoved, chunksRemoved, chunksReplaced int
// find expired cids and their respective chunks
for {
Expand Down
2 changes: 1 addition & 1 deletion delegatedrouting/listener_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestShouldProcessMillionCIDsInThirtySeconds(t *testing.T) {
pID, priv, _ := test.RandomIdentity()
ctx := context.Background()

engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher))
engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher))
require.NoError(t, err)
err = engine.Start(ctx)
defer engine.Shutdown()
Expand Down
10 changes: 5 additions & 5 deletions delegatedrouting/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestProvideRoundtrip(t *testing.T) {

ctx := context.Background()

engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher))
engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher))
require.NoError(t, err)
err = engine.Start(ctx)
defer engine.Shutdown()
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestProvideRoundtripWithRemove(t *testing.T) {
pID, priv, _ := test.RandomIdentity()
ctx := context.Background()

engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher))
engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher))
require.NoError(t, err)
err = engine.Start(ctx)
defer engine.Shutdown()
Expand Down Expand Up @@ -1178,7 +1178,7 @@ func TestShouldSplitSnapshotIntoMultipleChunksAndReadThemBack(t *testing.T) {
pID, priv, _ := test.RandomIdentity()
ctx := context.Background()

engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher))
engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher))
require.NoError(t, err)
err = engine.Start(ctx)
defer engine.Shutdown()
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func TestShouldCleanUpOldSnapshotChunksAfterStoringNewOnes(t *testing.T) {
pID, priv, _ := test.RandomIdentity()
ctx := context.Background()

engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher))
engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher))
require.NoError(t, err)
err = engine.Start(ctx)
defer engine.Shutdown()
Expand Down Expand Up @@ -1290,7 +1290,7 @@ func TestShouldRecogniseLegacySnapshot(t *testing.T) {
pID, priv, _ := test.RandomIdentity()
ctx := context.Background()

engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher))
engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher))
require.NoError(t, err)
err = engine.Start(ctx)
defer engine.Shutdown()
Expand Down
2 changes: 1 addition & 1 deletion doc/publisher-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The index-provider engine must be configured to use a libp2p publisher, HTTP pub
- `Libp2pHttpPublisher` serves advertisements using both HTTP and libp2p servers.
- `DataTransferPublisher` exposes a data-transfer/graphsync server that allows peers in the network to sync advertisements. This option is being discontinued. Only provided as a fallback in case HttpPublisher and Libp2pHttpPublisher are not working.

If `WithPublisherKind` is not provided a value, it defaults to `NoPublisher` and advertisements are only stored locally and no announcements are made. If configuring the command-line application, `WithPublisherKind` is configured by setting the `Ingest.PublisherKind` item in the configuration file to a value of "http", "libp2p", "libp2phttp, "dtsync", or "".
If `WithPublisherKind` is not provided a value, it defaults to `NoPublisher` and advertisements are only stored locally and no announcements are made. If configuring the command-line application, `WithPublisherKind` is configured by setting the `Ingest.PublisherKind` item in the configuration file to a value of "http", "libp2p", "libp2phttp, or "".

For all publisher kinds, except the `DataTransfer` publisher, the `WithHttpPublisherAnnounceAddr` option sets the addresses that are announced to indexers, telling the indexers where to fetch advertisements from. If configuring the command-line application, `WithHttpPublisherAnnounceAddr` is configured by specifying multiaddr strings in `Ingest.HttpPublisher.AnnounceMultiaddr`.

Expand Down
18 changes: 7 additions & 11 deletions e2e_retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ type testCase struct {

var testCases = []testCase{
{
name: "DT Publisher",
name: "Libp2p Publisher",
serverConfigOpts: func(t *testing.T) []engine.Option {
// Use env var to signal what publisher kind is being used.
setPubKindEnvVarKey(t, engine.DataTransferPublisher)
setPubKindEnvVarKey(t, engine.Libp2pPublisher)
return []engine.Option{
engine.WithTopicName(testTopic),
engine.WithPublisherKind(engine.DataTransferPublisher),
engine.WithPublisherKind(engine.Libp2pPublisher),
}
},
},
Expand Down Expand Up @@ -109,10 +109,8 @@ func testRetrievalRoundTripWithTestCase(t *testing.T, tc testCase) {
advCid, err := server.cs.Put(ctx, contextID, filepath.Join(testutil.ThisDir(t), "./testdata/sample-v1-2.car"), md)
require.NoError(t, err)

// TODO: Review after resolution of https://github.com/filecoin-project/go-legs/issues/95
// For now instantiate a new host for subscriber so that dt constructed by test client works.
subHost := newHost(t)
sub, err := dagsync.NewSubscriber(subHost, client.store, client.lsys, testTopic)
sub, err := dagsync.NewSubscriber(subHost, client.lsys)
require.NoError(t, err)

serverInfo := peer.AddrInfo{
Expand Down Expand Up @@ -189,10 +187,8 @@ func testReimportCarWtihTestCase(t *testing.T, tc testCase) {
advCid, err := server.cs.Put(ctx, contextID, filepath.Join(testutil.ThisDir(t), "./testdata/sample-v1-2.car"), md)
require.NoError(t, err)

// TODO: Review after resolution of https://github.com/filecoin-project/go-legs/issues/95
// For now instantiate a new host for subscriber so that dt constructed by test client works.
subHost := newHost(t)
sub, err := dagsync.NewSubscriber(subHost, client.store, client.lsys, testTopic)
sub, err := dagsync.NewSubscriber(subHost, client.lsys)
require.NoError(t, err)

serverInfo := peer.AddrInfo{
Expand Down Expand Up @@ -274,8 +270,7 @@ func newTestServer(t *testing.T, ctx context.Context, o ...engine.Option) *testS
// Explicitly override host so that the host is known for testing purposes.
h := newHost(t)
store := dssync.MutexWrap(datastore.NewMapDatastore())
dt := testutil.SetupDataTransferOnHost(t, h, store, cidlink.DefaultLinkSystem())
o = append(o, engine.WithHost(h), engine.WithDatastore(store), engine.WithDataTransfer(dt))
o = append(o, engine.WithHost(h), engine.WithDatastore(store))

var publisherAddr multiaddr.Multiaddr
pubKind := engine.PublisherKind(os.Getenv(pubKindEnvVarKey(t)))
Expand All @@ -293,6 +288,7 @@ func newTestServer(t *testing.T, ctx context.Context, o ...engine.Option) *testS
require.NoError(t, err)
require.NoError(t, e.Start(ctx))

dt := testutil.SetupDataTransferOnHost(t, h, store, cidlink.DefaultLinkSystem())
cs := supplier.NewCarSupplier(e, store, car.ZeroLengthSectionAsEOF(false))
require.NoError(t, cardatatransfer.StartCarDataTransfer(dt, cs))

Expand Down
93 changes: 93 additions & 0 deletions engine/datastore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package engine

import (
"context"
"errors"
"fmt"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
)

const (
// updateBatchSize is the number of records to update at a time.
updateBatchSize = 500000
)

func cleanupDTTempData(ctx context.Context, ds datastore.Batching) {
const dtCleanupTimeout = 10 * time.Minute
const dtPrefix = "/dagsync/dtsync/pub"

ctx, cancel := context.WithTimeout(ctx, dtCleanupTimeout)
defer cancel()

count, err := deletePrefix(ctx, ds, dtPrefix)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
log.Info("Not enough time to finish data-transfer state cleanup")
err = ds.Sync(context.Background(), datastore.NewKey(dtPrefix))
if err != nil {
log.Error("failed to sync datastore: %s", err)
}
return
}
log.Error("failed to remove old data-transfer fsm records: %s", err)
return
}
log.Infow("Removed old temporary data-transfer fsm records", "count", count)
}

func deletePrefix(ctx context.Context, ds datastore.Batching, prefix string) (int, error) {
q := query.Query{
KeysOnly: true,
Prefix: prefix,
}
results, err := ds.Query(ctx, q)
if err != nil {
return 0, fmt.Errorf("cannot query datastore: %w", err)
}
defer results.Close()

batch, err := ds.Batch(ctx)
if err != nil {
return 0, fmt.Errorf("cannot create datastore batch: %w", err)
}

var keyCount, writeCount int
for result := range results.Next() {
if ctx.Err() != nil {
return 0, ctx.Err()
}
if writeCount >= updateBatchSize {
writeCount = 0
if err = batch.Commit(ctx); err != nil {
return 0, fmt.Errorf("cannot commit datastore: %w", err)
}
log.Infow("Removed datastore records", "count", keyCount)
}
if result.Error != nil {
return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error)
}
ent := result.Entry
if len(ent.Key) == 0 {
log.Warnf("result entry has empty key")
continue
}

if err = batch.Delete(ctx, datastore.NewKey(ent.Key)); err != nil {
return 0, fmt.Errorf("cannot delete key from datastore: %w", err)
}
writeCount++
keyCount++
}

if err = batch.Commit(ctx); err != nil {
return 0, fmt.Errorf("cannot commit datastore: %w", err)
}
if err = ds.Sync(context.Background(), datastore.NewKey(q.Prefix)); err != nil {
return 0, err
}

return keyCount, nil
}
Loading

0 comments on commit f6b2259

Please sign in to comment.