Skip to content

Commit

Permalink
Fix incorrect log messages (#151)
Browse files Browse the repository at this point in the history
* Fix incorrect log messages
* Cache advertisement while processing entry chunks for advertisement to prevent reloading re-validating the same data.
* Sync datastore after saving ad mapping
* Log completion of entry block indexing at info level
* Index current chunk even if unable to set mapping for next chunk
* Update config defaults
* Populate defaults if config file missing values
* bump version
  • Loading branch information
gammazero committed Jan 13, 2022
1 parent f7059e5 commit 83d2304
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 110 deletions.
18 changes: 18 additions & 0 deletions config/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,21 @@ func NewAddresses() Addresses {
P2PAddr: "/ip4/0.0.0.0/tcp/3003",
}
}

// populateUnset replaces zero-values in the config with default values.
func (c *Addresses) populateUnset() {
def := NewAddresses()

if c.Admin == "" {
c.Admin = def.Admin
}
if c.Finder == "" {
c.Finder = def.Finder
}
if c.Ingest == "" {
c.Ingest = def.Ingest
}
if c.P2PAddr == "" {
c.P2PAddr = def.P2PAddr
}
}
10 changes: 10 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func Load(filePath string) (*Config, error) {
return nil, err
}

cfg.populateUnset()

return &cfg, nil
}

Expand Down Expand Up @@ -138,3 +140,11 @@ func (c *Config) String() string {
}
return string(b)
}

func (c *Config) populateUnset() {
c.Addresses.populateUnset()
c.Datastore.populateUnset()
c.Discovery.populateUnset()
c.Indexer.populateUnset()
c.Ingest.populateUnset()
}
12 changes: 12 additions & 0 deletions config/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,15 @@ func NewDatastore() Datastore {
Dir: "datastore",
}
}

// populateUnset replaces zero-values in the config with default values.
func (c *Datastore) populateUnset() {
def := NewDatastore()

if c.Type == "" {
c.Type = def.Type
}
if c.Dir == "" {
c.Dir = def.Dir
}
}
15 changes: 14 additions & 1 deletion config/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type Discovery struct {
// Values are a number ending in "s", "m", "h" for seconds. minutes, hours.
PollInterval Duration
// RediscoverWait is the amount of time that must pass before a provider
// can be discovered following a previous discovery attempt
// can be discovered following a previous discovery attempt. A value of 0
// means there is no wait time.
RediscoverWait Duration
// Timeout is the maximum amount of time that the indexer will spend trying
// to discover and verify a new provider.
Expand All @@ -41,3 +42,15 @@ func NewDiscovery() Discovery {
Timeout: Duration(2 * time.Minute),
}
}

// populateUnset replaces zero-values in the config with default values.
func (c *Discovery) populateUnset() {
def := NewDiscovery()

if c.PollInterval == 0 {
c.PollInterval = def.PollInterval
}
if c.Timeout == 0 {
c.Timeout = def.Timeout
}
}
15 changes: 15 additions & 0 deletions config/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,18 @@ func NewIndexer() Indexer {
ValueStoreType: "sth",
}
}

// populateUnset replaces zero-values in the config with default values.
func (c *Indexer) populateUnset() {
def := NewIndexer()

if c.CacheSize == 0 {
c.CacheSize = def.CacheSize
}
if c.ValueStoreDir == "" {
c.ValueStoreDir = def.ValueStoreDir
}
if c.ValueStoreType == "" {
c.ValueStoreType = def.ValueStoreType
}
}
19 changes: 18 additions & 1 deletion config/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ type Ingest struct {
// PubSubTopic used to advertise ingestion announcements.
PubSubTopic string
// StoreBatchSize is the number of entries in each write to the value
// store. Specifying a value less than 2 disables batching.
// store. Specifying a value less than 2 disables batching. This should
// be smaller than the maximum number of multihashes in an entry block to
// write concurrently to the value store.
StoreBatchSize int
// SyncTimeout is the maximum amount of time allowed for a sync to complete
// before it is canceled. This can be a sync of a chain of advertisements
Expand All @@ -24,3 +26,18 @@ func NewIngest() Ingest {
SyncTimeout: Duration(2 * time.Hour),
}
}

// populateUnset replaces zero-values in the config with default values.
func (c *Ingest) populateUnset() {
def := NewIngest()

if c.PubSubTopic == "" {
c.PubSubTopic = def.PubSubTopic
}
if c.StoreBatchSize == 0 {
c.StoreBatchSize = def.StoreBatchSize
}
if c.SyncTimeout == 0 {
c.SyncTimeout = def.SyncTimeout
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/ipfs/go-datastore v0.5.1
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ipfs v0.11.0
github.com/ipfs/go-log/v2 v2.4.0
github.com/ipfs/go-log/v2 v2.5.0
github.com/ipld/go-ipld-prime v0.14.3
github.com/libp2p/go-libp2p v0.17.0
github.com/libp2p/go-libp2p-core v0.13.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,9 @@ github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscw
github.com/ipfs/go-log/v2 v2.1.1/go.mod h1:2v2nsGfZsvvAJz13SyFzf9ObaqwHiHxsPLEHntrv9KM=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-log/v2 v2.3.0/go.mod h1:QqGoj30OTpnKaG/LKTGTxoP2mmQtjVMEnK72gynbe/g=
github.com/ipfs/go-log/v2 v2.4.0 h1:iR/2o9PGWanVJrBgIH5Ff8mPGOwpqLaPIAFqSnsdlzk=
github.com/ipfs/go-log/v2 v2.4.0/go.mod h1:nPZnh7Cj7lwS3LpRU5Mwr2ol1c2gXIEXuF6aywqrtmo=
github.com/ipfs/go-log/v2 v2.5.0 h1:+MhAooFd9XZNvR0i9FriKW6HB0ql7HNXUuflWtc0dd4=
github.com/ipfs/go-log/v2 v2.5.0/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/ipfs/go-merkledag v0.0.6/go.mod h1:QYPdnlvkOg7GnQRofu9XZimC5ZW5Wi3bKys/4GQQfto=
github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
github.com/ipfs/go-merkledag v0.2.4/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk=
Expand Down
4 changes: 4 additions & 0 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

indexer "github.com/filecoin-project/go-indexer-core/engine"
Expand Down Expand Up @@ -49,6 +50,9 @@ type Ingester struct {
syncTimeout time.Duration
adLocks *lockChain
watchDone chan struct{}

adCache map[cid.Cid]adCacheItem
adCacheMutex sync.Mutex
}

// NewIngester creates a new Ingester that uses a go-legs Subscriber to handle
Expand Down
2 changes: 1 addition & 1 deletion internal/ingest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func publishRandomAdv(t *testing.T, i *Ingester, pubHost host.Host, pub legs.Pub
requireTrueEventually(t, func() bool {
has, err := i.ds.Has(context.Background(), datastore.NewKey(c.String()))
return err == nil && has
}, 2*time.Second, 15*time.Second, "expected advertisement with ID %s was not received", c)
}, 3*time.Second, 21*time.Second, "expected advertisement with ID %s was not received", c)
}

// Check if advertisement in datastore.
Expand Down
Loading

0 comments on commit 83d2304

Please sign in to comment.