Skip to content
Permalink
Browse files

Merge branch 'master' into fix/expose-badger-conf

  • Loading branch information...
hsanjuan committed Jun 7, 2019
2 parents fd82463 + 445e484 commit 588596d28e1dd3c8c52346a7fbd57d57e231d229
@@ -75,9 +75,9 @@ test_problem: gx-deps

$(sharness):
@echo "Downloading sharness"
@curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz
@curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/8fa4b9b0465d21b7ec114ec4528fa17f5a6eb361.tar.gz
@cd sharness/lib; tar -zxf sharness.tar.gz; cd ../..
@mv sharness/lib/sharness-master sharness/lib/sharness
@mv sharness/lib/sharness-8fa4b9b0465d21b7ec114ec4528fa17f5a6eb361 sharness/lib/sharness
@rm sharness/lib/sharness.tar.gz

clean_sharness:
@@ -206,20 +206,23 @@ func (cfg *Config) LoadJSON(raw []byte) error {
}

func (cfg *Config) applyJSONConfig(jcfg *jsonConfig) error {
proxyAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress)
if err != nil {
return fmt.Errorf("error parsing proxy listen_multiaddress: %s", err)
if jcfg.ListenMultiaddress != "" {
proxyAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress)
if err != nil {
return fmt.Errorf("error parsing proxy listen_multiaddress: %s", err)
}
cfg.ListenAddr = proxyAddr
}
nodeAddr, err := ma.NewMultiaddr(jcfg.NodeMultiaddress)
if err != nil {
return fmt.Errorf("error parsing ipfs node_multiaddress: %s", err)
if jcfg.NodeMultiaddress != "" {
nodeAddr, err := ma.NewMultiaddr(jcfg.NodeMultiaddress)
if err != nil {
return fmt.Errorf("error parsing ipfs node_multiaddress: %s", err)
}
cfg.NodeAddr = nodeAddr
}

cfg.ListenAddr = proxyAddr
cfg.NodeAddr = nodeAddr
config.SetIfNotDefault(jcfg.NodeHTTPS, &cfg.NodeHTTPS)

err = config.ParseDurations(
err := config.ParseDurations(
"ipfsproxy",
&config.DurationOpt{Duration: jcfg.ReadTimeout, Dst: &cfg.ReadTimeout, Name: "read_timeout"},
&config.DurationOpt{Duration: jcfg.ReadHeaderTimeout, Dst: &cfg.ReadHeaderTimeout, Name: "read_header_timeout"},
@@ -8,11 +8,10 @@ import (
"net/http"
"time"

"github.com/ipfs/ipfs-cluster/api"

p2phttp "github.com/hsanjuan/go-libp2p-http"
libp2p "github.com/libp2p/go-libp2p"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
pnet "github.com/libp2p/go-libp2p-pnet"
madns "github.com/multiformats/go-multiaddr-dns"
@@ -41,11 +40,15 @@ func (c *defaultClient) defaultTransport() {
func (c *defaultClient) enableLibp2p() error {
c.defaultTransport()

pid, addr, err := api.Libp2pMultiaddrSplit(c.config.APIAddr)
pinfo, err := peerstore.InfoFromP2pAddr(c.config.APIAddr)
if err != nil {
return err
}

if len(pinfo.Addrs) == 0 {
return errors.New("APIAddr only includes a Peer ID")
}

var prot ipnet.Protector
if c.config.ProtectorKey != nil && len(c.config.ProtectorKey) > 0 {
if len(c.config.ProtectorKey) != 32 {
@@ -67,16 +70,16 @@ func (c *defaultClient) enableLibp2p() error {

ctx, cancel := context.WithTimeout(c.ctx, ResolveTimeout)
defer cancel()
resolvedAddrs, err := madns.Resolve(ctx, addr)
resolvedAddrs, err := madns.Resolve(ctx, pinfo.Addrs[0])
if err != nil {
return err
}

h.Peerstore().AddAddrs(pid, resolvedAddrs, peerstore.PermanentAddrTTL)
h.Peerstore().AddAddrs(pinfo.ID, resolvedAddrs, peerstore.PermanentAddrTTL)
c.transport.RegisterProtocol("libp2p", p2phttp.NewTransport(h))
c.net = "libp2p"
c.p2p = h
c.hostname = pid.Pretty()
c.hostname = peer.IDB58Encode(pinfo.ID)
return nil
}

@@ -12,7 +12,6 @@ import (
"encoding/json"
"fmt"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
@@ -191,14 +190,13 @@ type IPFSPinStatus int
// IPFSPinStatusFromString parses a string and returns the matching
// IPFSPinStatus.
func IPFSPinStatusFromString(t string) IPFSPinStatus {
// Since indirect statuses are of the form "indirect through <cid>",
// use a regexp to match
var ind, _ = regexp.MatchString("^indirect", t)
var rec, _ = regexp.MatchString("^recursive", t)
// Since indirect statuses are of the form "indirect through <cid>"
// use a prefix match

switch {
case ind:
case strings.HasPrefix(t, "indirect"):
return IPFSPinStatusIndirect
case rec:
case strings.HasPrefix(t, "recursive"):
// FIXME: Maxdepth?
return IPFSPinStatusRecursive
case t == "direct":
@@ -53,6 +53,12 @@ func TestIPFSPinStatusFromString(t *testing.T) {
}
}

func BenchmarkIPFSPinStatusFromString(b *testing.B) {
for i := 0; i < b.N; i++ {
IPFSPinStatusFromString("indirect")
}
}

func TestMetric(t *testing.T) {
m := Metric{
Name: "hello",
@@ -1,8 +1,6 @@
package api

import (
"fmt"

peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
@@ -32,29 +30,6 @@ func StringsToPeers(strs []string) []peer.ID {
return peers
}

// Libp2pMultiaddrSplit takes a LibP2P multiaddress (/<multiaddr>/ipfs/<peerID>)
// and decapsulates it, parsing the peer ID. Returns an error if there is
// any problem (for example, the provided address not being a Libp2p one).
func Libp2pMultiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
pid, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
err = fmt.Errorf("invalid peer multiaddress: %s: %s", addr, err)
logger.Error(err)
return "", nil, err
}

ipfs, _ := ma.NewMultiaddr("/ipfs/" + pid)
decapAddr := addr.Decapsulate(ipfs)

peerID, err := peer.IDB58Decode(pid)
if err != nil {
err = fmt.Errorf("invalid peer ID in multiaddress: %s: %s", pid, err)
logger.Error(err)
return "", nil, err
}
return peerID, decapAddr, nil
}

// MustLibp2pMultiaddrJoin takes a LibP2P multiaddress and a peer ID and
// encapsulates a new /ipfs/<peerID> address. It will panic if the given
// peer ID is bad.
@@ -24,6 +24,7 @@ import (
host "github.com/libp2p/go-libp2p-host"
dht "github.com/libp2p/go-libp2p-kad-dht"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"

ocgorpc "github.com/lanzafame/go-libp2p-ocgorpc"
@@ -36,7 +37,10 @@ import (
// consensus layer.
var ReadyTimeout = 30 * time.Second

var pingMetricName = "ping"
const (
pingMetricName = "ping"
bootstrapCount = 3
)

// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system.
@@ -116,9 +120,7 @@ func NewCluster(

logger.Infof("IPFS Cluster v%s listening on:\n%s\n", version.Version, listenAddrs)

// Note, we already loaded peers from peerstore into the host
// in daemon.go.
peerManager := pstoremgr.New(host, cfg.GetPeerstorePath())
peerManager := pstoremgr.New(ctx, host, cfg.GetPeerstorePath())

c := &Cluster{
ctx: ctx,
@@ -144,6 +146,18 @@ func NewCluster(
readyB: false,
}

// Import known cluster peers from peerstore file. Set
// a non permanent TTL.
c.peerManager.ImportPeersFromPeerstore(false, peerstore.AddressTTL)
// Attempt to connect to some peers (up to bootstrapCount)
actualCount := c.peerManager.Bootstrap(bootstrapCount)
// We cannot warn about this as this is normal if going to Join() later
logger.Debugf("bootstrap count %d", actualCount)
// Bootstrap the DHT now that we possibly have some connections
c.dht.Bootstrap(c.ctx)

// After setupRPC components can do their tasks with a fully operative
// routed libp2p host with some connections and a working DHT (hopefully).
err = c.setupRPC()
if err != nil {
c.Shutdown(ctx)
@@ -465,9 +479,6 @@ This might be due to one or several causes:

// Cluster is ready.

// Bootstrap the DHT now that we possibly have some connections
c.dht.Bootstrap(c.ctx)

peers, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
@@ -582,7 +593,6 @@ func (c *Cluster) Shutdown(ctx context.Context) error {
}

c.cancel()
c.host.Close() // Shutdown all network services
c.wg.Wait()

// Cleanly close the datastore
@@ -633,12 +643,24 @@ func (c *Cluster) ID(ctx context.Context) *api.ID {
peers, _ = c.consensus.Peers(ctx)
}

clusterPeerInfos := c.peerManager.PeerInfos(peers)
addresses := []api.Multiaddr{}
for _, pinfo := range clusterPeerInfos {
addrs, err := peerstore.InfoToP2pAddrs(&pinfo)
if err != nil {
continue
}
for _, a := range addrs {
addresses = append(addresses, api.NewMultiaddrWithValue(a))
}
}

return &api.ID{
ID: c.id,
//PublicKey: c.host.Peerstore().PubKey(c.id),
Addresses: addrs,
ClusterPeers: peers,
ClusterPeersAddresses: c.peerManager.PeersAddresses(peers),
ClusterPeersAddresses: addresses,
Version: version.Version.String(),
RPCProtocolVersion: version.RPCProtocol,
IPFS: ipfsID,
@@ -721,20 +743,15 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {

logger.Debugf("Join(%s)", addr)

pid, _, err := api.Libp2pMultiaddrSplit(addr)
// Add peer to peerstore so we can talk to it (and connect)
pid, err := c.peerManager.ImportPeer(addr, true, peerstore.PermanentAddrTTL)
if err != nil {
logger.Error(err)
return err
}

// Bootstrap to myself
if pid == c.id {
return nil
}

// Add peer to peerstore so we can talk to it (and connect)
c.peerManager.ImportPeer(addr, true)

// Note that PeerAdd() on the remote peer will
// figure out what our real address is (obviously not
// ListenAddr).

0 comments on commit 588596d

Please sign in to comment.
You can’t perform that action at this time.