Skip to content

Commit

Permalink
Merge pull request #932 from ipfs/feat/nat
Browse files Browse the repository at this point in the history
Enable P2P Circuit relay and QUIC
  • Loading branch information
hsanjuan committed Nov 5, 2019
2 parents dcb6c08 + b08fe09 commit ab4f2b4
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -24,7 +24,7 @@ jobs:
- stage: "Testing stage"
name: "Tests (all modules) + Coverage"
script:
- go test -v -timeout 15m -coverprofile=coverage.txt -covermode=atomic ./...
- travis_wait go test -v -timeout 15m -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
- name: "Main Tests with crdt consensus"
Expand Down
11 changes: 10 additions & 1 deletion api/rest/client/transports.go
Expand Up @@ -14,6 +14,9 @@ import (
ipnet "github.com/libp2p/go-libp2p-core/pnet"
p2phttp "github.com/libp2p/go-libp2p-http"
pnet "github.com/libp2p/go-libp2p-pnet"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
secio "github.com/libp2p/go-libp2p-secio"
libp2ptls "github.com/libp2p/go-libp2p-tls"
madns "github.com/multiformats/go-multiaddr-dns"
)

Expand Down Expand Up @@ -63,7 +66,13 @@ func (c *defaultClient) enableLibp2p() error {
}
}

