-
Notifications
You must be signed in to change notification settings - Fork 1k
/
nat_emitter.go
119 lines (108 loc) · 3.06 KB
/
nat_emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package identify
import (
"context"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
)
type natEmitter struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
reachabilitySub event.Subscription
reachability network.Reachability
eventInterval time.Duration
currentUDPNATDeviceType network.NATDeviceType
currentTCPNATDeviceType network.NATDeviceType
emitNATDeviceTypeChanged event.Emitter
observedAddrMgr *ObservedAddrManager
}
func newNATEmitter(h host.Host, o *ObservedAddrManager, eventInterval time.Duration) (*natEmitter, error) {
ctx, cancel := context.WithCancel(context.Background())
n := &natEmitter{
observedAddrMgr: o,
ctx: ctx,
cancel: cancel,
eventInterval: eventInterval,
}
reachabilitySub, err := h.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged), eventbus.Name("identify (nat emitter)"))
if err != nil {
return nil, fmt.Errorf("failed to subscribe to reachability event: %s", err)
}
n.reachabilitySub = reachabilitySub
emitter, err := h.EventBus().Emitter(new(event.EvtNATDeviceTypeChanged), eventbus.Stateful)
if err != nil {
return nil, fmt.Errorf("failed to create emitter for NATDeviceType: %s", err)
}
n.emitNATDeviceTypeChanged = emitter
n.wg.Add(1)
go n.worker()
return n, nil
}
func (n *natEmitter) worker() {
defer n.wg.Done()
subCh := n.reachabilitySub.Out()
ticker := time.NewTicker(n.eventInterval)
pendingUpdate := false
enoughTimeSinceLastUpdate := true
for {
select {
case evt, ok := <-subCh:
if !ok {
subCh = nil
continue
}
ev, ok := evt.(event.EvtLocalReachabilityChanged)
if !ok {
log.Error("invalid event: %v", evt)
continue
}
n.reachability = ev.Reachability
case <-ticker.C:
enoughTimeSinceLastUpdate = true
if pendingUpdate {
n.maybeNotify()
pendingUpdate = false
enoughTimeSinceLastUpdate = false
}
case <-n.observedAddrMgr.addrRecordedNotif:
pendingUpdate = true
if enoughTimeSinceLastUpdate {
n.maybeNotify()
pendingUpdate = false
enoughTimeSinceLastUpdate = false
}
case <-n.ctx.Done():
return
}
}
}
func (n *natEmitter) maybeNotify() {
if n.reachability == network.ReachabilityPrivate {
tcpNATType, udpNATType := n.observedAddrMgr.getNATType()
if tcpNATType != n.currentTCPNATDeviceType {
n.currentTCPNATDeviceType = tcpNATType
n.emitNATDeviceTypeChanged.Emit(event.EvtNATDeviceTypeChanged{
TransportProtocol: network.NATTransportTCP,
NatDeviceType: n.currentTCPNATDeviceType,
})
}
if udpNATType != n.currentUDPNATDeviceType {
n.currentUDPNATDeviceType = udpNATType
n.emitNATDeviceTypeChanged.Emit(event.EvtNATDeviceTypeChanged{
TransportProtocol: network.NATTransportUDP,
NatDeviceType: n.currentUDPNATDeviceType,
})
}
}
}
func (n *natEmitter) Close() {
n.cancel()
n.wg.Wait()
n.reachabilitySub.Close()
n.emitNATDeviceTypeChanged.Close()
}