diff --git a/pkg/controller/service/BUILD b/pkg/controller/service/BUILD index 15b4804a20531..9c6daacb8076f 100644 --- a/pkg/controller/service/BUILD +++ b/pkg/controller/service/BUILD @@ -46,10 +46,13 @@ go_test( "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/controller:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index 21d56029e9282..7681958e2326f 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -319,6 +319,10 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S service.Status.LoadBalancer = *newState if err := s.persistUpdate(service); err != nil { + // TODO: This logic needs to be revisited. We might want to retry on all the errors, not just conflicts. + if errors.IsConflict(err) { + return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v", service.Namespace, service.Name, err) + } runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err)) return nil } @@ -347,8 +351,7 @@ func (s *ServiceController) persistUpdate(service *v1.Service) error { // TODO: Try to resolve the conflict if the change was unrelated to load // balancer status. For now, just pass it up the stack. if errors.IsConflict(err) { - return fmt.Errorf("not persisting update to service '%s/%s' that has been changed since we received it: %v", - service.Namespace, service.Name, err) + return err } glog.Warningf("Failed to persist updated LoadBalancerStatus to service '%s/%s' after creating its load balancer: %v", service.Namespace, service.Name, err) diff --git a/pkg/controller/service/service_controller_test.go b/pkg/controller/service/service_controller_test.go index 962fe62b1b223..bbc4d24aae8c2 100644 --- a/pkg/controller/service/service_controller_test.go +++ b/pkg/controller/service/service_controller_test.go @@ -17,15 +17,20 @@ limitations under the License. package service import ( + "errors" "fmt" "reflect" + "strings" "testing" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/testapi" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" @@ -323,7 +328,6 @@ func TestGetNodeConditionPredicate(t *testing.T) { // TODO(a-robinson): Add tests for update/sync/delete. func TestProcessServiceUpdate(t *testing.T) { - var controller *ServiceController //A pair of old and new loadbalancer IP address @@ -411,6 +415,38 @@ func TestProcessServiceUpdate(t *testing.T) { } +// TestConflictWhenProcessServiceUpdate tests if processServiceUpdate will +// retry creating the load balancer if the update operation returns a conflict +// error. +func TestConflictWhenProcessServiceUpdate(t *testing.T) { + svcName := "conflict-lb" + svc := newService(svcName, types.UID("123"), v1.ServiceTypeLoadBalancer) + controller, _, client := newController() + client.PrependReactor("update", "services", func(action core.Action) (bool, runtime.Object, error) { + update := action.(core.UpdateAction) + return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), svcName, errors.New("Object changed")) + }) + + svcCache := controller.cache.getOrCreate(svcName) + if err := controller.processServiceUpdate(svcCache, svc, svcName); err == nil { + t.Fatalf("controller.processServiceUpdate() = nil, want error") + } + + retryMsg := "Error creating load balancer (will retry)" + if gotEvent := func() bool { + events := controller.eventRecorder.(*record.FakeRecorder).Events + for len(events) > 0 { + e := <-events + if strings.Contains(e, retryMsg) { + return true + } + } + return false + }(); !gotEvent { + t.Errorf("controller.processServiceUpdate() = can't find retry creating lb event, want event contains %q", retryMsg) + } +} + func TestSyncService(t *testing.T) { var controller *ServiceController