-
-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: log when resource manager limits are exceeded
This periodically logs how many times Resource Manager limits were exceeded. If they aren't exceeded, then nothing is logged. The log levels are at ERROR log level so that they are shown by default. The motivation is so that users know when they have exceeded resource manager limits. To find what is exceeding the limits, they'll need to turn on debug logging and inspect the errors being logged. This could collect the specific limits being reached, but that's more complicated to implement and could result in much longer log messages.
- Loading branch information
Showing
5 changed files
with
236 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
package libp2p | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"sync" | ||
"time" | ||
|
||
"github.com/benbjohnson/clock" | ||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
rcmgr "github.com/libp2p/go-libp2p-resource-manager" | ||
"go.uber.org/zap" | ||
) | ||
|
||
type loggingResourceManager struct { | ||
clock clock.Clock | ||
logger *zap.SugaredLogger | ||
delegate network.ResourceManager | ||
logInterval time.Duration | ||
|
||
mut sync.Mutex | ||
limitExceededErrs uint64 | ||
} | ||
|
||
type loggingScope struct { | ||
logger *zap.SugaredLogger | ||
delegate network.ResourceScope | ||
countErrs func(error) | ||
} | ||
|
||
var _ network.ResourceManager = (*loggingResourceManager)(nil) | ||
|
||
func (n *loggingResourceManager) start(ctx context.Context) { | ||
logInterval := n.logInterval | ||
if logInterval == 0 { | ||
logInterval = 10 * time.Second | ||
} | ||
ticker := n.clock.Ticker(logInterval) | ||
go func() { | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ticker.C: | ||
n.mut.Lock() | ||
errs := n.limitExceededErrs | ||
n.limitExceededErrs = 0 | ||
n.mut.Unlock() | ||
if errs != 0 { | ||
n.logger.Errorf("Resource limits were exceeded %d times, consider inspecting logs and raising the resource manager limits.", errs) | ||
} | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
}() | ||
} | ||
|
||
func (n *loggingResourceManager) countErrs(err error) { | ||
if errors.Is(err, network.ErrResourceLimitExceeded) { | ||
n.mut.Lock() | ||
n.limitExceededErrs++ | ||
n.mut.Unlock() | ||
} | ||
} | ||
|
||
func (n *loggingResourceManager) ViewSystem(f func(network.ResourceScope) error) error { | ||
return n.delegate.ViewSystem(f) | ||
} | ||
func (n *loggingResourceManager) ViewTransient(f func(network.ResourceScope) error) error { | ||
return n.delegate.ViewTransient(func(s network.ResourceScope) error { | ||
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) | ||
}) | ||
} | ||
func (n *loggingResourceManager) ViewService(svc string, f func(network.ServiceScope) error) error { | ||
return n.delegate.ViewService(svc, func(s network.ServiceScope) error { | ||
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) | ||
}) | ||
} | ||
func (n *loggingResourceManager) ViewProtocol(p protocol.ID, f func(network.ProtocolScope) error) error { | ||
return n.delegate.ViewProtocol(p, func(s network.ProtocolScope) error { | ||
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) | ||
}) | ||
} | ||
func (n *loggingResourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error { | ||
return n.delegate.ViewPeer(p, func(s network.PeerScope) error { | ||
return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) | ||
}) | ||
} | ||
func (n *loggingResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) { | ||
connMgmtScope, err := n.delegate.OpenConnection(dir, usefd) | ||
n.countErrs(err) | ||
return connMgmtScope, err | ||
} | ||
func (n *loggingResourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) { | ||
connMgmtScope, err := n.delegate.OpenStream(p, dir) | ||
n.countErrs(err) | ||
return connMgmtScope, err | ||
} | ||
func (n *loggingResourceManager) Close() error { | ||
return n.delegate.Close() | ||
} | ||
|
||
func (s *loggingScope) ReserveMemory(size int, prio uint8) error { | ||
err := s.delegate.ReserveMemory(size, prio) | ||
s.countErrs(err) | ||
return err | ||
} | ||
func (s *loggingScope) ReleaseMemory(size int) { | ||
s.delegate.ReleaseMemory(size) | ||
} | ||
func (s *loggingScope) Stat() network.ScopeStat { | ||
return s.delegate.Stat() | ||
} | ||
func (s *loggingScope) BeginSpan() (network.ResourceScopeSpan, error) { | ||
return s.delegate.BeginSpan() | ||
} | ||
func (s *loggingScope) Done() { | ||
s.delegate.(network.ResourceScopeSpan).Done() | ||
} | ||
func (s *loggingScope) Name() string { | ||
return s.delegate.(network.ServiceScope).Name() | ||
} | ||
func (s *loggingScope) Protocol() protocol.ID { | ||
return s.delegate.(network.ProtocolScope).Protocol() | ||
} | ||
func (s *loggingScope) Peer() peer.ID { | ||
return s.delegate.(network.PeerScope).Peer() | ||
} | ||
func (s *loggingScope) PeerScope() network.PeerScope { | ||
return s.delegate.(network.PeerScope) | ||
} | ||
func (s *loggingScope) SetPeer(p peer.ID) error { | ||
err := s.delegate.(network.ConnManagementScope).SetPeer(p) | ||
s.countErrs(err) | ||
return err | ||
} | ||
func (s *loggingScope) ProtocolScope() network.ProtocolScope { | ||
return s.delegate.(network.ProtocolScope) | ||
} | ||
func (s *loggingScope) SetProtocol(proto protocol.ID) error { | ||
err := s.delegate.(network.StreamManagementScope).SetProtocol(proto) | ||
s.countErrs(err) | ||
return err | ||
} | ||
func (s *loggingScope) ServiceScope() network.ServiceScope { | ||
return s.delegate.(network.ServiceScope) | ||
} | ||
func (s *loggingScope) SetService(srv string) error { | ||
err := s.delegate.(network.StreamManagementScope).SetService(srv) | ||
s.countErrs(err) | ||
return err | ||
} | ||
func (s *loggingScope) Limit() rcmgr.Limit { | ||
return s.delegate.(rcmgr.ResourceScopeLimiter).Limit() | ||
} | ||
func (s *loggingScope) SetLimit(limit rcmgr.Limit) { | ||
s.delegate.(rcmgr.ResourceScopeLimiter).SetLimit(limit) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package libp2p | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/benbjohnson/clock" | ||
"github.com/libp2p/go-libp2p-core/network" | ||
rcmgr "github.com/libp2p/go-libp2p-resource-manager" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap" | ||
"go.uber.org/zap/zaptest/observer" | ||
) | ||
|
||
func TestLoggingResourceManager(t *testing.T) { | ||
clock := clock.NewMock() | ||
limiter := rcmgr.NewDefaultLimiter() | ||
limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1, 1, 1) | ||
rm, err := rcmgr.NewResourceManager(limiter) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
oCore, oLogs := observer.New(zap.ErrorLevel) | ||
oLogger := zap.New(oCore) | ||
lrm := &loggingResourceManager{ | ||
clock: clock, | ||
logger: oLogger.Sugar(), | ||
delegate: rm, | ||
logInterval: 1 * time.Second, | ||
} | ||
|
||
// 2 of these should result in resource limit exceeded errors and subsequent log messages | ||
for i := 0; i < 3; i++ { | ||
_, _ = lrm.OpenConnection(network.DirInbound, false) | ||
} | ||
|
||
// run the logger which will write an entry for those errors | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
lrm.start(ctx) | ||
clock.Add(3 * time.Second) | ||
|
||
timer := time.NewTimer(1 * time.Second) | ||
for { | ||
select { | ||
case <-timer.C: | ||
t.Fatalf("expected logs never arrived") | ||
default: | ||
if oLogs.Len() == 0 { | ||
continue | ||
} | ||
require.Equal(t, "Resource limits were exceeded 2 times, consider inspecting logs and raising the resource manager limits.", oLogs.All()[0].Message) | ||
return | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters