Skip to content

Commit

Permalink
Merge pull request #811 from ipfs/rpi-fixes
Browse files Browse the repository at this point in the history
Multiple fixes to CRDTs
  • Loading branch information
hsanjuan committed Jun 10, 2019
2 parents ad710d5 + 65ad06c commit f1707e4
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 104 deletions.
115 changes: 98 additions & 17 deletions cluster.go
Expand Up @@ -38,8 +38,9 @@ import (
var ReadyTimeout = 30 * time.Second

const (
pingMetricName = "ping"
bootstrapCount = 3
pingMetricName = "ping"
bootstrapCount = 3
reBootstrapInterval = 30 * time.Second
)

// Cluster is the main IPFS cluster component. It provides
Expand Down Expand Up @@ -150,9 +151,18 @@ func NewCluster(
// a non permanent TTL.
c.peerManager.ImportPeersFromPeerstore(false, peerstore.AddressTTL)
// Attempt to connect to some peers (up to bootstrapCount)
actualCount := c.peerManager.Bootstrap(bootstrapCount)
// We cannot warn about this as this is normal if going to Join() later
logger.Debugf("bootstrap count %d", actualCount)
connectedPeers := c.peerManager.Bootstrap(bootstrapCount)
// We cannot warn when count is low as this as this is normal if going
// to Join() later.
logger.Debugf("bootstrap count %d", len(connectedPeers))
// Log a ping metric for every connected peer. This will make them
// visible as peers without having to wait for them to send one.
for _, p := range connectedPeers {
if err := c.logPingMetric(ctx, p); err != nil {
logger.Warning(err)
}
}

// Bootstrap the DHT now that we possibly have some connections
c.dht.Bootstrap(c.ctx)

Expand All @@ -164,7 +174,12 @@ func NewCluster(
return nil, err
}
c.setupRPCClients()

// Note: It is very important to first call Add() once in a non-racy
// place
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.ready(ReadyTimeout)
c.run()
}()
Expand Down Expand Up @@ -295,6 +310,27 @@ func (c *Cluster) sendPingMetric(ctx context.Context) (*api.Metric, error) {
return metric, c.monitor.PublishMetric(ctx, metric)
}

// logPingMetric logs a ping metric as if it had been sent from PID. It is
// used to make peers appear available as soon as we connect to them (without
// having to wait for them to broadcast a metric).
//
// We avoid specifically sending a metric to a peer when we "connect" to it
// because: a) this requires an extra. OPEN RPC endpoint (LogMetric) that can
// be called by everyone b) We have no way of verifying that the peer ID in a
// metric pushed is actually the issuer of the metric (something the regular
// "pubsub" way of pushing metrics allows (by verifying the signature on the
// message). Thus, this reduces chances of abuse until we have something
// better.
func (c *Cluster) logPingMetric(ctx context.Context, pid peer.ID) error {
m := &api.Metric{
Name: pingMetricName,
Peer: pid,
Valid: true,
}
m.SetTTL(c.config.MonitorPingInterval * 2)
return c.monitor.LogMetric(ctx, m)
}

func (c *Cluster) pushPingMetrics(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "cluster/pushPingMetrics")
defer span.End()
Expand Down Expand Up @@ -370,6 +406,7 @@ func (c *Cluster) shouldPeerRepinCid(failed peer.ID, pin *api.Pin) bool {
// detects that we have been removed from the peerset, it shuts down this peer.
func (c *Cluster) watchPeers() {
ticker := time.NewTicker(c.config.PeerWatchInterval)
defer ticker.Stop()

for {
select {
Expand Down Expand Up @@ -402,6 +439,26 @@ func (c *Cluster) watchPeers() {
}
}

// reBootstrap reguarly attempts to bootstrap (re-connect to peers from the
// peerstore). This should ensure that we auto-recover from situations in
// which the network was completely gone and we lost all peers.
func (c *Cluster) reBootstrap() {
ticker := time.NewTicker(reBootstrapInterval)
defer ticker.Stop()

for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
connected := c.peerManager.Bootstrap(bootstrapCount)
for _, p := range connected {
logger.Infof("reconnected to %s", p)
}
}
}
}

// find all Cids pinned to a given peer and triggers re-pins on them.
func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID) {
ctx, span := trace.StartSpan(ctx, "cluster/repinFromPeer")
Expand Down Expand Up @@ -434,11 +491,41 @@ func (c *Cluster) repinFromPeer(ctx context.Context, p peer.ID) {

// run launches some go-routines which live throughout the cluster's life
func (c *Cluster) run() {
go c.syncWatcher()
go c.pushPingMetrics(c.ctx)
go c.pushInformerMetrics(c.ctx)
go c.watchPeers()
go c.alertsHandler()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.syncWatcher()
}()

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.pushPingMetrics(c.ctx)
}()

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.pushInformerMetrics(c.ctx)
}()

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.watchPeers()
}()

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.alertsHandler()
}()

