Skip to content
Permalink
Browse files

Merge pull request #900 from ipfs/feat/mdns-properly

mDNS: attach mDNS inside the Cluster. Allow interval configuration.
  • Loading branch information...
hsanjuan committed Aug 26, 2019
2 parents d71c1cb + 33f111c commit 5d888f739059c56f2dc67b9853050daefbe487a6
Showing with 61 additions and 52 deletions.
  1. +21 −0 cluster.go
  2. +13 −2 cluster_config.go
  3. +5 −42 clusterhost.go
  4. +2 −5 cmd/ipfs-cluster-service/daemon.go
  5. +2 −1 config_test.go
  6. +1 −2 peer_manager_test.go
  7. +17 −0 pstoremgr/pstoremgr.go
@@ -25,6 +25,7 @@ import (
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
rpc "github.com/libp2p/go-libp2p-gorpc"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/p2p/discovery"
ma "github.com/multiformats/go-multiaddr"

ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc"
@@ -41,6 +42,7 @@ const (
pingMetricName = "ping"
bootstrapCount = 3
reBootstrapInterval = 30 * time.Second
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
)

var (
@@ -57,6 +59,7 @@ type Cluster struct {
config *Config
host host.Host
dht *dht.IpfsDHT
discovery discovery.Service
datastore ds.Datastore

rpcServer *rpc.Server
@@ -127,13 +130,25 @@ func NewCluster(

peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath())

var mdns discovery.Service
if cfg.MDNSInterval > 0 {
logger.Info("MDNS", cfg.MDNSInterval)
mdns, err := discovery.NewMdnsService(ctx, host, cfg.MDNSInterval, mdnsServiceTag)
if err != nil {
cancel()
return nil, err
}
mdns.RegisterNotifee(peerManager)
}

c := &Cluster{
ctx: ctx,
cancel: cancel,
id: host.ID(),
config: cfg,
host: host,
dht: dht,
discovery: mdns,
datastore: datastore,
consensus: consensus,
apis: apis,
@@ -621,6 +636,12 @@ func (c *Cluster) Shutdown(ctx context.Context) error {

logger.Info("shutting down Cluster")

// Cancel discovery service (this shutdowns announcing). Handling
// entries is cancelled along with the context below.
if c.discovery != nil {
c.discovery.Close()
}

// Try to store peerset file for all known peers whatsoever
// if we got ready (otherwise, don't overwrite anything)
if c.readyB {
@@ -37,6 +37,7 @@ const (
DefaultConnMgrLowWater = 100
DefaultConnMgrGracePeriod = 2 * time.Minute
DefaultFollowerMode = false
DefaultMDNSInterval = 10 * time.Second
)

// ConnMgrConfig configures the libp2p host connection manager.
@@ -74,8 +75,9 @@ type Config struct {
// the RPC and Consensus components.
ListenAddr ma.Multiaddr

// ConnMgr holds configuration values for the connection manager
// for the libp2p host.
// ConnMgr holds configuration values for the connection manager for
// the libp2p host.
// FIXME: This only applies to ipfs-cluster-service.
ConnMgr ConnMgrConfig

// Time between syncs of the consensus state to the
@@ -125,6 +127,11 @@ type Config struct {
// been removed from a cluster.
PeerWatchInterval time.Duration

// MDNSInterval controls the time between mDNS broadcasts to the
// network announcing the peer addresses. Set to 0 to disable
// mDNS.
MDNSInterval time.Duration

// If true, DisableRepinning, ensures that no repinning happens
// when a node goes down.
// This is useful when doing certain types of maintainance, or simply
@@ -162,6 +169,7 @@ type configJSON struct {
ReplicationFactorMax int `json:"replication_factor_max"`
MonitorPingInterval string `json:"monitor_ping_interval"`
PeerWatchInterval string `json:"peer_watch_interval"`
MDNSInterval string `json:"mdns_interval"`
DisableRepinning bool `json:"disable_repinning"`
FollowerMode bool `json:"follower_mode,omitempty"`
PeerstoreFile string `json:"peerstore_file,omitempty"`
@@ -341,6 +349,7 @@ func (cfg *Config) setDefaults() {
cfg.ReplicationFactorMax = DefaultReplicationFactor
cfg.MonitorPingInterval = DefaultMonitorPingInterval
cfg.PeerWatchInterval = DefaultPeerWatchInterval
cfg.MDNSInterval = DefaultMDNSInterval
cfg.DisableRepinning = DefaultDisableRepinning
cfg.PeerstoreFile = "" // empty so it gets ommited.
cfg.FollowerMode = DefaultFollowerMode
@@ -406,6 +415,7 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
&config.DurationOpt{Duration: jcfg.PinRecoverInterval, Dst: &cfg.PinRecoverInterval, Name: "pin_recover_interval"},
&config.DurationOpt{Duration: jcfg.MonitorPingInterval, Dst: &cfg.MonitorPingInterval, Name: "monitor_ping_interval"},
&config.DurationOpt{Duration: jcfg.PeerWatchInterval, Dst: &cfg.PeerWatchInterval, Name: "peer_watch_interval"},
&config.DurationOpt{Duration: jcfg.MDNSInterval, Dst: &cfg.MDNSInterval, Name: "mdns_interval"},
)
if err != nil {
return err
@@ -456,6 +466,7 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
jcfg.PinRecoverInterval = cfg.PinRecoverInterval.String()
jcfg.MonitorPingInterval = cfg.MonitorPingInterval.String()
jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String()
jcfg.MDNSInterval = cfg.MDNSInterval.String()
jcfg.DisableRepinning = cfg.DisableRepinning
jcfg.PeerstoreFile = cfg.PeerstoreFile
jcfg.FollowerMode = cfg.FollowerMode
@@ -3,29 +3,19 @@ package ipfscluster
import (
"context"
"encoding/hex"
"time"

"github.com/ipfs/ipfs-cluster/config"
"github.com/ipfs/ipfs-cluster/pstoremgr"
libp2p "github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
dht "github.com/libp2p/go-libp2p-kad-dht"
pnet "github.com/libp2p/go-libp2p-pnet"
pubsub "github.com/libp2p/go-libp2p-pubsub"
discovery "github.com/libp2p/go-libp2p/p2p/discovery"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
)

const (
mdnsServiceTag = "_ipfs-cluster-discovery._udp"
mdnsInterval = 30 * time.Second
)

// NewClusterHost creates a libp2p Host with the options from the provided
// cluster configuration. Using that host, it creates pubsub and a DHT
// instances, for shared use by all cluster components. The returned host uses
@@ -34,7 +24,7 @@ func NewClusterHost(
ctx context.Context,
ident *config.Identity,
cfg *Config,
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, discovery.Service, error) {
) (host.Host, *pubsub.PubSub, *dht.IpfsDHT, error) {

connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)

@@ -47,29 +37,22 @@ func NewClusterHost(
libp2p.ConnectionManager(connman),
)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, err
}

psub, err := newPubSub(ctx, h)
if err != nil {
h.Close()
return nil, nil, nil, nil, err
return nil, nil, nil, err
}

idht, err := newDHT(ctx, h)
if err != nil {
h.Close()
return nil, nil, nil, nil, err
}

mdns, err := discovery.NewMdnsService(ctx, h, mdnsInterval, mdnsServiceTag)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, err
}

mdns.RegisterNotifee(&mdnsNotifee{mgr: pstoremgr.New(ctx, h, "")})

return routedHost(h, idht), psub, idht, mdns, nil
return routedHost(h, idht), psub, idht, nil
}

func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
@@ -115,26 +98,6 @@ func routedHost(h host.Host, d *dht.IpfsDHT) host.Host {
return routedhost.Wrap(h, d)
}

type mdnsNotifee struct {
mgr *pstoremgr.Manager
}

func (mdnsNot *mdnsNotifee) HandlePeerFound(p peer.AddrInfo) {
addrs, err := peer.AddrInfoToP2pAddrs(&p)
if err != nil {
logger.Error(err)
return
}
// actually mdns returns a single address but let's do things
// as if there were several
for _, a := range addrs {
_, err = mdnsNot.mgr.ImportPeer(a, true, peerstore.ConnectedAddrTTL)
if err != nil {
logger.Error(err)
}
}
}

// EncodeProtectorKey converts a byte slice to its hex string representation.
func EncodeProtectorKey(secretBytes []byte) string {
return hex.EncodeToString(secretBytes)
@@ -29,7 +29,6 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
discovery "github.com/libp2p/go-libp2p/p2p/discovery"

ma "github.com/multiformats/go-multiaddr"

@@ -91,7 +90,7 @@ func daemon(c *cli.Context) error {
cfgs.Cluster.LeaveOnShutdown = true
}

host, pubsub, dht, mdns, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster)
host, pubsub, dht, err := ipfscluster.NewClusterHost(ctx, cfgHelper.Identity(), cfgs.Cluster)
checkErr("creating libp2p host", err)

cluster, err := createCluster(ctx, c, cfgHelper, host, pubsub, dht, raftStaging)
@@ -104,7 +103,7 @@ func daemon(c *cli.Context) error {
// will realize).
go bootstrap(ctx, cluster, bootstraps)

return handleSignals(ctx, cancel, cluster, host, dht, mdns)
return handleSignals(ctx, cancel, cluster, host, dht)
}

// createCluster creates all the necessary things to produce the cluster
@@ -224,7 +223,6 @@ func handleSignals(
cluster *ipfscluster.Cluster,
host host.Host,
dht *dht.IpfsDHT,
mdns discovery.Service,
) error {
signalChan := make(chan os.Signal, 20)
signal.Notify(
@@ -243,7 +241,6 @@ func handleSignals(
case <-cluster.Done():
cancel()
dht.Close()
mdns.Close()
host.Close()
return nil
}
@@ -36,7 +36,8 @@ var testingClusterCfg = []byte(`{
"replication_factor": -1,
"monitor_ping_interval": "350ms",
"peer_watch_interval": "200ms",
"disable_repinning": false
"disable_repinning": false,
"mdns_interval": "0s"
}`)

var testingRaftCfg = []byte(`{
@@ -45,11 +45,10 @@ func peerManagerClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock, host.Host)
cfg.Secret = testingClusterSecret

// Create a bootstrapping libp2p host
h, _, dht, mdns, err := NewClusterHost(context.Background(), ident, cfg)
h, _, dht, err := NewClusterHost(context.Background(), ident, cfg)
if err != nil {
t.Fatal(err)
}
mdns.Close()

// Connect all peers to that host. This will allow that they
// can discover each others via DHT.
@@ -341,6 +341,23 @@ func (pm *Manager) SetPriority(pid peer.ID, prio int) error {
return pm.host.Peerstore().Put(pid, PriorityTag, prio)
}

// HandlePeerFound implements the Notifee interface for discovery.
func (pm *Manager) HandlePeerFound(p peer.AddrInfo) {
addrs, err := peer.AddrInfoToP2pAddrs(&p)
if err != nil {
logger.Error(err)
return
}
// actually mdns returns a single address but let's do things
// as if there were several
for _, a := range addrs {
_, err = pm.ImportPeer(a, true, peerstore.ConnectedAddrTTL)
if err != nil {
logger.Error(err)
}
}
}

// peerSort is used to sort a slice of PinInfos given the PriorityTag in the
// peerstore, from the lowest tag value (0 is the highest priority) to the
// highest, Peers without a valid priority tag are considered as having a tag

0 comments on commit 5d888f7

Please sign in to comment.
You can’t perform that action at this time.