Skip to content

Commit

Permalink
refactor: remove pkg/resources dependencies on wgtypes, netx
Browse files Browse the repository at this point in the history
This finishes the work started in #4469, #4524.

The only thing left for #4420 is to move the package in the source tree.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Nov 15, 2021
1 parent 7462733 commit 0102a64
Show file tree
Hide file tree
Showing 29 changed files with 983 additions and 809 deletions.
2 changes: 1 addition & 1 deletion internal/app/machined/pkg/adapters/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package cluster providers adapters wrapping resources/cluster to provide additional functionality.
// Package cluster implements adapters wrapping resources/cluster to provide additional functionality.
package cluster
2 changes: 1 addition & 1 deletion internal/app/machined/pkg/adapters/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package k8s providers adapters wrapping resources/k8s to provide additional functionality.
// Package k8s implements adapters wrapping resources/k8s to provide additional functionality.
package k8s
72 changes: 72 additions & 0 deletions internal/app/machined/pkg/adapters/kubespan/identity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package kubespan

import (
"fmt"
"net"

"github.com/mdlayher/netx/eui64"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"inet.af/netaddr"

"github.com/talos-systems/talos/pkg/resources/kubespan"
"github.com/talos-systems/talos/pkg/resources/network"
)

// IdentitySpec adapter provides identity generation.
//
//nolint:revive,golint
func IdentitySpec(r *kubespan.IdentitySpec) identity {
return identity{
IdentitySpec: r,
}
}

type identity struct {
*kubespan.IdentitySpec
}

// GenerateKey generates new Wireguard key.
func (a identity) GenerateKey() error {
key, err := wgtypes.GeneratePrivateKey()
if err != nil {
return err
}

a.IdentitySpec.PrivateKey = key.String()
a.IdentitySpec.PublicKey = key.PublicKey().String()

return nil
}

// UpdateAddress re-calculates node address based on input data.
func (a identity) UpdateAddress(clusterID string, mac net.HardwareAddr) error {
a.IdentitySpec.Subnet = network.ULAPrefix(clusterID, network.ULAKubeSpan)

var err error

a.IdentitySpec.Address, err = wgEUI64(a.IdentitySpec.Subnet, mac)

return err
}

func wgEUI64(prefix netaddr.IPPrefix, mac net.HardwareAddr) (out netaddr.IPPrefix, err error) {
if prefix.IsZero() {
return out, fmt.Errorf("cannot calculate IP from zero prefix")
}

stdIP, err := eui64.ParseMAC(prefix.IPNet().IP, mac)
if err != nil {
return out, fmt.Errorf("failed to parse MAC into EUI-64 address: %w", err)
}

ip, ok := netaddr.FromStdIP(stdIP)
if !ok {
return out, fmt.Errorf("failed to parse intermediate standard IP %q: %w", stdIP.String(), err)
}

return netaddr.IPPrefixFrom(ip, ip.BitLen()), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

kubespanadapter "github.com/talos-systems/talos/internal/app/machined/pkg/adapters/kubespan"
"github.com/talos-systems/talos/pkg/resources/kubespan"
)

func TestIdentityGenerateKey(t *testing.T) {
var spec kubespan.IdentitySpec

assert.NoError(t, spec.GenerateKey())
assert.NoError(t, kubespanadapter.IdentitySpec(&spec).GenerateKey())
}

