Skip to content

Commit

Permalink
release v0.35.1 (#2834)
Browse files Browse the repository at this point in the history
* identify: Don't filter addr if remote is neither public nor private (#2820)

Updates the filterAddrs logic to no-op if the address is neither public nor private.

This fixes an issue in mocknet that assigns each node an address in the IPv6 discard prefix space. That doesn't interact well with this logic in identify.

The issue mocknet hits is that it filters out all received listen addresses and then doesn't remember any address for the peer.

* identify: fix bug in observed address handling (#2825)

* identify: add test for observed address handling (#2828)

This modifies TestObservedAddrManager to verify the fix in #2825

* libp2phttp: workaround for ResponseWriter's CloseNotifier (#2821)

* libp2phttp: workaround for CloseNotifier

* Add lintignore

* circuitv2: improve voucher validation (#2826)

* webrtc: fix ufrag prefix for dialing (#2832)

* webrtc: add a test for establishing many connections (#2801)

Update pion/ice to include the fix for out of order 
ConnectionState update callbacks

* release v0.35.1

---------

Co-authored-by: Marco Munizaga <git@marcopolo.io>
Co-authored-by: Ivan Shvedunov <ivan4th@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 12, 2024
1 parent f52cec1 commit ea57d61
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 41 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pion/datachannel v1.5.6
github.com/pion/ice/v2 v2.3.24
github.com/pion/ice/v2 v2.3.25
github.com/pion/logging v0.2.2
github.com/pion/sctp v1.8.16
github.com/pion/stun v0.6.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks=
github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI=
github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs=
github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
Expand Down
2 changes: 1 addition & 1 deletion p2p/http/libp2phttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type WellKnownHandler struct {

// streamHostListen returns a net.Listener that listens on libp2p streams for HTTP/1.1 messages.
func streamHostListen(streamHost host.Host) (net.Listener, error) {
return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect)
return gostream.Listen(streamHost, ProtocolIDForMultistreamSelect, gostream.IgnoreEOF())
}

func (h *WellKnownHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down
36 changes: 36 additions & 0 deletions p2p/http/libp2phttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,39 @@ func TestServerLegacyWellKnownResource(t *testing.T) {
}

}

func TestResponseWriterShouldNotHaveCancelledContext(t *testing.T) {
h, err := libp2p.New()
require.NoError(t, err)
defer h.Close()
httpHost := libp2phttp.Host{StreamHost: h}
go httpHost.Serve()
defer httpHost.Close()

closeNotifyCh := make(chan bool, 1)
httpHost.SetHTTPHandlerAtPath("/test", "/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Legacy code uses this to check if the connection was closed
//lint:ignore SA1019 This is a test to assert we do the right thing since Go HTTP stdlib depends on this.
ch := w.(http.CloseNotifier).CloseNotify()
select {
case <-ch:
closeNotifyCh <- true
case <-time.After(100 * time.Millisecond):
closeNotifyCh <- false
}
w.WriteHeader(http.StatusOK)
}))

clientH, err := libp2p.New()
require.NoError(t, err)
defer clientH.Close()
clientHost := libp2phttp.Host{StreamHost: clientH}

rt, err := clientHost.NewConstrainedRoundTripper(peer.AddrInfo{ID: h.ID(), Addrs: h.Addrs()})
require.NoError(t, err)
httpClient := &http.Client{Transport: rt}
_, err = httpClient.Get("/")
require.NoError(t, err)

require.False(t, <-closeNotifyCh)
}
16 changes: 13 additions & 3 deletions p2p/net/gostream/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gostream

import (
"context"
"io"
"net"

"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -14,11 +15,20 @@ import (
// libp2p streams.
type conn struct {
network.Stream
ignoreEOF bool
}

func (c *conn) Read(b []byte) (int, error) {
n, err := c.Stream.Read(b)
if err != nil && c.ignoreEOF && err == io.EOF {
return n, nil
}
return n, err
}

// newConn creates a conn given a libp2p stream
func newConn(s network.Stream) net.Conn {
return &conn{s}
func newConn(s network.Stream, ignoreEOF bool) net.Conn {
return &conn{s, ignoreEOF}
}

// LocalAddr returns the local network address.
Expand All @@ -39,5 +49,5 @@ func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.C
if err != nil {
return nil, err
}
return newConn(s), nil
return newConn(s, false), nil
}
22 changes: 20 additions & 2 deletions p2p/net/gostream/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type listener struct {
tag protocol.ID
cancel func()
streamCh chan network.Stream
// ignoreEOF is a flag that tells the listener to return conns that ignore EOF errors.
// Necessary because the default responsewriter will consider a connection closed if it reads EOF.
// But when on streams, it's fine for us to read EOF, but still be able to write.
ignoreEOF bool
}

// Accept returns the next a connection to this listener.
Expand All @@ -26,7 +30,7 @@ type listener struct {
func (l *listener) Accept() (net.Conn, error) {
select {
case s := <-l.streamCh:
return newConn(s), nil
return newConn(s, l.ignoreEOF), nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
Expand All @@ -48,7 +52,7 @@ func (l *listener) Addr() net.Addr {
// Listen provides a standard net.Listener ready to accept "connections".
// Under the hood, these connections are libp2p streams tagged with the
// given protocol.ID.
func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
func Listen(h host.Host, tag protocol.ID, opts ...ListenerOption) (net.Listener, error) {
ctx, cancel := context.WithCancel(context.Background())

l := &listener{
Expand All @@ -58,6 +62,11 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
tag: tag,
streamCh: make(chan network.Stream),
}
for _, opt := range opts {
if err := opt(l); err != nil {
return nil, err
}
}

h.SetStreamHandler(tag, func(s network.Stream) {
select {
Expand All @@ -69,3 +78,12 @@ func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {

return l, nil
}

type ListenerOption func(*listener) error

func IgnoreEOF() ListenerOption {
return func(l *listener) error {
l.ignoreEOF = true
return nil
}
}
28 changes: 23 additions & 5 deletions p2p/protocol/circuitv2/client/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,
}

if msg.GetType() != pbv2.HopMessage_STATUS {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType()),
err: err}
return nil, ReservationError{Status: pbv2.Status_MALFORMED_MESSAGE, Reason: fmt.Sprintf("unexpected relay response: not a status message (%d)", msg.GetType())}
}

if status := msg.GetStatus(); status != pbv2.Status_OK {
Expand Down Expand Up @@ -130,7 +127,7 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,

voucherBytes := rsvp.GetVoucher()
if voucherBytes != nil {
_, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain)
env, rec, err := record.ConsumeEnvelope(voucherBytes, proto.RecordDomain)
if err != nil {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Expand All @@ -146,6 +143,27 @@ func Reserve(ctx context.Context, h host.Host, ai peer.AddrInfo) (*Reservation,
Reason: fmt.Sprintf("unexpected voucher record type: %+T", rec),
}
}
signerPeerID, err := peer.IDFromPublicKey(env.PublicKey)
if err != nil {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("invalid voucher signing public key: %s", err),
err: err,
}
}
if signerPeerID != voucher.Relay {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("invalid voucher relay id: expected %s, got %s", signerPeerID, voucher.Relay),
}
}
if h.ID() != voucher.Peer {
return nil, ReservationError{
Status: pbv2.Status_MALFORMED_MESSAGE,
Reason: fmt.Sprintf("invalid voucher peer id: expected %s, got %s", h.ID(), voucher.Peer),
}

}
result.Voucher = voucher
}

Expand Down
42 changes: 42 additions & 0 deletions p2p/protocol/circuitv2/client/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/record"
"github.com/libp2p/go-libp2p/core/test"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
pbv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
Expand Down Expand Up @@ -84,6 +87,45 @@ func TestReservationFailures(t *testing.T) {
err: "error consuming voucher envelope: failed when unmarshalling the envelope",
status: pbv2.Status_MALFORMED_MESSAGE,
},
{
name: "invalid voucher 2",
streamHandler: func(s network.Stream) {
status := pbv2.Status_OK
expire := uint64(time.Now().Add(time.Hour).UnixNano())
priv, _, err := test.RandTestKeyPair(crypto.Ed25519, 256)
if err != nil {
s.Reset()
return
}
relay, _ := test.RandPeerID()
peer, _ := test.RandPeerID()
voucher := &proto.ReservationVoucher{
Relay: relay,
Peer: peer,
Expiration: time.Now().Add(time.Hour),
}
signedVoucher, err := record.Seal(voucher, priv)
if err != nil {
s.Reset()
return
}
env, err := signedVoucher.Marshal()
if err != nil {
s.Reset()
return
}
util.NewDelimitedWriter(s).WriteMsg(&pbv2.HopMessage{
Type: pbv2.HopMessage_STATUS.Enum(),
Status: &status,
Reservation: &pbv2.Reservation{
Expire: &expire,
Voucher: env,
},
})
},
err: "invalid voucher relay id",
status: pbv2.Status_MALFORMED_MESSAGE,
},
}

for _, tc := range testcases {
Expand Down
23 changes: 14 additions & 9 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
ids.Host.Peerstore().UpdateAddrs(p, peerstore.TempAddrTTL, 0)
ids.addrMu.Unlock()

log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), lmaddrs)
log.Debugf("%s received listen addrs for %s: %s", c.LocalPeer(), c.RemotePeer(), addrs)

// get protocol versions
pv := mes.GetProtocolVersion()
Expand Down Expand Up @@ -1064,18 +1064,23 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) {
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}

// filterAddrs filters the address slice based on the remove multiaddr:
// * if it's a localhost address, no filtering is applied
// * if it's a local network address, all localhost addresses are filtered out
// * if it's a public address, all localhost and local network addresses are filtered out
// filterAddrs filters the address slice based on the remote multiaddr:
// - if it's a localhost address, no filtering is applied
// - if it's a private network address, all localhost addresses are filtered out
// - if it's a public address, all non-public addresses are filtered out
// - if none of the above, (e.g. discard prefix), no filtering is applied.
// We can't do anything meaningful here so we do nothing.
func filterAddrs(addrs []ma.Multiaddr, remote ma.Multiaddr) []ma.Multiaddr {
if manet.IsIPLoopback(remote) {
switch {
case manet.IsIPLoopback(remote):
return addrs
}
if manet.IsPrivateAddr(remote) {
case manet.IsPrivateAddr(remote):
return ma.FilterAddrs(addrs, func(a ma.Multiaddr) bool { return !manet.IsIPLoopback(a) })
case manet.IsPublicAddr(remote):
return ma.FilterAddrs(addrs, manet.IsPublicAddr)
default:
return addrs
}
return ma.FilterAddrs(addrs, manet.IsPublicAddr)
}

func trimHostAddrList(addrs []ma.Multiaddr, maxSize int) []ma.Multiaddr {
Expand Down
11 changes: 6 additions & 5 deletions p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) {
o.mu.Lock()
defer o.mu.Unlock()

observedTWAddr, ok := o.connObservedTWAddrs[conn]
if !ok {
return
}
delete(o.connObservedTWAddrs, conn)

// normalize before obtaining the thinWaist so that we are always dealing
// with the normalized form of the address
localTW, err := thinWaistForm(o.normalize(conn.LocalMultiaddr()))
Expand All @@ -467,11 +473,6 @@ func (o *ObservedAddrManager) removeConn(conn connMultiaddrs) {
delete(o.localAddrs, string(localTW.Addr.Bytes()))
}

observedTWAddr, ok := o.connObservedTWAddrs[conn]
if !ok {
return
}
delete(o.connObservedTWAddrs, conn)
observer, err := getObserver(conn.RemoteMultiaddr())
if err != nil {
return
Expand Down
Loading

0 comments on commit ea57d61

Please sign in to comment.