From 0e9abdf2282165891e18c75de1c9547d20da6070 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 4 Jan 2023 10:57:41 +1300 Subject: [PATCH 1/4] identify: emit EvtPeerProtocolsUpdated for Identify Push as well --- p2p/protocol/identify/id.go | 51 +++++++++++++++++++++++++++----- p2p/protocol/identify/id_push.go | 2 +- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 505b0346af..c342a21d60 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -13,8 +13,8 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/record" - "github.com/libp2p/go-libp2p/p2p/host/eventbus" pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" @@ -372,7 +372,7 @@ func (ids *idService) identifyConn(c network.Conn) error { return err } - return ids.handleIdentifyResponse(s) + return ids.handleIdentifyResponse(s, false) } func (ids *idService) sendIdentifyResp(s network.Stream) { @@ -413,7 +413,7 @@ func (ids *idService) sendIdentifyResp(s network.Stream) { log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr()) } -func (ids *idService) handleIdentifyResponse(s network.Stream) error { +func (ids *idService) handleIdentifyResponse(s network.Stream, isPush bool) error { if err := s.Scope().SetService(ServiceName); err != nil { log.Warnf("error attaching stream to identify service: %s", err) s.Reset() @@ -444,7 +444,7 @@ func (ids *idService) handleIdentifyResponse(s network.Stream) error { log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr()) - ids.consumeMessage(mes, c) + ids.consumeMessage(mes, c, isPush) return nil } @@ -566,11 +566,49 @@ func (ids *idService) getSignedRecord(snapshot *identifySnapshot) []byte { return recBytes } -func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn) { +// diff takes two slices of strings (a and b) and computes which elements were added and removed in b +func diff(a, b []string) (added, removed []string) { + // This is O(n^2), but it's fine because the slices are small. + for _, x := range b { + var found bool + for _, y := range a { + if x == y { + found = true + break + } + } + if !found { + added = append(added, x) + } + } + for _, x := range a { + var found bool + for _, y := range b { + if x == y { + found = true + break + } + } + if !found { + removed = append(removed, x) + } + } + return +} + +func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bool) { p := c.RemotePeer() - // mes.Protocols + supported, _ := ids.Host.Peerstore().GetProtocols(p) + added, removed := diff(supported, mes.Protocols) ids.Host.Peerstore().SetProtocols(p, mes.Protocols...) + if isPush { + ids.emitters.evtPeerProtocolsUpdated.Emit(event.EvtPeerProtocolsUpdated{ + Peer: p, + Added: protocol.ConvertFromStrings(added), + Removed: protocol.ConvertFromStrings(removed), + }) + } // mes.ObservedAddr ids.consumeObservedAddress(mes.GetObservedAddr(), c) @@ -598,7 +636,6 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn) { // add certified addresses for the peer, if they sent us a signed peer record // otherwise use the unsigned addresses. - var signedPeerRecord *record.Envelope signedPeerRecord, err := signedPeerRecordFromMessage(mes) if err != nil { log.Errorf("error getting peer record from Identify message: %v", err) diff --git a/p2p/protocol/identify/id_push.go b/p2p/protocol/identify/id_push.go index cbb47a9fab..c68c972974 100644 --- a/p2p/protocol/identify/id_push.go +++ b/p2p/protocol/identify/id_push.go @@ -13,5 +13,5 @@ const IDPush = "/ipfs/id/push/1.0.0" // pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol. func (ids *idService) pushHandler(s network.Stream) { - ids.handleIdentifyResponse(s) + ids.handleIdentifyResponse(s, true) } From aa1f32484a650397dec1c0d6ff70b88a3117cfcf Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 4 Jan 2023 11:29:01 +1300 Subject: [PATCH 2/4] identify: remove support for Identify Delta --- limits.go | 2 +- p2p/host/basic/basic_host_test.go | 2 - p2p/protocol/identify/id.go | 14 +- p2p/protocol/identify/id_delta.go | 82 ------ p2p/protocol/identify/id_push.go | 7 +- p2p/protocol/identify/id_test.go | 202 +------------- p2p/protocol/identify/pb/identify.pb.go | 341 ++---------------------- p2p/protocol/identify/pb/identify.proto | 10 - p2p/protocol/identify/peer_loop.go | 95 +------ p2p/protocol/identify/peer_loop_test.go | 47 ---- 10 files changed, 33 insertions(+), 769 deletions(-) delete mode 100644 p2p/protocol/identify/id_delta.go diff --git a/limits.go b/limits.go index cf81fa7629..41b3da8dcb 100644 --- a/limits.go +++ b/limits.go @@ -25,7 +25,7 @@ func SetDefaultServiceLimits(config *rcmgr.ScalingLimitConfig) { rcmgr.BaseLimit{StreamsInbound: 16, StreamsOutbound: 16, Streams: 32, Memory: 1 << 20}, rcmgr.BaseLimitIncrease{}, ) - for _, id := range [...]protocol.ID{identify.ID, identify.IDDelta, identify.IDPush} { + for _, id := range [...]protocol.ID{identify.ID, identify.IDPush} { config.AddProtocolLimit( id, rcmgr.BaseLimit{StreamsInbound: 64, StreamsOutbound: 64, Streams: 128, Memory: 4 << 20}, diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index b1c2cf6203..9093beea01 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -286,7 +286,6 @@ func TestHostProtoPreference(t *testing.T) { // Prevent pushing identify information so this test works. h1.RemoveStreamHandler(identify.IDPush) - h1.RemoveStreamHandler(identify.IDDelta) h2.SetStreamHandler(protoOld, handler) @@ -362,7 +361,6 @@ func TestHostProtoPreknowledge(t *testing.T) { h2.SetStreamHandler("/super", handler) // Prevent pushing identify information so this test actually _uses_ the super protocol. h1.RemoveStreamHandler(identify.IDPush) - h1.RemoveStreamHandler(identify.IDDelta) h2pi := h2.Peerstore().PeerInfo(h2.ID()) require.NoError(t, h1.Connect(ctx, h2pi)) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index c342a21d60..f33ddc7cae 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -115,7 +115,7 @@ type idService struct { addPeerHandlerCh chan addPeerHandlerReq rmPeerHandlerCh chan rmPeerHandlerReq - // pushSemaphore limits the push/delta concurrency to avoid storms + // pushSemaphore limits the push concurrency to avoid storms // that clog the transient scope. pushSemaphore chan struct{} } @@ -154,9 +154,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) { } s.ctx, s.ctxCancel = context.WithCancel(context.Background()) - // handle local protocol handler updates, and push deltas to peers. - var err error - observedAddrs, err := NewObservedAddrManager(h) if err != nil { return nil, fmt.Errorf("failed to create observed address manager: %s", err) @@ -180,7 +177,6 @@ func NewIDService(h host.Host, opts ...Option) (*idService, error) { } // register protocols that do not depend on peer records. - h.SetStreamHandler(IDDelta, s.deltaHandler) h.SetStreamHandler(ID, s.sendIdentifyResp) h.SetStreamHandler(IDPush, s.pushHandler) @@ -269,20 +265,18 @@ func (ids *idService) loop() { select { case phs[pid].pushCh <- struct{}{}: default: - log.Debugf("dropping addr updated message for %s as buffer full", pid.Pretty()) + log.Debugf("dropping addr updated message for %s as buffer full", pid) } } - case event.EvtLocalProtocolsUpdated: for pid := range phs { select { - case phs[pid].deltaCh <- struct{}{}: + case phs[pid].pushCh <- struct{}{}: default: - log.Debugf("dropping protocol updated message for %s as buffer full", pid.Pretty()) + log.Debugf("dropping protocol updated message for %s as buffer full", pid) } } } - case <-ids.ctx.Done(): return } diff --git a/p2p/protocol/identify/id_delta.go b/p2p/protocol/identify/id_delta.go deleted file mode 100644 index 7f7c75f12d..0000000000 --- a/p2p/protocol/identify/id_delta.go +++ /dev/null @@ -1,82 +0,0 @@ -package identify - -import ( - "time" - - "github.com/libp2p/go-libp2p/core/event" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" - - "github.com/libp2p/go-msgio/protoio" -) - -const IDDelta = "/p2p/id/delta/1.0.0" - -const deltaMsgSize = 2048 - -// deltaHandler handles incoming delta updates from peers. -func (ids *idService) deltaHandler(s network.Stream) { - if err := s.Scope().SetService(ServiceName); err != nil { - log.Warnf("error attaching stream to identify service: %s", err) - s.Reset() - return - } - - if err := s.Scope().ReserveMemory(deltaMsgSize, network.ReservationPriorityAlways); err != nil { - log.Warnf("error reserving memory for identify stream: %s", err) - s.Reset() - return - } - defer s.Scope().ReleaseMemory(deltaMsgSize) - - _ = s.SetReadDeadline(time.Now().Add(StreamReadTimeout)) - - c := s.Conn() - - r := protoio.NewDelimitedReader(s, deltaMsgSize) - mes := pb.Identify{} - if err := r.ReadMsg(&mes); err != nil { - log.Warn("error reading identify message: ", err) - _ = s.Reset() - return - } - - defer s.Close() - - log.Debugf("%s received message from %s %s", s.Protocol(), c.RemotePeer(), c.RemoteMultiaddr()) - - delta := mes.GetDelta() - if delta == nil { - return - } - - p := s.Conn().RemotePeer() - if err := ids.consumeDelta(p, delta); err != nil { - _ = s.Reset() - log.Warnf("delta update from peer %s failed: %s", p, err) - } -} - -// consumeDelta processes an incoming delta from a peer, updating the peerstore -// and emitting the appropriate events. -func (ids *idService) consumeDelta(id peer.ID, delta *pb.Delta) error { - err := ids.Host.Peerstore().AddProtocols(id, delta.GetAddedProtocols()...) - if err != nil { - return err - } - - err = ids.Host.Peerstore().RemoveProtocols(id, delta.GetRmProtocols()...) - if err != nil { - return err - } - - evt := event.EvtPeerProtocolsUpdated{ - Peer: id, - Added: protocol.ConvertFromStrings(delta.GetAddedProtocols()), - Removed: protocol.ConvertFromStrings(delta.GetRmProtocols()), - } - ids.emitters.evtPeerProtocolsUpdated.Emit(evt) - return nil -} diff --git a/p2p/protocol/identify/id_push.go b/p2p/protocol/identify/id_push.go index c68c972974..3b2c6a1ca9 100644 --- a/p2p/protocol/identify/id_push.go +++ b/p2p/protocol/identify/id_push.go @@ -4,11 +4,8 @@ import ( "github.com/libp2p/go-libp2p/core/network" ) -// IDPush is the protocol.ID of the Identify push protocol. It sends full identify messages containing -// the current state of the peer. -// -// It is in the process of being replaced by identify delta, which sends only diffs for better -// resource utilisation. +// IDPush is the protocol.ID of the Identify push protocol. +// It sends full identify messages containing the current state of the peer. const IDPush = "/ipfs/id/push/1.0.0" // pushHandler handles incoming identify push streams. The behaviour is identical to the ordinary identify protocol. diff --git a/p2p/protocol/identify/id_test.go b/p2p/protocol/identify/id_test.go index ebdb634961..8b1ddc900d 100644 --- a/p2p/protocol/identify/id_test.go +++ b/p2p/protocol/identify/id_test.go @@ -3,8 +3,6 @@ package identify_test import ( "context" "fmt" - "reflect" - "sort" "sync" "testing" "time" @@ -360,169 +358,8 @@ func TestLocalhostAddrFiltering(t *testing.T) { } } -func TestIdentifyDeltaOnProtocolChange(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - h1 := blhost.NewBlankHost(swarmt.GenSwarm(t)) - h2 := blhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h2.Close() - defer h1.Close() - - h2.SetStreamHandler(protocol.TestingID, func(_ network.Stream) {}) - - ids1, err := identify.NewIDService(h1) - require.NoError(t, err) - - ids2, err := identify.NewIDService(h2) - require.NoError(t, err) - - defer func() { - ids1.Close() - ids2.Close() - }() - - idComplete, err := h1.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{}) - require.NoError(t, err) - defer idComplete.Close() - idFailed, err := h1.EventBus().Subscribe(&event.EvtPeerIdentificationFailed{}) - require.NoError(t, err) - defer idFailed.Close() - - if err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { - t.Fatal(err) - } - - conn := h1.Network().ConnsToPeer(h2.ID())[0] - select { - case <-ids1.IdentifyWait(conn): - case <-time.After(5 * time.Second): - t.Fatal("took over 5 seconds to identify") - } - - select { - case <-idComplete.Out(): - case evt := <-idFailed.Out(): - t.Fatalf("Failed to identify: %v", evt.(event.EvtPeerIdentificationFailed).Reason) - case <-time.After(5 * time.Second): - t.Fatal("Missing id event") - } - - protos, err := h1.Peerstore().GetProtocols(h2.ID()) - if err != nil { - t.Fatal(err) - } - sort.Strings(protos) - if sort.SearchStrings(protos, string(protocol.TestingID)) == len(protos) { - t.Fatalf("expected peer 1 to know that peer 2 speaks the Test protocol amongst others") - } - - // set up a subscriber to listen to peer protocol updated events in h1. We expect to receive events from h2 - // as protocols are added and removed. - sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}) - if err != nil { - t.Fatal(err) - } - defer sub.Close() - - h1ProtocolsUpdates, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}) - require.NoError(t, err) - defer h1ProtocolsUpdates.Close() - - waitForDelta := make(chan struct{}) - go func() { - expectedCount := 2 - for expectedCount > 0 { - evt := <-h1ProtocolsUpdates.Out() - expectedCount -= len(evt.(event.EvtPeerProtocolsUpdated).Added) - } - close(waitForDelta) - }() - - // add two new protocols in h2 and wait for identify to send deltas. - h2.SetStreamHandler(protocol.ID("foo"), func(_ network.Stream) {}) - h2.SetStreamHandler(protocol.ID("bar"), func(_ network.Stream) {}) - - recvWithTimeout(t, waitForDelta, 10*time.Second, "Timed out waiting to read protocol ids from the wire") - - protos, err = h1.Peerstore().GetProtocols(h2.ID()) - require.NoError(t, err) - - have := make(map[string]bool, len(protos)) - for _, p := range protos { - have[p] = true - } - require.True(t, have["foo"]) - require.True(t, have["bar"]) - - // remove one of the newly added protocols from h2, and wait for identify to send the delta. - h2.RemoveStreamHandler(protocol.ID("bar")) - - waitForDelta = make(chan struct{}) - go func() { - expectedCount := 1 - for expectedCount > 0 { - evt := <-h1ProtocolsUpdates.Out() - expectedCount -= len(evt.(event.EvtPeerProtocolsUpdated).Removed) - } - close(waitForDelta) - }() - - // check that h1 now has forgotten about h2's bar protocol. - recvWithTimeout(t, waitForDelta, 10*time.Second, "timed out waiting for protocol to be removed") - protos, err = h1.Peerstore().GetProtocols(h2.ID()) - require.NoError(t, err) - have = make(map[string]bool, len(protos)) - for _, p := range protos { - have[p] = true - } - require.True(t, have["foo"]) - require.False(t, have["bar"]) - - // make sure that h1 emitted events in the eventbus for h2's protocol updates. - done := make(chan struct{}) - - var lk sync.Mutex - var added []string - var removed []string - var success bool - - go func() { - defer close(done) - for { - select { - case <-time.After(5 * time.Second): - return - case e, ok := <-sub.Out(): - if !ok { - return - } - evt := e.(event.EvtPeerProtocolsUpdated) - lk.Lock() - added = append(added, protocol.ConvertToStrings(evt.Added)...) - removed = append(removed, protocol.ConvertToStrings(evt.Removed)...) - sort.Strings(added) - sort.Strings(removed) - if reflect.DeepEqual(added, []string{"bar", "foo"}) && - reflect.DeepEqual(removed, []string{"bar"}) { - success = true - lk.Unlock() - return - } - lk.Unlock() - } - } - }() - - <-done - - lk.Lock() - defer lk.Unlock() - require.True(t, success, "did not get correct peer protocol updated events") -} - -// TestIdentifyDeltaWhileIdentifyingConn tests that the host waits to push delta updates if an identify is ongoing. -func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) { +// TestIdentifyPushWhileIdentifyingConn tests that the host waits to push updates if an identify is ongoing. +func TestIdentifyPushWhileIdentifyingConn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -559,20 +396,18 @@ func TestIdentifyDeltaWhileIdentifyingConn(t *testing.T) { // from h2, identify h1. conn := h2.Network().ConnsToPeer(h1.ID())[0] - go func() { - ids2.IdentifyConn(conn) - }() + go ids2.IdentifyConn(conn) <-time.After(500 * time.Millisecond) - // subscribe to events in h1; after identify h1 should receive the delta from h2 and publish an event in the bus. + // subscribe to events in h1; after identify h1 should receive the update from h2 and publish an event in the bus. sub, err := h1.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}) if err != nil { t.Fatal(err) } defer sub.Close() - // add a handler in h2; the delta to h1 will queue until we're done identifying h1. + // add a handler in h2; the update to h1 will queue until we're done identifying h1. h2.SetStreamHandler(protocol.TestingID, func(_ network.Stream) {}) <-time.After(500 * time.Millisecond) @@ -722,7 +557,7 @@ func TestNotListening(t *testing.T) { } } -func TestSendPushIfDeltaNotSupported(t *testing.T) { +func TestSendPush(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -749,19 +584,6 @@ func TestSendPushIfDeltaNotSupported(t *testing.T) { ids1.IdentifyConn(h1.Network().ConnsToPeer(h2.ID())[0]) ids2.IdentifyConn(h2.Network().ConnsToPeer(h1.ID())[0]) - // h1 knows h2 speaks Delta - sup, err := h1.Peerstore().SupportsProtocols(h2.ID(), []string{identify.IDDelta}...) - require.NoError(t, err) - require.Equal(t, []string{identify.IDDelta}, sup) - - // h2 stops supporting Delta and that information flows to h1 - h2.RemoveStreamHandler(identify.IDDelta) - - require.Eventually(t, func() bool { - sup, err := h1.Peerstore().SupportsProtocols(h2.ID(), []string{identify.IDDelta}...) - return err == nil && len(sup) == 0 - }, time.Second, 10*time.Millisecond) - // h1 starts listening on a new protocol and h2 finds out about that through a push h1.SetStreamHandler("rand", func(network.Stream) {}) require.Eventually(t, func() bool { @@ -1019,7 +841,7 @@ func TestIncomingIDStreamsTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - protocols := []protocol.ID{identify.IDPush, identify.IDDelta} + protocols := []protocol.ID{identify.IDPush} for _, p := range protocols { h1 := blhost.NewBlankHost(swarmt.GenSwarm(t)) @@ -1055,16 +877,6 @@ func TestIncomingIDStreamsTimeout(t *testing.T) { } } -func recvWithTimeout(t *testing.T, s <-chan struct{}, timeout time.Duration, failMsg string) { - t.Helper() - select { - case <-s: - return - case <-time.After(timeout): - t.Fatalf("Hit time while waiting to recv from channel: %s", failMsg) - } -} - func waitForAddrInStream(t *testing.T, s <-chan ma.Multiaddr, expected ma.Multiaddr, timeout time.Duration, failMsg string) { t.Helper() for { diff --git a/p2p/protocol/identify/pb/identify.pb.go b/p2p/protocol/identify/pb/identify.pb.go index 3cfed82707..e0e08d9cb2 100644 --- a/p2p/protocol/identify/pb/identify.pb.go +++ b/p2p/protocol/identify/pb/identify.pb.go @@ -5,11 +5,10 @@ package identify_pb import ( fmt "fmt" + proto "github.com/gogo/protobuf/proto" io "io" math "math" math_bits "math/bits" - - proto "github.com/gogo/protobuf/proto" ) // Reference imports to suppress errors if they are not otherwise used. @@ -23,63 +22,6 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package -type Delta struct { - // new protocols now serviced by the peer. - AddedProtocols []string `protobuf:"bytes,1,rep,name=added_protocols,json=addedProtocols" json:"added_protocols,omitempty"` - // protocols dropped by the peer. - RmProtocols []string `protobuf:"bytes,2,rep,name=rm_protocols,json=rmProtocols" json:"rm_protocols,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Delta) Reset() { *m = Delta{} } -func (m *Delta) String() string { return proto.CompactTextString(m) } -func (*Delta) ProtoMessage() {} -func (*Delta) Descriptor() ([]byte, []int) { - return fileDescriptor_83f1e7e6b485409f, []int{0} -} -func (m *Delta) XXX_Unmarshal(b []byte) error { - return m.Unmarshal(b) -} -func (m *Delta) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - if deterministic { - return xxx_messageInfo_Delta.Marshal(b, m, deterministic) - } else { - b = b[:cap(b)] - n, err := m.MarshalToSizedBuffer(b) - if err != nil { - return nil, err - } - return b[:n], nil - } -} -func (m *Delta) XXX_Merge(src proto.Message) { - xxx_messageInfo_Delta.Merge(m, src) -} -func (m *Delta) XXX_Size() int { - return m.Size() -} -func (m *Delta) XXX_DiscardUnknown() { - xxx_messageInfo_Delta.DiscardUnknown(m) -} - -var xxx_messageInfo_Delta proto.InternalMessageInfo - -func (m *Delta) GetAddedProtocols() []string { - if m != nil { - return m.AddedProtocols - } - return nil -} - -func (m *Delta) GetRmProtocols() []string { - if m != nil { - return m.RmProtocols - } - return nil -} - type Identify struct { // protocolVersion determines compatibility between peers ProtocolVersion *string `protobuf:"bytes,5,opt,name=protocolVersion" json:"protocolVersion,omitempty"` @@ -98,8 +40,6 @@ type Identify struct { ObservedAddr []byte `protobuf:"bytes,4,opt,name=observedAddr" json:"observedAddr,omitempty"` // protocols are the services this node is running Protocols []string `protobuf:"bytes,3,rep,name=protocols" json:"protocols,omitempty"` - // a delta update is incompatible with everything else. If this field is included, none of the others can appear. - Delta *Delta `protobuf:"bytes,7,opt,name=delta" json:"delta,omitempty"` // signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord, // signed by the sending node. It contains the same addresses as the listenAddrs field, but // in a form that lets us share authenticated addrs with other peers. @@ -115,7 +55,7 @@ func (m *Identify) Reset() { *m = Identify{} } func (m *Identify) String() string { return proto.CompactTextString(m) } func (*Identify) ProtoMessage() {} func (*Identify) Descriptor() ([]byte, []int) { - return fileDescriptor_83f1e7e6b485409f, []int{1} + return fileDescriptor_83f1e7e6b485409f, []int{0} } func (m *Identify) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -186,13 +126,6 @@ func (m *Identify) GetProtocols() []string { return nil } -func (m *Identify) GetDelta() *Delta { - if m != nil { - return m.Delta - } - return nil -} - func (m *Identify) GetSignedPeerRecord() []byte { if m != nil { return m.SignedPeerRecord @@ -201,76 +134,27 @@ func (m *Identify) GetSignedPeerRecord() []byte { } func init() { - proto.RegisterType((*Delta)(nil), "identify.pb.Delta") proto.RegisterType((*Identify)(nil), "identify.pb.Identify") } func init() { proto.RegisterFile("identify.proto", fileDescriptor_83f1e7e6b485409f) } var fileDescriptor_83f1e7e6b485409f = []byte{ - // 272 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x90, 0x41, 0x4a, 0xc3, 0x40, - 0x14, 0x86, 0x99, 0xd6, 0x6a, 0xf3, 0x12, 0x5a, 0x99, 0xd5, 0x2c, 0x24, 0xc4, 0x6c, 0x1c, 0x5c, - 0x64, 0xe1, 0x0d, 0x14, 0x37, 0xe2, 0xa6, 0x8c, 0xe0, 0x56, 0x92, 0xbc, 0x67, 0x19, 0x48, 0x33, - 0x65, 0x32, 0x0a, 0xbd, 0x95, 0xc7, 0x70, 0xe9, 0x11, 0x24, 0x27, 0x91, 0x4c, 0x4d, 0x93, 0xea, - 0x72, 0x3e, 0x3e, 0xe6, 0x7f, 0xff, 0x0f, 0x0b, 0x8d, 0x54, 0x3b, 0xfd, 0xba, 0xcb, 0xb6, 0xd6, - 0x38, 0xc3, 0xc3, 0xe1, 0x5d, 0xa4, 0x4f, 0x30, 0xbb, 0xa7, 0xca, 0xe5, 0xfc, 0x0a, 0x96, 0x39, - 0x22, 0xe1, 0x8b, 0x97, 0x4a, 0x53, 0x35, 0x82, 0x25, 0x53, 0x19, 0xa8, 0x85, 0xc7, 0xab, 0x9e, - 0xf2, 0x4b, 0x88, 0xec, 0x66, 0x64, 0x4d, 0xbc, 0x15, 0xda, 0xcd, 0x41, 0x49, 0x3f, 0x26, 0x30, - 0x7f, 0xf8, 0x0d, 0xe1, 0x12, 0x96, 0xbd, 0xfc, 0x4c, 0xb6, 0xd1, 0xa6, 0x16, 0xb3, 0x84, 0xc9, - 0x40, 0xfd, 0xc5, 0x3c, 0x85, 0x28, 0x5f, 0x53, 0xed, 0x7a, 0xed, 0xd4, 0x6b, 0x47, 0x8c, 0x5f, - 0x40, 0xb0, 0x7d, 0x2b, 0x2a, 0x5d, 0x3e, 0xd2, 0x4e, 0xb0, 0x84, 0xc9, 0x48, 0x0d, 0x80, 0x27, - 0x10, 0x56, 0xba, 0x71, 0x54, 0xdf, 0x22, 0xda, 0xfd, 0x69, 0x91, 0x1a, 0xa3, 0x2e, 0xc3, 0x14, - 0x0d, 0xd9, 0x77, 0xc2, 0x0e, 0x88, 0x13, 0xff, 0xc5, 0x11, 0xf3, 0x19, 0x87, 0x7a, 0x53, 0x5f, - 0x6f, 0x00, 0x5c, 0xc2, 0x0c, 0xbb, 0xc5, 0xc4, 0x59, 0xc2, 0x64, 0x78, 0xc3, 0xb3, 0xd1, 0x9c, - 0x99, 0xdf, 0x52, 0xed, 0x05, 0x7e, 0x0d, 0xe7, 0x8d, 0x5e, 0xd7, 0x84, 0x2b, 0x22, 0xab, 0xa8, - 0x34, 0x16, 0xc5, 0xdc, 0xe7, 0xfd, 0xe3, 0x77, 0xd1, 0x67, 0x1b, 0xb3, 0xaf, 0x36, 0x66, 0xdf, - 0x6d, 0xcc, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0xc0, 0x03, 0xc8, 0x41, 0xb3, 0x01, 0x00, 0x00, -} - -func (m *Delta) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalToSizedBuffer(dAtA[:size]) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *Delta) MarshalTo(dAtA []byte) (int, error) { - size := m.Size() - return m.MarshalToSizedBuffer(dAtA[:size]) -} - -func (m *Delta) MarshalToSizedBuffer(dAtA []byte) (int, error) { - i := len(dAtA) - _ = i - var l int - _ = l - if m.XXX_unrecognized != nil { - i -= len(m.XXX_unrecognized) - copy(dAtA[i:], m.XXX_unrecognized) - } - if len(m.RmProtocols) > 0 { - for iNdEx := len(m.RmProtocols) - 1; iNdEx >= 0; iNdEx-- { - i -= len(m.RmProtocols[iNdEx]) - copy(dAtA[i:], m.RmProtocols[iNdEx]) - i = encodeVarintIdentify(dAtA, i, uint64(len(m.RmProtocols[iNdEx]))) - i-- - dAtA[i] = 0x12 - } - } - if len(m.AddedProtocols) > 0 { - for iNdEx := len(m.AddedProtocols) - 1; iNdEx >= 0; iNdEx-- { - i -= len(m.AddedProtocols[iNdEx]) - copy(dAtA[i:], m.AddedProtocols[iNdEx]) - i = encodeVarintIdentify(dAtA, i, uint64(len(m.AddedProtocols[iNdEx]))) - i-- - dAtA[i] = 0xa - } - } - return len(dAtA) - i, nil + // 213 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x4c, 0x49, 0xcd, + 0x2b, 0xc9, 0x4c, 0xab, 0xd4, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0xf0, 0x93, 0x94, + 0xda, 0x98, 0xb8, 0x38, 0x3c, 0xa1, 0x7c, 0x21, 0x0d, 0x2e, 0x7e, 0xb0, 0x92, 0xe4, 0xfc, 0x9c, + 0xb0, 0xd4, 0xa2, 0xe2, 0xcc, 0xfc, 0x3c, 0x09, 0x56, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x74, 0x61, + 0x21, 0x25, 0x2e, 0x9e, 0xc4, 0xf4, 0xd4, 0xbc, 0x12, 0x98, 0x32, 0x36, 0xb0, 0x32, 0x14, 0x31, + 0x21, 0x19, 0x2e, 0xce, 0x82, 0xd2, 0xa4, 0x9c, 0xcc, 0x64, 0xef, 0xd4, 0x4a, 0x09, 0x46, 0x05, + 0x46, 0x0d, 0x9e, 0x20, 0x84, 0x80, 0x90, 0x02, 0x17, 0x77, 0x4e, 0x66, 0x71, 0x49, 0x6a, 0x9e, + 0x63, 0x4a, 0x4a, 0x51, 0xb1, 0x04, 0x93, 0x02, 0xb3, 0x06, 0x4f, 0x10, 0xb2, 0x10, 0xc8, 0x8e, + 0xfc, 0xa4, 0xe2, 0xd4, 0xa2, 0xb2, 0xd4, 0x14, 0x90, 0x80, 0x04, 0x0b, 0xd8, 0x08, 0x14, 0x31, + 0xb0, 0x1d, 0x50, 0xa7, 0x15, 0x4b, 0x30, 0x2b, 0x30, 0x6b, 0x70, 0x06, 0x21, 0x04, 0x84, 0xb4, + 0xb8, 0x04, 0x8a, 0x33, 0xd3, 0xf3, 0x52, 0x53, 0x02, 0x52, 0x53, 0x8b, 0x82, 0x52, 0x93, 0xf3, + 0x8b, 0x52, 0x24, 0x38, 0xc0, 0xa6, 0x60, 0x88, 0x3b, 0xf1, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, + 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc3, 0xb6, 0xc0, + 0x32, 0x34, 0x01, 0x00, 0x00, } func (m *Identify) Marshal() (dAtA []byte, err error) { @@ -304,18 +188,6 @@ func (m *Identify) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x42 } - if m.Delta != nil { - { - size, err := m.Delta.MarshalToSizedBuffer(dAtA[:i]) - if err != nil { - return 0, err - } - i -= size - i = encodeVarintIdentify(dAtA, i, uint64(size)) - } - i-- - dAtA[i] = 0x3a - } if m.AgentVersion != nil { i -= len(*m.AgentVersion) copy(dAtA[i:], *m.AgentVersion) @@ -376,30 +248,6 @@ func encodeVarintIdentify(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } -func (m *Delta) Size() (n int) { - if m == nil { - return 0 - } - var l int - _ = l - if len(m.AddedProtocols) > 0 { - for _, s := range m.AddedProtocols { - l = len(s) - n += 1 + l + sovIdentify(uint64(l)) - } - } - if len(m.RmProtocols) > 0 { - for _, s := range m.RmProtocols { - l = len(s) - n += 1 + l + sovIdentify(uint64(l)) - } - } - if m.XXX_unrecognized != nil { - n += len(m.XXX_unrecognized) - } - return n -} - func (m *Identify) Size() (n int) { if m == nil { return 0 @@ -434,10 +282,6 @@ func (m *Identify) Size() (n int) { l = len(*m.AgentVersion) n += 1 + l + sovIdentify(uint64(l)) } - if m.Delta != nil { - l = m.Delta.Size() - n += 1 + l + sovIdentify(uint64(l)) - } if m.SignedPeerRecord != nil { l = len(m.SignedPeerRecord) n += 1 + l + sovIdentify(uint64(l)) @@ -454,121 +298,6 @@ func sovIdentify(x uint64) (n int) { func sozIdentify(x uint64) (n int) { return sovIdentify(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (m *Delta) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowIdentify - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: Delta: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: Delta: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field AddedProtocols", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowIdentify - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthIdentify - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthIdentify - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.AddedProtocols = append(m.AddedProtocols, string(dAtA[iNdEx:postIndex])) - iNdEx = postIndex - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field RmProtocols", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowIdentify - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthIdentify - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthIdentify - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.RmProtocols = append(m.RmProtocols, string(dAtA[iNdEx:postIndex])) - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipIdentify(dAtA[iNdEx:]) - if err != nil { - return err - } - if (skippy < 0) || (iNdEx+skippy) < 0 { - return ErrInvalidLengthIdentify - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} func (m *Identify) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -796,42 +525,6 @@ func (m *Identify) Unmarshal(dAtA []byte) error { s := string(dAtA[iNdEx:postIndex]) m.AgentVersion = &s iNdEx = postIndex - case 7: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Delta", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowIdentify - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthIdentify - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthIdentify - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Delta == nil { - m.Delta = &Delta{} - } - if err := m.Delta.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex case 8: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field SignedPeerRecord", wireType) diff --git a/p2p/protocol/identify/pb/identify.proto b/p2p/protocol/identify/pb/identify.proto index bdb283305b..cda102d41f 100644 --- a/p2p/protocol/identify/pb/identify.proto +++ b/p2p/protocol/identify/pb/identify.proto @@ -2,13 +2,6 @@ syntax = "proto2"; package identify.pb; -message Delta { - // new protocols now serviced by the peer. - repeated string added_protocols = 1; - // protocols dropped by the peer. - repeated string rm_protocols = 2; -} - message Identify { // protocolVersion determines compatibility between peers @@ -34,9 +27,6 @@ message Identify { // protocols are the services this node is running repeated string protocols = 3; - // a delta update is incompatible with everything else. If this field is included, none of the others can appear. - optional Delta delta = 7; - // signedPeerRecord contains a serialized SignedEnvelope containing a PeerRecord, // signed by the sending node. It contains the same addresses as the listenAddrs field, but // in a form that lets us share authenticated addrs with other peers. diff --git a/p2p/protocol/identify/peer_loop.go b/p2p/protocol/identify/peer_loop.go index af8549339e..e6520de951 100644 --- a/p2p/protocol/identify/peer_loop.go +++ b/p2p/protocol/identify/peer_loop.go @@ -11,9 +11,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/record" - pb "github.com/libp2p/go-libp2p/p2p/protocol/identify/pb" - - "github.com/libp2p/go-msgio/protoio" ma "github.com/multiformats/go-multiaddr" ) @@ -35,8 +32,7 @@ type peerHandler struct { snapshotMu sync.RWMutex snapshot *identifySnapshot - pushCh chan struct{} - deltaCh chan struct{} + pushCh chan struct{} } func newPeerHandler(pid peer.ID, ids *idService) *peerHandler { @@ -46,8 +42,7 @@ func newPeerHandler(pid peer.ID, ids *idService) *peerHandler { snapshot: ids.getSnapshot(), - pushCh: make(chan struct{}, 1), - deltaCh: make(chan struct{}, 1), + pushCh: make(chan struct{}, 1), } return ph @@ -92,52 +87,12 @@ func (ph *peerHandler) loop(ctx context.Context, onExit func()) { if err := ph.sendPush(ctx); err != nil { log.Warnw("failed to send Identify Push", "peer", ph.pid, "error", err) } - - case <-ph.deltaCh: - if err := ph.sendDelta(ctx); err != nil { - log.Warnw("failed to send Identify Delta", "peer", ph.pid, "error", err) - } - case <-ctx.Done(): return } } } -func (ph *peerHandler) sendDelta(ctx context.Context) error { - // send a push if the peer does not support the Delta protocol. - if !ph.peerSupportsProtos(ctx, []string{IDDelta}) { - log.Debugw("will send push as peer does not support delta", "peer", ph.pid) - if err := ph.sendPush(ctx); err != nil { - return fmt.Errorf("failed to send push on delta message: %w", err) - } - return nil - } - - // extract a delta message, updating the last state. - mes := ph.nextDelta() - if mes == nil || (len(mes.AddedProtocols) == 0 && len(mes.RmProtocols) == 0) { - return nil - } - - ds, err := ph.openStream(ctx, []string{IDDelta}) - if err != nil { - return fmt.Errorf("failed to open delta stream: %w", err) - } - - defer ds.Close() - - c := ds.Conn() - if err := protoio.NewDelimitedWriter(ds).WriteMsg(&pb.Identify{Delta: mes}); err != nil { - _ = ds.Reset() - return fmt.Errorf("failed to send delta message, %w", err) - } - log.Debugw("sent identify update", "protocol", ds.Protocol(), "peer", c.RemotePeer(), - "peer address", c.RemoteMultiaddr()) - - return nil -} - func (ph *peerHandler) sendPush(ctx context.Context) error { dp, err := ph.openStream(ctx, []string{IDPush}) if err == errProtocolNotSupported { @@ -216,49 +171,3 @@ func (ph *peerHandler) peerSupportsProtos(ctx context.Context, protos []string) } return true } - -func (ph *peerHandler) nextDelta() *pb.Delta { - curr := ph.ids.Host.Mux().Protocols() - - // Extract the old protocol list and replace the old snapshot with an - // updated one. - ph.snapshotMu.Lock() - snapshot := *ph.snapshot - old := snapshot.protocols - snapshot.protocols = curr - ph.snapshot = &snapshot - ph.snapshotMu.Unlock() - - oldProtos := make(map[string]struct{}, len(old)) - currProtos := make(map[string]struct{}, len(curr)) - - for _, proto := range old { - oldProtos[proto] = struct{}{} - } - - for _, proto := range curr { - currProtos[proto] = struct{}{} - } - - var added []string - var removed []string - - // has it been added ? - for p := range currProtos { - if _, ok := oldProtos[p]; !ok { - added = append(added, p) - } - } - - // has it been removed ? - for p := range oldProtos { - if _, ok := currProtos[p]; !ok { - removed = append(removed, p) - } - } - - return &pb.Delta{ - AddedProtocols: added, - RmProtocols: removed, - } -} diff --git a/p2p/protocol/identify/peer_loop_test.go b/p2p/protocol/identify/peer_loop_test.go index 54fe63b243..88e2a1258a 100644 --- a/p2p/protocol/identify/peer_loop_test.go +++ b/p2p/protocol/identify/peer_loop_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" blhost "github.com/libp2p/go-libp2p/p2p/host/blank" swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" @@ -13,52 +12,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestMakeApplyDelta(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - h1 := blhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - ids1, err := NewIDService(h1) - require.NoError(t, err) - ph := newPeerHandler(h1.ID(), ids1) - ph.start(ctx, func() {}) - defer ph.stop() - - m1 := ph.nextDelta() - require.NotNil(t, m1) - // We haven't changed anything since creating the peer handler - require.Empty(t, m1.AddedProtocols) - - h1.SetStreamHandler("p1", func(network.Stream) {}) - m2 := ph.nextDelta() - require.Len(t, m2.AddedProtocols, 1) - require.Contains(t, m2.AddedProtocols, "p1") - require.Empty(t, m2.RmProtocols) - - h1.SetStreamHandler("p2", func(network.Stream) {}) - h1.SetStreamHandler("p3", func(stream network.Stream) {}) - m3 := ph.nextDelta() - require.Len(t, m3.AddedProtocols, 2) - require.Contains(t, m3.AddedProtocols, "p2") - require.Contains(t, m3.AddedProtocols, "p3") - require.Empty(t, m3.RmProtocols) - - h1.RemoveStreamHandler("p3") - m4 := ph.nextDelta() - require.Empty(t, m4.AddedProtocols) - require.Len(t, m4.RmProtocols, 1) - require.Contains(t, m4.RmProtocols, "p3") - - h1.RemoveStreamHandler("p2") - h1.RemoveStreamHandler("p1") - m5 := ph.nextDelta() - require.Empty(t, m5.AddedProtocols) - require.Len(t, m5.RmProtocols, 2) - require.Contains(t, m5.RmProtocols, "p2") - require.Contains(t, m5.RmProtocols, "p1") -} - func TestHandlerClose(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 56078a18f7e6863a0b802693510391b07bf7798d Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Wed, 4 Jan 2023 13:09:41 +1300 Subject: [PATCH 3/4] identify: simply stream handling logic when sending pushes --- p2p/protocol/identify/peer_loop.go | 42 ++++--------------------- p2p/protocol/identify/peer_loop_test.go | 21 ------------- 2 files changed, 6 insertions(+), 57 deletions(-) diff --git a/p2p/protocol/identify/peer_loop.go b/p2p/protocol/identify/peer_loop.go index e6520de951..15c7e85b68 100644 --- a/p2p/protocol/identify/peer_loop.go +++ b/p2p/protocol/identify/peer_loop.go @@ -94,7 +94,7 @@ func (ph *peerHandler) loop(ctx context.Context, onExit func()) { } func (ph *peerHandler) sendPush(ctx context.Context) error { - dp, err := ph.openStream(ctx, []string{IDPush}) + dp, err := ph.openStream(ctx, IDPush) if err == errProtocolNotSupported { log.Debugw("not sending push as peer does not support protocol", "peer", ph.pid) return nil @@ -112,14 +112,13 @@ func (ph *peerHandler) sendPush(ctx context.Context) error { _ = dp.Reset() return fmt.Errorf("failed to send push message: %w", err) } - return nil } -func (ph *peerHandler) openStream(ctx context.Context, protos []string) (network.Stream, error) { +func (ph *peerHandler) openStream(ctx context.Context, proto string) (network.Stream, error) { // wait for the other peer to send us an Identify response on "all" connections we have with it // so we can look at it's supported protocols and avoid a multistream-select roundtrip to negotiate the protocol - // if we know for a fact that it dosen't support the protocol. + // if we know for a fact that it doesn't support the protocol. conns := ph.ids.Host.Network().ConnsToPeer(ph.pid) for _, c := range conns { select { @@ -129,45 +128,16 @@ func (ph *peerHandler) openStream(ctx context.Context, protos []string) (network } } - if !ph.peerSupportsProtos(ctx, protos) { + if sup, err := ph.ids.Host.Peerstore().SupportsProtocols(ph.pid, proto); err != nil || len(sup) == 0 { return nil, errProtocolNotSupported } - ph.ids.pushSemaphore <- struct{}{} defer func() { <-ph.ids.pushSemaphore }() // negotiate a stream without opening a new connection as we "should" already have a connection. - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + ctx, cancel := context.WithTimeout(network.WithNoDial(ctx, "should already have connection"), 30*time.Second) defer cancel() - ctx = network.WithNoDial(ctx, "should already have connection") - - // newstream will open a stream on the first protocol the remote peer supports from the among - // the list of protocols passed to it. - s, err := ph.ids.Host.NewStream(ctx, ph.pid, protocol.ConvertFromStrings(protos)...) - if err != nil { - return nil, err - } - - return s, err -} - -// returns true if the peer supports atleast one of the given protocols -func (ph *peerHandler) peerSupportsProtos(ctx context.Context, protos []string) bool { - conns := ph.ids.Host.Network().ConnsToPeer(ph.pid) - for _, c := range conns { - select { - case <-ph.ids.IdentifyWait(c): - case <-ctx.Done(): - return false - } - } - - pstore := ph.ids.Host.Peerstore() - - if sup, err := pstore.SupportsProtocols(ph.pid, protos...); err == nil && len(sup) == 0 { - return false - } - return true + return ph.ids.Host.NewStream(ctx, ph.pid, protocol.ID(proto)) } diff --git a/p2p/protocol/identify/peer_loop_test.go b/p2p/protocol/identify/peer_loop_test.go index 88e2a1258a..d7219f6a98 100644 --- a/p2p/protocol/identify/peer_loop_test.go +++ b/p2p/protocol/identify/peer_loop_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/libp2p/go-libp2p/core/peer" blhost "github.com/libp2p/go-libp2p/p2p/host/blank" swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" @@ -40,23 +39,3 @@ func TestHandlerClose(t *testing.T) { case <-time.After(10 * time.Millisecond): } } - -func TestPeerSupportsProto(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - h1 := blhost.NewBlankHost(swarmt.GenSwarm(t)) - defer h1.Close() - ids1, err := NewIDService(h1) - require.NoError(t, err) - - rp := peer.ID("test") - ph := newPeerHandler(rp, ids1) - require.NoError(t, h1.Peerstore().AddProtocols(rp, "test")) - require.True(t, ph.peerSupportsProtos(ctx, []string{"test"})) - require.False(t, ph.peerSupportsProtos(ctx, []string{"random"})) - - // remove support for protocol and check - require.NoError(t, h1.Peerstore().RemoveProtocols(rp, "test")) - require.False(t, ph.peerSupportsProtos(ctx, []string{"test"})) -} From 5095f44da24d2c354e8c8e634d41fba9f5081f09 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Thu, 5 Jan 2023 17:31:22 +1300 Subject: [PATCH 4/4] identify: remove snapshot handling --- p2p/protocol/identify/id.go | 8 +++----- p2p/protocol/identify/peer_loop.go | 30 +++++++++--------------------- 2 files changed, 12 insertions(+), 26 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index f33ddc7cae..ecfa8feaed 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -400,10 +400,7 @@ func (ids *idService) sendIdentifyResp(s network.Stream) { return } - ph.snapshotMu.RLock() - snapshot := ph.snapshot - ph.snapshotMu.RUnlock() - ids.writeChunkedIdentifyMsg(c, snapshot, s) + ids.writeChunkedIdentifyMsg(c, s) log.Debugf("%s sent message to %s %s", ID, c.RemotePeer(), c.RemoteMultiaddr()) } @@ -471,7 +468,8 @@ func (ids *idService) getSnapshot() *identifySnapshot { return snapshot } -func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, snapshot *identifySnapshot, s network.Stream) error { +func (ids *idService) writeChunkedIdentifyMsg(c network.Conn, s network.Stream) error { + snapshot := ids.getSnapshot() mes := ids.createBaseIdentifyResponse(c, snapshot) sr := ids.getSignedRecord(snapshot) mes.SignedPeerRecord = sr diff --git a/p2p/protocol/identify/peer_loop.go b/p2p/protocol/identify/peer_loop.go index 15c7e85b68..589f04a471 100644 --- a/p2p/protocol/identify/peer_loop.go +++ b/p2p/protocol/identify/peer_loop.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "sync" "time" "github.com/libp2p/go-libp2p/core/network" @@ -29,23 +28,15 @@ type peerHandler struct { pid peer.ID - snapshotMu sync.RWMutex - snapshot *identifySnapshot - pushCh chan struct{} } func newPeerHandler(pid peer.ID, ids *idService) *peerHandler { - ph := &peerHandler{ - ids: ids, - pid: pid, - - snapshot: ids.getSnapshot(), - + return &peerHandler{ + ids: ids, + pid: pid, pushCh: make(chan struct{}, 1), } - - return ph } // start starts a handler. This may only be called on a stopped handler, and must @@ -63,7 +54,10 @@ func (ph *peerHandler) start(ctx context.Context, onExit func()) { ctx, cancel := context.WithCancel(ctx) ph.cancel = cancel - go ph.loop(ctx, onExit) + go func() { + ph.loop(ctx) + onExit() + }() } // stop stops a handler. This may not be called concurrently with any @@ -77,9 +71,7 @@ func (ph *peerHandler) stop() error { } // per peer loop for pushing updates -func (ph *peerHandler) loop(ctx context.Context, onExit func()) { - defer onExit() - +func (ph *peerHandler) loop(ctx context.Context) { for { select { // our listen addresses have changed, send an IDPush. @@ -104,11 +96,7 @@ func (ph *peerHandler) sendPush(ctx context.Context) error { } defer dp.Close() - snapshot := ph.ids.getSnapshot() - ph.snapshotMu.Lock() - ph.snapshot = snapshot - ph.snapshotMu.Unlock() - if err := ph.ids.writeChunkedIdentifyMsg(dp.Conn(), snapshot, dp); err != nil { + if err := ph.ids.writeChunkedIdentifyMsg(dp.Conn(), dp); err != nil { _ = dp.Reset() return fmt.Errorf("failed to send push message: %w", err) }