func TestIdentityUpdateAddress(t *testing.T) {
Expand All @@ -26,7 +27,7 @@ func TestIdentityUpdateAddress(t *testing.T) {
mac, err := net.ParseMAC("2e:1a:b6:53:81:69")
require.NoError(t, err)

assert.NoError(t, spec.UpdateAddress("8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=", mac))
assert.NoError(t, kubespanadapter.IdentitySpec(&spec).UpdateAddress("8XuV9TZHW08DOk3bVxQjH9ih_TBKjnh-j44tsCLSBzo=", mac))

assert.Equal(t, "fd7f:175a:b97c:5602:2c1a:b6ff:fe53:8169/128", spec.Address.String())
assert.Equal(t, "fd7f:175a:b97c:5602::/64", spec.Subnet.String())
Expand Down
6 changes: 6 additions & 0 deletions internal/app/machined/pkg/adapters/kubespan/kubespan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package kubespan implements adapters wrapping resources/kubespan to provide additional functionality.
package kubespan
158 changes: 158 additions & 0 deletions internal/app/machined/pkg/adapters/kubespan/peer_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package kubespan

import (
"time"

"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"inet.af/netaddr"

"github.com/talos-systems/talos/pkg/resources/kubespan"
)

// PeerStatusSpec adapter provides Wiregard integration and state management.
//
//nolint:revive,golint
func PeerStatusSpec(r *kubespan.PeerStatusSpec) peerStatus {
return peerStatus{
PeerStatusSpec: r,
}
}

type peerStatus struct {
*kubespan.PeerStatusSpec
}

// PeerDownInterval is the time since last handshake when established peer is considered to be down.
//
// WG whitepaper defines a downed peer as being:
// Handshake Timeout (180s) + Rekey Timeout (5s) + Rekey Attempt Timeout (90s)
//
// This interval is applied when the link is already established.
const PeerDownInterval = (180 + 5 + 90) * time.Second

// EndpointConnectionTimeout is time to wait for initial handshake when the endpoint is just set.
const EndpointConnectionTimeout = 15 * time.Second

// CalculateState updates connection state based on other fields values.
//
// Goal: endpoint is ultimately down if we haven't seen handshake for more than peerDownInterval,
// but as the endpoints get updated we want faster feedback, so we start checking more aggressively
// that the handshake happened within endpointConnectionTimeout since last endpoint change.
//
// Timeline:
//
// ---------------------------------------------------------------------->
// ^ ^ ^
// | | |
// T0 T0+endpointConnectionTimeout T0+peerDownInterval
//
// Where T0 = LastEndpontChange
//
// The question is where is LastHandshakeTimeout vs. those points above:
//
// * if we're past (T0+peerDownInterval), simply check that time since last handshake < peerDownInterval
// * if we're between (T0+endpointConnectionTimeout) and (T0+peerDownInterval), and there's no handshake
// after the endpoint change, assume that the endpoint is down
// * if we're between (T0) and (T0+endpointConnectionTimeout), and there's no handshake since the endpoint change,
// consider the state to be unknown
func (a peerStatus) CalculateState() {
sinceLastHandshake := time.Since(a.PeerStatusSpec.LastHandshakeTime)
sinceEndpointChange := time.Since(a.PeerStatusSpec.LastEndpointChange)

a.CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange)
}

// CalculateStateWithDurations calculates the state based on the time since events.
func (a peerStatus) CalculateStateWithDurations(sinceLastHandshake, sinceEndpointChange time.Duration) {
switch {
case sinceEndpointChange > PeerDownInterval: // past T0+peerDownInterval
// if we got handshake in the last peerDownInterval, endpoint is up
if sinceLastHandshake < PeerDownInterval {
a.PeerStatusSpec.State = kubespan.PeerStateUp
} else {
a.PeerStatusSpec.State = kubespan.PeerStateDown
}
case sinceEndpointChange < EndpointConnectionTimeout: // between (T0) and (T0+endpointConnectionTimeout)
// endpoint got recently updated, consider no handshake as 'unknown'
if a.PeerStatusSpec.LastHandshakeTime.After(a.PeerStatusSpec.LastEndpointChange) {
a.PeerStatusSpec.State = kubespan.PeerStateUp
} else {
a.PeerStatusSpec.State = kubespan.PeerStateUnknown
}

default: // otherwise, we're between (T0+endpointConnectionTimeout) and (T0+peerDownInterval)
// if we haven't had the handshake yet, consider the endpoint to be down
if a.PeerStatusSpec.LastHandshakeTime.After(a.PeerStatusSpec.LastEndpointChange) {
a.PeerStatusSpec.State = kubespan.PeerStateUp
} else {
a.PeerStatusSpec.State = kubespan.PeerStateDown
}
}

if a.PeerStatusSpec.State == kubespan.PeerStateDown && a.PeerStatusSpec.LastUsedEndpoint.IsZero() {
// no endpoint, so unknown
a.PeerStatusSpec.State = kubespan.PeerStateUnknown
}
}

// UpdateFromWireguard updates fields from wgtypes information.
func (a peerStatus) UpdateFromWireguard(peer wgtypes.Peer) {
if peer.Endpoint != nil {
a.PeerStatusSpec.Endpoint, _ = netaddr.FromStdAddr(peer.Endpoint.IP, peer.Endpoint.Port, "")
} else {
a.PeerStatusSpec.Endpoint = netaddr.IPPort{}
}

a.PeerStatusSpec.LastHandshakeTime = peer.LastHandshakeTime
a.PeerStatusSpec.TransmitBytes = peer.TransmitBytes
a.PeerStatusSpec.ReceiveBytes = peer.ReceiveBytes
}

// UpdateEndpoint updates the endpoint information and last update timestamp.
func (a peerStatus) UpdateEndpoint(endpoint netaddr.IPPort) {
a.PeerStatusSpec.Endpoint = endpoint
a.PeerStatusSpec.LastUsedEndpoint = endpoint
a.PeerStatusSpec.LastEndpointChange = time.Now()
a.PeerStatusSpec.State = kubespan.PeerStateUnknown
}

// ShouldChangeEndpoint tells whether endpoint should be updated.
func (a peerStatus) ShouldChangeEndpoint() bool {
return a.PeerStatusSpec.State == kubespan.PeerStateDown || a.PeerStatusSpec.LastUsedEndpoint.IsZero()
}

// PickNewEndpoint picks new endpoint given the state and list of available endpoints.
//
// If returned newEndpoint is zero value, no new endpoint is available.
func (a peerStatus) PickNewEndpoint(endpoints []netaddr.IPPort) (newEndpoint netaddr.IPPort) {
if len(endpoints) == 0 {
return
}

if a.PeerStatusSpec.LastUsedEndpoint.IsZero() {
// first time setting the endpoint
newEndpoint = endpoints[0]
} else {
// find the next endpoint after LastUsedEndpoint and use it
idx := -1

for i := range endpoints {
if endpoints[i] == a.PeerStatusSpec.LastUsedEndpoint {
idx = i

break
}
}

// special case: if the peer has just a single endpoint, we can't rotate
if !(len(endpoints) == 1 && idx == 0 && a.PeerStatusSpec.Endpoint == a.PeerStatusSpec.LastUsedEndpoint) {
newEndpoint = endpoints[(idx+1)%len(endpoints)]
}
}

return
}

0 comments on commit 0102a64

Please sign in to comment.