h, err := libp2p.New(c.ctx, libp2p.PrivateNetwork(prot))
h, err := libp2p.New(c.ctx,
libp2p.PrivateNetwork(prot),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Security(secio.ID, secio.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultTransports,
)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions api/rest/restapi.go
Expand Up @@ -35,6 +35,9 @@ import (
rpc "github.com/libp2p/go-libp2p-gorpc"
gostream "github.com/libp2p/go-libp2p-gostream"
p2phttp "github.com/libp2p/go-libp2p-http"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
secio "github.com/libp2p/go-libp2p-secio"
libp2ptls "github.com/libp2p/go-libp2p-tls"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"

Expand Down Expand Up @@ -238,6 +241,10 @@ func (api *API) setupLibp2p() error {
context.Background(),
libp2p.Identity(api.config.PrivateKey),
libp2p.ListenAddrs([]ma.Multiaddr{api.config.Libp2pListenAddr}...),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Security(secio.ID, secio.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultTransports,
)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cluster.go
Expand Up @@ -877,8 +877,8 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {

logger.Debugf("Join(%s)", addr)

// Add peer to peerstore so we can talk to it (and connect)
pid, err := c.peerManager.ImportPeer(addr, true, peerstore.PermanentAddrTTL)
// Add peer to peerstore so we can talk to it
pid, err := c.peerManager.ImportPeer(addr, false, peerstore.PermanentAddrTTL)
if err != nil {
return err
}
Expand Down
50 changes: 39 additions & 11 deletions cluster_config.go
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ipfs/ipfs-cluster/config"

ipfsconfig "github.com/ipfs/go-ipfs-config"
pnet "github.com/libp2p/go-libp2p-pnet"
ma "github.com/multiformats/go-multiaddr"

Expand All @@ -21,9 +22,12 @@ import (

const configKey = "cluster"

// DefaultListenAddrs contains TCP and QUIC listen addresses
var DefaultListenAddrs = []string{"/ip4/0.0.0.0/tcp/9096", "/ip4/0.0.0.0/udp/9096/quic"}

// Configuration defaults
const (
DefaultListenAddr = "/ip4/0.0.0.0/tcp/9096"
DefaultEnableRelayHop = true
DefaultStateSyncInterval = 600 * time.Second
DefaultIPFSSyncInterval = 130 * time.Second
DefaultPinRecoverInterval = 1 * time.Hour
Expand Down Expand Up @@ -73,7 +77,11 @@ type Config struct {

// Listen parameters for the Cluster libp2p Host. Used by
// the RPC and Consensus components.
ListenAddr ma.Multiaddr
ListenAddr []ma.Multiaddr

// Enables HOP relay for the node. If this is enabled, the node will act as
// an intermediate (Hop Relay) node in relay circuits for connected peers.
EnableRelayHop bool

// ConnMgr holds configuration values for the connection manager for
// the libp2p host.
Expand Down Expand Up @@ -160,7 +168,8 @@ type configJSON struct {
PrivateKey string `json:"private_key,omitempty"`
Secret string `json:"secret"`
LeaveOnShutdown bool `json:"leave_on_shutdown"`
ListenMultiaddress string `json:"listen_multiaddress"`
ListenMultiaddress ipfsconfig.Strings `json:"listen_multiaddress"`
EnableRelayHop bool `json:"enable_relay_hop"`
ConnectionManager *connMgrConfigJSON `json:"connection_manager"`
StateSyncInterval string `json:"state_sync_interval"`
IPFSSyncInterval string `json:"ipfs_sync_interval"`
Expand Down Expand Up @@ -228,6 +237,10 @@ func (cfg *Config) Validate() error {
return errors.New("cluster.listen_multiaddress is undefined")
}

if len(cfg.ListenAddr) == 0 {
return errors.New("cluster.listen_multiaddress is empty")
}

if cfg.ConnMgr.LowWater <= 0 {
return errors.New("cluster.connection_manager.low_water is invalid")
}
Expand Down Expand Up @@ -334,8 +347,13 @@ func (cfg *Config) setDefaults() {
}
cfg.Peername = hostname

addr, _ := ma.NewMultiaddr(DefaultListenAddr)
cfg.ListenAddr = addr
listenAddrs := []ma.Multiaddr{}
for _, m := range DefaultListenAddrs {
addr, _ := ma.NewMultiaddr(m)
listenAddrs = append(listenAddrs, addr)
}
cfg.ListenAddr = listenAddrs
cfg.EnableRelayHop = DefaultEnableRelayHop
cfg.ConnMgr = ConnMgrConfig{
HighWater: DefaultConnMgrHighWater,
LowWater: DefaultConnMgrLowWater,
Expand Down Expand Up @@ -384,13 +402,18 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
}
cfg.Secret = clusterSecret

clusterAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress)
if err != nil {
err = fmt.Errorf("error parsing cluster_listen_multiaddress: %s", err)
return err
var listenAddrs []ma.Multiaddr
for _, addr := range jcfg.ListenMultiaddress {
listenAddr, err := ma.NewMultiaddr(addr)
if err != nil {
err = fmt.Errorf("error parsing a listen_multiaddress: %s", err)
return err
}
listenAddrs = append(listenAddrs, listenAddr)
}
cfg.ListenAddr = clusterAddr

cfg.ListenAddr = listenAddrs
cfg.EnableRelayHop = jcfg.EnableRelayHop
if conman := jcfg.ConnectionManager; conman != nil {
cfg.ConnMgr = ConnMgrConfig{
HighWater: jcfg.ConnectionManager.HighWater,
Expand Down Expand Up @@ -455,7 +478,12 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin
jcfg.ReplicationFactorMax = cfg.ReplicationFactorMax
jcfg.LeaveOnShutdown = cfg.LeaveOnShutdown
jcfg.ListenMultiaddress = cfg.ListenAddr.String()
var listenAddrs ipfsconfig.Strings
for _, addr := range cfg.ListenAddr {
listenAddrs = append(listenAddrs, addr.String())
}
jcfg.ListenMultiaddress = ipfsconfig.Strings(listenAddrs)
jcfg.EnableRelayHop = cfg.EnableRelayHop
jcfg.ConnectionManager = &connMgrConfigJSON{
HighWater: cfg.ConnMgr.HighWater,
LowWater: cfg.ConnMgr.LowWater,
Expand Down
14 changes: 11 additions & 3 deletions cluster_config_test.go
Expand Up @@ -5,6 +5,8 @@ import (
"os"
"testing"
"time"

ipfsconfig "github.com/ipfs/go-ipfs-config"
)

var ccfgTestJSON = []byte(`
Expand All @@ -17,7 +19,10 @@ var ccfgTestJSON = []byte(`
"low_water": 500,
"grace_period": "100m0s"
},
"listen_multiaddress": "/ip4/127.0.0.1/tcp/10000",
"listen_multiaddress": [
"/ip4/127.0.0.1/tcp/10000",
"/ip4/127.0.0.1/udp/10000/quic"
],
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"pin_recover_interval": "1m",
Expand Down Expand Up @@ -114,7 +119,7 @@ func TestLoadJSON(t *testing.T) {
})

t.Run("bad listen multiaddress", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ListenMultiaddress = "abc" })
_, err := loadJSON2(t, func(j *configJSON) { j.ListenMultiaddress = ipfsconfig.Strings{"abc"} })
if err == nil {
t.Error("expected error parsing listen_multiaddress")
}
Expand Down Expand Up @@ -197,7 +202,10 @@ func TestLoadJSON(t *testing.T) {

func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(ccfgTestJSON)
err := cfg.LoadJSON(ccfgTestJSON)
if err != nil {
t.Fatal(err)
}
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
Expand Down
99 changes: 72 additions & 27 deletions clusterhost.go
Expand Up @@ -6,20 +6,27 @@ import (

"github.com/ipfs/ipfs-cluster/config"
libp2p "github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc"
relay "github.com/libp2p/go-libp2p-circuit"
connmgr "github.com/libp2p/go-libp2p-connmgr"
corepnet "github.com/libp2p/go-libp2p-core/pnet"
routing "github.com/libp2p/go-libp2p-core/routing"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
dht "github.com/libp2p/go-libp2p-kad-dht"
pnet "github.com/libp2p/go-libp2p-pnet"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
secio "github.com/libp2p/go-libp2p-secio"
libp2ptls "github.com/libp2p/go-libp2p-tls"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
)

// NewClusterHost creates a libp2p Host with the options from the provided
// cluster configuration. Using that host, it creates pubsub and a DHT
// instances, for shared use by all cluster components. The returned host uses
// the DHT for routing. The resulting DHT is not bootstrapped.
// NewClusterHost creates a fully-featured libp2p Host with the options from
// the provided cluster configuration. Using that host, it creates pubsub and
// a DHT instances, for shared use by all cluster components. The returned
// host uses the DHT for routing. The resulting DHT is not bootstrapped. Relay
// and AutoNATService are additionally setup for this host.
func NewClusterHost(
ctx context.Context,
ident *config.Identity,
Expand All @@ -28,13 +35,35 @@ func NewClusterHost(

connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)

relayOpts := []relay.RelayOpt{relay.OptDiscovery}
if cfg.EnableRelayHop {
relayOpts = append(relayOpts, relay.OptHop)
}

var idht *dht.IpfsDHT
var err error
opts := []libp2p.Option{
libp2p.ListenAddrs(cfg.ListenAddr...),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connman),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
idht, err = newDHT(ctx, h)
return idht, err
}),
libp2p.EnableRelay(relayOpts...),
libp2p.EnableAutoRelay(),
}

prot, err := newProtector(cfg.Secret)
if err != nil {
return nil, nil, nil, err
}

h, err := newHost(
ctx,
cfg.Secret,
prot,
ident.PrivateKey,
libp2p.ListenAddrs(cfg.ListenAddr),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connman),
opts...,
)
if err != nil {
return nil, nil, nil, err
Expand All @@ -46,39 +75,55 @@ func NewClusterHost(
return nil, nil, nil, err
}

idht, err := newDHT(ctx, h)
// needed for auto relay
_, err = autonat.NewAutoNATService(ctx, h, baseOpts(prot)...)
if err != nil {
h.Close()
return nil, nil, nil, err
}

return routedHost(h, idht), psub, idht, nil
return h, psub, idht, nil
}

func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
var prot ipnet.Protector
var err error

// Create protector if we have a secret.
if secret != nil && len(secret) > 0 {
var key [32]byte
copy(key[:], secret)
prot, err = pnet.NewV1ProtectorFromBytes(&key)
if err != nil {
return nil, err
}
}

// newHost creates a base cluster host without dht, pubsub, relay or nat etc.
// mostly used for testing.
func newHost(ctx context.Context, prot corepnet.Protector, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
finalOpts := []libp2p.Option{
libp2p.Identity(priv),
libp2p.PrivateNetwork(prot),
}
finalOpts = append(finalOpts, baseOpts(prot)...)
finalOpts = append(finalOpts, opts...)

return libp2p.New(
h, err := libp2p.New(
ctx,
finalOpts...,
)
if err != nil {
return nil, err
}

return h, nil
}

func baseOpts(prot corepnet.Protector) []libp2p.Option {
return []libp2p.Option{
libp2p.PrivateNetwork(prot),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Security(secio.ID, secio.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultTransports,
}
}

func newProtector(secret []byte) (corepnet.Protector, error) {
// Create protector if we have a secret.
if len(secret) == 0 {
return nil, nil
}

var key [32]byte
copy(key[:], secret)
return pnet.NewV1ProtectorFromBytes(&key)
}

func newDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
Expand Down
4 changes: 3 additions & 1 deletion cmd/ipfs-cluster-service/main.go
Expand Up @@ -326,7 +326,9 @@ the peer IDs in the given multiaddresses.
if c.Bool("randomports") {
cfgs := cfgHelper.Configs()

cfgs.Cluster.ListenAddr, err = cmdutils.RandomizePorts(cfgs.Cluster.ListenAddr)
for i := range cfgs.Cluster.ListenAddr {
cfgs.Cluster.ListenAddr[i], err = cmdutils.RandomizePorts(cfgs.Cluster.ListenAddr[i])
}
checkErr("randomizing ports", err)
cfgs.Restapi.HTTPListenAddr, err = cmdutils.RandomizePorts(cfgs.Restapi.HTTPListenAddr)
checkErr("randomizing ports", err)
Expand Down
8 changes: 8 additions & 0 deletions consensus/raft/logging.go
Expand Up @@ -92,6 +92,10 @@ func (log *hcLogToLogger) IsError() bool {
return true
}

func (log *hcLogToLogger) Name() string {
return log.name
}

func (log *hcLogToLogger) With(args ...interface{}) hclog.Logger {
return &hcLogToLogger{extraArgs: args}
}
Expand All @@ -114,6 +118,10 @@ func (log *hcLogToLogger) StandardWriter(opts *hclog.StandardLoggerOptions) io.W
return nil
}

func (log *hcLogToLogger) ImpliedArgs() []interface{} {
return nil
}

const repeatPoolSize = 10
const repeatReset = time.Minute

Expand Down

0 comments on commit ab4f2b4

Please sign in to comment.