Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

servicecontroller fixes: exponential backoff #21982

Merged
merged 1 commit into from
Mar 5, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 48 additions & 16 deletions pkg/controller/service/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ const (
// How long to wait before retrying the processing of a service change.
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
// should be changed appropriately.
processingRetryInterval = 5 * time.Second
minRetryDelay = 5 * time.Second
maxRetryDelay = 300 * time.Second

clientRetryCount = 5
clientRetryInterval = 5 * time.Second

retryable = true
notRetryable = false

doNotRetry = time.Duration(0)
)

type cachedService struct {
Expand All @@ -61,6 +64,9 @@ type cachedService struct {

// Ensures only one goroutine can operate on this service at any given time.
mu sync.Mutex

// Controls error back-off
lastRetryDelay time.Duration
}

type serviceCache struct {
Expand Down Expand Up @@ -184,21 +190,26 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
glog.Errorf("Received nil delta from watcher queue.")
continue
}
err, shouldRetry := s.processDelta(delta)
if shouldRetry {
err, retryDelay := s.processDelta(delta)
if retryDelay != 0 {
// Add the failed service back to the queue so we'll retry it.
glog.Errorf("Failed to process service delta. Retrying: %v", err)
time.Sleep(processingRetryInterval)
serviceQueue.AddIfNotPresent(deltas)
glog.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err)
go func(deltas cache.Deltas, delay time.Duration) {
time.Sleep(delay)
if err := serviceQueue.AddIfNotPresent(deltas); err != nil {
glog.Errorf("Error requeuing service delta - will not retry: %v", err)
}
}(deltas, retryDelay)
} else if err != nil {
runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
}
}
}

// Returns an error if processing the delta failed, along with a boolean
// indicator of whether the processing should be retried.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
// Returns an error if processing the delta failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Duration) {
deltaService, ok := delta.Object.(*api.Service)
var namespacedName types.NamespacedName
var cachedService *cachedService
Expand All @@ -208,11 +219,11 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
// cache for deleting.
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), doNotRetry
}
cachedService, ok = s.cache.get(key.Key)
if !ok {
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
}
deltaService = cachedService.lastState
delta.Object = deltaService
Expand All @@ -236,19 +247,21 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name)
if err != nil && !errors.IsNotFound(err) {
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
return err, retryable
return err, cachedService.nextRetryDelay()
} else if errors.IsNotFound(err) {
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(deltaService), s.zone.Region)
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
return err, retryable
return err, cachedService.nextRetryDelay()
}
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(namespacedName.String())
return nil, notRetryable

cachedService.resetRetryDelay()
return nil, doNotRetry
}

// Update the cached service (used above for populating synthetic deletes)
Expand All @@ -265,7 +278,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
message += err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)

return err, retry
return err, cachedService.nextRetryDelay()
}
// Always update the cache upon success.
// NOTE: Since we update the cached service if and only if we successfully
Expand All @@ -274,7 +287,8 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
cachedService.appliedState = service
s.cache.set(namespacedName.String(), cachedService)

return nil, notRetryable
cachedService.resetRetryDelay()
return nil, doNotRetry
}

// Returns whatever error occurred along with a boolean indicator of whether it
Expand Down Expand Up @@ -738,3 +752,21 @@ func wantsLoadBalancer(service *api.Service) bool {
func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
}

// Computes the next retry, using exponential backoff
// mutex must be held.
func (s *cachedService) nextRetryDelay() time.Duration {
s.lastRetryDelay = s.lastRetryDelay * 2
if s.lastRetryDelay < minRetryDelay {
s.lastRetryDelay = minRetryDelay
}
if s.lastRetryDelay > maxRetryDelay {
s.lastRetryDelay = maxRetryDelay
}
return s.lastRetryDelay
}

// Resets the retry exponential backoff. mutex must be held.
func (s *cachedService) resetRetryDelay() {
s.lastRetryDelay = time.Duration(0)
}