Skip to content

Commit

Permalink
feat: use hashing to distribute node checks
Browse files Browse the repository at this point in the history
instead of querying all (valid) neighbouring pods, we filter and
only query KUBENURSE_NEIGHBOUR_LIMIT other neighbours.
To make sure that all nodes get the right number of queries, we
hash each node name, put that in a sorted list (l), and each pod will
query the e.g. 10 next nodes in the list, starting from the position
given by the node name' hash.
i.e. for node named n, we query the 10 next nodes in the list,
starting from position l[hash(n)] + 1

cf #55

Signed-off-by: Clément Nussbaumer <clement.nussbaumer@postfinance.ch>
  • Loading branch information
clementnuss committed Mar 11, 2024
1 parent 4fa4b9f commit a0b49bb
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 19 deletions.
4 changes: 1 addition & 3 deletions go.mod
@@ -1,8 +1,6 @@
module github.com/postfinance/kubenurse

go 1.21

toolchain go1.21.5
go 1.22

require (
github.com/prometheus/client_golang v1.19.0
Expand Down
6 changes: 3 additions & 3 deletions internal/kubenurse/handler.go
Expand Up @@ -9,7 +9,7 @@ import (
)

func (s *Server) readyHandler() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, _ *http.Request) {
s.mu.Lock()
defer s.mu.Unlock()

Expand All @@ -34,8 +34,8 @@ func (s *Server) aliveHandler() func(w http.ResponseWriter, r *http.Request) {
servicecheck.Result

// kubediscovery
NeighbourhoodState string `json:"neighbourhood_state"`
Neighbourhood []servicecheck.Neighbour `json:"neighbourhood"`
NeighbourhoodState string `json:"neighbourhood_state"`
Neighbourhood []*servicecheck.Neighbour `json:"neighbourhood"`
}

res := s.checker.LastCheckResult
Expand Down
9 changes: 7 additions & 2 deletions internal/kubenurse/server.go
Expand Up @@ -48,6 +48,7 @@ type Server struct {
// * KUBERNETES_SERVICE_PORT
// * KUBENURSE_NAMESPACE
// * KUBENURSE_NEIGHBOUR_FILTER
// * KUBENURSE_NEIGHBOUR_LIMIT
// * KUBENURSE_SHUTDOWN_DURATION
// * KUBENURSE_CHECK_API_SERVER_DIRECT
// * KUBENURSE_CHECK_API_SERVER_DNS
Expand Down Expand Up @@ -126,21 +127,25 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle
shutdownDuration := 5 * time.Second

if v, ok := os.LookupEnv("KUBENURSE_SHUTDOWN_DURATION"); ok {
var err error
shutdownDuration, err = time.ParseDuration(v)

if err != nil {
return nil, err
}
}

chk.ShutdownDuration = shutdownDuration
chk.KubenurseIngressURL = os.Getenv("KUBENURSE_INGRESS_URL")
chk.KubenurseServiceURL = os.Getenv("KUBENURSE_SERVICE_URL")
chk.KubernetesServiceHost = os.Getenv("KUBERNETES_SERVICE_HOST")
chk.KubernetesServicePort = os.Getenv("KUBERNETES_SERVICE_PORT")
chk.KubenurseNamespace = os.Getenv("KUBENURSE_NAMESPACE")
chk.NeighbourFilter = os.Getenv("KUBENURSE_NEIGHBOUR_FILTER")
chk.ShutdownDuration = shutdownDuration
neighLimit := os.Getenv("KUBENURSE_NEIGHBOUR_LIMIT")

if chk.NeighbourLimit, err = strconv.Atoi(neighLimit); err != nil {
return nil, err
}

//nolint:goconst // No need to make "false" a constant in my opinion, readability is better like this.
chk.SkipCheckAPIServerDirect = os.Getenv("KUBENURSE_CHECK_API_SERVER_DIRECT") == "false"
Expand Down
55 changes: 50 additions & 5 deletions internal/servicecheck/neighbours.go
Expand Up @@ -2,25 +2,30 @@ package servicecheck

import (
"context"
"crypto/sha256"
"fmt"
"os"
"slices"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var osHostname = os.Hostname //nolint:gochecknoglobals // used during testing

// Neighbour represents a kubenurse which should be reachable
type Neighbour struct {
PodName string
PodIP string
HostIP string
NodeName string
NodeHash string
}

// GetNeighbours returns a slice of neighbour kubenurses for the given namespace and labelSelector.
func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector string) ([]Neighbour, error) {
func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector string) ([]*Neighbour, error) {
// Get all pods
pods := v1.PodList{}
selector, _ := labels.Parse(labelSelector)
Expand All @@ -33,9 +38,9 @@ func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector st
return nil, fmt.Errorf("list pods: %w", err)
}

var neighbours = make([]Neighbour, 0, len(pods.Items))
var neighbours = make([]*Neighbour, 0, len(pods.Items))

var hostname, _ = os.Hostname()
var hostname, _ = osHostname()

// process pods
for idx := range pods.Items {
Expand Down Expand Up @@ -64,16 +69,21 @@ func (c *Checker) GetNeighbours(ctx context.Context, namespace, labelSelector st
PodIP: pod.Status.PodIP,
HostIP: pod.Status.HostIP,
NodeName: pod.Spec.NodeName,
NodeHash: sha256String(pod.Spec.NodeName),
}
neighbours = append(neighbours, n)
neighbours = append(neighbours, &n)
}

return neighbours, nil
}

// checkNeighbours checks the /alwayshappy endpoint from every discovered kubenurse neighbour. Neighbour pods on nodes
// which are not schedulable are excluded from this check to avoid possible false errors.
func (c *Checker) checkNeighbours(nh []Neighbour) {
func (c *Checker) checkNeighbours(nh []*Neighbour) {
if c.NeighbourLimit > 0 && len(nh) > c.NeighbourLimit {
nh = c.filterNeighbours(nh)
}

for _, neighbour := range nh {
neighbour := neighbour // pin

Expand All @@ -88,3 +98,38 @@ func (c *Checker) checkNeighbours(nh []Neighbour) {
_, _ = c.measure(check, "path_"+neighbour.NodeName)
}
}

func (c *Checker) filterNeighbours(nh []*Neighbour) []*Neighbour {
m := make(map[string]*Neighbour, len(nh))
l := make([]string, 0, len(nh))

for _, n := range nh {
m[n.NodeHash] = n
l = append(l, n.NodeHash)
}

slices.Sort(l)

currentHostName, _ := osHostname()
hostnameHash := sha256String(currentHostName)

if m[hostnameHash].NodeName != currentHostName {
panic("the current hostname hash doesn't match the value in the map")
}

idx, _ := slices.BinarySearch(l, hostnameHash)

filteredNeighbours := make([]*Neighbour, 0, c.NeighbourLimit)

for i := 0; i < c.NeighbourLimit; i++ {
hash := l[(idx+i+1)%len(l)]
filteredNeighbours = append(filteredNeighbours, m[hash])
}

return filteredNeighbours
}

func sha256String(s string) string {
h := sha256.Sum256([]byte(s))
return string(h[:])
}
62 changes: 62 additions & 0 deletions internal/servicecheck/neighbours_test.go
@@ -0,0 +1,62 @@
package servicecheck

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func generateNeighbours(n int) (nh []*Neighbour) {

nh = make([]*Neighbour, 0, n)

for i := range n {
nodeName := fmt.Sprintf("a1-k8s-abcd%03d.domain.tld", i)
neigh := Neighbour{
NodeName: nodeName,
NodeHash: sha256String(nodeName),
}
nh = append(nh, &neigh)
}

return
}

func TestNodeFiltering(t *testing.T) {

n := 1_000
neighbourLimit := 10
nh := generateNeighbours(n)
require.NotNil(t, nh)

trueOsHostname := osHostname
defer func() { osHostname = trueOsHostname }()

// fake client, with a dummy neighbour pod
// fakeClient := fake.NewFakeClient(&fakeNeighbourPod)
checker := Checker{
NeighbourLimit: neighbourLimit,
}

t.Run("all nodes should get NEIGHBOUR_LIMIT checks", func(t *testing.T) {
counter := make(map[string]int, n)

for i := range n {
osHostname = func() (name string, err error) {
return nh[i].NodeName, nil
}
filtered := checker.filterNeighbours(nh)
require.Equal(t, neighbourLimit, len(filtered))

for _, neigh := range filtered {
counter[neigh.NodeName]++
}
}

for _, count := range counter {
require.Equal(t, neighbourLimit, count, "one node didn't receive exactly NEIGHBOUR_LIMIT checks")
}

})
}
13 changes: 7 additions & 6 deletions internal/servicecheck/types.go
Expand Up @@ -29,6 +29,7 @@ type Checker struct {
// Neighbourhood
KubenurseNamespace string
NeighbourFilter string
NeighbourLimit int
allowUnschedulable bool
SkipCheckNeighbourhood bool

Expand Down Expand Up @@ -57,12 +58,12 @@ type Checker struct {

// Result contains the result of a performed check run
type Result struct {
APIServerDirect string `json:"api_server_direct"`
APIServerDNS string `json:"api_server_dns"`
MeIngress string `json:"me_ingress"`
MeService string `json:"me_service"`
NeighbourhoodState string `json:"neighbourhood_state"`
Neighbourhood []Neighbour `json:"neighbourhood"`
APIServerDirect string `json:"api_server_direct"`
APIServerDNS string `json:"api_server_dns"`
MeIngress string `json:"me_ingress"`
MeService string `json:"me_service"`
NeighbourhoodState string `json:"neighbourhood_state"`
Neighbourhood []*Neighbour `json:"neighbourhood"`
}

// Check is the signature used by all checks that the checker can execute.
Expand Down

0 comments on commit a0b49bb

Please sign in to comment.