New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes service controller update race condition #55336
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,7 +89,6 @@ type serviceCache struct { | |
type ServiceController struct { | ||
cloud cloudprovider.Interface | ||
knownHosts []*v1.Node | ||
servicesToUpdate []*v1.Service | ||
kubeClient clientset.Interface | ||
clusterName string | ||
balancer cloudprovider.LoadBalancer | ||
|
@@ -241,6 +240,20 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s | |
} | ||
} | ||
} | ||
|
||
if cachedService.state != nil { | ||
if !s.needsUpdate(cachedService.state, service) { | ||
// The service does not require an update which means it was placed on the work queue | ||
// by the node sync loop and indicates that the hosts need to be updated. | ||
err := s.updateLoadBalancerHosts(service) | ||
if err != nil { | ||
return err, cachedService.nextRetryDelay() | ||
} | ||
cachedService.resetRetryDelay() | ||
return nil, doNotRetry | ||
} | ||
} | ||
|
||
// cache the service, we need the info for service deletion | ||
cachedService.state = service | ||
err, retry := s.createLoadBalancerIfNeeded(key, service) | ||
|
@@ -435,6 +448,8 @@ func (s *serviceCache) delete(serviceName string) { | |
delete(s.serviceMap, serviceName) | ||
} | ||
|
||
// needsUpdate checks to see if there were any changes between the old and new service that would require a load balancer update. | ||
// This method does not and should not check if the hosts have changed. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool { | ||
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) { | ||
return false | ||
|
@@ -637,62 +652,45 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate { | |
} | ||
} | ||
|
||
// nodeSyncLoop handles updating the hosts pointed to by all load | ||
// balancers whenever the set of nodes in the cluster changes. | ||
// nodeSyncLoop handles adding all existing cached services to the work queue | ||
// to be reprocessed so that they can have their hosts updated, if any | ||
// host changes have occurred since the last sync loop. | ||
func (s *ServiceController) nodeSyncLoop() { | ||
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) | ||
if err != nil { | ||
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) | ||
return | ||
} | ||
|
||
if nodeSlicesEqualForLB(newHosts, s.knownHosts) { | ||
// The set of nodes in the cluster hasn't changed, but we can retry | ||
// updating any services that we failed to update last time around. | ||
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) | ||
// Nothing to do since the hosts have not changed. | ||
return | ||
} | ||
|
||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", | ||
nodeNames(newHosts)) | ||
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts)) | ||
|
||
// Try updating all services, and save the ones that fail to try again next | ||
// round. | ||
s.servicesToUpdate = s.cache.allServices() | ||
numServices := len(s.servicesToUpdate) | ||
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) | ||
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", | ||
numServices-len(s.servicesToUpdate), numServices) | ||
for _, svc := range s.cache.allServices() { | ||
s.enqueueService(svc) | ||
} | ||
|
||
// Update the known hosts so we can check next sync loop for changes. | ||
s.knownHosts = newHosts | ||
} | ||
|
||
// updateLoadBalancerHosts updates all existing load balancers so that | ||
// they will match the list of hosts provided. | ||
// Returns the list of services that couldn't be updated. | ||
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { | ||
for _, service := range services { | ||
func() { | ||
if service == nil { | ||
return | ||
} | ||
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil { | ||
glog.Errorf("External error while updating load balancer: %v.", err) | ||
servicesToRetry = append(servicesToRetry, service) | ||
} | ||
}() | ||
} | ||
return servicesToRetry | ||
} | ||
|
||
// Updates the load balancer of a service, assuming we hold the mutex | ||
// associated with the service. | ||
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { | ||
// Updates the load balancer of the service with updated nodes ONLY. | ||
// This method will not trigger the cloud provider to create or full update a load balancer. | ||
func (s *ServiceController) updateLoadBalancerHosts(service *v1.Service) error { | ||
if !wantsLoadBalancer(service) { | ||
return nil | ||
} | ||
|
||
hosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event | ||
err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) | ||
err = s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) | ||
if err == nil { | ||
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. | ||
if len(hosts) == 0 { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package service | |
import ( | ||
"fmt" | ||
"reflect" | ||
"sort" | ||
"testing" | ||
"time" | ||
|
||
|
@@ -27,6 +28,8 @@ import ( | |
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/informers" | ||
"k8s.io/client-go/kubernetes/fake" | ||
corelisters "k8s.io/client-go/listers/core/v1" | ||
"k8s.io/client-go/tools/cache" | ||
"k8s.io/client-go/tools/record" | ||
"k8s.io/kubernetes/pkg/api/testapi" | ||
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" | ||
|
@@ -174,31 +177,53 @@ func TestCreateExternalLoadBalancer(t *testing.T) { | |
} | ||
} | ||
|
||
// newLoadBalancerNode returns a node that passes the predicate check for a | ||
// node to receive load balancer traffic. | ||
func newLoadBalancerNode(name string) *v1.Node { | ||
return &v1.Node{ | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Name: name, | ||
}, | ||
Spec: v1.NodeSpec{ | ||
Unschedulable: false, | ||
}, | ||
Status: v1.NodeStatus{ | ||
Conditions: []v1.NodeCondition{ | ||
{Type: v1.NodeReady, Status: v1.ConditionTrue}, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func sortNodesByName(nodes []*v1.Node) { | ||
sort.Slice(nodes, func(i, j int) bool { | ||
return nodes[i].Name < nodes[j].Name | ||
}) | ||
} | ||
|
||
// TODO: Finish converting and update comments | ||
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { | ||
|
||
nodes := []*v1.Node{ | ||
{ObjectMeta: metav1.ObjectMeta{Name: "node0"}}, | ||
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, | ||
{ObjectMeta: metav1.ObjectMeta{Name: "node73"}}, | ||
newLoadBalancerNode("node0"), | ||
newLoadBalancerNode("node1"), | ||
newLoadBalancerNode("node73"), | ||
} | ||
table := []struct { | ||
sortNodesByName(nodes) | ||
|
||
table := map[string]struct { | ||
services []*v1.Service | ||
expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall | ||
}{ | ||
{ | ||
// No services present: no calls should be made. | ||
services: []*v1.Service{}, | ||
expectedUpdateCalls: nil, | ||
}, | ||
{ | ||
"update no load balancer": { | ||
// Services do not have external load balancers: no calls should be made. | ||
services: []*v1.Service{ | ||
newService("s0", "111", v1.ServiceTypeClusterIP), | ||
newService("s1", "222", v1.ServiceTypeNodePort), | ||
}, | ||
expectedUpdateCalls: nil, | ||
}, | ||
{ | ||
"update 1 load balancer": { | ||
// Services does have an external load balancer: one call should be made. | ||
services: []*v1.Service{ | ||
newService("s0", "333", v1.ServiceTypeLoadBalancer), | ||
|
@@ -207,7 +232,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { | |
{Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes}, | ||
}, | ||
}, | ||
{ | ||
"update 3 load balancers": { | ||
// Three services have an external load balancer: three calls. | ||
services: []*v1.Service{ | ||
newService("s0", "444", v1.ServiceTypeLoadBalancer), | ||
|
@@ -220,7 +245,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { | |
{Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes}, | ||
}, | ||
}, | ||
{ | ||
"update 2 load balancers": { | ||
// Two services have an external load balancer and two don't: two calls. | ||
services: []*v1.Service{ | ||
newService("s0", "777", v1.ServiceTypeNodePort), | ||
|
@@ -233,30 +258,44 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { | |
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes}, | ||
}, | ||
}, | ||
{ | ||
// One service has an external load balancer and one is nil: one call. | ||
services: []*v1.Service{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was this case removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not possible for a service to be nil when |
||
newService("s0", "234", v1.ServiceTypeLoadBalancer), | ||
nil, | ||
}, | ||
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ | ||
{Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes}, | ||
}, | ||
}, | ||
} | ||
for _, item := range table { | ||
controller, cloud, _ := newController() | ||
|
||
var services []*v1.Service | ||
for _, service := range item.services { | ||
services = append(services, service) | ||
} | ||
if err := controller.updateLoadBalancerHosts(services, nodes); err != nil { | ||
t.Errorf("unexpected error: %v", err) | ||
} | ||
if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { | ||
t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls) | ||
} | ||
for name, item := range table { | ||
t.Run(name, func(t *testing.T) { | ||
controller, cloud, _ := newController() | ||
|
||
var services []*v1.Service | ||
for _, service := range item.services { | ||
services = append(services, service) | ||
} | ||
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) | ||
for _, node := range nodes { | ||
nodeIndexer.Add(node) | ||
} | ||
controller.nodeLister = corelisters.NewNodeLister(nodeIndexer) | ||
|
||
for _, service := range services { | ||
if err := controller.updateLoadBalancerHosts(service); err != nil { | ||
t.Errorf("unexpected error: %v", err) | ||
} | ||
} | ||
|
||
if len(item.expectedUpdateCalls) != len(cloud.UpdateCalls) { | ||
t.Errorf("expected %d update calls but only got %d", len(item.expectedUpdateCalls), len(cloud.UpdateCalls)) | ||
} | ||
|
||
for i, expectedCall := range item.expectedUpdateCalls { | ||
actualCall := cloud.UpdateCalls[i] | ||
if !reflect.DeepEqual(expectedCall.Service, actualCall.Service) { | ||
t.Errorf("expected update call to contain service %+v, got %+v", expectedCall.Service, actualCall.Service) | ||
} | ||
|
||
sortNodesByName(actualCall.Hosts) | ||
if !reflect.DeepEqual(expectedCall.Hosts, actualCall.Hosts) { | ||
t.Errorf("expected update call to contain hosts %+v, got %+v", expectedCall.Hosts, actualCall.Hosts) | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
|
@@ -311,6 +350,13 @@ func TestProcessServiceUpdate(t *testing.T) { | |
var controller *ServiceController | ||
var cloud *fakecloud.FakeCloud | ||
|
||
nodes := []*v1.Node{ | ||
newLoadBalancerNode("node0"), | ||
newLoadBalancerNode("node1"), | ||
newLoadBalancerNode("node73"), | ||
} | ||
sortNodesByName(nodes) | ||
|
||
//A pair of old and new loadbalancer IP address | ||
oldLBIP := "192.168.1.1" | ||
newLBIP := "192.168.1.11" | ||
|
@@ -344,6 +390,51 @@ func TestProcessServiceUpdate(t *testing.T) { | |
return nil | ||
}, | ||
}, | ||
{ | ||
testName: "If updating hosts only", | ||
key: "default/sync-test-name", | ||
svc: newService("sync-test-name", types.UID("sync-test-uid"), v1.ServiceTypeLoadBalancer), | ||
updateFn: func(svc *v1.Service) *v1.Service { | ||
keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName() | ||
cachedServiceTest := controller.cache.getOrCreate(keyExpected) | ||
cachedServiceTest.state = svc | ||
controller.cache.set(keyExpected, cachedServiceTest) | ||
|
||
// Set the nodes for the cloud's UpdateLoadBalancer call to use. | ||
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) | ||
for _, node := range nodes { | ||
nodeIndexer.Add(node) | ||
} | ||
controller.nodeLister = corelisters.NewNodeLister(nodeIndexer) | ||
|
||
// This should trigger the needsUpdate false check since the service equals the cached service | ||
return svc | ||
}, | ||
expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error { | ||
if err != nil { | ||
return err | ||
} | ||
if retryDuration != doNotRetry { | ||
return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration) | ||
} | ||
|
||
if len(cloud.UpdateCalls) != 1 { | ||
return fmt.Errorf("expected one update host call but only got %+v", cloud.UpdateCalls) | ||
} | ||
|
||
actualCall := cloud.UpdateCalls[0] | ||
if !reflect.DeepEqual(svc, actualCall.Service) { | ||
return fmt.Errorf("expected update call to contain service %+v, got %+v", svc, actualCall.Service) | ||
} | ||
|
||
sortNodesByName(actualCall.Hosts) | ||
if !reflect.DeepEqual(nodes, actualCall.Hosts) { | ||
return fmt.Errorf("expected update call to contain hosts %+v, got %+v", nodes, actualCall.Hosts) | ||
} | ||
|
||
return nil | ||
}, | ||
}, | ||
{ | ||
testName: "If Updating Loadbalancer IP", | ||
key: "default/sync-test-name", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inferring source from state seems a little odd… does doing host updates when needsUpdate returns false mean this controller will do work on every service even when it should be in steady state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Services are only added to the queue under three conditions:
needsUpdate
which is checked in the informer OnUpdate.The
UpdateLoadBalancer
method is supposed to be cheap for cloud providers. It was added so that cloud providers could have a method that handled only updating load balancer hosts.This approach should not (and doesn't from what I tell in my tests) add any extra calls from before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From #56443. I think the logic here is too hacky and may not cover all corner cases. What we really want here is to distinguish "service update" and "nodeSync update". The condition
!s.needsUpdate(cachedService.state, service)
is too broad that it includes also the retry case of "service update" (given how we cache service).Besides, putting "nodeSync update" into the same work queue as "service update" might introduce another problem that one update could override the other. Ref #52495 (comment), within the working queue we don't save duplicate key, if both "nodeSync update" and "service update" come in before anyone leaves the queue, it will end up with only one update (depends on how we decide what update it is). It seems to me that the working queue mechanism also needs to be adjusted before we can put in "nodeSync update".
cc @bowei
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One update won't override the other because the sync checks if the service needs an update. Both cloud provider calls (
EnsureLoadBalancer
andUpdateLoadBalancer
) will update the hosts, so if both happen it will go withEnsureLoadBalancer
which is what we want.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the other PR #56448 (comment), the finalizer support will clean this all up and I think we should revert this until the finalizer PR cleans up all the cache/delete logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this assumption is inaccurate, ref the LoadBalancer interface, it doesn't explicitly define
EnsureLoadBalancer()
should update the hosts:kubernetes/pkg/cloudprovider/cloud.go
Lines 100 to 109 in e5aec86
And in fact,
EnsureLoadBalancer()
in GCE cloudprovider doesn't update hosts. Hence for GCE this is the case where "service update" overrides "nodeSync update".There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should include the backends (nodes). Why would it partially update the load balancer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MrHohn from what I see GCE does make sure that backends are up-to-date on EnsureLoadBalancer calls via this method, which is called by
ensureInternalLoadBalancer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for gathering the links, I didn't look into the internal one before, seems like it does check for hosts update. Though ATM the external one doesn't check for hosts update.
kubernetes/pkg/cloudprovider/providers/gce/gce_loadbalancer_external.go
Lines 675 to 676 in e57accb
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, missed that comment. The external one is definitely more complex than the internal one 👼
It sounds like we should definitely revert this then and we need to come to an agreement on what
EnsureLoadBalancer
andUpdateLoadBalancer
should do for each cloud provider. It was my understanding thatEnsureLoadBalancer
should completely update the load balancer, which is how AWS, Azure, Oracle & DigitalOcean handle it (only ones i checked). cc @wlan0 @luxasI'll open a PR to revert this for 1.9 @MrHohn