Skip to content

Commit

Permalink
adding logging around libp2p one-to-one stream management (#785)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalchangrani committed Jun 8, 2021
1 parent 802be06 commit cb558ea
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 20 deletions.
12 changes: 12 additions & 0 deletions network/p2p/libp2pUtils.go
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/model/flow"
)
Expand Down Expand Up @@ -208,3 +210,13 @@ func peerInfosFromIDs(ids flow.IdentityList) ([]peer.AddrInfo, map[flow.Identifi
}
return validIDs, invalidIDs
}

// streamLogger creates a logger for libp2p stream which logs the remote and local peer IDs and addresses
func streamLogger(log zerolog.Logger, stream libp2pnetwork.Stream) zerolog.Logger {
logger := log.With().
Str("remote_peer", stream.Conn().RemotePeer().String()).
Str("remote_address", stream.Conn().RemoteMultiaddr().String()).
Str("local_peer", stream.Conn().LocalPeer().String()).
Str("local_address", stream.Conn().LocalMultiaddr().String()).Logger()
return logger
}
11 changes: 4 additions & 7 deletions network/p2p/middleware.go
Expand Up @@ -277,13 +277,13 @@ func (m *Middleware) SendDirect(msg *message.Message, targetID flow.Identifier)
// flush the stream
err = bufw.Flush()
if err != nil {
return fmt.Errorf("failed to flush stream for %s: %w", targetID.String(), err)
return fmt.Errorf("failed to flush stream for %s: %w", targetIdentity.String(), err)
}

// close the stream immediately
err = stream.Close()
if err != nil {
return fmt.Errorf("failed to close the stream for %s: %w", targetID.String(), err)
return fmt.Errorf("failed to close the stream for %s: %w", targetIdentity.String(), err)
}

// OneToOne communication metrics are reported with topic OneToOne
Expand Down Expand Up @@ -327,12 +327,9 @@ func identityList(identityMap map[flow.Identifier]flow.Identity) flow.IdentityLi
func (m *Middleware) handleIncomingStream(s libp2pnetwork.Stream) {

// qualify the logger with local and remote address
log := m.log.With().
Str("local_addr", s.Conn().LocalMultiaddr().String()).
Str("remote_addr", s.Conn().RemoteMultiaddr().String()).
Logger()
log := streamLogger(m.log, s)

log.Info().Msg("incoming connection established")
log.Info().Msg("incoming stream received")

//create a new readConnection with the context of the middleware
conn := newReadConnection(m.ctx, s, m.processMessage, log, m.metrics, LargeMsgMaxUnicastMsgSize)
Expand Down
15 changes: 2 additions & 13 deletions network/p2p/readConnection.go
Expand Up @@ -37,13 +37,11 @@ func newReadConnection(ctx context.Context,
maxMsgSize = DefaultMaxUnicastMsgSize
}

streamLogger := streamLogger(log, stream)

c := readConnection{
ctx: ctx,
stream: stream,
callback: callback,
log: streamLogger,
log: log,
metrics: metrics,
maxMsgSize: maxMsgSize,
}
Expand Down Expand Up @@ -79,7 +77,7 @@ func (rc *readConnection) receiveLoop(wg *sync.WaitGroup) {
rc.closeStream()
return
}
rc.log.Error().Err(err)
rc.log.Error().Err(err).Msg("failed to read message")
rc.resetStream()
return
}
Expand Down Expand Up @@ -120,12 +118,3 @@ func (rc *readConnection) resetStream() {
rc.log.Error().Err(err).Msg("failed to reset stream")
}
}

func streamLogger(log zerolog.Logger, stream libp2pnetwork.Stream) zerolog.Logger {
logger := log.With().
Str("remote_peer", stream.Conn().RemotePeer().String()).
Str("remote_address", stream.Conn().RemoteMultiaddr().String()).
Str("local_peer", stream.Conn().LocalPeer().String()).
Str("local_address", stream.Conn().LocalMultiaddr().String()).Logger()
return logger
}

0 comments on commit cb558ea

Please sign in to comment.