Skip to content

Commit

Permalink
swarm: add mock net.PacketConn for testing
Browse files Browse the repository at this point in the history
This uses https://github.com/tailscale/tailscale/tree/main/tstest/natlab
to provide a mock net.PacketConn. This allows us to fake quic addresses
on nodes to test for address discovery. This can also be used to test
nat traversal.
  • Loading branch information
sukunrt committed Jul 26, 2023
1 parent 260b969 commit 8541839
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 365 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ require (
golang.org/x/sys v0.10.0
golang.org/x/tools v0.11.0
google.golang.org/protobuf v1.30.0
tailscale.com v1.40.1
)

require (
Expand All @@ -75,7 +76,6 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand All @@ -99,8 +99,8 @@ require (
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/common v0.41.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
Expand Down
355 changes: 11 additions & 344 deletions go.sum

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"net/netip"
"reflect"
"sort"
"strings"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"tailscale.com/tstest/natlab"

ma "github.com/multiformats/go-multiaddr"

Expand Down Expand Up @@ -892,5 +894,50 @@ func TestInferWebtransportAddrsFromQuic(t *testing.T) {
})

}
}

func TestAddressDiscovery(t *testing.T) {
inet := &natlab.Network{Name: "internet", Prefix4: netip.MustParsePrefix("1.2.3.0/24")}
type TestHost struct {
H *BasicHost
Addr ma.Multiaddr
}
port := 1000
newHost := func(name string) TestHost {
m := &natlab.Machine{Name: name}
mif := m.Attach("eth0", inet)
port++
listenAddr := ma.StringCast(fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", port))
publicAddr := ma.StringCast(fmt.Sprintf("/ip4/%s/udp/%d/quic-v1", mif.V4().String(), port))
h, err := NewHost(swarmt.GenSwarm(t, swarmt.OptQUICListenAddress(listenAddr), swarmt.OptUDPTransport(swarmt.UDPTransport(m))), nil)
if err != nil {
t.Fatal(err)
}
return TestHost{H: h, Addr: publicAddr}
}

var peers []TestHost
for i := 0; i < identify.ActivationThresh; i++ {
h := newHost(fmt.Sprintf("peer-%d", i))
h.H.IDService().Start()
peers = append(peers, h)
}

h := newHost("host")
h.H.IDService().Start()
for _, p := range peers {
ctx := network.WithDialPeerTimeout(context.Background(), 1*time.Second)
h.H.Peerstore().AddAddr(p.H.ID(), p.Addr, peerstore.TempAddrTTL)
err := h.H.Connect(ctx, peer.AddrInfo{ID: p.H.ID()})
if err != nil {
t.Fatal(err)
}
}

require.Eventually(t,
func() bool {
return ma.Contains(h.H.Addrs(), h.Addr)
},
5*time.Second,
100*time.Millisecond)
}
45 changes: 42 additions & 3 deletions p2p/net/swarm/testing/testing.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package testing

import (
"context"
"crypto/rand"
"net"
"testing"
"time"

Expand All @@ -24,6 +26,7 @@ import (
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"tailscale.com/tstest/natlab"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
Expand All @@ -38,6 +41,8 @@ type config struct {
sk crypto.PrivKey
swarmOpts []swarm.Option
eventBus event.Bus
quicListenAddr ma.Multiaddr
udpTransport quicreuse.UDPTransport
clock
}

Expand Down Expand Up @@ -101,6 +106,18 @@ func OptPeerPrivateKey(sk crypto.PrivKey) Option {
}
}

func OptQUICListenAddress(addr ma.Multiaddr) Option {
return func(_ *testing.T, c *config) {
c.quicListenAddr = addr
}
}

func OptUDPTransport(tr quicreuse.UDPTransport) Option {
return func(_ *testing.T, c *config) {
c.udpTransport = tr
}
}

func EventBus(b event.Bus) Option {
return func(_ *testing.T, c *config) {
c.eventBus = b
Expand Down Expand Up @@ -175,7 +192,7 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
}
}
if !cfg.disableQUIC {
reuse, err := quicreuse.NewConnManager([32]byte{})
reuse, err := quicreuse.NewConnManager([32]byte{}, quicreuse.WithUDPTransport(cfg.udpTransport))
if err != nil {
t.Fatal(err)
}
Expand All @@ -187,8 +204,14 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
t.Fatal(err)
}
if !cfg.dialOnly {
if err := s.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic")); err != nil {
t.Fatal(err)
if cfg.quicListenAddr == nil {
if err := s.Listen(ma.StringCast("/ip4/127.0.0.1/udp/0/quic")); err != nil {
t.Fatal(err)
}
} else {
if err := s.Listen(cfg.quicListenAddr); err != nil {
t.Fatal(err)
}
}
}
}
Expand All @@ -198,6 +221,22 @@ func GenSwarm(t *testing.T, opts ...Option) *swarm.Swarm {
return s
}

type udpTransport struct {
listenPacket func(network string, laddr *net.UDPAddr) (net.PacketConn, error)
}

func (u udpTransport) ListenPacket(network string, laddr *net.UDPAddr) (net.PacketConn, error) {
return u.listenPacket(network, laddr)
}

func UDPTransport(m *natlab.Machine) quicreuse.UDPTransport {
tr := udpTransport{}
tr.listenPacket = func(network string, laddr *net.UDPAddr) (net.PacketConn, error) {
return m.ListenPacket(context.Background(), network, laddr.String())
}
return tr
}

// DivulgeAddresses adds swarm a's addresses to swarm b's peerstore.
func DivulgeAddresses(a, b network.Network) {
id := a.LocalPeer()
Expand Down
22 changes: 18 additions & 4 deletions p2p/transport/quicreuse/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ type ConnManager struct {
quicListenersMu sync.Mutex
quicListeners map[string]quicListenerEntry

srk quic.StatelessResetKey
mt *metricsTracer
srk quic.StatelessResetKey
mt *metricsTracer
udpTransport UDPTransport
}

type quicListenerEntry struct {
Expand Down Expand Up @@ -67,12 +68,15 @@ func NewConnManager(statelessResetKey quic.StatelessResetKey, opts ...Option) (*
if !cm.enableDraft29 {
serverConfig.Versions = []quic.VersionNumber{quic.Version1}
}
if cm.udpTransport == nil {
cm.udpTransport = defaultUDPTranport{}
}

cm.clientConfig = quicConf
cm.serverConfig = serverConfig
if cm.enableReuseport {
cm.reuseUDP4 = newReuse(&statelessResetKey, cm.mt)
cm.reuseUDP6 = newReuse(&statelessResetKey, cm.mt)
cm.reuseUDP4 = newReuse(&statelessResetKey, cm.mt, cm.udpTransport)
cm.reuseUDP6 = newReuse(&statelessResetKey, cm.mt, cm.udpTransport)
}
return cm, nil
}
Expand Down Expand Up @@ -246,6 +250,16 @@ func (c *ConnManager) Close() error {
return c.reuseUDP4.Close()
}

type UDPTransport interface {
ListenPacket(network string, laddr *net.UDPAddr) (net.PacketConn, error)
}

type defaultUDPTranport struct{}

func (defaultUDPTranport) ListenPacket(network string, laddr *net.UDPAddr) (net.PacketConn, error) {
return listenAndOptimize(network, laddr)
}

// listenAndOptimize same as net.ListenUDP, but also calls quic.OptimizeConn
func listenAndOptimize(network string, laddr *net.UDPAddr) (net.PacketConn, error) {
conn, err := net.ListenUDP(network, laddr)
Expand Down
7 changes: 7 additions & 0 deletions p2p/transport/quicreuse/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,10 @@ func EnableMetrics() Option {
return nil
}
}

func WithUDPTransport(tr UDPTransport) Option {
return func(m *ConnManager) error {
m.udpTransport = tr
return nil
}
}
6 changes: 4 additions & 2 deletions p2p/transport/quicreuse/reuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,10 @@ type reuse struct {

statelessResetKey *quic.StatelessResetKey
metricsTracer *metricsTracer
udpTransport UDPTransport
}

func newReuse(srk *quic.StatelessResetKey, mt *metricsTracer) *reuse {
func newReuse(srk *quic.StatelessResetKey, mt *metricsTracer, tr UDPTransport) *reuse {
r := &reuse{
unicast: make(map[string]map[int]*refcountedTransport),
globalListeners: make(map[int]*refcountedTransport),
Expand All @@ -137,6 +138,7 @@ func newReuse(srk *quic.StatelessResetKey, mt *metricsTracer) *reuse {
gcStopChan: make(chan struct{}),
statelessResetKey: srk,
metricsTracer: mt,
udpTransport: tr,
}
go r.gc()
return r
Expand Down Expand Up @@ -314,7 +316,7 @@ func (r *reuse) TransportForListen(network string, laddr *net.UDPAddr) (*refcoun
}
}

conn, err := listenAndOptimize(network, laddr)
conn, err := r.udpTransport.ListenPacket(network, laddr)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions p2p/transport/quicreuse/reuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func cleanup(t *testing.T, reuse *reuse) {
}

func TestReuseListenOnAllIPv4(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil, nil, defaultUDPTranport{})
require.Eventually(t, isGarbageCollectorRunning, 500*time.Millisecond, 50*time.Millisecond, "expected garbage collector to be running")
cleanup(t, reuse)

Expand All @@ -73,7 +73,7 @@ func TestReuseListenOnAllIPv4(t *testing.T) {
}

func TestReuseListenOnAllIPv6(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil, nil, defaultUDPTranport{})
require.Eventually(t, isGarbageCollectorRunning, 500*time.Millisecond, 50*time.Millisecond, "expected garbage collector to be running")
cleanup(t, reuse)

Expand All @@ -86,7 +86,7 @@ func TestReuseListenOnAllIPv6(t *testing.T) {
}

func TestReuseCreateNewGlobalConnOnDial(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil, nil, defaultUDPTranport{})
cleanup(t, reuse)

addr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expand All @@ -100,7 +100,7 @@ func TestReuseCreateNewGlobalConnOnDial(t *testing.T) {
}

func TestReuseConnectionWhenDialing(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil, nil, defaultUDPTranport{})
cleanup(t, reuse)

addr, err := net.ResolveUDPAddr("udp4", "0.0.0.0:0")
Expand All @@ -117,7 +117,7 @@ func TestReuseConnectionWhenDialing(t *testing.T) {
}

func TestReuseConnectionWhenListening(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil, nil, defaultUDPTranport{})
cleanup(t, reuse)

raddr, err := net.ResolveUDPAddr("udp4", "1.1.1.1:1234")
Expand All @@ -132,7 +132,7 @@ func TestReuseConnectionWhenListening(t *testing.T) {
}

func TestReuseConnectionWhenDialBeforeListen(t *testing.T) {
reuse := newReuse(nil, nil)
reuse := newReuse(nil, nil, defaultUDPTranport{})
cleanup(t, reuse)

// dial any address
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestReuseListenOnSpecificInterface(t *testing.T) {
if platformHasRoutingTables() {
t.Skip("this test only works on platforms that support routing tables")
}
reuse := newReuse(nil, nil)
reuse := newReuse(nil, nil, defaultUDPTranport{})
cleanup(t, reuse)

router, err := netroute.New()
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestReuseGarbageCollect(t *testing.T) {
maxUnusedDuration = 10 * maxUnusedDuration
}

reuse := newReuse(nil, nil)
reuse := newReuse(nil, nil, defaultUDPTranport{})
cleanup(t, reuse)

numGlobals := func() int {
Expand Down
3 changes: 2 additions & 1 deletion test-plans/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwU
github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk=
github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
Expand Down Expand Up @@ -456,3 +456,4 @@ lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
tailscale.com v1.40.1 h1:NdIUKoD6DHXZY7SnMN85ERPbXZsx6ipiRa6odnrYZtU=

0 comments on commit 8541839

Please sign in to comment.