diff --git a/cmd/provider/daemon.go b/cmd/provider/daemon.go index 2bbf3776..e9007035 100644 --- a/cmd/provider/daemon.go +++ b/cmd/provider/daemon.go @@ -150,7 +150,6 @@ func daemonCommand(cctx *cli.Context) error { // Starting provider core eng, err := engine.New( engine.WithDatastore(ds), - engine.WithDataTransfer(dt), engine.WithDirectAnnounce(cfg.DirectAnnounce.URLs...), engine.WithHost(h), engine.WithEntriesCacheCapacity(cfg.Ingest.LinkCacheSize), diff --git a/cmd/provider/doc.go b/cmd/provider/doc.go index df5caa20..7d1869e7 100644 --- a/cmd/provider/doc.go +++ b/cmd/provider/doc.go @@ -11,25 +11,22 @@ Usage: USAGE: provider [global options] command [command options] [arguments...] - VERSION: - v0.0.0+unknown - COMMANDS: - daemon Starts a reference provider - find Query an indexer for indexed content - index Push a single content index into an indexer - init Initialize reference provider config file and identity - connect Connects to an indexer through its multiaddr - import, i Imports sources of multihashes to the index provider. - register Register provider information with an indexer that trusts the provider - remove, rm Removes previously advertised multihashes by the provider. - verify-ingest, vi Verifies ingestion of multihashes to an indexer node from a Lotus miner, CAR file or a CARv2 Index - list Lists advertisements - help, h Shows a list of commands or help for one command + announce Publish an announcement message for the latest advertisement + announce-http Publish an announcement message for the latest advertisement to a specific indexer via http + connect Connects to an indexer through its multiaddr + daemon Starts a reference provider + import, i Imports sources of multihashes to the index provider. + index Push a single content index into an indexer + init Initialize reference provider config file and identity + list, ls List local paths to data + remove, rm Removes previously advertised multihashes by the provider. + mirror Mirrors the advertisement chain from an existing index provider. + help, h Shows a list of commands or help for one command GLOBAL OPTIONS: - --help, -h show help (default: false) - --version, -v print the version (default: false) + --help, -h show help + --version, -v print the version To run a provider daemon it must first be initialized. To initialize the provider, execute: diff --git a/cmd/provider/init.go b/cmd/provider/init.go index 3b97225a..d7052b21 100644 --- a/cmd/provider/init.go +++ b/cmd/provider/init.go @@ -20,7 +20,7 @@ var InitCmd = &cli.Command{ var initFlags = []cli.Flag{ &cli.StringFlag{ Name: "pubkind", - Usage: "Set publisher king in config. Must be one of 'http', 'libp2p', 'libp2phttp', 'dtsync'", + Usage: "Set publisher kind in config. Must be one of 'http', 'libp2p', 'libp2phttp'", Value: "libp2p", }, } @@ -57,7 +57,7 @@ func initCommand(cctx *cli.Context) error { switch pubkind { case "": pubkind = config.Libp2pPublisherKind - case config.Libp2pPublisherKind, config.HttpPublisherKind, config.Libp2pHttpPublisherKind, config.DTSyncPublisherKind: + case config.Libp2pPublisherKind, config.HttpPublisherKind, config.Libp2pHttpPublisherKind: default: return fmt.Errorf("unknown publisher kind: %s", pubkind) } diff --git a/cmd/provider/internal/config/ingest.go b/cmd/provider/internal/config/ingest.go index b88d022c..46c6899e 100644 --- a/cmd/provider/internal/config/ingest.go +++ b/cmd/provider/internal/config/ingest.go @@ -11,7 +11,6 @@ const ( type PublisherKind string const ( - DTSyncPublisherKind PublisherKind = "dtsync" HttpPublisherKind PublisherKind = "http" Libp2pPublisherKind PublisherKind = "libp2p" Libp2pHttpPublisherKind PublisherKind = "libp2phttp" @@ -37,9 +36,11 @@ type Ingest struct { HttpPublisher HttpPublisher // PublisherKind specifies which dagsync.Publisher implementation to use. - // When set to "http", the publisher serves plain HTTP and libp2phttp. - // Libp2phttp is disabled by setting HttpPublisher.NoLibp2p to true, and - // plain HTTP is disabled by setting HttpPublisher.ListenMultiaddr to "". + // When set to "http", the publisher serves plain HTTP. When set to + // "libp2p" the publisher serves HTTP over libp2p. When set to + // "libp2phttp", the publisher serves both plain HTTP and HTTP over libp2p. + // + // Plain HTTP is disabled if HttpPublisher.ListenMultiaddr is set to "". PublisherKind PublisherKind // SyncPolicy configures which indexers are allowed to sync advertisements diff --git a/delegatedrouting/cid_queue.go b/delegatedrouting/cid_queue.go index 4a503646..b5b955ce 100644 --- a/delegatedrouting/cid_queue.go +++ b/delegatedrouting/cid_queue.go @@ -31,11 +31,10 @@ func (cq *cidQueue) recordCidNode(node *cidNode) *list.Element { listElem.Value.(*cidNode).Timestamp = node.Timestamp cq.nodesLl.MoveToFront(listElem) return listElem - } else { - listElem := cq.nodesLl.PushFront(node) - cq.listNodeByCid[node.C] = listElem - return listElem } + listElem := cq.nodesLl.PushFront(node) + cq.listNodeByCid[node.C] = listElem + return listElem } func (cq *cidQueue) removeCidNode(c cid.Cid) { diff --git a/delegatedrouting/listener.go b/delegatedrouting/listener.go index 21355ccd..31376ae1 100644 --- a/delegatedrouting/listener.go +++ b/delegatedrouting/listener.go @@ -294,11 +294,11 @@ func (listener *Listener) FindProviders(ctx context.Context, key cid.Cid, limit } func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { + const printFrequency = 10_000 cids := req.Keys pid := req.ID paddrs := req.Addrs startTime := time.Now() - printFrequency := 10_000 listener.lock.Lock() defer func() { listener.stats.incDelegatedRoutingCallsProcessed() @@ -326,12 +326,10 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa listener.lastSeenProviderInfo.ID = pid listener.lastSeenProviderInfo.Addrs = paddrs - timestamp := time.Now() for i, c := range cids { - // persisting timestamp only if this is not a snapshot if len(cids) < listener.snapshotSize { - err := listener.dsWrapper.recordCidTimestamp(ctx, c, timestamp) + err := listener.dsWrapper.recordCidTimestamp(ctx, c, startTime) if err != nil { log.Errorw("Error persisting timestamp. Continuing.", "cid", c, "err", err) continue @@ -342,7 +340,7 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa if listElem == nil { listener.cidQueue.recordCidNode(&cidNode{ C: c, - Timestamp: timestamp, + Timestamp: startTime, }) err := listener.chunker.addCidToCurrentChunk(ctx, c, func(cc *cidsChunk) error { return listener.notifyPutAndPersist(ctx, cc) @@ -354,7 +352,7 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa } } else { node := listElem.Value.(*cidNode) - node.Timestamp = timestamp + node.Timestamp = startTime listener.cidQueue.recordCidNode(node) // if no existing chunk has been found for the cid - adding it to the current one // This can happen in the following cases: @@ -393,12 +391,12 @@ func (listener *Listener) ProvideBitswap(ctx context.Context, req *server.Bitswa // Revise logic here func (listener *Listener) removeExpiredCids(ctx context.Context) (bool, error) { + const printFrequency = 100 lastElem := listener.cidQueue.nodesLl.Back() currentTime := time.Now() chunksToRemove := make(map[string]*cidsChunk) cidsToRemove := make(map[cid.Cid]struct{}) removedSomeCids := false - printFrequency := 100 var cidsRemoved, chunksRemoved, chunksReplaced int // find expired cids and their respective chunks for { diff --git a/delegatedrouting/listener_concurrency_test.go b/delegatedrouting/listener_concurrency_test.go index 0c17d281..16d0658a 100644 --- a/delegatedrouting/listener_concurrency_test.go +++ b/delegatedrouting/listener_concurrency_test.go @@ -82,7 +82,7 @@ func TestShouldProcessMillionCIDsInThirtySeconds(t *testing.T) { pID, priv, _ := test.RandomIdentity() ctx := context.Background() - engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher)) + engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher)) require.NoError(t, err) err = engine.Start(ctx) defer engine.Shutdown() diff --git a/delegatedrouting/listener_test.go b/delegatedrouting/listener_test.go index 73bd1e5f..84f55766 100644 --- a/delegatedrouting/listener_test.go +++ b/delegatedrouting/listener_test.go @@ -122,7 +122,7 @@ func TestProvideRoundtrip(t *testing.T) { ctx := context.Background() - engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher)) + engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher)) require.NoError(t, err) err = engine.Start(ctx) defer engine.Shutdown() @@ -181,7 +181,7 @@ func TestProvideRoundtripWithRemove(t *testing.T) { pID, priv, _ := test.RandomIdentity() ctx := context.Background() - engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher)) + engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher)) require.NoError(t, err) err = engine.Start(ctx) defer engine.Shutdown() @@ -1178,7 +1178,7 @@ func TestShouldSplitSnapshotIntoMultipleChunksAndReadThemBack(t *testing.T) { pID, priv, _ := test.RandomIdentity() ctx := context.Background() - engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher)) + engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher)) require.NoError(t, err) err = engine.Start(ctx) defer engine.Shutdown() @@ -1243,7 +1243,7 @@ func TestShouldCleanUpOldSnapshotChunksAfterStoringNewOnes(t *testing.T) { pID, priv, _ := test.RandomIdentity() ctx := context.Background() - engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher)) + engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher)) require.NoError(t, err) err = engine.Start(ctx) defer engine.Shutdown() @@ -1290,7 +1290,7 @@ func TestShouldRecogniseLegacySnapshot(t *testing.T) { pID, priv, _ := test.RandomIdentity() ctx := context.Background() - engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher)) + engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher)) require.NoError(t, err) err = engine.Start(ctx) defer engine.Shutdown() diff --git a/doc/publisher-config.md b/doc/publisher-config.md index 22a2160c..93c5c68a 100644 --- a/doc/publisher-config.md +++ b/doc/publisher-config.md @@ -21,7 +21,7 @@ The index-provider engine must be configured to use a libp2p publisher, HTTP pub - `Libp2pHttpPublisher` serves advertisements using both HTTP and libp2p servers. - `DataTransferPublisher` exposes a data-transfer/graphsync server that allows peers in the network to sync advertisements. This option is being discontinued. Only provided as a fallback in case HttpPublisher and Libp2pHttpPublisher are not working. -If `WithPublisherKind` is not provided a value, it defaults to `NoPublisher` and advertisements are only stored locally and no announcements are made. If configuring the command-line application, `WithPublisherKind` is configured by setting the `Ingest.PublisherKind` item in the configuration file to a value of "http", "libp2p", "libp2phttp, "dtsync", or "". +If `WithPublisherKind` is not provided a value, it defaults to `NoPublisher` and advertisements are only stored locally and no announcements are made. If configuring the command-line application, `WithPublisherKind` is configured by setting the `Ingest.PublisherKind` item in the configuration file to a value of "http", "libp2p", "libp2phttp, or "". For all publisher kinds, except the `DataTransfer` publisher, the `WithHttpPublisherAnnounceAddr` option sets the addresses that are announced to indexers, telling the indexers where to fetch advertisements from. If configuring the command-line application, `WithHttpPublisherAnnounceAddr` is configured by specifying multiaddr strings in `Ingest.HttpPublisher.AnnounceMultiaddr`. diff --git a/e2e_retrieve_test.go b/e2e_retrieve_test.go index 5508b577..a8d70235 100644 --- a/e2e_retrieve_test.go +++ b/e2e_retrieve_test.go @@ -43,13 +43,13 @@ type testCase struct { var testCases = []testCase{ { - name: "DT Publisher", + name: "Libp2p Publisher", serverConfigOpts: func(t *testing.T) []engine.Option { // Use env var to signal what publisher kind is being used. - setPubKindEnvVarKey(t, engine.DataTransferPublisher) + setPubKindEnvVarKey(t, engine.Libp2pPublisher) return []engine.Option{ engine.WithTopicName(testTopic), - engine.WithPublisherKind(engine.DataTransferPublisher), + engine.WithPublisherKind(engine.Libp2pPublisher), } }, }, @@ -109,10 +109,8 @@ func testRetrievalRoundTripWithTestCase(t *testing.T, tc testCase) { advCid, err := server.cs.Put(ctx, contextID, filepath.Join(testutil.ThisDir(t), "./testdata/sample-v1-2.car"), md) require.NoError(t, err) - // TODO: Review after resolution of https://github.com/filecoin-project/go-legs/issues/95 - // For now instantiate a new host for subscriber so that dt constructed by test client works. subHost := newHost(t) - sub, err := dagsync.NewSubscriber(subHost, client.store, client.lsys, testTopic) + sub, err := dagsync.NewSubscriber(subHost, client.lsys) require.NoError(t, err) serverInfo := peer.AddrInfo{ @@ -189,10 +187,8 @@ func testReimportCarWtihTestCase(t *testing.T, tc testCase) { advCid, err := server.cs.Put(ctx, contextID, filepath.Join(testutil.ThisDir(t), "./testdata/sample-v1-2.car"), md) require.NoError(t, err) - // TODO: Review after resolution of https://github.com/filecoin-project/go-legs/issues/95 - // For now instantiate a new host for subscriber so that dt constructed by test client works. subHost := newHost(t) - sub, err := dagsync.NewSubscriber(subHost, client.store, client.lsys, testTopic) + sub, err := dagsync.NewSubscriber(subHost, client.lsys) require.NoError(t, err) serverInfo := peer.AddrInfo{ @@ -274,8 +270,7 @@ func newTestServer(t *testing.T, ctx context.Context, o ...engine.Option) *testS // Explicitly override host so that the host is known for testing purposes. h := newHost(t) store := dssync.MutexWrap(datastore.NewMapDatastore()) - dt := testutil.SetupDataTransferOnHost(t, h, store, cidlink.DefaultLinkSystem()) - o = append(o, engine.WithHost(h), engine.WithDatastore(store), engine.WithDataTransfer(dt)) + o = append(o, engine.WithHost(h), engine.WithDatastore(store)) var publisherAddr multiaddr.Multiaddr pubKind := engine.PublisherKind(os.Getenv(pubKindEnvVarKey(t))) @@ -293,6 +288,7 @@ func newTestServer(t *testing.T, ctx context.Context, o ...engine.Option) *testS require.NoError(t, err) require.NoError(t, e.Start(ctx)) + dt := testutil.SetupDataTransferOnHost(t, h, store, cidlink.DefaultLinkSystem()) cs := supplier.NewCarSupplier(e, store, car.ZeroLengthSectionAsEOF(false)) require.NoError(t, cardatatransfer.StartCarDataTransfer(dt, cs)) diff --git a/engine/datastore.go b/engine/datastore.go new file mode 100644 index 00000000..d278c214 --- /dev/null +++ b/engine/datastore.go @@ -0,0 +1,93 @@ +package engine + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" +) + +const ( + // updateBatchSize is the number of records to update at a time. + updateBatchSize = 500000 +) + +func cleanupDTTempData(ctx context.Context, ds datastore.Batching) { + const dtCleanupTimeout = 10 * time.Minute + const dtPrefix = "/dagsync/dtsync/pub" + + ctx, cancel := context.WithTimeout(ctx, dtCleanupTimeout) + defer cancel() + + count, err := deletePrefix(ctx, ds, dtPrefix) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + log.Info("Not enough time to finish data-transfer state cleanup") + err = ds.Sync(context.Background(), datastore.NewKey(dtPrefix)) + if err != nil { + log.Error("failed to sync datastore: %s", err) + } + return + } + log.Error("failed to remove old data-transfer fsm records: %s", err) + return + } + log.Infow("Removed old temporary data-transfer fsm records", "count", count) +} + +func deletePrefix(ctx context.Context, ds datastore.Batching, prefix string) (int, error) { + q := query.Query{ + KeysOnly: true, + Prefix: prefix, + } + results, err := ds.Query(ctx, q) + if err != nil { + return 0, fmt.Errorf("cannot query datastore: %w", err) + } + defer results.Close() + + batch, err := ds.Batch(ctx) + if err != nil { + return 0, fmt.Errorf("cannot create datastore batch: %w", err) + } + + var keyCount, writeCount int + for result := range results.Next() { + if ctx.Err() != nil { + return 0, ctx.Err() + } + if writeCount >= updateBatchSize { + writeCount = 0 + if err = batch.Commit(ctx); err != nil { + return 0, fmt.Errorf("cannot commit datastore: %w", err) + } + log.Infow("Removed datastore records", "count", keyCount) + } + if result.Error != nil { + return 0, fmt.Errorf("cannot read query result from datastore: %w", result.Error) + } + ent := result.Entry + if len(ent.Key) == 0 { + log.Warnf("result entry has empty key") + continue + } + + if err = batch.Delete(ctx, datastore.NewKey(ent.Key)); err != nil { + return 0, fmt.Errorf("cannot delete key from datastore: %w", err) + } + writeCount++ + keyCount++ + } + + if err = batch.Commit(ctx); err != nil { + return 0, fmt.Errorf("cannot commit datastore: %w", err) + } + if err = ds.Sync(context.Background(), datastore.NewKey(q.Prefix)); err != nil { + return 0, err + } + + return keyCount, nil +} diff --git a/engine/engine.go b/engine/engine.go index 731d36f5..6d269389 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -22,7 +22,6 @@ import ( "github.com/ipni/go-libipni/announce/message" "github.com/ipni/go-libipni/announce/p2psender" "github.com/ipni/go-libipni/dagsync" - "github.com/ipni/go-libipni/dagsync/dtsync" "github.com/ipni/go-libipni/dagsync/ipnisync" "github.com/ipni/go-libipni/ingest/schema" "github.com/ipni/go-libipni/metadata" @@ -107,9 +106,10 @@ func New(o ...Option) (*Engine, error) { // the configured gossipsub topic used for publishing advertisements. // // The context is used to instantiate the internal LRU cache storage. See: -// Engine.Shutdown, chunker.NewCachedEntriesChunker, -// dtsync.NewPublisherFromExisting +// Engine.Shutdown, chunker.NewCachedEntriesChunker. func (e *Engine) Start(ctx context.Context) error { + go cleanupDTTempData(ctx, e.ds) + var err error // Create datastore entriesChunker. entriesCacheDs := dsn.Wrap(e.ds, datastore.NewKey(linksCachePath)) @@ -192,23 +192,8 @@ func (e *Engine) newPublisher(httpListenAddr, httpPath string) (dagsync.Publishe log.Warn("Libp2p + HTTP publisher in use without address for announcements. Using HTTP listen and libp2p host addresses, but external addresses may be needed.", "addrs", libp2phttpPub.Addrs()) } return libp2phttpPub, nil - case DataTransferPublisher: - log.Warn("Support ending for publishing IPNI data over data-transfer/graphsync, Disable this feature in configuration and test that indexing is working over libp2p.") - if e.pubDT != nil { - dtPub, err := dtsync.NewPublisherFromExisting(e.pubDT, e.h, e.pubTopicName, e.lsys, dtsync.WithAllowPeer(e.syncPolicy.Allowed)) - if err != nil { - return nil, fmt.Errorf("cannot create data-transfer publisher with existing dt manager: %w", err) - } - return dtPub, nil - } - ds := dsn.Wrap(e.ds, datastore.NewKey("/dagsync/dtsync/pub")) - dtPub, err := dtsync.NewPublisher(e.h, ds, e.lsys, e.pubTopicName, dtsync.WithAllowPeer(e.syncPolicy.Allowed)) - if err != nil { - return nil, fmt.Errorf("cannot create data-transfer publisher: %w", err) - } - return dtPub, nil } - return nil, fmt.Errorf("unknown publisher kind %s, expecting one of %v", e.pubKind, []PublisherKind{HttpPublisher, Libp2pPublisher, Libp2pHttpPublisher, DataTransferPublisher}) + panic("bad publisher kind") } func (e *Engine) createSenders(announceURLs []*url.URL, pubsubOK bool, extraGossipData []byte) ([]announce.Sender, error) { @@ -261,19 +246,12 @@ func (e *Engine) createSenders(announceURLs []*url.URL, pubsubOK bool, extraGoss // announce uses the engines senders to send advertisement announcement messages. func (e *Engine) announce(ctx context.Context, c cid.Cid) { - var err error - switch e.pubKind { - case HttpPublisher, Libp2pPublisher, Libp2pHttpPublisher: - // e.pubHttpAnnounceAddrs is always set in newPublisher. - err = announce.Send(ctx, c, e.pubHttpAnnounceAddrs, e.senders...) - case DataTransferPublisher: - // TODO: It may be necessary to specify a set of external addresses to - // put into the announce message, instead of using the libp2p host's - // addresses. - err = announce.Send(ctx, c, e.h.Addrs(), e.senders...) - case NoPublisher: - // Announcements disabled. + // If announcements disabled. + if e.pubKind == NoPublisher { + return } + + err := announce.Send(ctx, c, e.pubHttpAnnounceAddrs, e.senders...) if err != nil { log.Errorw("Failed to announce advertisement", "err", err) } @@ -408,18 +386,7 @@ func (e *Engine) httpAnnounce(ctx context.Context, adCid cid.Cid, announceURLs [ Cid: adCid, } - // The publisher kind determines what addresses to put into the announce - // message. - switch e.pubKind { - case HttpPublisher, Libp2pPublisher, Libp2pHttpPublisher: - // e.pubHttpAnnounceAddrs is always set in newPublisher. - msg.SetAddrs(e.pubHttpAnnounceAddrs) - case DataTransferPublisher: - // TODO: It may be necessary to specify a set of external addresses to - // put into the announce message, instead of using the libp2p host's - // addresses. - msg.SetAddrs(e.h.Addrs()) - } + msg.SetAddrs(e.pubHttpAnnounceAddrs) // Create the http announce sender. httpSender, err := httpsender.New(announceURLs, e.h.ID()) @@ -502,9 +469,13 @@ func (e *Engine) Shutdown() error { if err = e.publisher.Close(); err != nil { errs = multierror.Append(errs, fmt.Errorf("error closing leg publisher: %s", err)) } + e.publisher = nil } - if err = e.entriesChunker.Close(); err != nil { - errs = multierror.Append(errs, fmt.Errorf("error closing link entriesChunker: %s", err)) + if e.entriesChunker != nil { + if err = e.entriesChunker.Close(); err != nil { + errs = multierror.Append(errs, fmt.Errorf("error closing link entriesChunker: %s", err)) + } + e.entriesChunker = nil } return errs } @@ -597,7 +568,8 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, p peer.ID, addrs []mult lnk, err := e.entriesChunker.Chunk(ctx, mhIter) if err != nil { return cid.Undef, fmt.Errorf("could not generate entries list: %s", err) - } else if lnk == nil { + } + if lnk == nil { log.Warnw("chunking for context ID resulted in no link", "contextID", contextID) lnk = schema.NoEntries } @@ -705,12 +677,10 @@ func (e *Engine) publishAdvForIndex(ctx context.Context, p peer.ID, addrs []mult } func (e *Engine) keyToCidKey(provider peer.ID, contextID []byte) datastore.Key { - switch provider { - case e.provider.ID: + if provider == e.provider.ID { return datastore.NewKey(keyToCidMapPrefix + string(contextID)) - default: - return datastore.NewKey(keyToCidMapPrefix + provider.String() + "/" + string(contextID)) } + return datastore.NewKey(keyToCidMapPrefix + provider.String() + "/" + string(contextID)) } func (e *Engine) cidToKeyKey(c cid.Cid) datastore.Key { @@ -722,12 +692,10 @@ func (e *Engine) cidToProviderAndKeyKey(c cid.Cid) datastore.Key { } func (e *Engine) keyToMetadataKey(provider peer.ID, contextID []byte) datastore.Key { - switch provider { - case e.provider.ID: + if provider == e.provider.ID { return datastore.NewKey(keyToMetadataMapPrefix + string(contextID)) - default: - return datastore.NewKey(keyToMetadataMapPrefix + provider.String() + "/" + string(contextID)) } + return datastore.NewKey(keyToMetadataMapPrefix + provider.String() + "/" + string(contextID)) } func (e *Engine) putKeyCidMap(ctx context.Context, provider peer.ID, contextID []byte, c cid.Cid) error { diff --git a/engine/example_test.go b/engine/example_test.go index 0c278f3b..d5ea8cd8 100644 --- a/engine/example_test.go +++ b/engine/example_test.go @@ -42,8 +42,8 @@ func Example_advertiseHelloWorld() { fmt.Printf("✓ Instantiated new libp2p host with peer ID: %s...\n", h.ID().String()[:4]) // Construct a new provider engine with given libp2p host that announces advertisements over - // gossipsub and datatrasfer/graphsync. - engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.DataTransferPublisher)) + // gossipsub. + engine, err := engine.New(engine.WithHost(h), engine.WithPublisherKind(engine.Libp2pPublisher)) if err != nil { panic(err) } diff --git a/engine/options.go b/engine/options.go index 20d6b7f4..e7642d15 100644 --- a/engine/options.go +++ b/engine/options.go @@ -4,7 +4,6 @@ import ( "fmt" "net/url" - datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" _ "github.com/ipni/go-libipni/maurl" @@ -24,13 +23,6 @@ const ( // all advertisements are only stored locally. NoPublisher PublisherKind = "" - // DataTransferPublisher exposes a datatransfer/graphsync server that - // allows peers in the network to sync advertisements. - // - // This option is being discontinued. Only provided as a fallback in case - // HttpPublisher is not working. - DataTransferPublisher PublisherKind = "dtsync" - // HttpPublisher exposes an HTTP server that serves advertisements using an // HTTP server. HttpPublisher PublisherKind = "http" @@ -42,12 +34,15 @@ const ( // engine's libp2p host. This is just the combination of HttpPublisher and // Libp2pPublisher configurable as a single option. Libp2pHttpPublisher PublisherKind = "libp2phttp" + + // Deprecated. Use Libp2pPublisher. + DataTransferPublisher PublisherKind = "dtsync" ) type ( // PublisherKind represents the kind of publisher to use in order to announce a new // advertisement to the network. - // See: WithPublisherKind, NoPublisher, DataTransferPublisher, HttpPublisher. + // See: WithPublisherKind PublisherKind string // Option sets a configuration parameter for the provider engine. @@ -75,7 +70,6 @@ type ( // ---- publisher config ---- pubKind PublisherKind - pubDT datatransfer.Manager // pubHttpAnnounceAddrs are the addresses that are put into announce // messages to tell indexers the addresses to fetch advertisements // from. @@ -251,6 +245,13 @@ func WithEntriesCacheCapacity(s int) Option { // See: PublisherKind. func WithPublisherKind(k PublisherKind) Option { return func(o *options) error { + switch k { + case NoPublisher, HttpPublisher, Libp2pPublisher, Libp2pHttpPublisher: + case DataTransferPublisher: + return fmt.Errorf("publisher kind %q is no longer supported", DataTransferPublisher) + default: + return fmt.Errorf("unknown publisher kind %q, expecting one of %v", k, []PublisherKind{HttpPublisher, Libp2pPublisher, Libp2pHttpPublisher}) + } o.pubKind = k return nil } @@ -330,18 +331,6 @@ func WithTopic(t *pubsub.Topic) Option { } } -// WithDataTransfer sets the instance of datatransfer.Manager to use. -// If unspecified a new instance is created automatically. -// -// Note that this option only takes effect if the PublisherKind is set to DataTransferPublisher. -// See: WithPublisherKind. -func WithDataTransfer(dt datatransfer.Manager) Option { - return func(o *options) error { - o.pubDT = dt - return nil - } -} - // WithHost specifies the host to which the provider engine belongs. // If unspecified, a host is created automatically. // See: libp2p.New. diff --git a/go.mod b/go.mod index 9d0dc15b..67e37f0d 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/ipld/go-codec-dagpb v1.6.0 github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0 github.com/ipld/go-ipld-prime v0.21.0 - github.com/ipni/go-libipni v0.5.9 + github.com/ipni/go-libipni v0.6.0 github.com/libp2p/go-libp2p v0.32.2 github.com/libp2p/go-libp2p-pubsub v0.10.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index 59376287..149538d2 100644 --- a/go.sum +++ b/go.sum @@ -346,8 +346,8 @@ github.com/ipld/go-ipld-adl-hamt v0.0.0-20220616142416-9004dbd839e0/go.mod h1:od github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH9C2E= github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= -github.com/ipni/go-libipni v0.5.9 h1:AlYlqZScX2jusGXXWkW5j6OMUtMKgQKNcl1Mi8g3glA= -github.com/ipni/go-libipni v0.5.9/go.mod h1:c8mHa6J9iFREpDB29GlPIsbvztRq6bnhg5zJKrnvdUg= +github.com/ipni/go-libipni v0.6.0 h1:pq5grzYb0VjmhYhGWTdb6m67kRIUidj70BbIHhpf4Z8= +github.com/ipni/go-libipni v0.6.0/go.mod h1:AUnSlZSwABAqianohrqVA8HxawHMNmnXMX+6G9UhFsI= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= diff --git a/mirror/mirror.go b/mirror/mirror.go index 05f9b7ea..5e733198 100644 --- a/mirror/mirror.go +++ b/mirror/mirror.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" @@ -96,8 +95,7 @@ func New(ctx context.Context, source peer.AddrInfo, o ...Option) (*Mirror, error } m.senders = append(m.senders, p2pSender) - dtds := namespace.Wrap(opts.ds, datastore.NewKey("datatransfer")) - m.sub, err = dagsync.NewSubscriber(m.h, dtds, m.ls, m.topic, dagsync.RecvAnnounce()) + m.sub, err = dagsync.NewSubscriber(m.h, m.ls, dagsync.RecvAnnounce(m.topic)) if err != nil { return nil, err } diff --git a/mirror/mirror_test.go b/mirror/mirror_test.go index d657d420..0216b173 100644 --- a/mirror/mirror_test.go +++ b/mirror/mirror_test.go @@ -41,7 +41,7 @@ func TestMirror_PutAdIsMirrored(t *testing.T) { te := &testEnv{} // Start original index provider - te.startSource(t, ctx, engine.WithPublisherKind(engine.DataTransferPublisher)) + te.startSource(t, ctx, engine.WithPublisherKind(engine.Libp2pPublisher)) // Publish an advertisement on original provider. originalAdCid := te.putAdOnSource(t, ctx, wantCtxID, wantMhs, wantMetadata) @@ -67,7 +67,7 @@ func TestMirror_IsAlsoCdnForOriginalAds(t *testing.T) { te := &testEnv{} // Start original index provider - te.startSource(t, ctx, engine.WithPublisherKind(engine.DataTransferPublisher)) + te.startSource(t, ctx, engine.WithPublisherKind(engine.Libp2pPublisher)) // Publish a bunch of ads on the original provider ad1 := te.putAdOnSource(t, ctx, []byte("ad1"), test.RandomMultihashes(3), md) @@ -103,7 +103,7 @@ func TestMirror_FormsExpectedAdChain(t *testing.T) { te := &testEnv{} // Start original index provider - te.startSource(t, ctx, engine.WithPublisherKind(engine.DataTransferPublisher)) + te.startSource(t, ctx, engine.WithPublisherKind(engine.Libp2pPublisher)) // Publish a bunch of ads on the original provider _ = te.putAdOnSource(t, ctx, []byte("ad1"), test.RandomMultihashes(3), md) @@ -187,7 +187,7 @@ func TestMirror_FormsExpectedAdChainRemap(t *testing.T) { te := &testEnv{} // Start original index provider - te.startSource(t, ctx, engine.WithPublisherKind(engine.DataTransferPublisher)) + te.startSource(t, ctx, engine.WithPublisherKind(engine.Libp2pPublisher)) // Publish a bunch of ads on the original provider _ = te.putAdOnSource(t, ctx, []byte("ad1"), test.RandomMultihashes(1), md) @@ -229,7 +229,7 @@ func TestMirror_PreviousIDIsPreservedOnStartFromPartialAdChain(t *testing.T) { te := &testEnv{} // Start source and publish 3 ads. - te.startSource(t, ctx, engine.WithPublisherKind(engine.DataTransferPublisher)) + te.startSource(t, ctx, engine.WithPublisherKind(engine.Libp2pPublisher)) originalACid := te.putAdOnSource(t, ctx, []byte("ad1"), test.RandomMultihashes(1), md) originalBCid := te.putAdOnSource(t, ctx, []byte("ad2"), test.RandomMultihashes(2), md) orignalHeadCid := te.putAdOnSource(t, ctx, []byte("ad3"), test.RandomMultihashes(3), md) @@ -284,7 +284,7 @@ func TestMirror_MirrorsAdsIdenticallyWhenConfiguredTo(t *testing.T) { te := &testEnv{} // Start source and publish 3 ads. - te.startSource(t, ctx, engine.WithPublisherKind(engine.DataTransferPublisher)) + te.startSource(t, ctx, engine.WithPublisherKind(engine.Libp2pPublisher)) _ = te.putAdOnSource(t, ctx, []byte("ad1"), test.RandomMultihashes(1), md) _ = te.putAdOnSource(t, ctx, []byte("ad2"), test.RandomMultihashes(2), md) _ = te.removeAdOnSource(t, ctx, []byte("ad1"))