Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement control plane loadbalancer #7

Merged
merged 1 commit into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions controlplane/controlplane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

// Package controlplane wraps generic TCP loadbalancer for Kubernetes controlplane endpoint LB.
package controlplane

import (
"fmt"
"io"
"log"
"net"
"strconv"
"time"

"github.com/talos-systems/go-loadbalancer/loadbalancer"
)

// LoadBalancer provides Kubernetes control plane TCP loadbalancer with a way to update endpoints (list of control plane nodes).
type LoadBalancer struct {
lb loadbalancer.TCP

done chan struct{}

endpoint string
}

// NewLoadBalancer initializes the load balancer.
//
// If bindPort is zero, load balancer will bind to a random available port.
func NewLoadBalancer(bindAddress string, bindPort int, logWriter io.Writer) (*LoadBalancer, error) {
if bindPort == 0 {
var err error

bindPort, err = findListenPort(bindAddress)
if err != nil {
return nil, fmt.Errorf("unable to find available port: %w", err)
}
}

lb := &LoadBalancer{
endpoint: net.JoinHostPort(bindAddress, strconv.Itoa(bindPort)),
}

// set aggressive timeouts to prevent proxying to unhealthy upstreams
lb.lb.DialTimeout = 5 * time.Second
lb.lb.KeepAlivePeriod = time.Second
lb.lb.TCPUserTimeout = 5 * time.Second

lb.lb.Logger = log.New(logWriter, lb.endpoint+" ", log.Default().Flags())

// create a route without any upstreams yet
if err := lb.lb.AddRoute(lb.endpoint, nil); err != nil {
return nil, err
}

return lb, nil
}

// Endpoint returns loadbalancer endpoint as "host:port".
func (lb *LoadBalancer) Endpoint() string {
return lb.endpoint
}

// Start the loadbalancer providing a channel which provides endpoint list update.
//
// Load balancer starts with an empty list of endpoints, so initial list should be provided on the channel.
func (lb *LoadBalancer) Start(upstreamCh <-chan []string) error {
if err := lb.lb.Start(); err != nil {
return err
}

lb.done = make(chan struct{})

go func() {
for {
select {
case upstreams := <-upstreamCh:
if err := lb.lb.ReconcileRoute(lb.endpoint, upstreams); err != nil {
lb.lb.Logger.Printf("failed reconciling list of upstreams: %s", err)
}
case <-lb.done:
return
}
}
}()

return nil
}

// Shutdown the loadbalancer listener and wait for the connections to be closed.
func (lb *LoadBalancer) Shutdown() error {
if err := lb.lb.Close(); err != nil {
return err
}

close(lb.done)

lb.lb.Wait() //nolint:errcheck

return nil
}

func findListenPort(address string) (int, error) {
l, err := net.Listen("tcp", net.JoinHostPort(address, "0"))
if err != nil {
return 0, err
}

port := l.Addr().(*net.TCPAddr).Port //nolint:forcetypeassert

return port, l.Close()
}
141 changes: 141 additions & 0 deletions controlplane/controlplane_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package controlplane_test

import (
"fmt"
"io/ioutil"
"net"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/talos-systems/go-retry/retry"
"go.uber.org/goleak"

"github.com/talos-systems/go-loadbalancer/controlplane"
)

type mockUpstream struct {
addr string
l net.Listener

identity string
}

func (u *mockUpstream) Start() error {
var err error

u.l, err = net.Listen("tcp", "localhost:0")
if err != nil {
return err
}

u.addr = u.l.Addr().String()

go u.serve()

return nil
}

func (u *mockUpstream) serve() {
for {
c, err := u.l.Accept()
if err != nil {
return
}

c.Write([]byte(u.identity)) //nolint: errcheck
c.Close() //nolint: errcheck
}
}

func (u *mockUpstream) Close() {
u.l.Close() //nolint: errcheck
}

func TestLoadBalancer(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

const (
upstreamCount = 5
pivot = 2
)

upstreams := make([]mockUpstream, upstreamCount)
for i := range upstreams {
upstreams[i].identity = strconv.Itoa(i)
require.NoError(t, upstreams[i].Start())
}

upstreamAddrs := make([]string, len(upstreams))
for i := range upstreamAddrs {
upstreamAddrs[i] = upstreams[i].addr
}

lb, err := controlplane.NewLoadBalancer("localhost", 0, os.Stderr)
require.NoError(t, err)

upstreamCh := make(chan []string)

require.NoError(t, lb.Start(upstreamCh))

upstreamCh <- upstreamAddrs[:pivot]

readIdentity := func() (int, error) {
c, err := net.Dial("tcp", lb.Endpoint())
if err != nil {
return 0, retry.ExpectedError(err)
}

defer c.Close() //nolint:errcheck

id, err := ioutil.ReadAll(c)
if err != nil {
return 0, retry.ExpectedError(err)
}

return strconv.Atoi(string(id))
}

assert.NoError(t, retry.Constant(10*time.Second, retry.WithUnits(time.Second)).Retry(func() error {
identity, err := readIdentity()
if err != nil {
return err
}

if identity < 0 || identity > pivot-1 {
return fmt.Errorf("unexpected response: %d", identity)
}

return nil
}))

// change the upstreams
upstreamCh <- upstreamAddrs[pivot:]

assert.NoError(t, retry.Constant(10*time.Second, retry.WithUnits(time.Second)).Retry(func() error {
identity, err := readIdentity()
if err != nil {
return err
}

// upstreams are not changed immediately, there might be some stale responses
if identity < pivot {
return retry.ExpectedErrorf("unexpected response: %d", identity)
}

return nil
}))

assert.NoError(t, lb.Shutdown())

for i := range upstreams {
upstreams[i].Close()
}
}