Skip to content

Commit

Permalink
Merge pull request #1275 from libp2p/rcmgr
Browse files Browse the repository at this point in the history
use the resource manager
  • Loading branch information
marten-seemann committed Jan 18, 2022
2 parents 05fe672 + 33e8768 commit 15d7dfb
Show file tree
Hide file tree
Showing 30 changed files with 681 additions and 170 deletions.
18 changes: 13 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ type Config struct {
AddrsFactory bhost.AddrsFactory
ConnectionGater connmgr.ConnectionGater

ConnManager connmgr.ConnManager
NATManager NATManagerC
Peerstore peerstore.Peerstore
Reporter metrics.Reporter
ConnManager connmgr.ConnManager
ResourceManager network.ResourceManager

NATManager NATManagerC
Peerstore peerstore.Peerstore
Reporter metrics.Reporter

MultiaddrResolver *madns.Resolver

Expand Down Expand Up @@ -148,6 +150,9 @@ func (cfg *Config) makeSwarm() (*swarm.Swarm, error) {
if cfg.DialTimeout != 0 {
opts = append(opts, swarm.WithDialTimeout(cfg.DialTimeout))
}
if cfg.ResourceManager != nil {
opts = append(opts, swarm.WithResourceManager(cfg.ResourceManager))
}
// TODO: Make the swarm implementation configurable.
return swarm.NewSwarm(pid, cfg.Peerstore, opts...)
}
Expand Down Expand Up @@ -179,11 +184,14 @@ func (cfg *Config) addTransports(h host.Host) error {
if cfg.ConnectionGater != nil {
opts = append(opts, tptu.WithConnectionGater(cfg.ConnectionGater))
}
if cfg.ResourceManager != nil {
opts = append(opts, tptu.WithResourceManager(cfg.ResourceManager))
}
upgrader, err := tptu.New(secure, muxer, opts...)
if err != nil {
return err
}
tpts, err := makeTransports(h, upgrader, cfg.ConnectionGater, cfg.PSK, cfg.Transports)
tpts, err := makeTransports(h, upgrader, cfg.ConnectionGater, cfg.PSK, cfg.ResourceManager, cfg.Transports)
if err != nil {
return err
}
Expand Down
31 changes: 20 additions & 11 deletions config/constructor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
Expand All @@ -21,41 +20,51 @@ var (
hostType = reflect.TypeOf((*host.Host)(nil)).Elem()
networkType = reflect.TypeOf((*network.Network)(nil)).Elem()
transportType = reflect.TypeOf((*transport.Transport)(nil)).Elem()
muxType = reflect.TypeOf((*mux.Multiplexer)(nil)).Elem()
muxType = reflect.TypeOf((*network.Multiplexer)(nil)).Elem()
securityType = reflect.TypeOf((*sec.SecureTransport)(nil)).Elem()
privKeyType = reflect.TypeOf((*crypto.PrivKey)(nil)).Elem()
pubKeyType = reflect.TypeOf((*crypto.PubKey)(nil)).Elem()
pstoreType = reflect.TypeOf((*peerstore.Peerstore)(nil)).Elem()
connGaterType = reflect.TypeOf((*connmgr.ConnectionGater)(nil)).Elem()
upgraderType = reflect.TypeOf((*transport.Upgrader)(nil)).Elem()
rcmgrType = reflect.TypeOf((*network.ResourceManager)(nil)).Elem()

// concrete types
peerIDType = reflect.TypeOf((peer.ID)(""))
pskType = reflect.TypeOf((pnet.PSK)(nil))
)

var argTypes = map[reflect.Type]constructor{
upgraderType: func(_ host.Host, u transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater) interface{} { return u },
hostType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater) interface{} { return h },
networkType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater) interface{} {
upgraderType: func(_ host.Host, u transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return u
},
hostType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return h
},
networkType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return h.Network()
},
pskType: func(_ host.Host, _ transport.Upgrader, psk pnet.PSK, _ connmgr.ConnectionGater) interface{} {
pskType: func(_ host.Host, _ transport.Upgrader, psk pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return psk
},
connGaterType: func(_ host.Host, _ transport.Upgrader, _ pnet.PSK, cg connmgr.ConnectionGater) interface{} { return cg },
peerIDType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater) interface{} {
connGaterType: func(_ host.Host, _ transport.Upgrader, _ pnet.PSK, cg connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return cg
},
peerIDType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return h.ID()
},
privKeyType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater) interface{} {
privKeyType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return h.Peerstore().PrivKey(h.ID())
},
pubKeyType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater) interface{} {
pubKeyType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return h.Peerstore().PubKey(h.ID())
},
pstoreType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater) interface{} {
pstoreType: func(h host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) interface{} {
return h.Peerstore()
},
rcmgrType: func(_ host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, rcmgr network.ResourceManager) interface{} {
return rcmgr
},
}

