diff --git a/core/commands/swarm.go b/core/commands/swarm.go index d6a3e8d696d..f904ed67e19 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -409,7 +409,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel return errors.New("expected a JSON file") } if err := json.NewDecoder(file).Decode(&newLimit); err != nil { - return errors.New("failed to decode JSON as ResourceMgrScopeConfig") + return fmt.Errorf("decoding JSON as ResourceMgrScopeConfig: %w", err) } return libp2p.NetSetLimit(node.ResourceManager, node.Repo, scope, newLimit) } diff --git a/core/node/libp2p/rcmgr.go b/core/node/libp2p/rcmgr.go index 28d05a131b4..4d4b29a564d 100644 --- a/core/node/libp2p/rcmgr.go +++ b/core/node/libp2p/rcmgr.go @@ -7,9 +7,11 @@ import ( "path/filepath" "strings" + "github.com/benbjohnson/clock" config "github.com/ipfs/go-ipfs/config" + "github.com/ipfs/go-ipfs/core/node/helpers" "github.com/ipfs/go-ipfs/repo" - + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -24,8 +26,8 @@ const NetLimitTraceFilename = "rcmgr.json.gz" var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled") -func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) { - return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { +func ResourceManager(cfg config.SwarmConfig) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { var manager network.ResourceManager var opts Libp2pOpts @@ -72,6 +74,13 @@ func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (netw if err != nil { return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err) } + lrm := &loggingResourceManager{ + clock: clock.New(), + logger: &logging.Logger("resourcemanager").SugaredLogger, + delegate: manager, + } + lrm.start(helpers.LifecycleCtx(mctx, lc)) + manager = lrm } else { log.Debug("libp2p resource manager is disabled") manager = network.NullResourceManager diff --git a/core/node/libp2p/rcmgr_logging.go b/core/node/libp2p/rcmgr_logging.go new file mode 100644 index 00000000000..06d22c71b8c --- /dev/null +++ b/core/node/libp2p/rcmgr_logging.go @@ -0,0 +1,160 @@ +package libp2p + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/benbjohnson/clock" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "go.uber.org/zap" +) + +type loggingResourceManager struct { + clock clock.Clock + logger *zap.SugaredLogger + delegate network.ResourceManager + logInterval time.Duration + + mut sync.Mutex + limitExceededErrs uint64 +} + +type loggingScope struct { + logger *zap.SugaredLogger + delegate network.ResourceScope + countErrs func(error) +} + +var _ network.ResourceManager = (*loggingResourceManager)(nil) + +func (n *loggingResourceManager) start(ctx context.Context) { + logInterval := n.logInterval + if logInterval == 0 { + logInterval = 10 * time.Second + } + ticker := n.clock.Ticker(logInterval) + go func() { + defer ticker.Stop() + for { + select { + case <-ticker.C: + n.mut.Lock() + errs := n.limitExceededErrs + n.limitExceededErrs = 0 + n.mut.Unlock() + if errs != 0 { + n.logger.Warnf("Resource limits were exceeded %d times, consider inspecting logs and raising the resource manager limits.", errs) + } + case <-ctx.Done(): + return + } + } + }() +} + +func (n *loggingResourceManager) countErrs(err error) { + if errors.Is(err, network.ErrResourceLimitExceeded) { + n.mut.Lock() + n.limitExceededErrs++ + n.mut.Unlock() + } +} + +func (n *loggingResourceManager) ViewSystem(f func(network.ResourceScope) error) error { + return n.delegate.ViewSystem(f) +} +func (n *loggingResourceManager) ViewTransient(f func(network.ResourceScope) error) error { + return n.delegate.ViewTransient(func(s network.ResourceScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewService(svc string, f func(network.ServiceScope) error) error { + return n.delegate.ViewService(svc, func(s network.ServiceScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewProtocol(p protocol.ID, f func(network.ProtocolScope) error) error { + return n.delegate.ViewProtocol(p, func(s network.ProtocolScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error { + return n.delegate.ViewPeer(p, func(s network.PeerScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) { + connMgmtScope, err := n.delegate.OpenConnection(dir, usefd) + n.countErrs(err) + return connMgmtScope, err +} +func (n *loggingResourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) { + connMgmtScope, err := n.delegate.OpenStream(p, dir) + n.countErrs(err) + return connMgmtScope, err +} +func (n *loggingResourceManager) Close() error { + return n.delegate.Close() +} + +func (s *loggingScope) ReserveMemory(size int, prio uint8) error { + err := s.delegate.ReserveMemory(size, prio) + s.countErrs(err) + return err +} +func (s *loggingScope) ReleaseMemory(size int) { + s.delegate.ReleaseMemory(size) +} +func (s *loggingScope) Stat() network.ScopeStat { + return s.delegate.Stat() +} +func (s *loggingScope) BeginSpan() (network.ResourceScopeSpan, error) { + return s.delegate.BeginSpan() +} +func (s *loggingScope) Done() { + s.delegate.(network.ResourceScopeSpan).Done() +} +func (s *loggingScope) Name() string { + return s.delegate.(network.ServiceScope).Name() +} +func (s *loggingScope) Protocol() protocol.ID { + return s.delegate.(network.ProtocolScope).Protocol() +} +func (s *loggingScope) Peer() peer.ID { + return s.delegate.(network.PeerScope).Peer() +} +func (s *loggingScope) PeerScope() network.PeerScope { + return s.delegate.(network.PeerScope) +} +func (s *loggingScope) SetPeer(p peer.ID) error { + err := s.delegate.(network.ConnManagementScope).SetPeer(p) + s.countErrs(err) + return err +} +func (s *loggingScope) ProtocolScope() network.ProtocolScope { + return s.delegate.(network.ProtocolScope) +} +func (s *loggingScope) SetProtocol(proto protocol.ID) error { + err := s.delegate.(network.StreamManagementScope).SetProtocol(proto) + s.countErrs(err) + return err +} +func (s *loggingScope) ServiceScope() network.ServiceScope { + return s.delegate.(network.ServiceScope) +} +func (s *loggingScope) SetService(srv string) error { + err := s.delegate.(network.StreamManagementScope).SetService(srv) + s.countErrs(err) + return err +} +func (s *loggingScope) Limit() rcmgr.Limit { + return s.delegate.(rcmgr.ResourceScopeLimiter).Limit() +} +func (s *loggingScope) SetLimit(limit rcmgr.Limit) { + s.delegate.(rcmgr.ResourceScopeLimiter).SetLimit(limit) +} diff --git a/core/node/libp2p/rcmgr_logging_test.go b/core/node/libp2p/rcmgr_logging_test.go new file mode 100644 index 00000000000..72f34b80885 --- /dev/null +++ b/core/node/libp2p/rcmgr_logging_test.go @@ -0,0 +1,58 @@ +package libp2p + +import ( + "context" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/libp2p/go-libp2p-core/network" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +func TestLoggingResourceManager(t *testing.T) { + clock := clock.NewMock() + limiter := rcmgr.NewDefaultLimiter() + limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1, 1, 1) + rm, err := rcmgr.NewResourceManager(limiter) + if err != nil { + t.Fatal(err) + } + + oCore, oLogs := observer.New(zap.WarnLevel) + oLogger := zap.New(oCore) + lrm := &loggingResourceManager{ + clock: clock, + logger: oLogger.Sugar(), + delegate: rm, + logInterval: 1 * time.Second, + } + + // 2 of these should result in resource limit exceeded errors and subsequent log messages + for i := 0; i < 3; i++ { + _, _ = lrm.OpenConnection(network.DirInbound, false) + } + + // run the logger which will write an entry for those errors + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + lrm.start(ctx) + clock.Add(3 * time.Second) + + timer := time.NewTimer(1 * time.Second) + for { + select { + case <-timer.C: + t.Fatalf("expected logs never arrived") + default: + if oLogs.Len() == 0 { + continue + } + require.Equal(t, "Resource limits were exceeded 2 times, consider inspecting logs and raising the resource manager limits.", oLogs.All()[0].Message) + return + } + } +} diff --git a/go.mod b/go.mod index c2b77d793fe..b7946841b27 100644 --- a/go.mod +++ b/go.mod @@ -125,13 +125,17 @@ require ( golang.org/x/sys v0.0.0-20220412211240-33da011f77ad ) +require ( + github.com/benbjohnson/clock v1.3.0 + github.com/ipfs/go-log/v2 v2.5.1 +) + require ( github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/Kubuxu/go-os-helper v0.0.1 // indirect github.com/Stebalien/go-bitfield v0.0.1 // indirect github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect - github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect @@ -172,7 +176,6 @@ require ( github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.2 // indirect - github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/ipfs/go-peertaskqueue v0.7.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/klauspost/compress v1.15.1 // indirect