Skip to content

Commit

Permalink
chore: enable "WG over GRPC" testing in siderolink agent tests
Browse files Browse the repository at this point in the history
Fixes #8514
For #8392

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
DmitriyMV committed Apr 1, 2024
1 parent bac366e commit 8dc4910
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 192 deletions.
8 changes: 8 additions & 0 deletions .drone.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,13 @@ local integration_siderolink = Step('e2e-siderolink', target='e2e-qemu', privile
REGISTRY: local_registry,
});

local integration_siderolink_tunnel = Step('e2e-siderolink-tunnel', target='e2e-qemu', privileged=true, depends_on=[integration_siderolink], environment={
SHORT_INTEGRATION_TEST: 'yes',
WITH_SIDEROLINK_AGENT: 'tunnel',
VIA_MAINTENANCE_MODE: 'true',
REGISTRY: local_registry,
});

local push_edge = {
name: 'push-edge',
image: 'autonomy/build-container:latest',
Expand Down Expand Up @@ -705,6 +712,7 @@ local integration_pipelines = [
integration_kubespan,
integration_default_hostname,
integration_siderolink,
integration_siderolink_tunnel,
]) + integration_trigger(['integration-misc']),
Pipeline('integration-extensions', default_pipeline_steps + integration_extensions) + integration_trigger(['integration-extensions']),
Pipeline('integration-cilium', default_pipeline_steps + [integration_cilium, integration_cilium_strict, integration_cilium_strict_kubespan]) + integration_trigger(['integration-cilium']),
Expand Down
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ linters-settings:
- gopkg.in/yaml.v3
- github.com/coredns/coredns
- github.com/mdlayher/kobject
- golang.zx2c4.com/wireguard
- golang.zx2c4.com/wireguard/wgctrl
retract-allow-no-explanation: false
exclude-forbidden: true

Expand Down
236 changes: 155 additions & 81 deletions cmd/talosctl/cmd/mgmt/cluster/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/google/uuid"
"github.com/hashicorp/go-getter/v2"
"github.com/siderolabs/gen/maps"
"github.com/siderolabs/go-blockdevice/blockdevice/encryption"
"github.com/siderolabs/go-kubeconfig"
"github.com/siderolabs/go-pointer"
Expand Down Expand Up @@ -174,7 +175,7 @@ var (
diskEncryptionKeyTypes []string
withFirewall string
withUUIDHostnames bool
withSiderolinkAgent bool
withSiderolinkAgent agentFlag
)

// createCmd represents the cluster up command.
Expand Down Expand Up @@ -425,7 +426,7 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {
provision.WithTPM2(tpm2Enabled),
provision.WithExtraUEFISearchPaths(extraUEFISearchPaths),
provision.WithTargetArch(targetArch),
provision.WithSiderolinkAgent(withSiderolinkAgent),
provision.WithSiderolinkAgent(withSiderolinkAgent.IsEnabled()),
}