func newArgTypeSet(types ...reflect.Type) map[reflect.Type]constructor {
Expand Down
21 changes: 11 additions & 10 deletions config/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package config
import (
"fmt"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/mux"

msmux "github.com/libp2p/go-stream-muxer-multistream"

"github.com/libp2p/go-libp2p-core/network"

"github.com/libp2p/go-libp2p-core/host"
)

// MuxC is a stream multiplex transport constructor.
type MuxC func(h host.Host) (mux.Multiplexer, error)
type MuxC func(h host.Host) (network.Multiplexer, error)

// MsMuxC is a tuple containing a multiplex transport constructor and a protocol
// ID.
Expand All @@ -25,8 +26,8 @@ var muxArgTypes = newArgTypeSet(hostType, networkType, peerIDType, pstoreType)
// using reflection.
func MuxerConstructor(m interface{}) (MuxC, error) {
// Already constructed?
if t, ok := m.(mux.Multiplexer); ok {
return func(_ host.Host) (mux.Multiplexer, error) {
if t, ok := m.(network.Multiplexer); ok {
return func(_ host.Host) (network.Multiplexer, error) {
return t, nil
}, nil
}
Expand All @@ -35,16 +36,16 @@ func MuxerConstructor(m interface{}) (MuxC, error) {
if err != nil {
return nil, err
}
return func(h host.Host) (mux.Multiplexer, error) {
t, err := ctor(h, nil, nil, nil)
return func(h host.Host) (network.Multiplexer, error) {
t, err := ctor(h, nil, nil, nil, nil)
if err != nil {
return nil, err
}
return t.(mux.Multiplexer), nil
return t.(network.Multiplexer), nil
}, nil
}

func makeMuxer(h host.Host, tpts []MsMuxC) (mux.Multiplexer, error) {
func makeMuxer(h host.Host, tpts []MsMuxC) (network.Multiplexer, error) {
muxMuxer := msmux.NewBlankTransport()
transportSet := make(map[string]struct{}, len(tpts))
for _, tptC := range tpts {
Expand Down
13 changes: 7 additions & 6 deletions config/muxer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ package config
import (
"testing"

"github.com/libp2p/go-libp2p-core/network"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

"github.com/libp2p/go-libp2p-core/mux"
yamux "github.com/libp2p/go-libp2p-yamux"
)

func TestMuxerSimple(t *testing.T) {
// single
_, err := MuxerConstructor(func(_ peer.ID) mux.Multiplexer { return nil })
_, err := MuxerConstructor(func(_ peer.ID) network.Multiplexer { return nil })
if err != nil {
t.Fatal(err)
}
Expand All @@ -27,14 +28,14 @@ func TestMuxerByValue(t *testing.T) {
}
}
func TestMuxerDuplicate(t *testing.T) {
_, err := MuxerConstructor(func(_ peer.ID, _ peer.ID) mux.Multiplexer { return nil })
_, err := MuxerConstructor(func(_ peer.ID, _ peer.ID) network.Multiplexer { return nil })
if err != nil {
t.Fatal(err)
}
}

func TestMuxerError(t *testing.T) {
_, err := MuxerConstructor(func() (mux.Multiplexer, error) { return nil, nil })
_, err := MuxerConstructor(func() (network.Multiplexer, error) { return nil, nil })
if err != nil {
t.Fatal(err)
}
Expand All @@ -45,8 +46,8 @@ func TestMuxerBadTypes(t *testing.T) {
func() error { return nil },
func() string { return "" },
func() {},
func(string) mux.Multiplexer { return nil },
func(string) (mux.Multiplexer, error) { return nil, nil },
func(string) network.Multiplexer { return nil },
func(string) (network.Multiplexer, error) { return nil, nil },
nil,
"testing",
} {
Expand Down
10 changes: 6 additions & 4 deletions config/reflection_magic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"reflect"
"runtime"

"github.com/libp2p/go-libp2p-core/network"

"github.com/libp2p/go-libp2p-core/pnet"

"github.com/libp2p/go-libp2p-core/connmgr"
Expand Down Expand Up @@ -80,7 +82,7 @@ func callConstructor(c reflect.Value, args []reflect.Value) (interface{}, error)
return val, err
}

type constructor func(h host.Host, u transport.Upgrader, psk pnet.PSK, cg connmgr.ConnectionGater) interface{}
type constructor func(host.Host, transport.Upgrader, pnet.PSK, connmgr.ConnectionGater, network.ResourceManager) interface{}

func makeArgumentConstructors(fnType reflect.Type, argTypes map[reflect.Type]constructor) ([]constructor, error) {
params := fnType.NumIn()
Expand Down Expand Up @@ -131,7 +133,7 @@ func makeConstructor(
tptType reflect.Type,
argTypes map[reflect.Type]constructor,
opts ...interface{},
) (func(host.Host, transport.Upgrader, pnet.PSK, connmgr.ConnectionGater) (interface{}, error), error) {
) (func(host.Host, transport.Upgrader, pnet.PSK, connmgr.ConnectionGater, network.ResourceManager) (interface{}, error), error) {
v := reflect.ValueOf(tpt)
// avoid panicing on nil/zero value.
if v == (reflect.Value{}) {
Expand All @@ -155,10 +157,10 @@ func makeConstructor(
return nil, err
}

return func(h host.Host, u transport.Upgrader, psk pnet.PSK, cg connmgr.ConnectionGater) (interface{}, error) {
return func(h host.Host, u transport.Upgrader, psk pnet.PSK, cg connmgr.ConnectionGater, rcmgr network.ResourceManager) (interface{}, error) {
arguments := make([]reflect.Value, 0, len(argConstructors)+len(opts))
for i, makeArg := range argConstructors {
if arg := makeArg(h, u, psk, cg); arg != nil {
if arg := makeArg(h, u, psk, cg, rcmgr); arg != nil {
arguments = append(arguments, reflect.ValueOf(arg))
} else {
// ValueOf an un-typed nil yields a zero reflect
Expand Down
2 changes: 1 addition & 1 deletion config/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func SecurityConstructor(security interface{}) (SecC, error) {
return nil, err
}
return func(h host.Host) (sec.SecureTransport, error) {
t, err := ctor(h, nil, nil, nil)
t, err := ctor(h, nil, nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions config/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package config
import (
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/pnet"
"github.com/libp2p/go-libp2p-core/transport"
)

// TptC is the type for libp2p transport constructors. You probably won't ever
// implement this function interface directly. Instead, pass your transport
// constructor to TransportConstructor.
type TptC func(host.Host, transport.Upgrader, pnet.PSK, connmgr.ConnectionGater) (transport.Transport, error)
type TptC func(host.Host, transport.Upgrader, pnet.PSK, connmgr.ConnectionGater, network.ResourceManager) (transport.Transport, error)

var transportArgTypes = argTypes

Expand Down Expand Up @@ -38,27 +39,27 @@ var transportArgTypes = argTypes
func TransportConstructor(tpt interface{}, opts ...interface{}) (TptC, error) {
// Already constructed?
if t, ok := tpt.(transport.Transport); ok {
return func(_ host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater) (transport.Transport, error) {
return func(_ host.Host, _ transport.Upgrader, _ pnet.PSK, _ connmgr.ConnectionGater, _ network.ResourceManager) (transport.Transport, error) {
return t, nil
}, nil
}
ctor, err := makeConstructor(tpt, transportType, transportArgTypes, opts...)
if err != nil {
return nil, err
}
return func(h host.Host, u transport.Upgrader, psk pnet.PSK, cg connmgr.ConnectionGater) (transport.Transport, error) {
t, err := ctor(h, u, psk, cg)
return func(h host.Host, u transport.Upgrader, psk pnet.PSK, cg connmgr.ConnectionGater, rcmgr network.ResourceManager) (transport.Transport, error) {
t, err := ctor(h, u, psk, cg, rcmgr)
if err != nil {
return nil, err
}
return t.(transport.Transport), nil
}, nil
}

func makeTransports(h host.Host, u transport.Upgrader, cg connmgr.ConnectionGater, psk pnet.PSK, tpts []TptC) ([]transport.Transport, error) {
func makeTransports(h host.Host, u transport.Upgrader, cg connmgr.ConnectionGater, psk pnet.PSK, rcmgr network.ResourceManager, tpts []TptC) ([]transport.Transport, error) {
transports := make([]transport.Transport, len(tpts))
for i, tC := range tpts {
t, err := tC(h, u, psk, cg)
t, err := tC(h, u, psk, cg, rcmgr)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions config/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func TestConstructorWithOpts(t *testing.T) {
var options []int
c, err := TransportConstructor(func(_ transport.Upgrader, opts ...int) (transport.Transport, error) {
options = opts
return tcp.NewTCPTransport(nil)
return tcp.NewTCPTransport(nil, nil)
}, 42, 1337)
require.NoError(t, err)
_, err = c(nil, nil, nil, nil)
_, err = c(nil, nil, nil, nil, nil)
require.NoError(t, err)
require.Equal(t, []int{42, 1337}, options)
}
18 changes: 18 additions & 0 deletions defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
noise "github.com/libp2p/go-libp2p-noise"
"github.com/libp2p/go-libp2p-peerstore/pstoremem"
quic "github.com/libp2p/go-libp2p-quic-transport"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
tls "github.com/libp2p/go-libp2p-tls"
yamux "github.com/libp2p/go-libp2p-yamux"
"github.com/libp2p/go-tcp-transport"
Expand Down Expand Up @@ -85,6 +86,19 @@ var DefaultEnableRelay = func(cfg *Config) error {
return cfg.Apply(EnableRelay())
}

var DefaultResourceManager = func(cfg *Config) error {
// Default memory limit: 1/8th of total memory, minimum 128MB, maximum 1GB
limiter := rcmgr.NewDefaultLimiter()
SetDefaultServiceLimits(limiter)

mgr, err := rcmgr.NewResourceManager(limiter)
if err != nil {
return err
}

return cfg.Apply(ResourceManager(mgr))
}

// Complete list of default options and when to fallback on them.
//
// Please *DON'T* specify default options any other way. Putting this all here
Expand Down Expand Up @@ -121,6 +135,10 @@ var defaults = []struct {
fallback: func(cfg *Config) bool { return !cfg.RelayCustom },
opt: DefaultEnableRelay,
},
{
fallback: func(cfg *Config) bool { return cfg.ResourceManager == nil },
opt: DefaultResourceManager,
},
}

// Defaults configures libp2p to use the default options. Can be combined with
Expand Down
Loading

0 comments on commit 15d7dfb

Please sign in to comment.