Skip to content
This repository has been archived by the owner on May 11, 2022. It is now read-only.

Commit

Permalink
Add option to force nat into a specified reachability state
Browse files Browse the repository at this point in the history
  • Loading branch information
willscott committed Mar 18, 2020
1 parent 4a4a0ff commit 849d407
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 21 deletions.
60 changes: 47 additions & 13 deletions autonat.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,57 @@ type AmbientAutoNAT struct {
emitReachabilityChanged event.Emitter
}

type StaticAutoNAT struct {
ctx context.Context
host host.Host
reachability network.Reachability
service *autoNATService
}

type autoNATResult struct {
network.Reachability
address ma.Multiaddr
}

// New creates a new NAT autodiscovery system attached to a host
func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
var err error
conf := new(config)
conf.host = h

if err := defaults(conf); err != nil {
if err = defaults(conf); err != nil {
return nil, err
}
if conf.addressFunc == nil {
conf.addressFunc = h.Addrs
}

for _, o := range options {
if err := o(conf); err != nil {
if err = o(conf); err != nil {
return nil, err
}
}
emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)

var service *autoNATService
if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil {
service, err = newAutoNATService(ctx, conf)
if err != nil {
return nil, err
}
}

if conf.forceReachability {
emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability})
return &StaticAutoNAT{
ctx: ctx,
host: h,
reachability: conf.reachability,
service: service,
}, nil
}

subAddrUpdated, _ := h.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated))
emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)

as := &AmbientAutoNAT{
ctx: ctx,
Expand All @@ -83,20 +109,13 @@ func New(ctx context.Context, h host.Host, options ...Option) (AutoNAT, error) {
subAddrUpdated: subAddrUpdated,

emitReachabilityChanged: emitReachabilityChanged,
service: service,
}
as.status.Store(autoNATResult{network.ReachabilityUnknown, nil})

h.Network().Notify(as)
go as.background()

if conf.dialer != nil {
var err error
as.service, err = newAutoNATService(ctx, conf)
if err != nil {
return nil, err
}
}

return as, nil
}

Expand All @@ -115,7 +134,7 @@ func (as *AmbientAutoNAT) emitStatus() {
func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) {
s := as.status.Load().(autoNATResult)
if s.Reachability != network.ReachabilityPublic {
return nil, errors.New("NAT Status is not public")
return nil, errors.New("NAT status is not public")
}

return s.address, nil
Expand Down Expand Up @@ -225,7 +244,7 @@ func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) {
if currentStatus.Reachability != network.ReachabilityPublic {
// we are flipping our NATStatus, so confidence drops to 0
as.confidence = 0
if as.service != nil && !as.config.forceServer {
if as.service != nil {
ctx, cancel := context.WithCancel(as.ctx)
go as.service.Enable(ctx)
as.serviceCancel = cancel
Expand Down Expand Up @@ -337,3 +356,18 @@ func shufflePeers(peers []peer.AddrInfo) {
peers[i], peers[j] = peers[j], peers[i]
}
}

func (s *StaticAutoNAT) Status() network.Reachability {
return s.reachability
}

func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) {
if s.reachability != network.ReachabilityPublic {
return nil, errors.New("NAT status is not public")
}
addrs := s.host.Addrs()
if len(addrs) > 0 {
return s.host.Addrs()[0], nil
}
return nil, errors.New("No available address")
}
17 changes: 17 additions & 0 deletions autonat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,20 @@ func TestAutoNATObservationRecording(t *testing.T) {
}

}

func TestStaticNat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
s, _ := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{})

nat, err := New(ctx, h, WithReachability(network.ReachabilityPrivate))
if err != nil {
t.Fatal(err)
}
if nat.Status() != network.ReachabilityPrivate {
t.Fatalf("should be private")
}
expectEvent(t, s, network.ReachabilityPrivate)
}
20 changes: 15 additions & 5 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
type config struct {
host host.Host

addressFunc AddrFunc
dialer network.Network
forceServer bool
addressFunc AddrFunc
dialer network.Network
forceReachability bool
reachability network.Reachability

// client
bootDelay time.Duration
Expand Down Expand Up @@ -52,13 +53,22 @@ var defaults = func(c *config) error {
// make parallel connections, and as such will modify both the associated peerstore
// and terminate connections of this dialer. The dialer provided
// should be compatible (TCP/UDP) however with the transports of the libp2p network.
func EnableService(dialer network.Network, forceServer bool) Option {
func EnableService(dialer network.Network) Option {
return func(c *config) error {
if dialer == c.host.Network() || dialer.Peerstore() == c.host.Peerstore() {
return errors.New("dialer should not be that of the host")
}
c.dialer = dialer
c.forceServer = forceServer
return nil
}
}

// WithReachability overrides autonat to simply report an over-ridden reachability
// status.
func WithReachability(reachability network.Reachability) Option {
return func(c *config) error {
c.forceReachability = true
c.reachability = reachability
return nil
}
}
Expand Down
12 changes: 11 additions & 1 deletion svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"net"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p-core/helpers"
Expand All @@ -29,6 +30,7 @@ type autoNATService struct {
config *config

// rate limiter
running uint32
mx sync.Mutex
reqs map[peer.ID]int
globalReqs int
Expand All @@ -46,7 +48,7 @@ func newAutoNATService(ctx context.Context, c *config) (*autoNATService, error)
reqs: make(map[peer.ID]int),
}

if c.forceServer {
if c.forceReachability {
go as.Enable(ctx)
}

Expand Down Expand Up @@ -212,6 +214,14 @@ func (as *autoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {

// Enable the autoNAT service temporarily until the associated context is canceled.
func (as *autoNATService) Enable(ctx context.Context) {
alreadyRunning := atomic.SwapUint32(&as.running, 1)
if alreadyRunning > 0 {
return
}
defer func() {
atomic.StoreUint32(&as.running, 0)
}()

as.config.host.SetStreamHandler(AutoNATProto, as.handleStream)

timer := time.NewTimer(as.config.throttleResetPeriod)
Expand Down
4 changes: 2 additions & 2 deletions svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func makeAutoNATConfig(ctx context.Context, t *testing.T) *config {
dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
c := config{host: h, dialer: dh.Network()}
_ = defaults(&c)
c.forceServer = true
c.forceReachability = true
return &c
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func TestAutoNATServiceStartup(t *testing.T) {

h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
dh := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
an, err := New(ctx, h, EnableService(dh.Network(), false))
an, err := New(ctx, h, EnableService(dh.Network()))
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 849d407

Please sign in to comment.