Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable P2P Circuit relay and QUIC #932

Merged
merged 13 commits into from Nov 5, 2019
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -24,7 +24,7 @@ jobs:
- stage: "Testing stage"
name: "Tests (all modules) + Coverage"
script:
- go test -v -timeout 15m -coverprofile=coverage.txt -covermode=atomic ./...
- travis_wait go test -v -timeout 15m -coverprofile=coverage.txt -covermode=atomic ./...
after_success:
- bash <(curl -s https://codecov.io/bash)
- name: "Main Tests with crdt consensus"
Expand Down
11 changes: 10 additions & 1 deletion api/rest/client/transports.go
Expand Up @@ -14,6 +14,9 @@ import (
ipnet "github.com/libp2p/go-libp2p-core/pnet"
p2phttp "github.com/libp2p/go-libp2p-http"
pnet "github.com/libp2p/go-libp2p-pnet"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
secio "github.com/libp2p/go-libp2p-secio"
libp2ptls "github.com/libp2p/go-libp2p-tls"
madns "github.com/multiformats/go-multiaddr-dns"
)

Expand Down Expand Up @@ -63,7 +66,13 @@ func (c *defaultClient) enableLibp2p() error {
}
}

h, err := libp2p.New(c.ctx, libp2p.PrivateNetwork(prot))
h, err := libp2p.New(c.ctx,
libp2p.PrivateNetwork(prot),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Security(secio.ID, secio.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultTransports,
)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions api/rest/restapi.go
Expand Up @@ -35,6 +35,9 @@ import (
rpc "github.com/libp2p/go-libp2p-gorpc"
gostream "github.com/libp2p/go-libp2p-gostream"
p2phttp "github.com/libp2p/go-libp2p-http"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
secio "github.com/libp2p/go-libp2p-secio"
libp2ptls "github.com/libp2p/go-libp2p-tls"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"

Expand Down Expand Up @@ -238,6 +241,10 @@ func (api *API) setupLibp2p() error {
context.Background(),
libp2p.Identity(api.config.PrivateKey),
libp2p.ListenAddrs([]ma.Multiaddr{api.config.Libp2pListenAddr}...),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Security(secio.ID, secio.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultTransports,
)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cluster.go
Expand Up @@ -877,8 +877,8 @@ func (c *Cluster) Join(ctx context.Context, addr ma.Multiaddr) error {

logger.Debugf("Join(%s)", addr)

// Add peer to peerstore so we can talk to it (and connect)
pid, err := c.peerManager.ImportPeer(addr, true, peerstore.PermanentAddrTTL)
// Add peer to peerstore so we can talk to it
pid, err := c.peerManager.ImportPeer(addr, false, peerstore.PermanentAddrTTL)
if err != nil {
return err
}
Expand Down
50 changes: 39 additions & 11 deletions cluster_config.go
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/ipfs/ipfs-cluster/config"

ipfsconfig "github.com/ipfs/go-ipfs-config"
pnet "github.com/libp2p/go-libp2p-pnet"
ma "github.com/multiformats/go-multiaddr"

Expand All @@ -21,9 +22,12 @@ import (

const configKey = "cluster"

// DefaultListenAddrs contains TCP and QUIC listen addresses
var DefaultListenAddrs = []string{"/ip4/0.0.0.0/tcp/9096", "/ip4/0.0.0.0/udp/9096/quic"}

// Configuration defaults
const (
DefaultListenAddr = "/ip4/0.0.0.0/tcp/9096"
DefaultEnableRelayHop = true
DefaultStateSyncInterval = 600 * time.Second
DefaultIPFSSyncInterval = 130 * time.Second
DefaultPinRecoverInterval = 1 * time.Hour
Expand Down Expand Up @@ -73,7 +77,11 @@ type Config struct {

// Listen parameters for the Cluster libp2p Host. Used by
// the RPC and Consensus components.
ListenAddr ma.Multiaddr
ListenAddr []ma.Multiaddr

// Enables HOP relay for the node. If this is enabled, the node will act as
// an intermediate (Hop Relay) node in relay circuits for connected peers.
EnableRelayHop bool

// ConnMgr holds configuration values for the connection manager for
// the libp2p host.
Expand Down Expand Up @@ -160,7 +168,8 @@ type configJSON struct {
PrivateKey string `json:"private_key,omitempty"`
Secret string `json:"secret"`
LeaveOnShutdown bool `json:"leave_on_shutdown"`
ListenMultiaddress string `json:"listen_multiaddress"`
ListenMultiaddress ipfsconfig.Strings `json:"listen_multiaddress"`
EnableRelayHop bool `json:"enable_relay_hop"`
ConnectionManager *connMgrConfigJSON `json:"connection_manager"`
StateSyncInterval string `json:"state_sync_interval"`
IPFSSyncInterval string `json:"ipfs_sync_interval"`
Expand Down Expand Up @@ -228,6 +237,10 @@ func (cfg *Config) Validate() error {
return errors.New("cluster.listen_multiaddress is undefined")
}

if len(cfg.ListenAddr) == 0 {
return errors.New("cluster.listen_multiaddress is empty")
}

if cfg.ConnMgr.LowWater <= 0 {
return errors.New("cluster.connection_manager.low_water is invalid")
}
Expand Down Expand Up @@ -334,8 +347,13 @@ func (cfg *Config) setDefaults() {
}
cfg.Peername = hostname

addr, _ := ma.NewMultiaddr(DefaultListenAddr)
cfg.ListenAddr = addr
listenAddrs := []ma.Multiaddr{}
for _, m := range DefaultListenAddrs {
addr, _ := ma.NewMultiaddr(m)
listenAddrs = append(listenAddrs, addr)
}
cfg.ListenAddr = listenAddrs
cfg.EnableRelayHop = DefaultEnableRelayHop
cfg.ConnMgr = ConnMgrConfig{
HighWater: DefaultConnMgrHighWater,
LowWater: DefaultConnMgrLowWater,
Expand Down Expand Up @@ -384,13 +402,18 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {
}
cfg.Secret = clusterSecret

clusterAddr, err := ma.NewMultiaddr(jcfg.ListenMultiaddress)
if err != nil {
err = fmt.Errorf("error parsing cluster_listen_multiaddress: %s", err)
return err
var listenAddrs []ma.Multiaddr
for _, addr := range jcfg.ListenMultiaddress {
listenAddr, err := ma.NewMultiaddr(addr)
if err != nil {
err = fmt.Errorf("error parsing a listen_multiaddress: %s", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include the addr in the error so the user knows which one

return err
}
listenAddrs = append(listenAddrs, listenAddr)
}
cfg.ListenAddr = clusterAddr

cfg.ListenAddr = listenAddrs
cfg.EnableRelayHop = jcfg.EnableRelayHop
if conman := jcfg.ConnectionManager; conman != nil {
cfg.ConnMgr = ConnMgrConfig{
HighWater: jcfg.ConnectionManager.HighWater,
Expand Down Expand Up @@ -455,7 +478,12 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
jcfg.ReplicationFactorMin = cfg.ReplicationFactorMin
jcfg.ReplicationFactorMax = cfg.ReplicationFactorMax
jcfg.LeaveOnShutdown = cfg.LeaveOnShutdown
jcfg.ListenMultiaddress = cfg.ListenAddr.String()
var listenAddrs ipfsconfig.Strings
for _, addr := range cfg.ListenAddr {
listenAddrs = append(listenAddrs, addr.String())
}
jcfg.ListenMultiaddress = ipfsconfig.Strings(listenAddrs)
jcfg.EnableRelayHop = cfg.EnableRelayHop
jcfg.ConnectionManager = &connMgrConfigJSON{
HighWater: cfg.ConnMgr.HighWater,
LowWater: cfg.ConnMgr.LowWater,
Expand Down
14 changes: 11 additions & 3 deletions cluster_config_test.go
Expand Up @@ -5,6 +5,8 @@ import (
"os"
"testing"
"time"

ipfsconfig "github.com/ipfs/go-ipfs-config"
)

var ccfgTestJSON = []byte(`
Expand All @@ -17,7 +19,10 @@ var ccfgTestJSON = []byte(`
"low_water": 500,
"grace_period": "100m0s"
},
"listen_multiaddress": "/ip4/127.0.0.1/tcp/10000",
"listen_multiaddress": [
"/ip4/127.0.0.1/tcp/10000",
"/ip4/127.0.0.1/udp/10000/quic"
],
"state_sync_interval": "1m0s",
"ipfs_sync_interval": "2m10s",
"pin_recover_interval": "1m",
Expand Down Expand Up @@ -114,7 +119,7 @@ func TestLoadJSON(t *testing.T) {
})

t.Run("bad listen multiaddress", func(t *testing.T) {
_, err := loadJSON2(t, func(j *configJSON) { j.ListenMultiaddress = "abc" })
_, err := loadJSON2(t, func(j *configJSON) { j.ListenMultiaddress = ipfsconfig.Strings{"abc"} })
if err == nil {
t.Error("expected error parsing listen_multiaddress")
}
Expand Down Expand Up @@ -197,7 +202,10 @@ func TestLoadJSON(t *testing.T) {

func TestToJSON(t *testing.T) {
cfg := &Config{}
cfg.LoadJSON(ccfgTestJSON)
err := cfg.LoadJSON(ccfgTestJSON)
if err != nil {
t.Fatal(err)
}
newjson, err := cfg.ToJSON()
if err != nil {
t.Fatal(err)
Expand Down
99 changes: 72 additions & 27 deletions clusterhost.go
Expand Up @@ -6,20 +6,27 @@ import (

"github.com/ipfs/ipfs-cluster/config"
libp2p "github.com/libp2p/go-libp2p"
autonat "github.com/libp2p/go-libp2p-autonat-svc"
relay "github.com/libp2p/go-libp2p-circuit"
connmgr "github.com/libp2p/go-libp2p-connmgr"
corepnet "github.com/libp2p/go-libp2p-core/pnet"
routing "github.com/libp2p/go-libp2p-core/routing"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
ipnet "github.com/libp2p/go-libp2p-interface-pnet"
dht "github.com/libp2p/go-libp2p-kad-dht"
pnet "github.com/libp2p/go-libp2p-pnet"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
secio "github.com/libp2p/go-libp2p-secio"
libp2ptls "github.com/libp2p/go-libp2p-tls"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
)

// NewClusterHost creates a libp2p Host with the options from the provided
// cluster configuration. Using that host, it creates pubsub and a DHT
// instances, for shared use by all cluster components. The returned host uses
// the DHT for routing. The resulting DHT is not bootstrapped.
// NewClusterHost creates a fully-featured libp2p Host with the options from
// the provided cluster configuration. Using that host, it creates pubsub and
// a DHT instances, for shared use by all cluster components. The returned
// host uses the DHT for routing. The resulting DHT is not bootstrapped. Relay
// and AutoNATService are additionally setup for this host.
func NewClusterHost(
ctx context.Context,
ident *config.Identity,
Expand All @@ -28,13 +35,35 @@ func NewClusterHost(

connman := connmgr.NewConnManager(cfg.ConnMgr.LowWater, cfg.ConnMgr.HighWater, cfg.ConnMgr.GracePeriod)

relayOpts := []relay.RelayOpt{relay.OptDiscovery}
if cfg.EnableRelayHop {
relayOpts = append(relayOpts, relay.OptHop)
}

var idht *dht.IpfsDHT
var err error
opts := []libp2p.Option{
libp2p.ListenAddrs(cfg.ListenAddr...),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connman),
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
hsanjuan marked this conversation as resolved.
Show resolved Hide resolved
idht, err = newDHT(ctx, h)
return idht, err
}),
libp2p.EnableRelay(relayOpts...),
libp2p.EnableAutoRelay(),
}

prot, err := newProtector(cfg.Secret)
if err != nil {
return nil, nil, nil, err
}

h, err := newHost(
ctx,
cfg.Secret,
prot,
ident.PrivateKey,
libp2p.ListenAddrs(cfg.ListenAddr),
libp2p.NATPortMap(),
libp2p.ConnectionManager(connman),
opts...,
)
if err != nil {
return nil, nil, nil, err
Expand All @@ -46,39 +75,55 @@ func NewClusterHost(
return nil, nil, nil, err
}

idht, err := newDHT(ctx, h)
// needed for auto relay
_, err = autonat.NewAutoNATService(ctx, h, baseOpts(prot)...)
if err != nil {
h.Close()
return nil, nil, nil, err
}

return routedHost(h, idht), psub, idht, nil
return h, psub, idht, nil
}

func newHost(ctx context.Context, secret []byte, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
var prot ipnet.Protector
var err error

// Create protector if we have a secret.
if secret != nil && len(secret) > 0 {
var key [32]byte
copy(key[:], secret)
prot, err = pnet.NewV1ProtectorFromBytes(&key)
if err != nil {
return nil, err
}
}

// newHost creates a base cluster host without dht, pubsub, relay or nat etc.
// mostly used for testing.
func newHost(ctx context.Context, prot corepnet.Protector, priv crypto.PrivKey, opts ...libp2p.Option) (host.Host, error) {
finalOpts := []libp2p.Option{
libp2p.Identity(priv),
libp2p.PrivateNetwork(prot),
}
finalOpts = append(finalOpts, baseOpts(prot)...)
finalOpts = append(finalOpts, opts...)

return libp2p.New(
h, err := libp2p.New(
ctx,
finalOpts...,
)
if err != nil {
return nil, err
}

return h, nil
}

func baseOpts(prot corepnet.Protector) []libp2p.Option {
return []libp2p.Option{
libp2p.PrivateNetwork(prot),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Security(secio.ID, secio.New),
libp2p.Transport(libp2pquic.NewTransport),
libp2p.DefaultTransports,
}
}

func newProtector(secret []byte) (corepnet.Protector, error) {
// Create protector if we have a secret.
if len(secret) == 0 {
return nil, nil
}

var key [32]byte
copy(key[:], secret)
return pnet.NewV1ProtectorFromBytes(&key)
}

func newDHT(ctx context.Context, h host.Host) (*dht.IpfsDHT, error) {
Expand Down
4 changes: 3 additions & 1 deletion cmd/ipfs-cluster-service/main.go
Expand Up @@ -326,7 +326,9 @@ the peer IDs in the given multiaddresses.
if c.Bool("randomports") {
cfgs := cfgHelper.Configs()

cfgs.Cluster.ListenAddr, err = cmdutils.RandomizePorts(cfgs.Cluster.ListenAddr)
for i := range cfgs.Cluster.ListenAddr {
cfgs.Cluster.ListenAddr[i], err = cmdutils.RandomizePorts(cfgs.Cluster.ListenAddr[i])
}
checkErr("randomizing ports", err)
cfgs.Restapi.HTTPListenAddr, err = cmdutils.RandomizePorts(cfgs.Restapi.HTTPListenAddr)
checkErr("randomizing ports", err)
Expand Down
8 changes: 8 additions & 0 deletions consensus/raft/logging.go
Expand Up @@ -92,6 +92,10 @@ func (log *hcLogToLogger) IsError() bool {
return true
}

func (log *hcLogToLogger) Name() string {
return log.name
}

func (log *hcLogToLogger) With(args ...interface{}) hclog.Logger {
return &hcLogToLogger{extraArgs: args}
}
Expand All @@ -114,6 +118,10 @@ func (log *hcLogToLogger) StandardWriter(opts *hclog.StandardLoggerOptions) io.W
return nil
}

func (log *hcLogToLogger) ImpliedArgs() []interface{} {
return nil
}

const repeatPoolSize = 10
const repeatReset = time.Minute

Expand Down