Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: log when resource manager limits are exceeded #8980

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/commands/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel
return errors.New("expected a JSON file")
}
if err := json.NewDecoder(file).Decode(&newLimit); err != nil {
return errors.New("failed to decode JSON as ResourceMgrScopeConfig")
return fmt.Errorf("decoding JSON as ResourceMgrScopeConfig: %w", err)
}
return libp2p.NetSetLimit(node.ResourceManager, node.Repo, scope, newLimit)
}
Expand Down
15 changes: 12 additions & 3 deletions core/node/libp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"path/filepath"
"strings"

"github.com/benbjohnson/clock"
config "github.com/ipfs/go-ipfs/config"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -24,8 +26,8 @@ const NetLimitTraceFilename = "rcmgr.json.gz"

var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled")

func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
func ResourceManager(cfg config.SwarmConfig) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) {
var manager network.ResourceManager
var opts Libp2pOpts

Expand Down Expand Up @@ -72,6 +74,13 @@ func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (netw
if err != nil {
return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err)
}
lrm := &loggingResourceManager{
clock: clock.New(),
logger: &logging.Logger("resourcemanager").SugaredLogger,
delegate: manager,
}
lrm.start(helpers.LifecycleCtx(mctx, lc))
manager = lrm
} else {
log.Debug("libp2p resource manager is disabled")
manager = network.NullResourceManager
Expand Down
160 changes: 160 additions & 0 deletions core/node/libp2p/rcmgr_logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package libp2p

import (
"context"
"errors"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"go.uber.org/zap"
)

type loggingResourceManager struct {
clock clock.Clock
logger *zap.SugaredLogger
delegate network.ResourceManager
logInterval time.Duration

mut sync.Mutex
limitExceededErrs uint64
}

type loggingScope struct {
logger *zap.SugaredLogger
delegate network.ResourceScope
countErrs func(error)
}

var _ network.ResourceManager = (*loggingResourceManager)(nil)

func (n *loggingResourceManager) start(ctx context.Context) {
logInterval := n.logInterval
if logInterval == 0 {
logInterval = 10 * time.Second
}
ticker := n.clock.Ticker(logInterval)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
n.mut.Lock()
errs := n.limitExceededErrs
n.limitExceededErrs = 0
n.mut.Unlock()
if errs != 0 {
n.logger.Warnf("Resource limits were exceeded %d times, consider inspecting logs and raising the resource manager limits.", errs)
}
case <-ctx.Done():
return
}
}
}()
}

func (n *loggingResourceManager) countErrs(err error) {
if errors.Is(err, network.ErrResourceLimitExceeded) {
n.mut.Lock()
n.limitExceededErrs++
n.mut.Unlock()
}
}

func (n *loggingResourceManager) ViewSystem(f func(network.ResourceScope) error) error {
return n.delegate.ViewSystem(f)
}
func (n *loggingResourceManager) ViewTransient(f func(network.ResourceScope) error) error {
return n.delegate.ViewTransient(func(s network.ResourceScope) error {
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
})
}
func (n *loggingResourceManager) ViewService(svc string, f func(network.ServiceScope) error) error {
return n.delegate.ViewService(svc, func(s network.ServiceScope) error {
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
})
}
func (n *loggingResourceManager) ViewProtocol(p protocol.ID, f func(network.ProtocolScope) error) error {
return n.delegate.ViewProtocol(p, func(s network.ProtocolScope) error {
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
})
}
func (n *loggingResourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error {
return n.delegate.ViewPeer(p, func(s network.PeerScope) error {
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs})
})
}
func (n *loggingResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) {
connMgmtScope, err := n.delegate.OpenConnection(dir, usefd)
n.countErrs(err)
return connMgmtScope, err
}
func (n *loggingResourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
connMgmtScope, err := n.delegate.OpenStream(p, dir)
n.countErrs(err)
return connMgmtScope, err
}
func (n *loggingResourceManager) Close() error {
return n.delegate.Close()
}

