Skip to content

Commit

Permalink
KCCM: add test validating slow node sync issue
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderConstantinescu committed Nov 3, 2023
1 parent e8d4559 commit a8673fa
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 36 deletions.
211 changes: 178 additions & 33 deletions staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go
Expand Up @@ -23,6 +23,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -146,13 +147,16 @@ func defaultExternalService() *v1.Service {
return newService("external-balancer", v1.ServiceTypeLoadBalancer)
}

func alwaysReady() bool { return true }

func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
// newController creates a new service controller. Callers have the option to
// specify `stopChan` for test cases which might require running the
// node/service informers and reacting to resource events. Callers can also
// specify `objects` which represent the initial state of objects, used to
// populate the client set / informer cache at start-up.
func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) {
cloud := &fakecloud.Cloud{}
cloud.Region = region

kubeClient := fake.NewSimpleClientset()
kubeClient := fake.NewSimpleClientset(objects...)
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
serviceInformer := informerFactory.Core().V1().Services()
nodeInformer := informerFactory.Core().V1().Nodes()
Expand All @@ -162,26 +166,36 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})

controller := &Controller{
cloud: cloud,
kubeClient: kubeClient,
clusterName: "test-cluster",
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: newFakeNodeLister(nil),
nodeListerSynced: nodeInformer.Informer().HasSynced,
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
lastSyncedNodes: []*v1.Node{},
cloud: cloud,
kubeClient: kubeClient,
clusterName: "test-cluster",
eventBroadcaster: broadcaster,
eventRecorder: recorder,
serviceLister: serviceInformer.Lister(),
serviceListerSynced: serviceInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"),
lastSyncedNodes: []*v1.Node{},
}

informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)

serviceMap := make(map[string]*cachedService)
services, _ := serviceInformer.Lister().List(labels.Everything())
for _, service := range services {
serviceMap[service.Name] = &cachedService{
state: service,
}
}

controller.cache = &serviceCache{serviceMap: serviceMap}

balancer, _ := cloud.LoadBalancer()
controller.balancer = balancer

controller.serviceLister = serviceInformer.Lister()

controller.nodeListerSynced = alwaysReady
controller.serviceListerSynced = alwaysReady
controller.eventRecorder = record.NewFakeRecorder(100)

