-
Notifications
You must be signed in to change notification settings - Fork 26
/
ctlconnectivty.go
134 lines (110 loc) · 3.15 KB
/
ctlconnectivty.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package nexodus
import (
"encoding/json"
"fmt"
"github.com/nexodus-io/nexodus/internal/api"
"net"
"go.uber.org/zap"
)
const (
batchSize = 10
v4 = "v4"
v6 = "v6"
)
// ConnectivityV4 pings all peers via IPv4
func (ac *NexdCtl) ConnectivityV4(_ string, keepaliveResults *string) error {
res := ac.nx.connectivityProbe(v4)
var err error
// Marshal the map into a JSON string.
keepaliveJson, err := json.Marshal(res)
if err != nil {
return fmt.Errorf("error marshalling connectivty results")
}
*keepaliveResults = string(keepaliveJson)
return nil
}
// ConnectivityV6 pings all peers via IPv6
func (ac *NexdCtl) ConnectivityV6(_ string, keepaliveResults *string) error {
res := ac.nx.connectivityProbe(v6)
var err error
// Marshal the map into a JSON string.
keepaliveJson, err := json.Marshal(res)
if err != nil {
return fmt.Errorf("error marshalling connectivty results")
}
*keepaliveResults = string(keepaliveJson)
return nil
}
func (nx *Nexodus) connectivityProbe(family string) api.PingPeersResponse {
peersByKey := make(map[string]api.KeepaliveStatus)
res := api.PingPeersResponse{
RelayRequired: nx.symmetricNat,
}
if !nx.relay {
nx.deviceCacheIterRead(func(value deviceCacheEntry) {
// skip the node sourcing the probe
if nx.wireguardPubKey == value.device.GetPublicKey() {
return
}
var nodeAddr string
pubKey := value.device.GetPublicKey()
if family == v6 {
nodeAddr = value.device.Ipv6TunnelIps[0].GetAddress()
} else {
nodeAddr = value.device.Ipv4TunnelIps[0].GetAddress()
}
if net.ParseIP(nodeAddr) == nil {
nx.logger.Debugf("failed parsing an ip address from %s", nodeAddr)
return
}
hostname := value.device.GetHostname()
peersByKey[pubKey] = api.KeepaliveStatus{
WgIP: nodeAddr,
IsReachable: false,
Hostname: hostname,
Method: value.peeringMethod,
}
})
}
res.Peers = nx.probeConnectivity(peersByKey, nx.logger)
return res
}
// probeConnectivity check connectivity in batches to limit excessive traffic in the case of a large number of peers
func (nx *Nexodus) probeConnectivity(peersByKey map[string]api.KeepaliveStatus, logger *zap.SugaredLogger) map[string]api.KeepaliveStatus {
peerConnResultsMap := make(map[string]api.KeepaliveStatus)
peerKeys := make([]string, 0, len(peersByKey))
for key := range peersByKey {
peerKeys = append(peerKeys, key)
}
for i := 0; i < len(peerKeys); i += batchSize {
end := i + batchSize
if end > len(peerKeys) {
end = len(peerKeys)
}
batch := peerKeys[i:end]
c := make(chan struct {
api.KeepaliveStatus
IsReachable bool
})
for _, pubKey := range batch {
go nx.runProbe(peersByKey[pubKey], c)
}
for range batch {
result := <-c
ip := result.WgIP
if result.IsReachable {
logger.Debugf("connectivty probe [ %s ] is reachable", ip)
} else {
logger.Debugf("connectivty probe [ %s ] is not reachable", ip)
}
peerConnResultsMap[ip] = api.KeepaliveStatus{
WgIP: result.WgIP,
IsReachable: result.IsReachable,
Hostname: result.Hostname,
Latency: result.Latency,
Method: result.Method,
}
}
}
return peerConnResultsMap
}