Skip to content

Commit

Permalink
p2p: server: use zap for logging (#5979)
Browse files Browse the repository at this point in the history
## Motivation

Use `zap` for logging in `p2p/server` instead of the custom logger.
One of the minor changes needed for syncv2



Co-authored-by: Ivan Shvedunov <ivan4th@users.noreply.github.com>
  • Loading branch information
ivan4th and ivan4th committed May 24, 2024
1 parent 9035def commit dd8ed36
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 38 deletions.
2 changes: 1 addition & 1 deletion fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (f *Fetch) registerServer(
opts := []server.Opt{
server.WithTimeout(f.cfg.RequestTimeout),
server.WithHardTimeout(f.cfg.RequestHardTimeout),
server.WithLog(f.logger),
server.WithLog(f.logger.Zap()),
server.WithDecayingTag(f.cfg.DecayingTag),
}
if f.cfg.EnableServerMetrics {
Expand Down
70 changes: 36 additions & 34 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-varint"
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

Expand Down Expand Up @@ -58,7 +59,7 @@ func WithHardTimeout(timeout time.Duration) Opt {
}

// WithLog configures logger for the server.
func WithLog(log log.Log) Opt {
func WithLog(log *zap.Logger) Opt {
return func(s *Server) {
s.logger = log
}
Expand Down Expand Up @@ -147,7 +148,7 @@ type Response struct {

// Server for the Handler.
type Server struct {
logger log.Log
logger *zap.Logger
protocol string
handler StreamHandler
timeout time.Duration
Expand All @@ -167,7 +168,7 @@ type Server struct {
// New server for the handler.
func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server {
srv := &Server{
logger: log.NewNop(),
logger: zap.NewNop(),
protocol: proto,
handler: handler,
h: h,
Expand All @@ -191,7 +192,7 @@ func New(h Host, proto string, handler StreamHandler, opts ...Opt) *Server {
connmgr.DecayFixed(srv.decayingTagSpec.Dec),
connmgr.BumpSumBounded(0, srv.decayingTagSpec.Cap))
if err != nil {
srv.logger.Error("error registering decaying tag", log.Err(err))
srv.logger.Error("error registering decaying tag", zap.Error(err))
} else {
srv.decayingTag = tag
}
Expand Down Expand Up @@ -268,51 +269,51 @@ func (s *Server) queueHandler(ctx context.Context, stream network.Stream) bool {
rd := bufio.NewReader(dadj)
size, err := varint.ReadUvarint(rd)
if err != nil {
s.logger.With().Debug("initial read failed",
log.String("protocol", s.protocol),
log.Stringer("remotePeer", stream.Conn().RemotePeer()),
log.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
log.Err(err),
s.logger.Debug("initial read failed",
zap.String("protocol", s.protocol),
zap.Stringer("remotePeer", stream.Conn().RemotePeer()),
zap.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
zap.Error(err),
)
return false
}
if size > uint64(s.requestLimit) {
s.logger.With().Warning("request limit overflow",
log.String("protocol", s.protocol),
log.Stringer("remotePeer", stream.Conn().RemotePeer()),
log.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
log.Int("limit", s.requestLimit),
log.Uint64("request", size),
s.logger.Warn("request limit overflow",
zap.String("protocol", s.protocol),
zap.Stringer("remotePeer", stream.Conn().RemotePeer()),
zap.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
zap.Int("limit", s.requestLimit),
zap.Uint64("request", size),
)
stream.Conn().Close()
return false
}
buf := make([]byte, size)
_, err = io.ReadFull(rd, buf)
if err != nil {
s.logger.With().Debug("error reading request",
log.String("protocol", s.protocol),
log.Stringer("remotePeer", stream.Conn().RemotePeer()),
log.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
log.Err(err),
s.logger.Debug("error reading request",
zap.String("protocol", s.protocol),
zap.Stringer("remotePeer", stream.Conn().RemotePeer()),
zap.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
zap.Error(err),
)
return false
}
start := time.Now()
if err = s.handler(log.WithNewRequestID(ctx), buf, dadj); err != nil {
s.logger.With().Debug("handler reported error",
log.String("protocol", s.protocol),
log.Stringer("remotePeer", stream.Conn().RemotePeer()),
log.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
log.Err(err),
s.logger.Debug("handler reported error",
zap.String("protocol", s.protocol),
zap.Stringer("remotePeer", stream.Conn().RemotePeer()),
zap.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
zap.Error(err),
)
return false
}
s.logger.With().Debug("protocol handler execution time",
log.String("protocol", s.protocol),
log.Stringer("remotePeer", stream.Conn().RemotePeer()),
log.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
log.Duration("duration", time.Since(start)),
s.logger.Debug("protocol handler execution time",
zap.String("protocol", s.protocol),
zap.Stringer("remotePeer", stream.Conn().RemotePeer()),
zap.Stringer("remoteMultiaddr", stream.Conn().RemoteMultiaddr()),
zap.Duration("duration", time.Since(start)),
)
return true
}
Expand Down Expand Up @@ -357,10 +358,11 @@ func (s *Server) StreamRequest(
stream, err := s.streamRequest(ctx, pid, req, extraProtocols...)
if err == nil {
err = callback(ctx, stream)
s.logger.WithContext(ctx).With().Debug("request execution time",
log.String("protocol", s.protocol),
log.Duration("duration", time.Since(start)),
log.Err(err),
s.logger.Debug("request execution time",
zap.String("protocol", s.protocol),
zap.Duration("duration", time.Since(start)),
zap.Error(err),
log.ZContext(ctx),
)
}

Expand Down
6 changes: 3 additions & 3 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"github.com/spacemeshos/go-scale/tester"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/log/logtest"
)

func TestServer(t *testing.T) {
Expand All @@ -33,7 +33,7 @@ func TestServer(t *testing.T) {
}
opts := []Opt{
WithTimeout(100 * time.Millisecond),
WithLog(logtest.New(t)),
WithLog(zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))),
WithMetrics(),
}
client := New(mesh.Hosts()[0], proto, WrapHandler(handler), append(opts, WithRequestSizeLimit(2*limit))...)
Expand Down

0 comments on commit dd8ed36

Please sign in to comment.