Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 5, 2024
1 parent 1b1d3bb commit b042795
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 80 deletions.
6 changes: 6 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ func (h *BasicHost) Start() {
h.psManager.Start()
h.refCount.Add(1)
h.ids.Start()
if h.autonatv2 != nil {
err := h.autonatv2.Start()
if err != nil {
log.Errorf("autonat v2 failed to start: %s", err)
}
}
go h.background()
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/black_hole_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (st blackHoleState) String() string {
type BlackHoleFilter struct {
// N is
// 1. The minimum number of completed dials required before evaluating black hole state
// 2. the minimum number of requests after which we probe the state of the black hole in
// 2. the minimum number of requests after which we probe the state of the black hole in
// blocked state
N int
// MinSuccesses is the minimum number of Success required in the last n dials
Expand Down
71 changes: 35 additions & 36 deletions p2p/protocol/autonatv2/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ type Request struct {

// Result is the result of the CheckReachability call
type Result struct {
// Idx is the index of the dialed address
Idx int
// Addr is the dialed address
Addr ma.Multiaddr
// Reachability of the dialed address
Expand All @@ -70,7 +68,6 @@ type Result struct {
// The server provides amplification attack prevention and rate limiting.
type AutoNAT struct {
host host.Host
sub event.Subscription

// for cleanly closing
ctx context.Context
Expand All @@ -82,10 +79,9 @@ type AutoNAT struct {

mx sync.Mutex
peers *peersMap

// allowAllAddrs enables using private and localhost addresses for reachability checks.
// allowPrivateAddrs enables using private and localhost addresses for reachability checks.
// This is only useful for testing.
allowAllAddrs bool
allowPrivateAddrs bool
}

// New returns a new AutoNAT instance.
Expand All @@ -98,47 +94,28 @@ func New(host host.Host, dialerHost host.Host, opts ...AutoNATOption) (*AutoNAT,
return nil, fmt.Errorf("failed to apply option: %w", err)
}
}
// Listen on event.EvtPeerProtocolsUpdated, event.EvtPeerConnectednessChanged
// event.EvtPeerIdentificationCompleted to maintain our set of autonat supporting peers.
sub, err := host.EventBus().Subscribe([]interface{}{
new(event.EvtPeerProtocolsUpdated),
new(event.EvtPeerConnectednessChanged),
new(event.EvtPeerIdentificationCompleted),
})
if err != nil {
return nil, fmt.Errorf("event subscription failed: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())
an := &AutoNAT{
host: host,
ctx: ctx,
cancel: cancel,
sub: sub,
srv: newServer(host, dialerHost, s),
cli: newClient(host),
allowAllAddrs: s.allowAllAddrs,
peers: newPeersMap(),
host: host,
ctx: ctx,
cancel: cancel,
srv: newServer(host, dialerHost, s),
cli: newClient(host),
allowPrivateAddrs: s.allowPrivateAddrs,
peers: newPeersMap(),
}
an.cli.RegisterDialBack()
an.srv.Enable()

an.wg.Add(1)
go an.background()
return an, nil
}

func (an *AutoNAT) background() {
func (an *AutoNAT) background(sub event.Subscription) {
for {
select {
case <-an.ctx.Done():
an.srv.Disable()
an.srv.Close()
an.sub.Close()
an.peers = nil
sub.Close()
an.wg.Done()
return
case e := <-an.sub.Out():
case e := <-sub.Out():
switch evt := e.(type) {
case event.EvtPeerProtocolsUpdated:
an.updatePeer(evt.Peer)
Expand All @@ -151,14 +128,36 @@ func (an *AutoNAT) background() {
}
}

func (an *AutoNAT) Start() error {
// Listen on event.EvtPeerProtocolsUpdated, event.EvtPeerConnectednessChanged
// event.EvtPeerIdentificationCompleted to maintain our set of autonat supporting peers.
sub, err := an.host.EventBus().Subscribe([]interface{}{
new(event.EvtPeerProtocolsUpdated),
new(event.EvtPeerConnectednessChanged),
new(event.EvtPeerIdentificationCompleted),
})
if err != nil {
return fmt.Errorf("event subscription failed: %w", err)
}
an.cli.Start()
an.srv.Start()

an.wg.Add(1)
go an.background(sub)
return nil
}

func (an *AutoNAT) Close() {
an.cancel()
an.wg.Wait()
an.srv.Close()
an.cli.Close()
an.peers = nil
}

// GetReachability makes a single dial request for checking reachability for requested addresses
func (an *AutoNAT) GetReachability(ctx context.Context, reqs []Request) (Result, error) {
if !an.allowAllAddrs {
if !an.allowPrivateAddrs {
for _, r := range reqs {
if !manet.IsPublicAddr(r.Addr) {
return Result{}, fmt.Errorf("private address cannot be verified by autonatv2: %s", r.Addr)
Expand Down
20 changes: 10 additions & 10 deletions p2p/protocol/autonatv2/autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func newAutoNAT(t testing.TB, dialer host.Host, opts ...AutoNATOption) *AutoNAT
if err != nil {
t.Error(err)
}
an.srv.Enable()
an.cli.RegisterDialBack()
an.Start()
t.Cleanup(an.Close)
return an
}

Expand Down Expand Up @@ -92,7 +92,7 @@ func TestAutoNATPrivateAddr(t *testing.T) {
}

func TestClientRequest(t *testing.T) {
an := newAutoNAT(t, nil, allowAllAddrs)
an := newAutoNAT(t, nil, allowPrivateAddrs)
defer an.Close()
defer an.host.Close()

Expand Down Expand Up @@ -127,7 +127,7 @@ func TestClientRequest(t *testing.T) {
}

func TestClientServerError(t *testing.T) {
an := newAutoNAT(t, nil, allowAllAddrs)
an := newAutoNAT(t, nil, allowPrivateAddrs)
defer an.Close()
defer an.host.Close()

Expand Down Expand Up @@ -184,7 +184,7 @@ func TestClientServerError(t *testing.T) {
}

func TestClientDataRequest(t *testing.T) {
an := newAutoNAT(t, nil, allowAllAddrs)
an := newAutoNAT(t, nil, allowPrivateAddrs)
defer an.Close()
defer an.host.Close()

Expand Down Expand Up @@ -299,7 +299,7 @@ func TestClientDataRequest(t *testing.T) {
}

func TestClientDialBacks(t *testing.T) {
an := newAutoNAT(t, nil, allowAllAddrs)
an := newAutoNAT(t, nil, allowPrivateAddrs)
defer an.Close()
defer an.host.Close()

Expand Down Expand Up @@ -605,10 +605,10 @@ func TestAreAddrsConsistency(t *testing.T) {
success: true,
},
{
name: "nat64 match",
name: "nat64",
localAddr: ma.StringCast("/ip6/1::1/tcp/12345"),
dialAddr: ma.StringCast("/ip4/1.2.3.4/tcp/23232"),
success: true,
success: false,
},
{
name: "simple mismatch",
Expand All @@ -623,8 +623,8 @@ func TestAreAddrsConsistency(t *testing.T) {
success: false,
},
{
name: "nat64 mismatch",
localAddr: ma.StringCast("/ip4/192.168.0.1/udp/12345/quic-v1"),
name: "dns",
localAddr: ma.StringCast("/dns/lib.p2p/udp/12345/quic-v1"),
dialAddr: ma.StringCast("/ip6/1::1/udp/123/quic-v1/"),
success: false,
},
Expand Down
33 changes: 25 additions & 8 deletions p2p/protocol/autonatv2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ func newClient(h host.Host) *client {
return &client{host: h, dialData: make([]byte, 4000), dialBackQueues: make(map[uint64]chan ma.Multiaddr)}
}

// RegisterDialBack registers the client to receive DialBack streams initiated by the server to send the nonce.
func (ac *client) RegisterDialBack() {
func (ac *client) Start() {
ac.host.SetStreamHandler(DialBackProtocol, ac.handleDialBack)
}

func (ac *client) Close() {
ac.host.RemoveStreamHandler(DialBackProtocol)
}

// GetReachability verifies address reachability with a AutoNAT v2 server p.
func (ac *client) GetReachability(ctx context.Context, p peer.ID, reqs []Request) (Result, error) {
ctx, cancel := context.WithTimeout(ctx, streamTimeout)
Expand Down Expand Up @@ -166,7 +169,7 @@ func (ac *client) newResult(resp *pb.DialResponse, reqs []Request, dialBackAddr
// the server is misinforming us about the address it successfully dialed
// either we received no dialback or the address on the dialback is inconsistent with
// what the server is telling us
return Result{}, fmt.Errorf("invalid repsonse: dialBackAddr: %s, respAddr: %s", dialBackAddr, addr)
return Result{}, fmt.Errorf("invalid response: dialBackAddr: %s, respAddr: %s", dialBackAddr, addr)
}
rch = network.ReachabilityPublic
case pb.DialStatus_E_DIAL_ERROR:
Expand All @@ -187,7 +190,6 @@ func (ac *client) newResult(resp *pb.DialResponse, reqs []Request, dialBackAddr
}

return Result{
Idx: idx,
Addr: addr,
Reachability: rch,
Status: resp.DialStatus,
Expand Down Expand Up @@ -293,11 +295,26 @@ func areAddrsConsistent(local, external ma.Multiaddr) bool {
}
for i := 0; i < len(localProtos); i++ {
if i == 0 {
if localProtos[i].Code == externalProtos[i].Code ||
localProtos[i].Code == ma.P_IP6 && externalProtos[i].Code == ma.P_IP4 /* NAT64 */ {
continue
switch externalProtos[i].Code {
case ma.P_DNS, ma.P_DNSADDR:
if localProtos[i].Code == ma.P_IP4 || localProtos[i].Code == ma.P_IP6 {
continue
}
return false
case ma.P_DNS4:
if localProtos[i].Code == ma.P_IP4 {
continue
}
return false
case ma.P_DNS6:
if localProtos[i].Code == ma.P_IP6 {
continue
}
return false
}
if localProtos[i].Code != externalProtos[i].Code {
return false
}
return false
} else {
if localProtos[i].Code != externalProtos[i].Code {
return false
Expand Down
8 changes: 4 additions & 4 deletions p2p/protocol/autonatv2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "time"

// autoNATSettings is used to configure AutoNAT
type autoNATSettings struct {
allowAllAddrs bool
allowPrivateAddrs bool
serverRPM int
serverPerPeerRPM int
serverDialDataRPM int
Expand All @@ -14,7 +14,7 @@ type autoNATSettings struct {

func defaultSettings() *autoNATSettings {
return &autoNATSettings{
allowAllAddrs: false,
allowPrivateAddrs: false,
// TODO: confirm rate limiting defaults
serverRPM: 20,
serverPerPeerRPM: 2,
Expand Down Expand Up @@ -42,7 +42,7 @@ func withDataRequestPolicy(drp dataRequestPolicyFunc) AutoNATOption {
}
}

func allowAllAddrs(s *autoNATSettings) error {
s.allowAllAddrs = true
func allowPrivateAddrs(s *autoNATSettings) error {
s.allowPrivateAddrs = true
return nil
}
10 changes: 3 additions & 7 deletions p2p/protocol/autonatv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newServer(host, dialer host.Host, s *autoNATSettings) *server {
dialerHost: dialer,
host: host,
dialDataRequestPolicy: s.dataRequestPolicy,
allowPrivateAddrs: s.allowAllAddrs,
allowPrivateAddrs: s.allowPrivateAddrs,
limiter: &rateLimiter{
RPM: s.serverRPM,
PerPeerRPM: s.serverPerPeerRPM,
Expand All @@ -54,16 +54,12 @@ func newServer(host, dialer host.Host, s *autoNATSettings) *server {
}

// Enable attaches the stream handler to the host.
func (as *server) Enable() {
func (as *server) Start() {
as.host.SetStreamHandler(DialProtocol, as.handleDialRequest)
}

// Disable removes the stream handles from the host.
func (as *server) Disable() {
as.host.RemoveStreamHandler(DialProtocol)
}

func (as *server) Close() {
as.host.RemoveStreamHandler(DialProtocol)
as.dialerHost.Close()
}

Expand Down
Loading

0 comments on commit b042795

Please sign in to comment.