Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #787: Connectivity fixes #792

Merged
merged 4 commits into from
May 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ test_problem: gx-deps

$(sharness):
@echo "Downloading sharness"
@curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz
@curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/8fa4b9b0465d21b7ec114ec4528fa17f5a6eb361.tar.gz
@cd sharness/lib; tar -zxf sharness.tar.gz; cd ../..
@mv sharness/lib/sharness-master sharness/lib/sharness
@mv sharness/lib/sharness-8fa4b9b0465d21b7ec114ec4528fa17f5a6eb361 sharness/lib/sharness
@rm sharness/lib/sharness.tar.gz

clean_sharness:
Expand Down
23 changes: 13 additions & 10 deletions api/ipfsproxy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,20 +206,23 @@ func (cfg *Config) LoadJSON(raw []byte) error {
}

func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
proxyAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress)
if err != nil {
return fmt.Errorf("error parsing proxy listen_multiaddress: %s", err)
if jcfg.ListenMultiaddress != "" {
proxyAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress)
if err != nil {
return fmt.Errorf("error parsing proxy listen_multiaddress: %s", err)
}
cfg.ListenAddr = proxyAddr
}
nodeAddr, err := ma.NewMultiaddr(jcfg.NodeMultiaddress)
if err != nil {
return fmt.Errorf("error parsing ipfs node_multiaddress: %s", err)
if jcfg.NodeMultiaddress != "" {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two changes are two fix a broken tests that I must have missed when updating deps.

nodeAddr, err := ma.NewMultiaddr(jcfg.NodeMultiaddress)
if err != nil {
return fmt.Errorf("error parsing ipfs node_multiaddress: %s", err)
}
cfg.NodeAddr = nodeAddr
}

cfg.ListenAddr = proxyAddr
cfg.NodeAddr = nodeAddr
config.SetIfNotDefault(jcfg.NodeHTTPS, &cfg.NodeHTTPS)

err = config.ParseDurations(
err := config.ParseDurations(
"ipfsproxy",
&config.DurationOpt{Duration: jcfg.ReadTimeout, Dst: &cfg.ReadTimeout, Name: "read_timeout"},
&config.DurationOpt{Duration: jcfg.ReadHeaderTimeout, Dst: &cfg.ReadHeaderTimeout, Name: "read_header_timeout"},
Expand Down
15 changes: 9 additions & 6 deletions api/rest/client/transports.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"net/http"
"time"

"github.com/ipfs/ipfs-cluster/api"

p2phttp "github.com/hsanjuan/go-libp2p-http"
libp2p "github.com/libp2p/go-libp2p"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pnet "github.com/libp2p/go-libp2p-pnet"
madns "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -41,11 +40,15 @@ func (c *defaultClient) defaultTransport() {
func (c *defaultClient) enableLibp2p() error {
c.defaultTransport()

pid, addr, err := api.Libp2pMultiaddrSplit(c.config.APIAddr)
pinfo, err := peerstore.InfoFromP2pAddr(c.config.APIAddr)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A function in peerstore does mostly the same as a function we had, so started using that one.

if err != nil {
return err
}

if len(pinfo.Addrs) == 0 {
return errors.New("APIAddr only includes a Peer ID")
}

var prot ipnet.Protector
if c.config.ProtectorKey != nil && len(c.config.ProtectorKey) > 0 {
if len(c.config.ProtectorKey) != 32 {
Expand All @@ -67,16 +70,16 @@ func (c *defaultClient) enableLibp2p() error {

ctx, cancel := context.WithTimeout(c.ctx, ResolveTimeout)
defer cancel()
resolvedAddrs, err := madns.Resolve(ctx, addr)
resolvedAddrs, err := madns.Resolve(ctx, pinfo.Addrs[0])
if err != nil {
return err
}

h.Peerstore().AddAddrs(pid, resolvedAddrs, peerstore.PermanentAddrTTL)
h.Peerstore().AddAddrs(pinfo.ID, resolvedAddrs, peerstore.PermanentAddrTTL)
c.transport.RegisterProtocol("libp2p", p2phttp.NewTransport(h))
c.net = "libp2p"
c.p2p = h
c.hostname = pid.Pretty()
c.hostname = peer.IDB58Encode(pinfo.ID)
return nil
}

Expand Down
25 changes: 0 additions & 25 deletions api/util.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package api

import (
"fmt"

peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -32,29 +30,6 @@ func StringsToPeers(strs []string) []peer.ID {
return peers
}

// Libp2pMultiaddrSplit takes a LibP2P multiaddress (/<multiaddr>/ipfs/<peerID>)
// and decapsulates it, parsing the peer ID. Returns an error if there is
// any problem (for example, the provided address not being a Libp2p one).
func Libp2pMultiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
pid, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
err = fmt.Errorf("invalid peer multiaddress: %s: %s", addr, err)
logger.Error(err)
return "", nil, err
}

ipfs, _ := ma.NewMultiaddr("/ipfs/" + pid)
decapAddr := addr.Decapsulate(ipfs)

peerID, err := peer.IDB58Decode(pid)
if err != nil {
err = fmt.Errorf("invalid peer ID in multiaddress: %s: %s", pid, err)
logger.Error(err)
return "", nil, err
}
return peerID, decapAddr, nil
}

// MustLibp2pMultiaddrJoin takes a LibP2P multiaddress and a peer ID and
// encapsulates a new /ipfs/<peerID> address. It will panic if the given
// peer ID is bad.
Expand Down
48 changes: 33 additions & 15 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"

ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc"
Expand All @@ -36,7 +37,10 @@ import (
// consensus layer.
var ReadyTimeout = 30 * time.Second

var pingMetricName = "ping"
const (
pingMetricName = "ping"
bootstrapCount = 3
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the number of peers we attempt to connect. We block until 3 connections succeed or we run out of addresses to try. After that the DHT will take care of discovering the swarm in the background.

)

// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system.
Expand Down Expand Up @@ -116,9 +120,7 @@ func NewCluster(

logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)

// Note, we already loaded peers from peerstore into the host
// in daemon.go.
peerManager := pstoremgr.New(host, cfg.GetPeerstorePath())
peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath())

c := &Cluster{
ctx: ctx,
Expand All @@ -144,6 +146,18 @@ func NewCluster(
readyB: false,
}

// Import known cluster peers from peerstore file. Set
// a non permanent TTL.
c.peerManager.ImportPeersFromPeerstore(false, peerstore.AddressTTL)
// Attempt to connect to some peers (up to bootstrapCount)
actualCount := c.peerManager.Bootstrap(bootstrapCount)
// We cannot warn about this as this is normal if going to Join() later
logger.Debugf("bootstrap count %d", actualCount)
// Bootstrap the DHT now that we possibly have some connections
c.dht.Bootstrap(c.ctx)

// After setupRPC components can do their tasks with a fully operative
// routed libp2p host with some connections and a working DHT (hopefully).
err = c.setupRPC()
if err != nil {
c.Shutdown(ctx)
Expand Down Expand Up @@ -465,9 +479,6 @@ This might be due to one or several causes:

// Cluster is ready.

// Bootstrap the DHT now that we possibly have some connections
c.dht.Bootstrap(c.ctx)

peers, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
Expand Down Expand Up @@ -632,12 +643,24 @@ func (c *Cluster) ID(ctx context.Context) *api.ID {
peers, _ = c.consensus.Peers(ctx)
}

clusterPeerInfos := c.peerManager.PeerInfos(peers)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change block is because the PeerManager does not return api.Multiaddr type anymore (which is our serializable wrapper for multiaddresses). Instead we need to build that address list by hand.

addresses := []api.Multiaddr{}
for _, pinfo := range clusterPeerInfos {
addrs, err := peerstore.InfoToP2pAddrs(&pinfo)
if err != nil {
continue
}
for _, a := range addrs {
addresses = append(addresses, api.NewMultiaddrWithValue(a))
}
}

return &api.ID{
ID: c.id,
//PublicKey: c.host.Peerstore().PubKey(c.id),
Addresses: addrs,
ClusterPeers: peers,
ClusterPeersAddresses: c.peerManager.PeersAddresses(peers),
ClusterPeersAddresses: addresses,
Version: version.Version.String(),
RPCProtocolVersion: version.RPCProtocol,
IPFS: ipfsID,
Expand Down Expand Up @@ -720,20 +743,15 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {

logger.Debugf("Join(%s)", addr)

pid, _, err := api.Libp2pMultiaddrSplit(addr)
// Add peer to peerstore so we can talk to it (and connect)
pid, err := c.peerManager.ImportPeer(addr, true, peerstore.PermanentAddrTTL)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returning the pid from that function allowed some simplication here, but it does not change behaviour.

if err != nil {
logger.Error(err)
return err
}

// Bootstrap to myself
if pid == c.id {
return nil
}

// Add peer to peerstore so we can talk to it (and connect)
c.peerManager.ImportPeer(addr, true)

// Note that PeerAdd() on the remote peer will
// figure out what our real address is (obviously not
// ListenAddr).
Expand Down
8 changes: 0 additions & 8 deletions cmd/ipfs-cluster-service/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/ipfs/ipfs-cluster/observations"
"github.com/ipfs/ipfs-cluster/pintracker/maptracker"
"github.com/ipfs/ipfs-cluster/pintracker/stateless"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"go.opencensus.io/tag"

ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -113,13 +112,6 @@ func createCluster(
ctx, err := tag.New(ctx, tag.Upsert(observations.HostKey, host.ID().Pretty()))
checkErr("tag context with host id", err)

peerstoreMgr := pstoremgr.New(host, cfgs.clusterCfg.GetPeerstorePath())
// Import peers but do not connect. We cannot connect to peers until
// everything has been created (dht, pubsub, bitswap). Otherwise things
// fail.
// Connections will happen as needed during bootstrap, rpc etc.
peerstoreMgr.ImportPeersFromPeerstore(false)

api, err := rest.NewAPIWithHost(ctx, cfgs.apiCfg, host)
checkErr("creating REST API component", err)

Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs-cluster-service/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (raftsm *raftStateManager) ImportState(r io.Reader) error {
if err != nil {
return err
}
pm := pstoremgr.New(nil, raftsm.cfgs.clusterCfg.GetPeerstorePath())
pm := pstoremgr.New(context.Background(), nil, raftsm.cfgs.clusterCfg.GetPeerstorePath())
raftPeers := append(
ipfscluster.PeersFromMultiaddrs(pm.LoadPeerstore()),
raftsm.ident.ID,
Expand Down
51 changes: 33 additions & 18 deletions consensus/crdt/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
ipfslite "github.com/hsanjuan/ipfs-lite"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/pstoremgr"
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/dsstate"
multihash "github.com/multiformats/go-multihash"
Expand All @@ -22,13 +23,15 @@ import (
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

var logger = logging.Logger("crdt")

var (
blocksNs = "b" // blockstore namespace
blocksNs = "b" // blockstore namespace
connMgrTag = "crdt"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connmanager needs a tag when we tag something for Protect().

)

// Common variables for the module.
Expand All @@ -48,7 +51,8 @@ type Consensus struct {

trustedPeers sync.Map

host host.Host
host host.Host
peerManager *pstoremgr.Manager

store ds.Datastore
namespace ds.Key
Expand Down Expand Up @@ -85,21 +89,17 @@ func New(
ctx, cancel := context.WithCancel(context.Background())

css := &Consensus{
ctx: ctx,
cancel: cancel,
config: cfg,
host: host,
dht: dht,
store: store,
namespace: ds.NewKey(cfg.DatastoreNamespace),
pubsub: pubsub,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
}

// Set up a fast-lookup trusted peers cache.
for _, p := range css.config.TrustedPeers {
css.Trust(ctx, p)
ctx: ctx,
cancel: cancel,
config: cfg,
host: host,
peerManager: pstoremgr.New(ctx, host, ""),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We keep a peerManager around to set higher priority to trusted peers

dht: dht,
store: store,
namespace: ds.NewKey(cfg.DatastoreNamespace),
pubsub: pubsub,
rpcReady: make(chan struct{}, 1),
readyCh: make(chan struct{}, 1),
}

go css.setup()
Expand All @@ -113,6 +113,12 @@ func (css *Consensus) setup() {
case <-css.rpcReady:
}

// Set up a fast-lookup trusted peers cache.
// Protect these peers in the ConnMgr
for _, p := range css.config.TrustedPeers {
css.Trust(css.ctx, p)
}

// Hash the cluster name and produce the topic name from there
// as a way to avoid pubsub topic collisions with other
// pubsub applications potentially when both potentially use
Expand Down Expand Up @@ -296,9 +302,18 @@ func (css *Consensus) IsTrustedPeer(ctx context.Context, pid peer.ID) bool {
return ok
}

// Trust marks a peer as "trusted".
// Trust marks a peer as "trusted". It makes sure it is trusted as issuer
// for pubsub updates, it is protected in the connection manager, it
// has the highest priority when the peerstore is saved, and it's addresses
// are always remembered.
func (css *Consensus) Trust(ctx context.Context, pid peer.ID) error {
css.trustedPeers.Store(pid, struct{}{})
if conman := css.host.ConnManager(); conman != nil {
conman.Protect(pid, connMgrTag)
}
css.peerManager.SetPriority(pid, 0)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 means highest priority.

addrs := css.host.Peerstore().Addrs(pid)
css.host.Peerstore().SetAddrs(pid, addrs, peerstore.PermanentAddrTTL)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ func TestClustersPeerAdd(t *testing.T) {
addrs := c.peerManager.LoadPeerstore()
peerMap := make(map[peer.ID]struct{})
for _, a := range addrs {
pid, _, err := api.Libp2pMultiaddrSplit(a)
pinfo, err := peerstore.InfoFromP2pAddr(a)
if err != nil {
t.Fatal(err)
}
peerMap[pid] = struct{}{}
peerMap[pinfo.ID] = struct{}{}
}

if len(peerMap) == 0 {
Expand Down