-
Notifications
You must be signed in to change notification settings - Fork 458
/
peer_status.go
153 lines (131 loc) · 5.6 KB
/
peer_status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package kubespan
import (
"net/netip"
"time"
"github.com/siderolabs/gen/value"
"go4.org/netipx"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/siderolabs/talos/internal/app/machined/pkg/adapters/wireguard"
"github.com/siderolabs/talos/pkg/machinery/resources/kubespan"
)
// PeerStatusSpec adapter provides Wireguard integration and state management.
//
//nolint:revive,golint
func PeerStatusSpec(r *kubespan.PeerStatusSpec) peerStatus {
return peerStatus{
PeerStatusSpec: r,
}
}
type peerStatus struct {
*kubespan.PeerStatusSpec
}
// EndpointConnectionTimeout is time to wait for initial handshake when the endpoint is just set.
const EndpointConnectionTimeout = 15 * time.Second
// CalculateState updates connection state based on other fields values.
//
// Goal: endpoint is ultimately down if we haven't seen handshake for more than peerDownInterval,
// but as the endpoints get updated we want faster feedback, so we start checking more aggressively
// that the handshake happened within endpointConnectionTimeout since last endpoint change.
//
// Timeline:
//
// ---------------------------------------------------------------------->
// ^ ^ ^
// | | |
// T0 T0+endpointConnectionTimeout T0+peerDownInterval
//
// Where T0 = LastEndpointChange
//
// The question is where is LastHandshakeTimeout vs. those points above:
//
// - if we're past (T0+peerDownInterval), simply check that time since last handshake < peerDownInterval
// - if we're between (T0+endpointConnectionTimeout) and (T0+peerDownInterval), and there's no handshake
// after the endpoint change, assume that the endpoint is down
// - if we're between (T0) and (T0+endpointConnectionTimeout), and there's no handshake since the endpoint change,
// consider the state to be unknown
func (a peerStatus) CalculateState() {
sinceLastHandshake := time.Since(a.PeerStatusSpec.LastHandshakeTime)
sinceEndpointChange := time.Since(a.PeerStatusSpec.LastEndpointChange)
a.CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange)
}
// CalculateStateWithDurations calculates the state based on the time since events.
func (a peerStatus) CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange time.Duration) {
switch {
case sinceEndpointChange > wireguard.PeerDownInterval: // past T0+peerDownInterval
// if we got handshake in the last peerDownInterval, endpoint is up
if sinceLastHandshake < wireguard.PeerDownInterval {
a.PeerStatusSpec.State = kubespan.PeerStateUp
} else {
a.PeerStatusSpec.State = kubespan.PeerStateDown
}
case sinceEndpointChange < EndpointConnectionTimeout: // between (T0) and (T0+endpointConnectionTimeout)
// endpoint got recently updated, consider no handshake as 'unknown'
if a.PeerStatusSpec.LastHandshakeTime.After(a.PeerStatusSpec.LastEndpointChange) {
a.PeerStatusSpec.State = kubespan.PeerStateUp
} else {
a.PeerStatusSpec.State = kubespan.PeerStateUnknown
}
default: // otherwise, we're between (T0+endpointConnectionTimeout) and (T0+peerDownInterval)
// if we haven't had the handshake yet, consider the endpoint to be down
if a.PeerStatusSpec.LastHandshakeTime.After(a.PeerStatusSpec.LastEndpointChange) {
a.PeerStatusSpec.State = kubespan.PeerStateUp
} else {
a.PeerStatusSpec.State = kubespan.PeerStateDown
}
}
if a.PeerStatusSpec.State == kubespan.PeerStateDown && value.IsZero(a.PeerStatusSpec.LastUsedEndpoint) {
// no endpoint, so unknown
a.PeerStatusSpec.State = kubespan.PeerStateUnknown
}
}
// UpdateFromWireguard updates fields from wgtypes information.
func (a peerStatus) UpdateFromWireguard(peer wgtypes.Peer) {
if peer.Endpoint != nil {
a.PeerStatusSpec.Endpoint, _ = netipx.FromStdAddr(peer.Endpoint.IP, peer.Endpoint.Port, "")
} else {
a.PeerStatusSpec.Endpoint = netip.AddrPort{}
}
a.PeerStatusSpec.LastHandshakeTime = peer.LastHandshakeTime
a.PeerStatusSpec.TransmitBytes = peer.TransmitBytes
a.PeerStatusSpec.ReceiveBytes = peer.ReceiveBytes
}
// UpdateEndpoint updates the endpoint information and last update timestamp.
func (a peerStatus) UpdateEndpoint(endpoint netip.AddrPort) {
a.PeerStatusSpec.Endpoint = endpoint
a.PeerStatusSpec.LastUsedEndpoint = endpoint
a.PeerStatusSpec.LastEndpointChange = time.Now()
a.PeerStatusSpec.State = kubespan.PeerStateUnknown
}
// ShouldChangeEndpoint tells whether endpoint should be updated.
func (a peerStatus) ShouldChangeEndpoint() bool {
return a.PeerStatusSpec.State == kubespan.PeerStateDown || value.IsZero(a.PeerStatusSpec.LastUsedEndpoint)
}
// PickNewEndpoint picks new endpoint given the state and list of available endpoints.
//
// If returned newEndpoint is zero value, no new endpoint is available.
func (a peerStatus) PickNewEndpoint(endpoints []netip.AddrPort) (newEndpoint netip.AddrPort) {
if len(endpoints) == 0 {
return
}
if value.IsZero(a.PeerStatusSpec.LastUsedEndpoint) {
// first time setting the endpoint
newEndpoint = endpoints[0]
} else {
// find the next endpoint after LastUsedEndpoint and use it
idx := -1
for i := range endpoints {
if endpoints[i] == a.PeerStatusSpec.LastUsedEndpoint {
idx = i
break
}
}
// special case: if the peer has just a single endpoint, we can't rotate
if !(len(endpoints) == 1 && idx == 0 && a.PeerStatusSpec.Endpoint == a.PeerStatusSpec.LastUsedEndpoint) {
newEndpoint = endpoints[(idx+1)%len(endpoints)]
}
}
return
}