c.wg.Add(1)
go func() {
defer c.wg.Done()
c.reBootstrap()
}()
}

func (c *Cluster) ready(timeout time.Duration) {
Expand Down Expand Up @@ -773,13 +860,7 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {
// contacting. This will signal a CRDT component that
// we know that peer since we have metrics for it without
// having to wait for the next metric round.
m := &api.Metric{
Name: pingMetricName,
Peer: pid,
Valid: true,
}
m.SetTTL(c.config.MonitorPingInterval * 2)
if err := c.monitor.LogMetric(ctx, m); err != nil {
if err := c.logPingMetric(ctx, pid); err != nil {
logger.Warning(err)
}

Expand Down
48 changes: 24 additions & 24 deletions consensus/crdt/consensus.go
Expand Up @@ -63,6 +63,7 @@ type Consensus struct {

state state.State
crdt *crdt.Datastore
ipfs *ipfslite.Peer

dht *dht.IpfsDHT
pubsub *pubsub.PubSub
Expand Down Expand Up @@ -92,6 +93,25 @@ func New(

ctx, cancel := context.WithCancel(context.Background())

var blocksDatastore ds.Batching
ns := ds.NewKey(cfg.DatastoreNamespace)
blocksDatastore = namespace.Wrap(store, ns.ChildString(blocksNs))

ipfs, err := ipfslite.New(
ctx,
blocksDatastore,
host,
dht,
&ipfslite.Config{
Offline: false,
},
)
if err != nil {
logger.Errorf("error creating ipfs-lite: %s", err)
cancel()
return nil, err
}

css := &Consensus{
ctx: ctx,
cancel: cancel,
Expand All @@ -100,7 +120,8 @@ func New(
peerManager: pstoremgr.New(ctx, host, ""),
dht: dht,
store: store,
namespace: ds.NewKey(cfg.DatastoreNamespace),
ipfs: ipfs,
namespace: ns,
pubsub: pubsub,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
Expand Down Expand Up @@ -147,25 +168,6 @@ func (css *Consensus) setup() {
logger.Errorf("error registering topic validator: %s", err)
}

var blocksDatastore ds.Batching
blocksDatastore = namespace.Wrap(css.store, css.namespace.ChildString(blocksNs))

ipfs, err := ipfslite.New(
css.ctx,
blocksDatastore,
css.host,
css.dht,
&ipfslite.Config{
Offline: false,
},
)
if err != nil {
logger.Errorf("error creating ipfs-lite: %s", err)
return
}

dagSyncer := newLiteDAGSyncer(css.ctx, ipfs)

broadcaster, err := crdt.NewPubSubBroadcaster(
css.ctx,
css.pubsub,
Expand Down Expand Up @@ -233,7 +235,7 @@ func (css *Consensus) setup() {
crdt, err := crdt.New(
css.store,
css.namespace,
dagSyncer,
css.ipfs,
broadcaster,
opts,
)
Expand Down Expand Up @@ -491,12 +493,10 @@ func OfflineState(cfg *Config, store ds.Datastore) (state.BatchingState, error)
return nil, err
}

dags := newLiteDAGSyncer(context.Background(), ipfs)

crdt, err := crdt.New(
batching,
ds.NewKey(cfg.DatastoreNamespace),
dags,
ipfs,
nil,
opts,
)
Expand Down
57 changes: 0 additions & 57 deletions consensus/crdt/dagsyncer.go

This file was deleted.

2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -27,7 +27,6 @@ require (
github.com/ipfs/go-ds-crdt v0.0.14
github.com/ipfs/go-fs-lock v0.0.1
github.com/ipfs/go-ipfs-api v0.0.1
github.com/ipfs/go-ipfs-blockstore v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-ds-help v0.0.1
github.com/ipfs/go-ipfs-files v0.0.3
Expand All @@ -50,6 +49,7 @@ require (
github.com/libp2p/go-libp2p-host v0.0.3
github.com/libp2p/go-libp2p-interface-pnet v0.0.1
github.com/libp2p/go-libp2p-kad-dht v0.0.14
github.com/libp2p/go-libp2p-net v0.0.2
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.0
github.com/libp2p/go-libp2p-pnet v0.0.1
Expand Down
1 change: 1 addition & 0 deletions logging.go
Expand Up @@ -39,6 +39,7 @@ var LoggingFacilities = map[string]string{
"localdags": "INFO",
"adder": "INFO",
"optracker": "INFO",
"pstoremgr": "INFO",
}

// LoggingFacilitiesExtra provides logging identifiers
Expand Down

0 comments on commit f1707e4

Please sign in to comment.