Skip to content

Commit

Permalink
Merge pull request #5456 from oasisprotocol/peternose/bugfix/improve-…
Browse files Browse the repository at this point in the history
…seed-node-performance

go/p2p: Improve seed node performance
  • Loading branch information
peternose committed Nov 21, 2023
2 parents 0fe5e68 + ecb0be7 commit adf3314
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 3 deletions.
1 change: 1 addition & 0 deletions .changelog/5456.bugfix.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/p2p: Increase incoming connection limit for seed nodes
7 changes: 7 additions & 0 deletions .changelog/5456.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
go/p2p: Close connection to seed node after every request

Bootstrap client, which is responsible for peer discovery and advertisement,
now terminates connection to the seed node after every request. This action
should free up recourses (e.g. inbound/outbound connections) on both sides
without affecting performance since discovered peers are cached (see retention
period) and advertisement is done infrequently (see TTL).
14 changes: 14 additions & 0 deletions go/p2p/discovery/bootstrap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ func (c *client) Advertise(ctx context.Context, ns string, _ ...discovery.Option

pf.RecordSuccess()

// Close connections after every call because requests to the seed node are infrequent.
if err = c.rc.Close(c.seed.ID); err != nil {
c.logger.Warn("failed to close connections to seed node",
"err", err,
)
}

return res.TTL, nil
}

Expand Down Expand Up @@ -239,6 +246,13 @@ func (c *client) fetchPeers(ctx context.Context, ns string, limit int) []peer.Ad
pf.RecordFailure()
}

// Close connections after every call because requests to the seed node are infrequent.
if err := c.rc.Close(c.seed.ID); err != nil {
c.logger.Warn("failed to close connections to seed node",
"err", err,
)
}

return cache.peers
}

Expand Down
40 changes: 40 additions & 0 deletions go/p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -36,6 +38,12 @@ type HostConfig struct {
func NewHost(cfg *HostConfig) (host.Host, *conngater.BasicConnectionGater, error) {
id := api.SignerToPrivKey(cfg.Signer)

// Set up a resource manager so that we can reserve more resources.
rm, err := NewResourceManager()
if err != nil {
return nil, nil, err
}

// Set up a connection manager so we can limit the number of connections.
cm, err := NewConnManager(&cfg.ConnManagerConfig)
if err != nil {
Expand All @@ -52,6 +60,7 @@ func NewHost(cfg *HostConfig) (host.Host, *conngater.BasicConnectionGater, error
libp2p.UserAgent(cfg.UserAgent),
libp2p.ListenAddrs(cfg.ListenAddr),
libp2p.Identity(id),
libp2p.ResourceManager(rm),
libp2p.ConnectionManager(cm),
libp2p.ConnectionGater(cg),
)
Expand Down Expand Up @@ -197,3 +206,34 @@ func (cfg *ConnGaterConfig) Load() error {

return nil
}

// NewResourceManager constructs a new resource manager.
func NewResourceManager() (network.ResourceManager, error) {
// Use the default resource manager for non-seed nodes.
if config.GlobalConfig.Mode != config.ModeSeed {
return nil, nil
}

// Tweak limits for seed nodes.
//
// Note: The connection manager will trim connections when the total number of inbound and
// outbound connections exceeds the high watermark (default set to 130). Using autoscaling
// and configuring the default limit to 128 seems to be a prudent choice.
defaultLimits := rcmgr.DefaultLimits
defaultLimits.SystemBaseLimit.ConnsInbound = 128
defaultLimits.SystemBaseLimit.StreamsInbound = 128 * 16
defaultLimits.SystemLimitIncrease.ConnsInbound = 128
defaultLimits.SystemLimitIncrease.StreamsInbound = 128 * 16

// Add limits around included libp2p protocols.
libp2p.SetDefaultServiceLimits(&defaultLimits)

// Scale limits.
scaledLimits := defaultLimits.AutoScale()

// The resource manager expects a limiter, se we create one from our limits.
limiter := rcmgr.NewFixedLimiter(scaledLimits)

// Initialize the resource manager.
return rcmgr.NewResourceManager(limiter)
}
25 changes: 22 additions & 3 deletions go/p2p/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
"errors"
"fmt"
"reflect"
"sync"
Expand All @@ -13,7 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"

"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/errors"
commonErrors "github.com/oasisprotocol/oasis-core/go/common/errors"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/workerpool"
)
Expand Down Expand Up @@ -252,6 +253,9 @@ type Client interface {
opts ...CallMultiOption,
) ([]interface{}, []PeerFeedback, error)

// Close closes all connections to the given peer.
Close(peerID core.PeerID) error

// RegisterListener subscribes the listener to the client notification events.
// If the listener is already registered this is a noop operation.
RegisterListener(l ClientListener)
Expand All @@ -273,6 +277,7 @@ type client struct {
logger *logging.Logger
}

// Implements Client.
func (c *client) Call(
ctx context.Context,
peer core.PeerID,
Expand All @@ -283,6 +288,7 @@ func (c *client) Call(
return c.CallOne(ctx, []core.PeerID{peer}, method, body, rsp, opts...)
}

// Implements Client.
func (c *client) CallOne(
ctx context.Context,
peers []core.PeerID,
Expand Down Expand Up @@ -345,6 +351,7 @@ func (c *client) CallOne(
return pf, err
}

// Implements Client.
func (c *client) CallMulti(
ctx context.Context,
peers []core.PeerID,
Expand Down Expand Up @@ -449,7 +456,7 @@ func (c *client) timeCall(

if err != nil {
// If the caller canceled the context we should not degrade the peer.
if !errors.Is(err, context.Canceled) {
if !commonErrors.Is(err, context.Canceled) {
c.recordFailure(peerID, latency)
}

Expand Down Expand Up @@ -519,7 +526,7 @@ func (c *client) call(

// Decode response.
if rawRsp.Error != nil {
return errors.FromCode(rawRsp.Error.Module, rawRsp.Error.Code, rawRsp.Error.Message)
return commonErrors.FromCode(rawRsp.Error.Module, rawRsp.Error.Code, rawRsp.Error.Message)
}

if rsp != nil {
Expand All @@ -528,13 +535,25 @@ func (c *client) call(
return nil
}

// Implements Client.
func (c *client) Close(peerID core.PeerID) error {
var errs error
for _, conn := range c.host.Network().ConnsToPeer(peerID) {
err := conn.Close()
errs = errors.Join(errs, err)
}
return errs
}

// Implements Client.
func (c *client) RegisterListener(l ClientListener) {
c.listeners.Lock()
defer c.listeners.Unlock()

c.listeners.m[l] = struct{}{}
}

// Implements Client.
func (c *client) UnregisterListener(l ClientListener) {
c.listeners.Lock()
defer c.listeners.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions go/p2p/rpc/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (c *nopClient) CallMulti(
return nil, nil, errUnsupported
}

// Implements Client.
func (c *nopClient) Close(
peer.ID,
) error {
return nil
}

// Implements Client.
func (c *nopClient) RegisterListener(ClientListener) {}

Expand Down

0 comments on commit adf3314

Please sign in to comment.