Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autonat: don't use autonat for address discovery #2148

Merged
merged 2 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 41 additions & 68 deletions p2p/host/autonat/autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package autonat

import (
"context"
"errors"
"math/rand"
"sync/atomic"
"time"
Expand All @@ -20,6 +19,8 @@ import (

var log = logging.Logger("autonat")

const maxConfidence = 3

// AmbientAutoNAT is the implementation of ambient NAT autodiscovery
type AmbientAutoNAT struct {
host host.Host
Expand All @@ -31,9 +32,9 @@ type AmbientAutoNAT struct {
backgroundRunning chan struct{} // is closed when the background go routine exits

inboundConn chan network.Conn
observations chan autoNATResult
observations chan network.Reachability
// status is an autoNATResult reflecting current status.
status atomic.Pointer[autoNATResult]
status atomic.Pointer[network.Reachability]
// Reflects the confidence on of the NATStatus being private, as a single
// dialback may fail for reasons unrelated to NAT.
// If it is <3, then multiple autoNAT peers may be contacted for dialback
Expand All @@ -58,11 +59,6 @@ type StaticAutoNAT struct {
service *autoNATService
}

type autoNATResult struct {
network.Reachability
address ma.Multiaddr
}

// New creates a new NAT autodiscovery system attached to a host
func New(h host.Host, options ...Option) (AutoNAT, error) {
var err error
Expand Down Expand Up @@ -111,13 +107,14 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {
host: h,
config: conf,
inboundConn: make(chan network.Conn, 5),
observations: make(chan autoNATResult, 1),
observations: make(chan network.Reachability, 1),

emitReachabilityChanged: emitReachabilityChanged,
service: service,
recentProbes: make(map[peer.ID]time.Time),
}
as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil})
reachability := network.ReachabilityUnknown
as.status.Store(&reachability)

subscriber, err := as.host.EventBus().Subscribe(
[]any{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)},
Expand All @@ -137,25 +134,15 @@ func New(h host.Host, options ...Option) (AutoNAT, error) {
// Status returns the AutoNAT observed reachability status.
func (as *AmbientAutoNAT) Status() network.Reachability {
s := as.status.Load()
return s.Reachability
return *s
}

func (as *AmbientAutoNAT) emitStatus() {
status := as.status.Load()
as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability})
status := *as.status.Load()
as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status})
if as.metricsTracer != nil {
as.metricsTracer.ReachabilityStatus(status.Reachability)
}
}

// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
s := as.status.Load()
if s.Reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
as.metricsTracer.ReachabilityStatus(status)
}

return s.address, nil
}

