Skip to content

Commit

Permalink
test: integrate new controlplane loadbalancer
Browse files Browse the repository at this point in the history
See siderolabs/go-loadbalancer#7

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>

(cherry picked from commit c0dfd23)
  • Loading branch information
smira authored and utkuozdemir committed Jul 15, 2022
1 parent e087307 commit bee6536
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ENV GOMODCACHE /.cache/mod
RUN --mount=type=cache,target=/.cache go install sigs.k8s.io/controller-tools/cmd/controller-gen@v0.8.0
RUN --mount=type=cache,target=/.cache go install k8s.io/code-generator/cmd/conversion-gen@v0.23.1
RUN --mount=type=cache,target=/.cache go install mvdan.cc/gofumpt/gofumports@v0.1.1
RUN curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b /toolchain/bin v1.43.0
RUN curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/v1.43.0/install.sh | bash -s -- -b /toolchain/bin v1.43.0
WORKDIR /src
COPY ./go.mod ./
COPY ./go.sum ./
Expand Down
2 changes: 1 addition & 1 deletion sfyra/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/stretchr/testify v1.7.1
github.com/talos-systems/cluster-api-control-plane-provider-talos v0.4.5
github.com/talos-systems/go-debug v0.2.1
github.com/talos-systems/go-loadbalancer v0.1.1
github.com/talos-systems/go-loadbalancer v0.1.2
github.com/talos-systems/go-procfs v0.1.0
github.com/talos-systems/go-retry v0.3.1
github.com/talos-systems/net v0.3.2
Expand Down
3 changes: 2 additions & 1 deletion sfyra/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1241,8 +1241,9 @@ github.com/talos-systems/go-cmd v0.1.0/go.mod h1:kf+rZzTEmlDiYQ6ulslvRONnKLQH8x8
github.com/talos-systems/go-debug v0.2.1 h1:VSN8P1zXWeHWgUBZn4cVT3keBcecCAJBG9Up+F6N2KM=
github.com/talos-systems/go-debug v0.2.1/go.mod h1:pR4NjsZQNFqGx3n4qkD4MIj1F2CxyIF8DCiO1+05JO0=
github.com/talos-systems/go-kmsg v0.1.1/go.mod h1:dppwQn+/mrdvsziGMbXjzfc4E+75oZhr39UIP6LgL0w=
github.com/talos-systems/go-loadbalancer v0.1.1 h1:qjC0uWHu6O7VXG9EN4ovVPg79sRbypXTrJZJskdaa2k=
github.com/talos-systems/go-loadbalancer v0.1.1/go.mod h1:L0uLhCBUQVkdBqtnxqbrw+wzopQyoeluPos8okqdxLo=
github.com/talos-systems/go-loadbalancer v0.1.2 h1:gWgOY5c4gVrahbDOAPHA2UbDY8QSgiR8S6bRXoC5Wqw=
github.com/talos-systems/go-loadbalancer v0.1.2/go.mod h1:ptznwIJopZLeXBviQnIGqEuN1xJ5xVQSB7KWqGvqK8A=
github.com/talos-systems/go-procfs v0.1.0 h1:AuS3/4fx5Me6CUyPVDxBH79eSSnl+8C83tzGmsMAPzs=
github.com/talos-systems/go-procfs v0.1.0/go.mod h1:ATyUGFQIW8OnbnmvqefZWVPgL9g+CAmXHfkgny21xX8=
github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM=
Expand Down
95 changes: 38 additions & 57 deletions sfyra/pkg/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,31 @@ import (
"net"
"reflect"
"sort"
"strconv"
"sync"
"time"

cacpt "github.com/talos-systems/cluster-api-control-plane-provider-talos/api/v1alpha3"
"github.com/talos-systems/go-loadbalancer/loadbalancer"
"github.com/talos-systems/go-loadbalancer/controlplane"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
capiv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"

sidero "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3"
infrav1 "github.com/talos-systems/sidero/app/caps-controller-manager/api/v1alpha3"
metal "github.com/talos-systems/sidero/app/sidero-controller-manager/api/v1alpha1"
)