var configBundleOpts []bundle.Option
Expand Down Expand Up @@ -746,42 +747,22 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {

var extraKernelArgs *procfs.Cmdline

if extraBootKernelArgs != "" {
if extraBootKernelArgs != "" || withSiderolinkAgent.IsEnabled() {
extraKernelArgs = procfs.NewCmdline(extraBootKernelArgs)
}

wgNodeGen := makeNodeAddrGenerator()
var slb *siderolinkBuilder

if withSiderolinkAgent {
if extraKernelArgs == nil {
extraKernelArgs = procfs.NewCmdline("")
}

if extraKernelArgs.Get("siderolink.api") != nil || extraKernelArgs.Get("talos.events.sink") != nil || extraKernelArgs.Get("talos.logging.kernel") != nil {
return errors.New("siderolink kernel arguments are already set, cannot run with --with-siderolink")
}

wgHost := gatewayIPs[0].String()

ports, err := getDynamicPorts()
if withSiderolinkAgent.IsEnabled() {
slb, err = newSiderolinkBuilder(gatewayIPs[0].String())
if err != nil {
return err
}
}

request.SiderolinkRequest.WireguardEndpoint = net.JoinHostPort(wgHost, ports.wgPort)
request.SiderolinkRequest.APIEndpoint = ":" + ports.apiPort
request.SiderolinkRequest.SinkEndpoint = ":" + ports.sinkPort
request.SiderolinkRequest.LogEndpoint = ":" + ports.logPort

agentNodeAddr := wgNodeGen.GetAgentNodeAddr()

apiLink := "grpc://" + net.JoinHostPort(wgHost, ports.apiPort) + "?jointoken=foo"
sinkURL := net.JoinHostPort(agentNodeAddr, ports.sinkPort)
kernelURL := "tcp://" + net.JoinHostPort(agentNodeAddr, ports.logPort)

extraKernelArgs.Append("siderolink.api", apiLink)
extraKernelArgs.Append("talos.events.sink", sinkURL)
extraKernelArgs.Append("talos.logging.kernel", kernelURL)
err = slb.SetKernelArgs(extraKernelArgs, withSiderolinkAgent.IsTunnel())
if err != nil {
return err
}

// Add talosconfig to provision options, so we'll have it to parse there
Expand All @@ -798,15 +779,9 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {

nodeUUID := uuid.New()

if withSiderolinkAgent {
var generated netip.Addr

generated, err = wgNodeGen.GenerateRandomNodeAddr()
if err != nil {
return err
}

request.SiderolinkRequest.AddBind(nodeUUID, generated)
err = slb.DefineIPv6ForUUID(nodeUUID)
if err != nil {
return err
}

nodeReq := provision.NodeRequest{
Expand Down Expand Up @@ -869,15 +844,9 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {

nodeUUID := uuid.New()

if withSiderolinkAgent {
var generated netip.Addr

generated, err = wgNodeGen.GenerateRandomNodeAddr()
if err != nil {
return err
}

request.SiderolinkRequest.AddBind(nodeUUID, generated)
err = slb.DefineIPv6ForUUID(nodeUUID)
if err != nil {
return err
}

request.Nodes = append(request.Nodes,
Expand All @@ -896,6 +865,8 @@ func create(ctx context.Context, flags *pflag.FlagSet) error {
})
}

request.SiderolinkRequest = slb.SiderolinkRequest()

cluster, err := provisioner.Create(ctx, request, provisionOptions...)
if err != nil {
return err
Expand Down Expand Up @@ -1213,7 +1184,7 @@ func init() {
createCmd.Flags().IntVar(&bandwidth, "with-network-bandwidth", 0, "specify bandwidth restriction (in kbps) on the bridge interface when creating a qemu cluster")
createCmd.Flags().StringVar(&withFirewall, firewallFlag, "", "inject firewall rules into the cluster, value is default policy - accept/block (QEMU only)")
createCmd.Flags().BoolVar(&withUUIDHostnames, "with-uuid-hostnames", false, "use machine UUIDs as default hostnames (QEMU only)")
createCmd.Flags().BoolVar(&withSiderolinkAgent, "with-siderolink", false, "enables the use of siderolink agent as configuration apply mechanism")
createCmd.Flags().Var(&withSiderolinkAgent, "with-siderolink", "enables the use of siderolink agent as configuration apply mechanism. `true` or `wireguard` enables the agent, `tunnel` enables the agent with grpc tunneling") //nolint:lll

Cmd.AddCommand(createCmd)
}
Expand Down Expand Up @@ -1254,51 +1225,124 @@ func checkForDefinedGenFlag(flags *pflag.FlagSet) string {
return ""
}

type generatedPorts struct {
wgPort string
apiPort string
sinkPort string
logPort string
}
func newSiderolinkBuilder(wgHost string) (*siderolinkBuilder, error) {
prefix, err := networkPrefix("")
if err != nil {
return nil, err
}

result := &siderolinkBuilder{
wgHost: wgHost,
binds: map[uuid.UUID]netip.Addr{},
prefix: prefix,
nodeIPv6Addr: prefix.Addr().Next().String(),
}

func getDynamicPorts() (generatedPorts, error) {
var resultErr error

for range 10 {
wgPort, err := getDynamicPort("udp")
if err != nil {
return generatedPorts{}, fmt.Errorf("failed to get dynamic port for WireGuard: %w", err)
for _, d := range []struct {
field *int
net string
what string
}{
{&result.wgPort, "udp", "WireGuard"},
{&result.apiPort, "tcp", "gRPC API"},
{&result.sinkPort, "tcp", "Event Sink"},
{&result.logPort, "tcp", "Log Receiver"},
} {
var err error

*d.field, err = getDynamicPort(d.net)
if err != nil {
return nil, fmt.Errorf("failed to get dynamic port for %s: %w", d.what, err)
}
}

apiPort, err := getDynamicPort("tcp")
if err != nil {
return generatedPorts{}, fmt.Errorf("failed to get dynamic port for GRPC API: %w", err)
resultErr = checkPortsDontOverlap(result.wgPort, result.apiPort, result.sinkPort, result.logPort)
if resultErr == nil {
break
}
}

sinkPort, err := getDynamicPort("tcp")
if err != nil {
return generatedPorts{}, fmt.Errorf("failed to get dynamic port for Sink: %w", err)
}
if resultErr != nil {
return nil, fmt.Errorf("failed to get non-overlapping dynamic ports in 10 attempts: %w", resultErr)
}

logPort, err := getDynamicPort("tcp")
if err != nil {
return generatedPorts{}, fmt.Errorf("failed to get dynamic port for Log: %w", err)
}
return result, nil
}

resultErr = checkPortsDontOverlap(wgPort, apiPort, sinkPort, logPort)
if resultErr != nil {
continue
}
type siderolinkBuilder struct {
wgHost string

binds map[uuid.UUID]netip.Addr
prefix netip.Prefix
nodeIPv6Addr string
wgPort int
apiPort int
sinkPort int
logPort int
}

return generatedPorts{
wgPort: strconv.Itoa(wgPort),
apiPort: strconv.Itoa(apiPort),
sinkPort: strconv.Itoa(sinkPort),
logPort: strconv.Itoa(logPort),
}, nil
// DefineIPv6ForUUID defines an IPv6 address for a given UUID. It is safe to call this method on a nil pointer.
func (slb *siderolinkBuilder) DefineIPv6ForUUID(id uuid.UUID) error {
if slb == nil {
return nil
}

result, err := generateRandomNodeAddr(slb.prefix)
if err != nil {
return err
}

return generatedPorts{}, fmt.Errorf("failed to get non-overlapping dynamic ports in 10 attempts: %w", resultErr)
slb.binds[id] = result.Addr()

return nil
}

// SiderolinkRequest returns a SiderolinkRequest based on the current state of the builder.
// It is safe to call this method on a nil pointer.
func (slb *siderolinkBuilder) SiderolinkRequest() provision.SiderolinkRequest {
if slb == nil {
return provision.SiderolinkRequest{}
}

return provision.SiderolinkRequest{
WireguardEndpoint: net.JoinHostPort(slb.wgHost, strconv.Itoa(slb.wgPort)),
APIEndpoint: ":" + strconv.Itoa(slb.apiPort),
SinkEndpoint: ":" + strconv.Itoa(slb.sinkPort),
LogEndpoint: ":" + strconv.Itoa(slb.logPort),
SiderolinkBind: maps.ToSlice(slb.binds, func(k uuid.UUID, v netip.Addr) provision.SiderolinkBind {
return provision.SiderolinkBind{
UUID: k,
Addr: v,
}
}),
}
}

// SetKernelArgs sets the kernel arguments for the current builder. It is safe to call this method on a nil pointer.
func (slb *siderolinkBuilder) SetKernelArgs(extraKernelArgs *procfs.Cmdline, tunnel bool) error {
switch {
case slb == nil:
return nil
case extraKernelArgs.Get("siderolink.api") != nil,
extraKernelArgs.Get("talos.events.sink") != nil,
extraKernelArgs.Get("talos.logging.kernel") != nil:
return errors.New("siderolink kernel arguments are already set, cannot run with --with-siderolink")
default:
apiLink := "grpc://" + net.JoinHostPort(slb.wgHost, strconv.Itoa(slb.apiPort)) + "?jointoken=foo"

if tunnel {
apiLink += "&grpc_tunnel=true"
}

extraKernelArgs.Append("siderolink.api", apiLink)
extraKernelArgs.Append("talos.events.sink", net.JoinHostPort(slb.nodeIPv6Addr, strconv.Itoa(slb.sinkPort)))
extraKernelArgs.Append("talos.logging.kernel", "tcp://"+net.JoinHostPort(slb.nodeIPv6Addr, strconv.Itoa(slb.logPort)))

return nil
}
}

func getDynamicPort(network string) (int, error) {
Expand Down Expand Up @@ -1361,3 +1405,33 @@ func checkPortsDontOverlap(ports ...int) error {

return nil
}

type agentFlag uint8

func (a *agentFlag) String() string {
switch *a {
case 1:
return "wireguard"
case 2:
return "grpc-tunnel"
default:
return "none"
}
}

func (a *agentFlag) Set(s string) error {
switch s {
case "true", "wireguard":
*a = 1
case "tunnel":
*a = 2
default:
return fmt.Errorf("unknown type: %s, possible values: 'true', 'wireguard' for the usual WG; 'tunnel' for WG over GRPC", s)
}

return nil
}

func (a *agentFlag) Type() string { return "agent" }
func (a *agentFlag) IsEnabled() bool { return *a != 0 }
func (a *agentFlag) IsTunnel() bool { return *a == 2 }
19 changes: 19 additions & 0 deletions cmd/talosctl/cmd/mgmt/cluster/create_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package cluster

import (
"net/netip"

"github.com/siderolabs/siderolink/pkg/wireguard"
)

func generateRandomNodeAddr(prefix netip.Prefix) (netip.Prefix, error) {
return wireguard.GenerateRandomNodeAddr(prefix)
}

func networkPrefix(prefix string) (netip.Prefix, error) {
return wireguard.NetworkPrefix(prefix), nil
}
20 changes: 20 additions & 0 deletions cmd/talosctl/cmd/mgmt/cluster/create_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//go:build !linux

package cluster

import (
"errors"
"net/netip"
)

func generateRandomNodeAddr(prefix netip.Prefix) (netip.Prefix, error) {
return netip.Prefix{}, nil
}

func networkPrefix(prefix string) (netip.Prefix, error) {
return netip.Prefix{}, errors.New("unsupported platform")
}
Loading

0 comments on commit 8dc4910

Please sign in to comment.