Skip to content

Commit

Permalink
Merge pull request #1975 from libp2p/identify-remove-delta
Browse files Browse the repository at this point in the history
identify: remove support for Identify Delta
  • Loading branch information
marten-seemann committed Jan 7, 2023
2 parents 2a49ff3 + 5095f44 commit 2ac8a31
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 860 deletions.
2 changes: 1 addition & 1 deletion limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) {
rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 1 << 20},
rcmgr.BaseLimitIncrease{},
)
for _, id := range [...]protocol.ID{identify.ID, identify.IDDelta, identify.IDPush} {
for _, id := range [...]protocol.ID{identify.ID, identify.IDPush} {
config.AddProtocolLimit(
id,
rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20},
Expand Down
2 changes: 0 additions & 2 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ func TestHostProtoPreference(t *testing.T) {

// Prevent pushing identify information so this test works.
h1.RemoveStreamHandler(identify.IDPush)
h1.RemoveStreamHandler(identify.IDDelta)

h2.SetStreamHandler(protoOld, handler)

Expand Down Expand Up @@ -362,7 +361,6 @@ func TestHostProtoPreknowledge(t *testing.T) {
h2.SetStreamHandler("/super", handler)
// Prevent pushing identify information so this test actually _uses_ the super protocol.
h1.RemoveStreamHandler(identify.IDPush)
h1.RemoveStreamHandler(identify.IDDelta)

h2pi := h2.Peerstore().PeerInfo(h2.ID())
require.NoError(t, h1.Connect(ctx, h2pi))
Expand Down
73 changes: 51 additions & 22 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"

"github.com/libp2p/go-libp2p/p2p/host/eventbus"
pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb"

Expand Down Expand Up @@ -115,7 +115,7 @@ type idService struct {
addPeerHandlerCh chan addPeerHandlerReq
rmPeerHandlerCh chan rmPeerHandlerReq

// pushSemaphore limits the push/delta concurrency to avoid storms
// pushSemaphore limits the push concurrency to avoid storms
// that clog the transient scope.
pushSemaphore chan struct{}
}
Expand Down Expand Up @@ -154,9 +154,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())

// handle local protocol handler updates, and push deltas to peers.
var err error

observedAddrs, err := NewObservedAddrManager(h)
if err != nil {
return nil, fmt.Errorf("failed to create observed address manager: %s", err)
Expand All @@ -180,7 +177,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) {
}

// register protocols that do not depend on peer records.
h.SetStreamHandler(IDDelta, s.deltaHandler)
h.SetStreamHandler(ID, s.sendIdentifyResp)
h.SetStreamHandler(IDPush, s.pushHandler)

Expand Down Expand Up @@ -269,20 +265,18 @@ func (ids *idService) loop() {
select {
case phs[pid].pushCh <- struct{}{}:
default:
log.Debugf("dropping addr updated message for %s as buffer full", pid.Pretty())
log.Debugf("dropping addr updated message for %s as buffer full", pid)
}
}

case event.EvtLocalProtocolsUpdated:
for pid := range phs {
select {
case phs[pid].deltaCh <- struct{}{}:
case phs[pid].pushCh <- struct{}{}:
default:
log.Debugf("dropping protocol updated message for %s as buffer full", pid.Pretty())
log.Debugf("dropping protocol updated message for %s as buffer full", pid)
}
}
}

case <-ids.ctx.Done():
return
}
Expand Down Expand Up @@ -372,7 +366,7 @@ func (ids *idService) identifyConn(c network.Conn) error {
return err
}

return ids.handleIdentifyResponse(s)
return ids.handleIdentifyResponse(s, false)
}

func (ids *idService) sendIdentifyResp(s network.Stream) {
Expand Down Expand Up @@ -406,14 +400,11 @@ func (ids *idService) sendIdentifyResp(s network.Stream) {
return
}

ph.snapshotMu.RLock()
snapshot := ph.snapshot
ph.snapshotMu.RUnlock()
ids.writeChunkedIdentifyMsg(c, snapshot, s)
ids.writeChunkedIdentifyMsg(c, s)
log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr())
}

func (ids *idService) handleIdentifyResponse(s network.Stream) error {
func (ids *idService) handleIdentifyResponse(s network.Stream, isPush bool) error {
if err := s.Scope().SetService(ServiceName); err != nil {
log.Warnf("error attaching stream to identify service: %s", err)
s.Reset()
Expand Down Expand Up @@ -444,7 +435,7 @@ func (ids *idService) handleIdentifyResponse(s network.Stream) error {

log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr())

ids.consumeMessage(mes, c)
ids.consumeMessage(mes, c, isPush)

return nil
}
Expand Down Expand Up @@ -477,7 +468,8 @@ func (ids *idService) getSnapshot() *identifySnapshot {
return snapshot
}

func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identifySnapshot, s network.Stream) error {
func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, s network.Stream) error {
snapshot := ids.getSnapshot()
mes := ids.createBaseIdentifyResponse(c, snapshot)
sr := ids.getSignedRecord(snapshot)
mes.SignedPeerRecord = sr
Expand Down Expand Up @@ -566,11 +558,49 @@ func (ids *idService) getSignedRecord(snapshot *identifySnapshot) []byte {
return recBytes
}

func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn) {
// diff takes two slices of strings (a and b) and computes which elements were added and removed in b
func diff(a, b []string) (added, removed []string) {
// This is O(n^2), but it's fine because the slices are small.
for _, x := range b {
var found bool
for _, y := range a {
if x == y {
found = true
break
}
}
if !found {
added = append(added, x)
}
}
for _, x := range a {
var found bool
for _, y := range b {
if x == y {
found = true
break
}
}
if !found {
removed = append(removed, x)
}
}
return
}

func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bool) {
p := c.RemotePeer()

// mes.Protocols
supported, _ := ids.Host.Peerstore().GetProtocols(p)
added, removed := diff(supported, mes.Protocols)
ids.Host.Peerstore().SetProtocols(p, mes.Protocols...)
if isPush {
ids.emitters.evtPeerProtocolsUpdated.Emit(event.EvtPeerProtocolsUpdated{
Peer: p,
Added: protocol.ConvertFromStrings(added),
Removed: protocol.ConvertFromStrings(removed),
})
}

// mes.ObservedAddr
ids.consumeObservedAddress(mes.GetObservedAddr(), c)
Expand Down Expand Up @@ -598,7 +628,6 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn) {

// add certified addresses for the peer, if they sent us a signed peer record
// otherwise use the unsigned addresses.
var signedPeerRecord *record.Envelope
signedPeerRecord, err := signedPeerRecordFromMessage(mes)
if err != nil {
log.Errorf("error getting peer record from Identify message: %v", err)
Expand Down
82 changes: 0 additions & 82 deletions p2p/protocol/identify/id_delta.go

This file was deleted.

9 changes: 3 additions & 6 deletions p2p/protocol/identify/id_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import (
"github.com/libp2p/go-libp2p/core/network"
)

// IDPush is the protocol.ID of the Identify push protocol. It sends full identify messages containing
// the current state of the peer.
//
// It is in the process of being replaced by identify delta, which sends only diffs for better
// resource utilisation.
// IDPush is the protocol.ID of the Identify push protocol.
// It sends full identify messages containing the current state of the peer.
const IDPush = "/ipfs/id/push/1.0.0"

// pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol.
func (ids *idService) pushHandler(s network.Stream) {
ids.handleIdentifyResponse(s)
ids.handleIdentifyResponse(s, true)
}

0 comments on commit 2ac8a31

Please sign in to comment.