diff --git a/Makefile b/Makefile index 670d0518b..4211e47d5 100644 --- a/Makefile +++ b/Makefile @@ -75,9 +75,9 @@ test_problem: gx-deps $(sharness): @echo "Downloading sharness" - @curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz + @curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/8fa4b9b0465d21b7ec114ec4528fa17f5a6eb361.tar.gz @cd sharness/lib; tar -zxf sharness.tar.gz; cd ../.. - @mv sharness/lib/sharness-master sharness/lib/sharness + @mv sharness/lib/sharness-8fa4b9b0465d21b7ec114ec4528fa17f5a6eb361 sharness/lib/sharness @rm sharness/lib/sharness.tar.gz clean_sharness: diff --git a/api/ipfsproxy/config.go b/api/ipfsproxy/config.go index 0ef1e72ec..cacfce574 100644 --- a/api/ipfsproxy/config.go +++ b/api/ipfsproxy/config.go @@ -206,20 +206,23 @@ func (cfg *Config) LoadJSON(raw []byte) error { } func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error { - proxyAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress) - if err != nil { - return fmt.Errorf("error parsing proxy listen_multiaddress: %s", err) + if jcfg.ListenMultiaddress != "" { + proxyAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress) + if err != nil { + return fmt.Errorf("error parsing proxy listen_multiaddress: %s", err) + } + cfg.ListenAddr = proxyAddr } - nodeAddr, err := ma.NewMultiaddr(jcfg.NodeMultiaddress) - if err != nil { - return fmt.Errorf("error parsing ipfs node_multiaddress: %s", err) + if jcfg.NodeMultiaddress != "" { + nodeAddr, err := ma.NewMultiaddr(jcfg.NodeMultiaddress) + if err != nil { + return fmt.Errorf("error parsing ipfs node_multiaddress: %s", err) + } + cfg.NodeAddr = nodeAddr } - - cfg.ListenAddr = proxyAddr - cfg.NodeAddr = nodeAddr config.SetIfNotDefault(jcfg.NodeHTTPS, &cfg.NodeHTTPS) - err = config.ParseDurations( + err := config.ParseDurations( "ipfsproxy", &config.DurationOpt{Duration: jcfg.ReadTimeout, Dst: &cfg.ReadTimeout, Name: "read_timeout"}, &config.DurationOpt{Duration: jcfg.ReadHeaderTimeout, Dst: &cfg.ReadHeaderTimeout, Name: "read_header_timeout"}, diff --git a/api/rest/client/transports.go b/api/rest/client/transports.go index ea38a8d74..92ac7ded0 100644 --- a/api/rest/client/transports.go +++ b/api/rest/client/transports.go @@ -8,11 +8,10 @@ import ( "net/http" "time" - "github.com/ipfs/ipfs-cluster/api" - p2phttp "github.com/hsanjuan/go-libp2p-http" libp2p "github.com/libp2p/go-libp2p" ipnet "github.com/libp2p/go-libp2p-interface-pnet" + peer "github.com/libp2p/go-libp2p-peer" peerstore "github.com/libp2p/go-libp2p-peerstore" pnet "github.com/libp2p/go-libp2p-pnet" madns "github.com/multiformats/go-multiaddr-dns" @@ -41,11 +40,15 @@ func (c *defaultClient) defaultTransport() { func (c *defaultClient) enableLibp2p() error { c.defaultTransport() - pid, addr, err := api.Libp2pMultiaddrSplit(c.config.APIAddr) + pinfo, err := peerstore.InfoFromP2pAddr(c.config.APIAddr) if err != nil { return err } + if len(pinfo.Addrs) == 0 { + return errors.New("APIAddr only includes a Peer ID") + } + var prot ipnet.Protector if c.config.ProtectorKey != nil && len(c.config.ProtectorKey) > 0 { if len(c.config.ProtectorKey) != 32 { @@ -67,16 +70,16 @@ func (c *defaultClient) enableLibp2p() error { ctx, cancel := context.WithTimeout(c.ctx, ResolveTimeout) defer cancel() - resolvedAddrs, err := madns.Resolve(ctx, addr) + resolvedAddrs, err := madns.Resolve(ctx, pinfo.Addrs[0]) if err != nil { return err } - h.Peerstore().AddAddrs(pid, resolvedAddrs, peerstore.PermanentAddrTTL) + h.Peerstore().AddAddrs(pinfo.ID, resolvedAddrs, peerstore.PermanentAddrTTL) c.transport.RegisterProtocol("libp2p", p2phttp.NewTransport(h)) c.net = "libp2p" c.p2p = h - c.hostname = pid.Pretty() + c.hostname = peer.IDB58Encode(pinfo.ID) return nil } diff --git a/api/util.go b/api/util.go index 3031d93b8..da24250c2 100644 --- a/api/util.go +++ b/api/util.go @@ -1,8 +1,6 @@ package api import ( - "fmt" - peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" ) @@ -32,29 +30,6 @@ func StringsToPeers(strs []string) []peer.ID { return peers } -// Libp2pMultiaddrSplit takes a LibP2P multiaddress (//ipfs/) -// and decapsulates it, parsing the peer ID. Returns an error if there is -// any problem (for example, the provided address not being a Libp2p one). -func Libp2pMultiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) { - pid, err := addr.ValueForProtocol(ma.P_IPFS) - if err != nil { - err = fmt.Errorf("invalid peer multiaddress: %s: %s", addr, err) - logger.Error(err) - return "", nil, err - } - - ipfs, _ := ma.NewMultiaddr("/ipfs/" + pid) - decapAddr := addr.Decapsulate(ipfs) - - peerID, err := peer.IDB58Decode(pid) - if err != nil { - err = fmt.Errorf("invalid peer ID in multiaddress: %s: %s", pid, err) - logger.Error(err) - return "", nil, err - } - return peerID, decapAddr, nil -} - // MustLibp2pMultiaddrJoin takes a LibP2P multiaddress and a peer ID and // encapsulates a new /ipfs/ address. It will panic if the given // peer ID is bad. diff --git a/cluster.go b/cluster.go index 65fda785a..4619b13ba 100644 --- a/cluster.go +++ b/cluster.go @@ -24,6 +24,7 @@ import ( host "github.com/libp2p/go-libp2p-host" dht "github.com/libp2p/go-libp2p-kad-dht" peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc" @@ -36,7 +37,10 @@ import ( // consensus layer. var ReadyTimeout = 30 * time.Second -var pingMetricName = "ping" +const ( + pingMetricName = "ping" + bootstrapCount = 3 +) // Cluster is the main IPFS cluster component. It provides // the go-API for it and orchestrates the components that make up the system. @@ -116,9 +120,7 @@ func NewCluster( logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs) - // Note, we already loaded peers from peerstore into the host - // in daemon.go. - peerManager := pstoremgr.New(host, cfg.GetPeerstorePath()) + peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath()) c := &Cluster{ ctx: ctx, @@ -144,6 +146,18 @@ func NewCluster( readyB: false, } + // Import known cluster peers from peerstore file. Set + // a non permanent TTL. + c.peerManager.ImportPeersFromPeerstore(false, peerstore.AddressTTL) + // Attempt to connect to some peers (up to bootstrapCount) + actualCount := c.peerManager.Bootstrap(bootstrapCount) + // We cannot warn about this as this is normal if going to Join() later + logger.Debugf("bootstrap count %d", actualCount) + // Bootstrap the DHT now that we possibly have some connections + c.dht.Bootstrap(c.ctx) + + // After setupRPC components can do their tasks with a fully operative + // routed libp2p host with some connections and a working DHT (hopefully). err = c.setupRPC() if err != nil { c.Shutdown(ctx) @@ -465,9 +479,6 @@ This might be due to one or several causes: // Cluster is ready. - // Bootstrap the DHT now that we possibly have some connections - c.dht.Bootstrap(c.ctx) - peers, err := c.consensus.Peers(ctx) if err != nil { logger.Error(err) @@ -632,12 +643,24 @@ func (c *Cluster) ID(ctx context.Context) *api.ID { peers, _ = c.consensus.Peers(ctx) } + clusterPeerInfos := c.peerManager.PeerInfos(peers) + addresses := []api.Multiaddr{} + for _, pinfo := range clusterPeerInfos { + addrs, err := peerstore.InfoToP2pAddrs(&pinfo) + if err != nil { + continue + } + for _, a := range addrs { + addresses = append(addresses, api.NewMultiaddrWithValue(a)) + } + } + return &api.ID{ ID: c.id, //PublicKey: c.host.Peerstore().PubKey(c.id), Addresses: addrs, ClusterPeers: peers, - ClusterPeersAddresses: c.peerManager.PeersAddresses(peers), + ClusterPeersAddresses: addresses, Version: version.Version.String(), RPCProtocolVersion: version.RPCProtocol, IPFS: ipfsID, @@ -720,20 +743,15 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error { logger.Debugf("Join(%s)", addr) - pid, _, err := api.Libp2pMultiaddrSplit(addr) + // Add peer to peerstore so we can talk to it (and connect) + pid, err := c.peerManager.ImportPeer(addr, true, peerstore.PermanentAddrTTL) if err != nil { - logger.Error(err) return err } - - // Bootstrap to myself if pid == c.id { return nil } - // Add peer to peerstore so we can talk to it (and connect) - c.peerManager.ImportPeer(addr, true) - // Note that PeerAdd() on the remote peer will // figure out what our real address is (obviously not // ListenAddr). diff --git a/cmd/ipfs-cluster-service/daemon.go b/cmd/ipfs-cluster-service/daemon.go index dd9bc346f..59c488b25 100644 --- a/cmd/ipfs-cluster-service/daemon.go +++ b/cmd/ipfs-cluster-service/daemon.go @@ -23,7 +23,6 @@ import ( "github.com/ipfs/ipfs-cluster/observations" "github.com/ipfs/ipfs-cluster/pintracker/maptracker" "github.com/ipfs/ipfs-cluster/pintracker/stateless" - "github.com/ipfs/ipfs-cluster/pstoremgr" "go.opencensus.io/tag" ds "github.com/ipfs/go-datastore" @@ -113,13 +112,6 @@ func createCluster( ctx, err := tag.New(ctx, tag.Upsert(observations.HostKey, host.ID().Pretty())) checkErr("tag context with host id", err) - peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath()) - // Import peers but do not connect. We cannot connect to peers until - // everything has been created (dht, pubsub, bitswap). Otherwise things - // fail. - // Connections will happen as needed during bootstrap, rpc etc. - peerstoreMgr.ImportPeersFromPeerstore(false) - api, err := rest.NewAPIWithHost(ctx, cfgs.apiCfg, host) checkErr("creating REST API component", err) diff --git a/cmd/ipfs-cluster-service/state.go b/cmd/ipfs-cluster-service/state.go index 4d0cca584..c15ed798d 100644 --- a/cmd/ipfs-cluster-service/state.go +++ b/cmd/ipfs-cluster-service/state.go @@ -73,7 +73,7 @@ func (raftsm *raftStateManager) ImportState(r io.Reader) error { if err != nil { return err } - pm := pstoremgr.New(nil, raftsm.cfgs.clusterCfg.GetPeerstorePath()) + pm := pstoremgr.New(context.Background(), nil, raftsm.cfgs.clusterCfg.GetPeerstorePath()) raftPeers := append( ipfscluster.PeersFromMultiaddrs(pm.LoadPeerstore()), raftsm.ident.ID, diff --git a/consensus/crdt/consensus.go b/consensus/crdt/consensus.go index 61f5d1678..fe4472985 100644 --- a/consensus/crdt/consensus.go +++ b/consensus/crdt/consensus.go @@ -9,6 +9,7 @@ import ( ipfslite "github.com/hsanjuan/ipfs-lite" dshelp "github.com/ipfs/go-ipfs-ds-help" "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/pstoremgr" "github.com/ipfs/ipfs-cluster/state" "github.com/ipfs/ipfs-cluster/state/dsstate" multihash "github.com/multiformats/go-multihash" @@ -22,13 +23,15 @@ import ( host "github.com/libp2p/go-libp2p-host" dht "github.com/libp2p/go-libp2p-kad-dht" peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" pubsub "github.com/libp2p/go-libp2p-pubsub" ) var logger = logging.Logger("crdt") var ( - blocksNs = "b" // blockstore namespace + blocksNs = "b" // blockstore namespace + connMgrTag = "crdt" ) // Common variables for the module. @@ -48,7 +51,8 @@ type Consensus struct { trustedPeers sync.Map - host host.Host + host host.Host + peerManager *pstoremgr.Manager store ds.Datastore namespace ds.Key @@ -85,21 +89,17 @@ func New( ctx, cancel := context.WithCancel(context.Background()) css := &Consensus{ - ctx: ctx, - cancel: cancel, - config: cfg, - host: host, - dht: dht, - store: store, - namespace: ds.NewKey(cfg.DatastoreNamespace), - pubsub: pubsub, - rpcReady: make(chan struct{}, 1), - readyCh: make(chan struct{}, 1), - } - - // Set up a fast-lookup trusted peers cache. - for _, p := range css.config.TrustedPeers { - css.Trust(ctx, p) + ctx: ctx, + cancel: cancel, + config: cfg, + host: host, + peerManager: pstoremgr.New(ctx, host, ""), + dht: dht, + store: store, + namespace: ds.NewKey(cfg.DatastoreNamespace), + pubsub: pubsub, + rpcReady: make(chan struct{}, 1), + readyCh: make(chan struct{}, 1), } go css.setup() @@ -113,6 +113,12 @@ func (css *Consensus) setup() { case <-css.rpcReady: } + // Set up a fast-lookup trusted peers cache. + // Protect these peers in the ConnMgr + for _, p := range css.config.TrustedPeers { + css.Trust(css.ctx, p) + } + // Hash the cluster name and produce the topic name from there // as a way to avoid pubsub topic collisions with other // pubsub applications potentially when both potentially use @@ -296,9 +302,18 @@ func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool { return ok } -// Trust marks a peer as "trusted". +// Trust marks a peer as "trusted". It makes sure it is trusted as issuer +// for pubsub updates, it is protected in the connection manager, it +// has the highest priority when the peerstore is saved, and it's addresses +// are always remembered. func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error { css.trustedPeers.Store(pid, struct{}{}) + if conman := css.host.ConnManager(); conman != nil { + conman.Protect(pid, connMgrTag) + } + css.peerManager.SetPriority(pid, 0) + addrs := css.host.Peerstore().Addrs(pid) + css.host.Peerstore().SetAddrs(pid, addrs, peerstore.PermanentAddrTTL) return nil } diff --git a/peer_manager_test.go b/peer_manager_test.go index 839d2d248..7596dbe9b 100644 --- a/peer_manager_test.go +++ b/peer_manager_test.go @@ -155,11 +155,11 @@ func TestClustersPeerAdd(t *testing.T) { addrs := c.peerManager.LoadPeerstore() peerMap := make(map[peer.ID]struct{}) for _, a := range addrs { - pid, _, err := api.Libp2pMultiaddrSplit(a) + pinfo, err := peerstore.InfoFromP2pAddr(a) if err != nil { t.Fatal(err) } - peerMap[pid] = struct{}{} + peerMap[pinfo.ID] = struct{}{} } if len(peerMap) == 0 { diff --git a/pstoremgr/pstoremgr.go b/pstoremgr/pstoremgr.go index d6c102c74..81ee2cda8 100644 --- a/pstoremgr/pstoremgr.go +++ b/pstoremgr/pstoremgr.go @@ -10,11 +10,10 @@ import ( "context" "fmt" "os" + "sort" "sync" "time" - "github.com/ipfs/ipfs-cluster/api" - logging "github.com/ipfs/go-log" host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" @@ -25,10 +24,14 @@ import ( var logger = logging.Logger("pstoremgr") -// Timeouts for network operations triggered by the Manager +// PriorityTag is used to attach metadata to peers in the peerstore +// so they can be sorted. +var PriorityTag = "cluster" + +// Timeouts for network operations triggered by the Manager. var ( DNSTimeout = 5 * time.Second - ConnectTimeout = 10 * time.Second + ConnectTimeout = 5 * time.Second ) // Manager provides utilities for handling cluster peer addresses @@ -43,9 +46,9 @@ type Manager struct { // New creates a Manager with the given libp2p Host and peerstorePath. // The path indicates the place to persist and read peer addresses from. // If empty, these operations (LoadPeerstore, SavePeerstore) will no-op. -func New(h host.Host, peerstorePath string) *Manager { +func New(ctx context.Context, h host.Host, peerstorePath string) *Manager { return &Manager{ - ctx: context.Background(), + ctx: ctx, host: h, peerstorePath: peerstorePath, } @@ -54,38 +57,33 @@ func New(h host.Host, peerstorePath string) *Manager { // ImportPeer adds a new peer address to the host's peerstore, optionally // dialing to it. It will resolve any DNS multiaddresses before adding them. // The address is expected to include the /ipfs/ protocol part. -func (pm *Manager) ImportPeer(addr ma.Multiaddr, connect bool) error { +// Peers are added with the given ttl +func (pm *Manager) ImportPeer(addr ma.Multiaddr, connect bool, ttl time.Duration) (peer.ID, error) { if pm.host == nil { - return nil + return "", nil } logger.Debugf("adding peer address %s", addr) - pid, decapAddr, err := api.Libp2pMultiaddrSplit(addr) + pinfo, err := peerstore.InfoFromP2pAddr(addr) if err != nil { - return err + return "", err } - pm.host.Peerstore().AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL) - // dns multiaddresses need to be resolved because libp2p only does that - // on explicit bhost.Connect(). - if madns.Matches(addr) { - ctx, cancel := context.WithTimeout(pm.ctx, DNSTimeout) - defer cancel() - resolvedAddrs, err := madns.Resolve(ctx, addr) - if err != nil { - logger.Error(err) - return err - } - pm.ImportPeers(resolvedAddrs, connect) + // Do not add ourselves + if pinfo.ID == pm.host.ID() { + return pinfo.ID, nil } + + pm.host.Peerstore().AddAddrs(pinfo.ID, pinfo.Addrs, ttl) + if connect { go func() { ctx, cancel := context.WithTimeout(pm.ctx, ConnectTimeout) defer cancel() - pm.host.Network().DialPeer(ctx, pid) + pm.host.Connect(ctx, *pinfo) }() } - return nil + return pinfo.ID, nil } // RmPeer clear all addresses for a given peer ID from the host's peerstore. @@ -100,19 +98,17 @@ func (pm *Manager) RmPeer(pid peer.ID) error { } // if the peer has dns addresses, return only those, otherwise -// return all. In all cases, encapsulate the peer ID. -func (pm *Manager) filteredPeerAddrs(p peer.ID) []api.Multiaddr { +// return all. +func (pm *Manager) filteredPeerAddrs(p peer.ID) []ma.Multiaddr { all := pm.host.Peerstore().Addrs(p) - peerAddrs := []api.Multiaddr{} - peerDNSAddrs := []api.Multiaddr{} - peerPart, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(p))) + peerAddrs := []ma.Multiaddr{} + peerDNSAddrs := []ma.Multiaddr{} for _, a := range all { - encAddr := a.Encapsulate(peerPart) - if madns.Matches(encAddr) { - peerDNSAddrs = append(peerDNSAddrs, api.NewMultiaddrWithValue(encAddr)) + if madns.Matches(a) { + peerDNSAddrs = append(peerDNSAddrs, a) } else { - peerAddrs = append(peerAddrs, api.NewMultiaddrWithValue(encAddr)) + peerAddrs = append(peerAddrs, a) } } @@ -123,11 +119,12 @@ func (pm *Manager) filteredPeerAddrs(p peer.ID) []api.Multiaddr { return peerAddrs } -// PeersAddresses returns the list of multiaddresses (encapsulating the -// /ipfs/ part) for the given set of peers. For peers for which -// we know DNS multiaddresses, we only return those. Otherwise, we return -// all the multiaddresses known for that peer. -func (pm *Manager) PeersAddresses(peers []peer.ID) []api.Multiaddr { +// PeerInfos returns a slice of peerinfos for the given set of peers in order +// of priority. For peers for which we know DNS +// multiaddresses, we only include those. Otherwise, the PeerInfo includes all +// the multiaddresses known for that peer. Peers without addresses are not +// included. +func (pm *Manager) PeerInfos(peers []peer.ID) []peerstore.PeerInfo { if pm.host == nil { return nil } @@ -136,29 +133,47 @@ func (pm *Manager) PeersAddresses(peers []peer.ID) []api.Multiaddr { return nil } - var addrs []api.Multiaddr + var pinfos []peerstore.PeerInfo for _, p := range peers { if p == pm.host.ID() { continue } - addrs = append(addrs, pm.filteredPeerAddrs(p)...) + pinfo := peerstore.PeerInfo{ + ID: p, + Addrs: pm.filteredPeerAddrs(p), + } + if len(pinfo.Addrs) > 0 { + pinfos = append(pinfos, pinfo) + } } - return addrs + + toSort := &peerSort{ + pinfos: pinfos, + pstore: pm.host.Peerstore(), + } + // Sort from highest to lowest priority + sort.Sort(toSort) + + return toSort.pinfos } // ImportPeers calls ImportPeer for every address in the given slice, using the -// given connect parameter. -func (pm *Manager) ImportPeers(addrs []ma.Multiaddr, connect bool) error { - for _, a := range addrs { - pm.ImportPeer(a, connect) +// given connect parameter. Peers are tagged with priority as given +// by their position in the list. +func (pm *Manager) ImportPeers(addrs []ma.Multiaddr, connect bool, ttl time.Duration) error { + for i, a := range addrs { + pid, err := pm.ImportPeer(a, connect, ttl) + if err == nil { + pm.SetPriority(pid, i) + } } return nil } // ImportPeersFromPeerstore reads the peerstore file and calls ImportPeers with // the addresses obtained from it. -func (pm *Manager) ImportPeersFromPeerstore(connect bool) error { - return pm.ImportPeers(pm.LoadPeerstore(), connect) +func (pm *Manager) ImportPeersFromPeerstore(connect bool, ttl time.Duration) error { + return pm.ImportPeers(pm.LoadPeerstore(), connect, ttl) } // LoadPeerstore parses the peerstore file and returns the list @@ -202,7 +217,7 @@ func (pm *Manager) LoadPeerstore() (addrs []ma.Multiaddr) { // SavePeerstore stores a slice of multiaddresses in the peerstore file, one // per line. -func (pm *Manager) SavePeerstore(addrs []api.Multiaddr) { +func (pm *Manager) SavePeerstore(pinfos []peerstore.PeerInfo) { if pm.peerstorePath == "" { return } @@ -221,13 +236,100 @@ func (pm *Manager) SavePeerstore(addrs []api.Multiaddr) { } defer f.Close() - for _, a := range addrs { - f.Write([]byte(fmt.Sprintf("%s\n", a.Value().String()))) + for _, pinfo := range pinfos { + addrs, err := peerstore.InfoToP2pAddrs(&pinfo) + if err != nil { + logger.Warning(err) + continue + } + for _, a := range addrs { + f.Write([]byte(fmt.Sprintf("%s\n", a.String()))) + } } } -// SavePeerstoreForPeers calls PeersAddresses and then saves the peerstore +// SavePeerstoreForPeers calls PeerInfos and then saves the peerstore // file using the result. func (pm *Manager) SavePeerstoreForPeers(peers []peer.ID) { - pm.SavePeerstore(pm.PeersAddresses(peers)) + pm.SavePeerstore(pm.PeerInfos(peers)) +} + +// Bootstrap attempts to get as much as count connected peers by selecting +// randomly from those in the libp2p host peerstore. It returns the number +// of peers it successfully connected to. +func (pm *Manager) Bootstrap(count int) int { + knownPeers := pm.host.Peerstore().PeersWithAddrs() + toSort := &peerSort{ + pinfos: peerstore.PeerInfos(pm.host.Peerstore(), knownPeers), + pstore: pm.host.Peerstore(), + } + + // Sort from highest to lowest priority + sort.Sort(toSort) + + pinfos := toSort.pinfos + lenKnown := len(pinfos) + totalConns := 0 + + // keep conecting while we have peers in the store + // and we have not reached count. + for i := 0; i < lenKnown && totalConns < count; i++ { + pinfo := pinfos[i] + ctx, cancel := context.WithTimeout(pm.ctx, ConnectTimeout) + defer cancel() + + logger.Infof("connecting to %s", pinfo.ID) + err := pm.host.Connect(ctx, pinfo) + if err != nil { + logger.Warning(err) + pm.SetPriority(pinfo.ID, 9999) + continue + } + totalConns++ + } + return totalConns +} + +// SetPriority attaches a priority to a peer. 0 means more priority than +// 1. 1 means more priority than 2 etc. +func (pm *Manager) SetPriority(pid peer.ID, prio int) error { + return pm.host.Peerstore().Put(pid, PriorityTag, prio) +} + +// peerSort is used to sort a slice of PinInfos given the PriorityTag in the +// peerstore, from the lowest tag value (0 is the highest priority) to the +// highest, Peers without a valid priority tag are considered as having a tag +// with value 0, so they will be among the first elements in the resulting +// slice. +type peerSort struct { + pinfos []peerstore.PeerInfo + pstore peerstore.Peerstore +} + +func (ps *peerSort) Len() int { + return len(ps.pinfos) +} + +func (ps *peerSort) Less(i, j int) bool { + pinfo1 := ps.pinfos[i] + pinfo2 := ps.pinfos[j] + + var prio1, prio2 int + + prio1iface, err := ps.pstore.Get(pinfo1.ID, PriorityTag) + if err == nil { + prio1 = prio1iface.(int) + } + prio2iface, err := ps.pstore.Get(pinfo2.ID, PriorityTag) + if err == nil { + prio2 = prio2iface.(int) + } + return prio1 < prio2 +} + +func (ps *peerSort) Swap(i, j int) { + pinfo1 := ps.pinfos[i] + pinfo2 := ps.pinfos[j] + ps.pinfos[i] = pinfo2 + ps.pinfos[j] = pinfo1 } diff --git a/pstoremgr/pstoremgr_test.go b/pstoremgr/pstoremgr_test.go index 40bd764ba..017b910da 100644 --- a/pstoremgr/pstoremgr_test.go +++ b/pstoremgr/pstoremgr_test.go @@ -4,21 +4,21 @@ import ( "context" "os" "testing" + "time" - "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/test" libp2p "github.com/libp2p/go-libp2p" + peer "github.com/libp2p/go-libp2p-peer" ma "github.com/multiformats/go-multiaddr" ) -var pid = "QmXZrtE5jQwXNqCJMfHUTQkvhQ4ZAnqMnmzFMJfLewuabc" - func makeMgr(t *testing.T) *Manager { h, err := libp2p.New(context.Background()) if err != nil { t.Fatal(err) } - return New(h, "peerstore") + return New(context.Background(), h, "peerstore") } func clean(pm *Manager) { @@ -27,31 +27,45 @@ func clean(pm *Manager) { } } +func testAddr(loc string, pid peer.ID) ma.Multiaddr { + m, _ := ma.NewMultiaddr(loc + "/ipfs/" + peer.IDB58Encode(pid)) + return m +} + func TestManager(t *testing.T) { pm := makeMgr(t) defer clean(pm) - testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid) + loc := "/ip4/127.0.0.1/tcp/1234" + testAddr := testAddr(loc, test.PeerID1) - err := pm.ImportPeer(testPeer, false) + _, err := pm.ImportPeer(testAddr, false, time.Minute) if err != nil { t.Fatal(err) } - peers := api.StringsToPeers([]string{pid, pm.host.ID().Pretty()}) - addrs := pm.PeersAddresses(peers) - if len(addrs) != 1 { - t.Fatal("expected 1 address") + peers := []peer.ID{test.PeerID1, pm.host.ID()} + pinfos := pm.PeerInfos(peers) + if len(pinfos) != 1 { + t.Fatal("expected 1 peerinfo") + } + + if pinfos[0].ID != test.PeerID1 { + t.Error("expected same peer as added") } - if !addrs[0].Equal(testPeer) { + if len(pinfos[0].Addrs) != 1 { + t.Fatal("expected an address") + } + + if pinfos[0].Addrs[0].String() != loc { t.Error("expected same address as added") } pm.RmPeer(peers[0]) - addrs = pm.PeersAddresses(peers) - if len(addrs) != 0 { - t.Fatal("expected 0 addresses") + pinfos = pm.PeerInfos(peers) + if len(pinfos) != 0 { + t.Fatal("expected 0 pinfos") } } @@ -59,21 +73,27 @@ func TestManagerDNS(t *testing.T) { pm := makeMgr(t) defer clean(pm) - testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid) - testPeer2, _ := ma.NewMultiaddr("/dns4/localhost/tcp/1235/ipfs/" + pid) + loc1 := "/ip4/127.0.0.1/tcp/1234" + testAddr1 := testAddr(loc1, test.PeerID1) + loc2 := "/dns4/localhost/tcp/1235" + testAddr2 := testAddr(loc2, test.PeerID1) - err := pm.ImportPeers([]ma.Multiaddr{testPeer, testPeer2}, false) + err := pm.ImportPeers([]ma.Multiaddr{testAddr1, testAddr2}, false, time.Minute) if err != nil { t.Fatal(err) } - addrs := pm.PeersAddresses(api.StringsToPeers([]string{pid})) - if len(addrs) != 1 { - t.Fatal("expected 1 address") + pinfos := pm.PeerInfos([]peer.ID{test.PeerID1}) + if len(pinfos) != 1 { + t.Fatal("expected 1 pinfo") } - if !addrs[0].Equal(testPeer2) { - t.Error("expected only the dns address") + if len(pinfos[0].Addrs) != 1 { + t.Error("expected a single address") + } + + if pinfos[0].Addrs[0].String() != "/dns4/localhost/tcp/1235" { + t.Error("expected the dns address") } } @@ -81,25 +101,95 @@ func TestPeerstore(t *testing.T) { pm := makeMgr(t) defer clean(pm) - testPeer, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1234/ipfs/" + pid) - testPeer2, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235/ipfs/" + pid) + loc1 := "/ip4/127.0.0.1/tcp/1234" + testAddr1 := testAddr(loc1, test.PeerID1) + loc2 := "/ip4/127.0.0.1/tcp/1235" + testAddr2 := testAddr(loc2, test.PeerID1) + + err := pm.ImportPeers([]ma.Multiaddr{testAddr1, testAddr2}, false, time.Minute) + if err != nil { + t.Fatal(err) + } + + pm.SavePeerstoreForPeers([]peer.ID{test.PeerID1}) + + pm2 := makeMgr(t) + defer clean(pm2) + + err = pm2.ImportPeersFromPeerstore(false, time.Minute) + if err != nil { + t.Fatal(err) + } + + pinfos := pm2.PeerInfos([]peer.ID{test.PeerID1}) + if len(pinfos) != 1 { + t.Fatal("expected 1 peer in the peerstore") + } + + if len(pinfos[0].Addrs) != 2 { + t.Error("expected 2 addresses") + } +} + +func TestPriority(t *testing.T) { + pm := makeMgr(t) + defer clean(pm) + + loc1 := "/ip4/127.0.0.1/tcp/1234" + testAddr1 := testAddr(loc1, test.PeerID1) + loc2 := "/ip4/127.0.0.2/tcp/1235" + testAddr2 := testAddr(loc2, test.PeerID2) + loc3 := "/ip4/127.0.0.3/tcp/1234" + testAddr3 := testAddr(loc3, test.PeerID3) + loc4 := "/ip4/127.0.0.4/tcp/1235" + testAddr4 := testAddr(loc4, test.PeerID4) - err := pm.ImportPeers([]ma.Multiaddr{testPeer, testPeer2}, false) + err := pm.ImportPeers([]ma.Multiaddr{testAddr1, testAddr2, testAddr3, testAddr4}, false, time.Minute) if err != nil { t.Fatal(err) } - pm.SavePeerstoreForPeers(api.StringsToPeers([]string{pid})) + pinfos := pm.PeerInfos([]peer.ID{test.PeerID4, test.PeerID2, test.PeerID3, test.PeerID1}) + if len(pinfos) != 4 { + t.Fatal("expected 4 pinfos") + } + + if pinfos[0].ID != test.PeerID1 || + pinfos[1].ID != test.PeerID2 || + pinfos[2].ID != test.PeerID3 || + pinfos[3].ID != test.PeerID4 { + t.Error("wrong order of peerinfos") + } + + pm.SetPriority(test.PeerID1, 100) + + pinfos = pm.PeerInfos([]peer.ID{test.PeerID4, test.PeerID2, test.PeerID3, test.PeerID1}) + if len(pinfos) != 4 { + t.Fatal("expected 4 pinfos") + } + + if pinfos[3].ID != test.PeerID1 { + t.Fatal("PeerID1 should be last in the list") + } + + pm.SavePeerstoreForPeers([]peer.ID{test.PeerID4, test.PeerID2, test.PeerID3, test.PeerID1}) pm2 := makeMgr(t) defer clean(pm2) - err = pm2.ImportPeersFromPeerstore(false) + err = pm2.ImportPeersFromPeerstore(false, time.Minute) if err != nil { t.Fatal(err) } + pinfos = pm2.PeerInfos([]peer.ID{test.PeerID4, test.PeerID2, test.PeerID3, test.PeerID1}) + if len(pinfos) != 4 { + t.Fatal("expected 4 pinfos") + } - if len(pm2.PeersAddresses(api.StringsToPeers([]string{pid}))) != 2 { - t.Error("expected 2 addresses from the peerstore") + if pinfos[0].ID != test.PeerID2 || + pinfos[1].ID != test.PeerID3 || + pinfos[2].ID != test.PeerID4 || + pinfos[3].ID != test.PeerID1 { + t.Error("wrong order of peerinfos") } } diff --git a/util.go b/util.go index e856f5f39..41e09a88c 100644 --- a/util.go +++ b/util.go @@ -4,10 +4,9 @@ import ( "errors" "fmt" - "github.com/ipfs/ipfs-cluster/api" - cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" ma "github.com/multiformats/go-multiaddr" ) @@ -18,14 +17,14 @@ func PeersFromMultiaddrs(addrs []ma.Multiaddr) []peer.ID { var pids []peer.ID pm := make(map[peer.ID]struct{}) for _, addr := range addrs { - pid, _, err := api.Libp2pMultiaddrSplit(addr) + pinfo, err := peerstore.InfoFromP2pAddr(addr) if err != nil { continue } - _, ok := pm[pid] + _, ok := pm[pinfo.ID] if !ok { - pm[pid] = struct{}{} - pids = append(pids, pid) + pm[pinfo.ID] = struct{}{} + pids = append(pids, pinfo.ID) } } return pids