func (s *loggingScope) ReserveMemory(size int, prio uint8) error {
err := s.delegate.ReserveMemory(size, prio)
s.countErrs(err)
return err
}
func (s *loggingScope) ReleaseMemory(size int) {
s.delegate.ReleaseMemory(size)
}
func (s *loggingScope) Stat() network.ScopeStat {
return s.delegate.Stat()
}
func (s *loggingScope) BeginSpan() (network.ResourceScopeSpan, error) {
return s.delegate.BeginSpan()
}
func (s *loggingScope) Done() {
s.delegate.(network.ResourceScopeSpan).Done()
}
func (s *loggingScope) Name() string {
return s.delegate.(network.ServiceScope).Name()
}
func (s *loggingScope) Protocol() protocol.ID {
return s.delegate.(network.ProtocolScope).Protocol()
}
func (s *loggingScope) Peer() peer.ID {
return s.delegate.(network.PeerScope).Peer()
}
func (s *loggingScope) PeerScope() network.PeerScope {
return s.delegate.(network.PeerScope)
}
func (s *loggingScope) SetPeer(p peer.ID) error {
err := s.delegate.(network.ConnManagementScope).SetPeer(p)
s.countErrs(err)
return err
}
func (s *loggingScope) ProtocolScope() network.ProtocolScope {
return s.delegate.(network.ProtocolScope)
}
func (s *loggingScope) SetProtocol(proto protocol.ID) error {
err := s.delegate.(network.StreamManagementScope).SetProtocol(proto)
s.countErrs(err)
return err
}
func (s *loggingScope) ServiceScope() network.ServiceScope {
return s.delegate.(network.ServiceScope)
}
func (s *loggingScope) SetService(srv string) error {
err := s.delegate.(network.StreamManagementScope).SetService(srv)
s.countErrs(err)
return err
}
func (s *loggingScope) Limit() rcmgr.Limit {
return s.delegate.(rcmgr.ResourceScopeLimiter).Limit()
}
func (s *loggingScope) SetLimit(limit rcmgr.Limit) {
s.delegate.(rcmgr.ResourceScopeLimiter).SetLimit(limit)
}
58 changes: 58 additions & 0 deletions core/node/libp2p/rcmgr_logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package libp2p

import (
"context"
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p-core/network"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)

func TestLoggingResourceManager(t *testing.T) {
clock := clock.NewMock()
limiter := rcmgr.NewDefaultLimiter()
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1, 1, 1)
rm, err := rcmgr.NewResourceManager(limiter)
if err != nil {
t.Fatal(err)
}

oCore, oLogs := observer.New(zap.WarnLevel)
oLogger := zap.New(oCore)
lrm := &loggingResourceManager{
clock: clock,
logger: oLogger.Sugar(),
delegate: rm,
logInterval: 1 * time.Second,
}

// 2 of these should result in resource limit exceeded errors and subsequent log messages
for i := 0; i < 3; i++ {
_, _ = lrm.OpenConnection(network.DirInbound, false)
}

// run the logger which will write an entry for those errors
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
lrm.start(ctx)
clock.Add(3 * time.Second)

timer := time.NewTimer(1 * time.Second)
for {
select {
case <-timer.C:
t.Fatalf("expected logs never arrived")
default:
if oLogs.Len() == 0 {
continue
}
require.Equal(t, "Resource limits were exceeded 2 times, consider inspecting logs and raising the resource manager limits.", oLogs.All()[0].Message)
return
}
}
}
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,17 @@ require (
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad
)

require (
github.com/benbjohnson/clock v1.3.0
github.com/ipfs/go-log/v2 v2.5.1
)

require (
Comment on lines +128 to 133
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WTF ?
I don't understand what go.mod is up to know

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a block for direct dependencies and a block for transitive/indirect dependencies

github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/Kubuxu/go-os-helper v0.0.1 // indirect
github.com/Stebalien/go-bitfield v0.0.1 // indirect
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd v0.22.0-beta // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
Expand Down Expand Up @@ -172,7 +176,6 @@ require (
github.com/ipfs/go-ipfs-delay v0.0.1 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipfs/go-peertaskqueue v0.7.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/klauspost/compress v1.15.1 // indirect
Expand Down