// ControlPlane implements dynamic loadbalancer for the control plane.
type ControlPlane struct {
client client.Client

endpoint string
lb loadbalancer.TCP

prevUpstreams []string

clusterNamespace, clusterName string

ctx context.Context
lb *controlplane.LoadBalancer

ctxCancel context.CancelFunc

wg sync.WaitGroup
Expand All @@ -54,58 +51,45 @@ func NewControlPlane(client client.Client, address net.IP, port int, clusterName
clusterName: clusterName,
}

cp.lb.DialTimeout = 5 * time.Second
cp.lb.KeepAlivePeriod = time.Second
cp.lb.TCPUserTimeout = 5 * time.Second

cp.ctx, cp.ctxCancel = context.WithCancel(context.Background())
logWriter := log.Writer()
if !verboseLog {
logWriter = ioutil.Discard
}

var err error

if port == 0 {
port, err = findListenPort(address)
if err != nil {
return nil, err
}
cp.lb, err = controlplane.NewLoadBalancer(address.String(), port, logWriter)
if err != nil {
return nil, err
}

cp.endpoint = net.JoinHostPort(address.String(), strconv.Itoa(port))
upstreamCh := make(chan []string)

if !verboseLog {
// send logs to /dev/null
cp.lb.Logger = log.New(ioutil.Discard, "", 0)
}
var ctx context.Context

// create route without any upstreams yet
if err := cp.lb.AddRoute(cp.endpoint, nil); err != nil {
return nil, err
}
ctx, cp.ctxCancel = context.WithCancel(context.Background())

cp.wg.Add(1)

go cp.reconcileLoop()
go cp.reconcileLoop(ctx, upstreamCh)

return &cp, cp.lb.Start()
return &cp, cp.lb.Start(upstreamCh)
}

// GetEndpoint returns loadbalancer endpoint.
func (cp *ControlPlane) GetEndpoint() string {
return cp.endpoint
return cp.lb.Endpoint()
}

// Close the load balancer.
func (cp *ControlPlane) Close() error {
cp.ctxCancel()
cp.wg.Wait()

if err := cp.lb.Close(); err != nil {
return err
}

return cp.lb.Wait()
return cp.lb.Shutdown()
}

func (cp *ControlPlane) reconcileLoop() {
func (cp *ControlPlane) reconcileLoop(ctx context.Context, upstreamCh chan<- []string) {
defer cp.wg.Done()

const interval = 15 * time.Second
Expand All @@ -114,28 +98,34 @@ func (cp *ControlPlane) reconcileLoop() {
defer ticker.Stop()

for {
if err := cp.reconcile(); err != nil {
if err := cp.reconcile(ctx); err != nil {
log.Printf("load balancer reconcile failed: %s", err)
} else {
select {
case upstreamCh <- cp.prevUpstreams:
case <-ctx.Done():
return
}
}

select {
case <-cp.ctx.Done():
case <-ctx.Done():
return
case <-ticker.C:
}
}
}

func (cp *ControlPlane) reconcile() error {
func (cp *ControlPlane) reconcile(ctx context.Context) error {
var cluster capiv1.Cluster

if err := cp.client.Get(cp.ctx, types.NamespacedName{Namespace: cp.clusterNamespace, Name: cp.clusterName}, &cluster); err != nil {
if err := cp.client.Get(ctx, types.NamespacedName{Namespace: cp.clusterNamespace, Name: cp.clusterName}, &cluster); err != nil {
return err
}

var controlPlane cacpt.TalosControlPlane

if err := cp.client.Get(cp.ctx, types.NamespacedName{Namespace: cluster.Spec.ControlPlaneRef.Namespace, Name: cluster.Spec.ControlPlaneRef.Name}, &controlPlane); err != nil {
if err := cp.client.Get(ctx, types.NamespacedName{Namespace: cluster.Spec.ControlPlaneRef.Namespace, Name: cluster.Spec.ControlPlaneRef.Name}, &controlPlane); err != nil {
return err
}

Expand All @@ -146,16 +136,18 @@ func (cp *ControlPlane) reconcile() error {
return err
}

if err := cp.client.List(cp.ctx, &machines, client.MatchingLabelsSelector{Selector: labelSelector}); err != nil {
if err := cp.client.List(ctx, &machines, client.MatchingLabelsSelector{Selector: labelSelector}); err != nil {
return err
}

var upstreams []string

for _, machine := range machines.Items {
var metalMachine sidero.MetalMachine
// we could have looked up addresses via Machine.status, but as we still have tests with Talos 0.13 (before SideroLink was introduced),
// we need to keep this way of looking up addresses.
var metalMachine infrav1.MetalMachine

if err := cp.client.Get(cp.ctx, types.NamespacedName{Namespace: machine.Spec.InfrastructureRef.Namespace, Name: machine.Spec.InfrastructureRef.Name}, &metalMachine); err != nil {
if err := cp.client.Get(ctx, types.NamespacedName{Namespace: machine.Spec.InfrastructureRef.Namespace, Name: machine.Spec.InfrastructureRef.Name}, &metalMachine); err != nil {
continue
}

Expand All @@ -165,7 +157,7 @@ func (cp *ControlPlane) reconcile() error {
continue
}

if err := cp.client.Get(cp.ctx, types.NamespacedName{Namespace: metalMachine.Spec.ServerRef.Namespace, Name: metalMachine.Spec.ServerRef.Name}, &server); err != nil {
if err := cp.client.Get(ctx, types.NamespacedName{Namespace: metalMachine.Spec.ServerRef.Namespace, Name: metalMachine.Spec.ServerRef.Name}, &server); err != nil {
return err
}

Expand All @@ -179,21 +171,10 @@ func (cp *ControlPlane) reconcile() error {
sort.Strings(upstreams)

if !reflect.DeepEqual(cp.prevUpstreams, upstreams) {
log.Printf("new control plane loadbalancer %q routes: %v", cp.endpoint, upstreams)
log.Printf("new control plane loadbalancer %q routes: %v", cp.lb.Endpoint(), upstreams)
}

cp.prevUpstreams = upstreams

return cp.lb.ReconcileRoute(cp.endpoint, upstreams)
}

func findListenPort(address net.IP) (int, error) {
l, err := net.Listen("tcp", net.JoinHostPort(address.String(), "0"))
if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port

return port, l.Close()
return nil
}

0 comments on commit bee6536

Please sign in to comment.