cloud.Calls = nil // ignore any cloud calls made in init()
Expand Down Expand Up @@ -265,7 +279,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
controller, cloud, client := newController()
controller, cloud, client := newController(nil)
cloud.Exists = tc.lbExists
key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name)
if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil {
Expand Down Expand Up @@ -439,7 +453,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
t.Run(item.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
controller, cloud, _ := newController()
controller, cloud, _ := newController(nil)
controller.nodeLister = newFakeNodeLister(nil, nodes...)
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 {
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
Expand Down Expand Up @@ -590,7 +604,8 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) {
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
}} {
t.Run(tc.desc, func(t *testing.T) {
controller, cloud, _ := newController()
controller, cloud, _ := newController(nil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -762,7 +777,8 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) {
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
}} {
t.Run(tc.desc, func(t *testing.T) {
controller, cloud, _ := newController()
controller, cloud, _ := newController(nil)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -806,7 +822,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName()))
}

controller, cloud, _ := newController()
controller, cloud, _ := newController(nil)
for _, tc := range []struct {
desc string
nodes []*v1.Node
Expand Down Expand Up @@ -901,8 +917,28 @@ func compareUpdateCalls(t *testing.T, left, right []fakecloud.UpdateBalancerCall
}
}

// compareHostSets compares if the nodes in left are in right, despite the order.
func compareHostSets(t *testing.T, left, right []*v1.Node) bool {
if len(left) != len(right) {
return false
}
for _, lHost := range left {
found := false
for _, rHost := range right {
if reflect.DeepEqual(lHost, rHost) {
found = true
break
}
}
if !found {
return false
}
}
return true
}

func TestNodesNotEqual(t *testing.T) {
controller, cloud, _ := newController()
controller, cloud, _ := newController(nil)

services := []*v1.Service{
newService("s0", v1.ServiceTypeLoadBalancer),
Expand Down Expand Up @@ -952,7 +988,9 @@ func TestNodesNotEqual(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...)

controller.lastSyncedNodes = tc.lastSyncNodes

controller.updateLoadBalancerHosts(ctx, services, 5)
compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls)
cloud.UpdateCalls = []fakecloud.UpdateBalancerCall{}
Expand All @@ -961,7 +999,7 @@ func TestNodesNotEqual(t *testing.T) {
}

func TestProcessServiceCreateOrUpdate(t *testing.T) {
controller, _, client := newController()
controller, _, client := newController(nil)

//A pair of old and new loadbalancer IP address
oldLBIP := "192.168.1.1"
Expand Down Expand Up @@ -1076,7 +1114,7 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) {
svc := newService(svcName, v1.ServiceTypeLoadBalancer)
// Preset finalizer so k8s error only happens when patching status.
svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer}
controller, _, client := newController()
controller, _, client := newController(nil)
client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.k8sErr
})
Expand Down Expand Up @@ -1120,7 +1158,7 @@ func TestSyncService(t *testing.T) {
testName: "if an invalid service name is synced",
key: "invalid/key/string",
updateFn: func() {
controller, _, _ = newController()
controller, _, _ = newController(nil)
},
expectedFn: func(e error) error {
//TODO: should find a way to test for dependent package errors in such a way that it won't break
Expand Down Expand Up @@ -1152,7 +1190,7 @@ func TestSyncService(t *testing.T) {
key: "external-balancer",
updateFn: func() {
testSvc := defaultExternalService()
controller, _, _ = newController()
controller, _, _ = newController(nil)
controller.enqueueService(testSvc)
svc := controller.cache.getOrCreate("external-balancer")
svc.state = testSvc
Expand Down Expand Up @@ -1258,7 +1296,7 @@ func TestProcessServiceDeletion(t *testing.T) {
defer cancel()

//Create a new controller.
controller, cloud, _ = newController()
controller, cloud, _ = newController(nil)
tc.updateFn(controller)
obtainedErr := controller.processServiceDeletion(ctx, svcKey)
if err := tc.expectedFn(obtainedErr); err != nil {
Expand Down Expand Up @@ -1333,8 +1371,115 @@ func TestNeedsCleanup(t *testing.T) {

}

func TestNeedsUpdate(t *testing.T) {
// This tests a service update while a slow node sync is happening. If we have multiple
// services to process from a node sync: each service will experience a sync delta.
// If a new Node is added and a service is synced while this happens: we want to
// make sure that the slow node sync never removes the Node from LB set because it
// has stale data.
func TestSlowNodeSync(t *testing.T) {
stopCh, updateCallCh := make(chan struct{}), make(chan fakecloud.UpdateBalancerCall)
defer close(stopCh)
defer close(updateCallCh)

duration := time.Millisecond

syncService := make(chan string)

node1 := makeNode(tweakName("node1"))
node2 := makeNode(tweakName("node2"))
node3 := makeNode(tweakName("node3"))
service1 := newService("service1", v1.ServiceTypeLoadBalancer)
service2 := newService("service2", v1.ServiceTypeLoadBalancer)

sKey1, _ := cache.MetaNamespaceKeyFunc(service1)
sKey2, _ := cache.MetaNamespaceKeyFunc(service2)
serviceKeys := sets.New(sKey1, sKey2)

controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2)
cloudProvider.RequestDelay = 4 * duration
cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) {
updateCallCh <- update
}
cloudProvider.EnsureCallCb = func(update fakecloud.UpdateBalancerCall) {
updateCallCh <- update
}

// Three update calls are expected. This is because this test calls
// controller.syncNodes once with two existing services, so we will have an
// update call for each service, and controller.syncService once. The end
// result is therefore three update calls. Each update call takes
// cloudProvider.RequestDelay to process. The test asserts that the order of
// the Hosts defined by the update calls is respected, but doesn't
// necessarily assert the order of the Service. This is because the
// controller implementation doesn't use a deterministic order when syncing
// services. The test therefor works out which service is impacted by the
// slow node sync (which will be whatever service is not synced first) and
// then validates that the Hosts for each update call is respected.
expectedUpdateCalls := []fakecloud.UpdateBalancerCall{
// First update call for first service from controller.syncNodes
{Service: service1, Hosts: []*v1.Node{node1, node2}},
// Second update call for impacted service from controller.syncService
{Service: service2, Hosts: []*v1.Node{node1, node2, node3}},
// Third update call for second service from controller.syncNodes. Here
// is the problem: this update call removes the previously added node3.
{Service: service2, Hosts: []*v1.Node{node1, node2}},
}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
controller.syncNodes(context.TODO(), 1)
}()

wg.Add(1)
go func() {
defer wg.Done()
updateCallIdx := 0
impactedService := ""
for update := range updateCallCh {
// Validate that the call hosts are what we expect
if !compareHostSets(t, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts) {
t.Errorf("unexpected updated hosts for update: %v, expected: %v, got: %v", updateCallIdx, expectedUpdateCalls[updateCallIdx].Hosts, update.Hosts)
return
}
key, _ := cache.MetaNamespaceKeyFunc(update.Service)
// For call 0: determine impacted service
if updateCallIdx == 0 {
impactedService = serviceKeys.Difference(sets.New(key)).UnsortedList()[0]
syncService <- impactedService
}
// For calls > 0: validate the impacted service
if updateCallIdx > 0 {
if key != impactedService {
t.Error("unexpected impacted service")
return
}
}
if updateCallIdx == len(expectedUpdateCalls)-1 {
return
}
updateCallIdx++
}
}()

key := <-syncService
if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil {
t.Fatalf("error creating node3, err: %v", err)
}

// Give it some time to update the informer cache, needs to be lower than
// cloudProvider.RequestDelay
time.Sleep(duration)
// Sync the service
if err := controller.syncService(context.TODO(), key); err != nil {
t.Errorf("unexpected service sync error, err: %v", err)
}

wg.Wait()
}

func TestNeedsUpdate(t *testing.T) {
testCases := []struct {
testName string //Name of the test case
updateFn func() (*v1.Service, *v1.Service) //Function to update the service object
Expand Down Expand Up @@ -1494,7 +1639,7 @@ func TestNeedsUpdate(t *testing.T) {
expectedNeedsUpdate: true,
}}

controller, _, _ := newController()
controller, _, _ := newController(nil)
for _, tc := range testCases {
oldSvc, newSvc := tc.updateFn()
obtainedResult := controller.needsUpdate(oldSvc, newSvc)
Expand Down Expand Up @@ -2441,7 +2586,7 @@ func TestServiceQueueDelay(t *testing.T) {

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
controller, cloud, client := newController()
controller, cloud, client := newController(nil)
queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")}
controller.serviceQueue = queue
cloud.Err = tc.lbCloudErr
Expand Down

0 comments on commit a8673fa

Please sign in to comment.