Skip to content

Commit

Permalink
Identify: emit useful events after identification (#2759)
Browse files Browse the repository at this point in the history
* Identify should emit useful events after identification

* Compare strings rather than objects

* Include other fields in emitted event

* Sort addrs when comparing in test
  • Loading branch information
MarcoPolo authored Apr 16, 2024
1 parent 5d1c4a6 commit be32b5b
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 21 deletions.
31 changes: 30 additions & 1 deletion core/event/identify.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,40 @@
package event

import "github.com/libp2p/go-libp2p/core/peer"
import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
"github.com/multiformats/go-multiaddr"
)

// EvtPeerIdentificationCompleted is emitted when the initial identification round for a peer is completed.
type EvtPeerIdentificationCompleted struct {
// Peer is the ID of the peer whose identification succeeded.
Peer peer.ID

// Conn is the connection we identified.
Conn network.Conn

// ListenAddrs is the list of addresses the peer is listening on.
ListenAddrs []multiaddr.Multiaddr

// Protocols is the list of protocols the peer advertised on this connection.
Protocols []protocol.ID

// SignedPeerRecord is the provided signed peer record of the peer. May be nil.
SignedPeerRecord *record.Envelope

// AgentVersion is like a UserAgent string in browsers, or client version in
// bittorrent includes the client name and client.
AgentVersion string

// ProtocolVersion is the protocolVersion field in the identify message
ProtocolVersion string

// ObservedAddr is the our side's connection address as observed by the
// peer. This is not verified, the peer could return anything here.
ObservedAddr multiaddr.Multiaddr
}

// EvtPeerIdentificationFailed is emitted when the initial identification round for a peer failed.
Expand Down
41 changes: 23 additions & 18 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,6 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
ids.emitters.evtPeerIdentificationFailed.Emit(event.EvtPeerIdentificationFailed{Peer: c.RemotePeer(), Reason: err})
return
}

ids.emitters.evtPeerIdentificationCompleted.Emit(event.EvtPeerIdentificationCompleted{Peer: c.RemotePeer()})
}()

return e.IdentifyWaitChan
Expand Down Expand Up @@ -711,8 +709,16 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
})
}

// mes.ObservedAddr
ids.consumeObservedAddress(mes.GetObservedAddr(), c)
obsAddr, err := ma.NewMultiaddrBytes(mes.GetObservedAddr())
if err != nil {
log.Debugf("error parsing received observed addr for %s: %s", c, err)
obsAddr = nil
}

if obsAddr != nil {
// TODO refactor this to use the emitted events instead of having this func call explicitly.
ids.observedAddrs.Record(c, obsAddr)
}

// mes.ListenAddrs
laddrs := mes.GetListenAddrs()
Expand Down Expand Up @@ -763,6 +769,7 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
signedAddrs, err := ids.consumeSignedPeerRecord(c.RemotePeer(), signedPeerRecord)
if err != nil {
log.Debugf("failed to consume signed peer record: %s", err)
signedPeerRecord = nil
} else {
addrs = signedAddrs
}
Expand All @@ -786,6 +793,18 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo

// get the key from the other side. we may not have it (no-auth transport)
ids.consumeReceivedPubKey(c, mes.PublicKey)

ids.emitters.evtPeerIdentificationCompleted.Emit(event.EvtPeerIdentificationCompleted{
Peer: c.RemotePeer(),
Conn: c,
ListenAddrs: lmaddrs,
Protocols: mesProtocols,
SignedPeerRecord: signedPeerRecord,
ObservedAddr: obsAddr,
ProtocolVersion: pv,
AgentVersion: av,
})

}

func (ids *idService) consumeSignedPeerRecord(p peer.ID, signedPeerRecord *record.Envelope) ([]ma.Multiaddr, error) {
Expand Down Expand Up @@ -919,20 +938,6 @@ func HasConsistentTransport(a ma.Multiaddr, green []ma.Multiaddr) bool {
return false
}

func (ids *idService) consumeObservedAddress(observed []byte, c network.Conn) {
if observed == nil {
return
}

maddr, err := ma.NewMultiaddrBytes(observed)
if err != nil {
log.Debugf("error parsing received observed addr for %s: %s", c, err)
return
}

ids.observedAddrs.Record(c, maddr)
}

// addConnWithLock assuems caller holds the connsMu lock
func (ids *idService) addConnWithLock(c network.Conn) {
_, found := ids.conns[c]
Expand Down
41 changes: 39 additions & 2 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -200,12 +201,47 @@ func TestIDService(t *testing.T) {

// test that we received the "identify completed" event.
select {
case <-sub.Out():
case evtAny := <-sub.Out():
assertCorrectEvtPeerIdentificationCompleted(t, evtAny, h2)
case <-time.After(3 * time.Second):
t.Fatalf("expected EvtPeerIdentificationCompleted event within 10 seconds; none received")
}
}

func assertCorrectEvtPeerIdentificationCompleted(t *testing.T, evtAny interface{}, other host.Host) {
t.Helper()
evt := evtAny.(event.EvtPeerIdentificationCompleted)
require.NotNil(t, evt.Conn)
require.Equal(t, other.ID(), evt.Peer)

require.Equal(t, len(other.Addrs()), len(evt.ListenAddrs))
if len(other.Addrs()) == len(evt.ListenAddrs) {
otherAddrsStrings := make([]string, len(other.Addrs()))
evtAddrStrings := make([]string, len(evt.ListenAddrs))
for i, a := range other.Addrs() {
otherAddrsStrings[i] = a.String()
evtAddrStrings[i] = evt.ListenAddrs[i].String()
}
slices.Sort(otherAddrsStrings)
slices.Sort(evtAddrStrings)
require.Equal(t, otherAddrsStrings, evtAddrStrings)
}

otherProtos := other.Mux().Protocols()
slices.Sort(otherProtos)
evtProtos := evt.Protocols
slices.Sort(evtProtos)
require.Equal(t, otherProtos, evtProtos)
idFromSignedRecord, err := peer.IDFromPublicKey(evt.SignedPeerRecord.PublicKey)
require.NoError(t, err)
require.Equal(t, other.ID(), idFromSignedRecord)
require.Equal(t, peer.PeerRecordEnvelopePayloadType, evt.SignedPeerRecord.PayloadType)
var peerRecord peer.PeerRecord
evt.SignedPeerRecord.TypedRecord(&peerRecord)
require.Equal(t, other.ID(), peerRecord.PeerID)
require.Equal(t, other.Addrs(), peerRecord.Addrs)
}

func TestProtoMatching(t *testing.T) {
tcp1, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/1234")
tcp2, _ := ma.NewMultiaddr("/ip4/1.2.3.4/tcp/2345")
Expand Down Expand Up @@ -665,7 +701,8 @@ func TestLargeIdentifyMessage(t *testing.T) {

// test that we received the "identify completed" event.
select {
case <-sub.Out():
case evtAny := <-sub.Out():
assertCorrectEvtPeerIdentificationCompleted(t, evtAny, h2)
case <-time.After(3 * time.Second):
t.Fatalf("expected EvtPeerIdentificationCompleted event within 3 seconds; none received")
}
Expand Down

0 comments on commit be32b5b

Please sign in to comment.