func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool {
Expand All @@ -174,7 +161,6 @@ func (as *AmbientAutoNAT) background() {
// before starting autodetection
delay := as.config.bootDelay

var lastAddrUpdated time.Time
subChan := as.subscriber.Out()
defer as.subscriber.Close()
defer as.emitReachabilityChanged.Close()
Expand All @@ -187,10 +173,6 @@ func (as *AmbientAutoNAT) background() {
// new inbound connection.
case conn := <-as.inboundConn:
localAddrs := as.host.Addrs()
ca := as.status.Load()
if ca.address != nil {
localAddrs = append(localAddrs, ca.address)
}
if manet.IsPublicAddr(conn.RemoteMultiaddr()) &&
!ipInList(conn.RemoteMultiaddr(), localAddrs) {
as.lastInbound = time.Now()
Expand All @@ -199,16 +181,15 @@ func (as *AmbientAutoNAT) background() {
case e := <-subChan:
switch e := e.(type) {
case event.EvtLocalAddressesUpdated:
if !lastAddrUpdated.Add(time.Second).After(time.Now()) {
lastAddrUpdated = time.Now()
if as.confidence > 1 {
as.confidence--
}
// On local address update, reduce confidence from maximum so that we schedule
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a local address update?

Copy link
Member Author

@sukunrt sukunrt Feb 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is any change in the host address list(host.AllAddrs()). I used the phrase from the event EvtLocalAddressesUpdated.

I didn't change this logic much since it was not related to this PR, but the correct thing to do here would be to only reduce confidence if there are any addresses removed from the host address list and our status is public or if there are addresses added and our status is private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is fine. Sorry, I messed this up, didn't realize this was literally in the name of the event ;)

// the next probe sooner
if as.confidence == maxConfidence {
as.confidence--
}
case event.EvtPeerIdentificationCompleted:
if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 {
currentStatus := as.status.Load()
if currentStatus.Reachability == network.ReachabilityUnknown {
currentStatus := *as.status.Load()
if currentStatus == network.ReachabilityUnknown {
as.tryProbe(e.Peer)
}
}
Expand Down Expand Up @@ -256,7 +237,7 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
// * recent inbound connections (implying continued connectivity) should decrease the retry when public
// * recent inbound connections when not public mean we should try more actively to see if we're public.
fixedNow := time.Now()
currentStatus := as.status.Load()
currentStatus := *as.status.Load()

nextProbe := fixedNow
// Don't look for peers in the peer store more than once per second.
Expand All @@ -268,13 +249,13 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
}
if !as.lastProbe.IsZero() {
untilNext := as.config.refreshInterval
if currentStatus.Reachability == network.ReachabilityUnknown {
if currentStatus == network.ReachabilityUnknown {
untilNext = as.config.retryInterval
} else if as.confidence < 3 {
} else if as.confidence < maxConfidence {
untilNext = as.config.retryInterval
} else if currentStatus.Reachability == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
} else if currentStatus == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
untilNext *= 2
} else if currentStatus.Reachability != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
} else if currentStatus != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) {
untilNext /= 5
}

Expand All @@ -289,34 +270,35 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration {
}

// Update the current status based on an observed result.
func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
currentStatus := as.status.Load()
func (as *AmbientAutoNAT) recordObservation(observation network.Reachability) {
currentStatus := *as.status.Load()

if observation.Reachability == network.ReachabilityPublic {
log.Debugf("NAT status is public")
if observation == network.ReachabilityPublic {
changed := false
if currentStatus.Reachability != network.ReachabilityPublic {
if currentStatus != network.ReachabilityPublic {
// Aggressively switch to public from other states ignoring confidence
log.Debugf("NAT status is public")

// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
if as.service != nil {
as.service.Enable()
}
changed = true
} else if as.confidence < 3 {
} else if as.confidence < maxConfidence {
as.confidence++
}
as.status.Store(&observation)
if changed {
as.emitStatus()
}
} else if observation.Reachability == network.ReachabilityPrivate {
log.Debugf("NAT status is private")
if currentStatus.Reachability != network.ReachabilityPrivate {
} else if observation == network.ReachabilityPrivate {
if currentStatus != network.ReachabilityPrivate {
if as.confidence > 0 {
as.confidence--
} else {
log.Debugf("NAT status is private")

// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
as.status.Store(&observation)
Expand All @@ -325,7 +307,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
}
as.emitStatus()
}
} else if as.confidence < 3 {
} else if as.confidence < maxConfidence {
as.confidence++
as.status.Store(&observation)
}
Expand All @@ -334,8 +316,8 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
as.confidence--
} else {
log.Debugf("NAT status is unknown")
as.status.Store(&autoNATResult{network.ReachabilityUnknown, nil})
if currentStatus.Reachability != network.ReachabilityUnknown {
as.status.Store(&observation)
if currentStatus != network.ReachabilityUnknown {
if as.service != nil {
as.service.Enable()
}
Expand Down Expand Up @@ -376,19 +358,18 @@ func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) {
ctx, cancel := context.WithTimeout(as.ctx, as.config.requestTimeout)
defer cancel()

a, err := cli.DialBack(ctx, pi.ID)
err := cli.DialBack(ctx, pi.ID)

var result autoNATResult
var result network.Reachability
switch {
case err == nil:
log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String())
result.Reachability = network.ReachabilityPublic
result.address = a
log.Debugf("Dialback through %s successful", pi.ID.Pretty())
result = network.ReachabilityPublic
case IsDialError(err):
log.Debugf("Dialback through %s failed", pi.ID.Pretty())
result.Reachability = network.ReachabilityPrivate
result = network.ReachabilityPrivate
default:
result.Reachability = network.ReachabilityUnknown
result = network.ReachabilityUnknown
}

select {
Expand Down Expand Up @@ -455,14 +436,6 @@ func (s *StaticAutoNAT) Status() network.Reachability {
return s.reachability
}

// PublicAddr returns the publicly connectable Multiaddr of this node if one is known.
func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) {
if s.reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
}
return nil, errors.New("no available address")
}

func (s *StaticAutoNAT) Close() error {
if s.service != nil {
s.service.Disable()
Expand Down
20 changes: 8 additions & 12 deletions p2p/host/autonat/autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/libp2p/go-msgio/pbio"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -209,46 +208,43 @@ func TestAutoNATObservationRecording(t *testing.T) {
t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err)
}

addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234")
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
an.recordObservation(network.ReachabilityPublic)
if an.Status() != network.ReachabilityPublic {
t.Fatalf("failed to transition to public.")
}

expectEvent(t, s, network.ReachabilityPublic, 3*time.Second)

// a single recording should have confidence still at 0, and transition to private quickly.
an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil})
an.recordObservation(network.ReachabilityPrivate)
if an.Status() != network.ReachabilityPrivate {
t.Fatalf("failed to transition to private.")
}

expectEvent(t, s, network.ReachabilityPrivate, 3*time.Second)

// stronger public confidence should be harder to undo.
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
an.recordObservation(autoNATResult{network.ReachabilityPublic, addr})
an.recordObservation(network.ReachabilityPublic)
an.recordObservation(network.ReachabilityPublic)
if an.Status() != network.ReachabilityPublic {
t.Fatalf("failed to transition to public.")
}

expectEvent(t, s, network.ReachabilityPublic, 3*time.Second)

an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil})
an.recordObservation(network.ReachabilityPrivate)
if an.Status() != network.ReachabilityPublic {
t.Fatalf("too-extreme private transition.")
}

// don't emit events on observed address change
newAddr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/12345")
an.recordObservation(autoNATResult{network.ReachabilityPublic, newAddr})
// Don't emit events if reachability hasn't changed
an.recordObservation(network.ReachabilityPublic)
if an.Status() != network.ReachabilityPublic {
t.Fatalf("reachability should stay public")
}
select {
case <-s.Out():
t.Fatal("received event without state transition")
case <-time.After(300 * time.Millisecond):
case <-time.After(1 * time.Second):
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
21 changes: 9 additions & 12 deletions p2p/host/autonat/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autonat/pb"

"github.com/libp2p/go-msgio/pbio"

ma "github.com/multiformats/go-multiaddr"
)

// NewAutoNATClient creates a fresh instance of an AutoNATClient
Expand All @@ -36,22 +34,22 @@ type client struct {
// Note: A returned error Message_E_DIAL_ERROR does not imply that the server
// actually performed a dial attempt. Servers that run a version < v0.20.0 also
// return Message_E_DIAL_ERROR if the dial was skipped due to the dialPolicy.
func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error) {
func (c *client) DialBack(ctx context.Context, p peer.ID) error {
s, err := c.h.NewStream(ctx, p, AutoNATProto)
if err != nil {
return nil, err
return err
}

if err := s.Scope().SetService(ServiceName); err != nil {
log.Debugf("error attaching stream to autonat service: %s", err)
s.Reset()
return nil, err
return err
}

if err := s.Scope().ReserveMemory(maxMsgSize, network.ReservationPriorityAlways); err != nil {
log.Debugf("error reserving memory for autonat stream: %s", err)
s.Reset()
return nil, err
return err
}
defer s.Scope().ReleaseMemory(maxMsgSize)

Expand All @@ -66,17 +64,17 @@ func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error)
req := newDialMessage(peer.AddrInfo{ID: c.h.ID(), Addrs: c.addrFunc()})
if err := w.WriteMsg(req); err != nil {
s.Reset()
return nil, err
return err
}

var res pb.Message
if err := r.ReadMsg(&res); err != nil {
s.Reset()
return nil, err
return err
}
if res.GetType() != pb.Message_DIAL_RESPONSE {
s.Reset()
return nil, fmt.Errorf("unexpected response: %s", res.GetType().String())
return fmt.Errorf("unexpected response: %s", res.GetType().String())
}

status := res.GetDialResponse().GetStatus()
Expand All @@ -85,10 +83,9 @@ func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error)
}
switch status {
case pb.Message_OK:
addr := res.GetDialResponse().GetAddr()
return ma.NewMultiaddrBytes(addr)
return nil
default:
return nil, Error{Status: status, Text: res.GetDialResponse().GetStatusText()}
return Error{Status: status, Text: res.GetDialResponse().GetStatusText()}
}
}

Expand Down
Loading