Skip to content

Commit

Permalink
Only process delete once per object.
Browse files Browse the repository at this point in the history
Signed-off-by: Nadia Pinaeva <npinaeva@redhat.com>
(cherry picked from commit bbd773c)
  • Loading branch information
npinaeva authored and trozet committed Dec 15, 2022
1 parent a80d091 commit 5bf74e6
Showing 1 changed file with 26 additions and 10 deletions.
36 changes: 26 additions & 10 deletions go-controller/pkg/retry/obj_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ type RetryFramework struct {
// channel to indicate we need to retry objs immediately
retryChan chan struct{}

watchFactory *factory.WatchFactory
ResourceHandler *ResourceHandler
watchFactory *factory.WatchFactory
ResourceHandler *ResourceHandler
terminatedObjects sync.Map
}

// NewRetryFramework returns a new RetryFramework instance, essential for the whole retry logic.
Expand All @@ -88,10 +89,11 @@ func NewRetryFramework(
watchFactory *factory.WatchFactory,
resourceHandler *ResourceHandler) *RetryFramework {
return &RetryFramework{
retryEntries: syncmap.NewSyncMap[*retryObjEntry](),
retryChan: make(chan struct{}, 1),
watchFactory: watchFactory,
ResourceHandler: resourceHandler,
retryEntries: syncmap.NewSyncMap[*retryObjEntry](),
retryChan: make(chan struct{}, 1),
watchFactory: watchFactory,
ResourceHandler: resourceHandler,
terminatedObjects: sync.Map{},
}
}

Expand Down Expand Up @@ -396,10 +398,17 @@ var (
// free its resources. (for now, this applies to completed pods)
// processObjectInTerminalState doesn't unlock key
func (r *RetryFramework) processObjectInTerminalState(obj interface{}, lockedKey string, event resourceEvent) {
_, loaded := r.terminatedObjects.LoadOrStore(lockedKey, true)
if loaded {
// object was already terminated
klog.Infof("Detected object %s of type %s in terminal state (e.g. completed) will be " +
"ignored as it has already been processed")
return
}

// The object is in a terminal state: delete it from the cluster, delete its retry entry and return.
klog.Infof("Detected object %s of type %s in terminal state (e.g. completed)"+
" during %s event: will remove it", lockedKey, r.ResourceHandler.ObjType, event)

internalCacheEntry := r.ResourceHandler.GetInternalCacheEntry(obj)
retryEntry := r.InitRetryObjWithDelete(obj, lockedKey, internalCacheEntry, true) // set up the retry obj for deletion
if err := r.ResourceHandler.DeleteResource(obj, internalCacheEntry); err != nil {
Expand Down Expand Up @@ -642,9 +651,16 @@ func (r *RetryFramework) WatchResourceFiltered(namespaceForFilteredHandler strin
// If object is in terminal state, we would have already deleted it during update.
// No reason to attempt to delete it here again.
if r.ResourceHandler.IsObjectInTerminalState(obj) {
klog.Infof("Ignoring delete event for resource in terminal state %s %s",
r.ResourceHandler.ObjType, key)
return
// If object is in terminal state, check if we have already processed it in a previous update.
// We cannot blindly handle multiple delete operations for the same pod currently. There can be races
// where other pod handlers are removing IP addresses from address sets when they shouldn't be, etc.
// See: https://github.com/ovn-org/ovn-kubernetes/pull/3318#issuecomment-1349804450
if _, loaded := r.terminatedObjects.LoadAndDelete(key); loaded {
// object was already terminated
klog.Infof("Ignoring delete event for resource in terminal state %s %s",
r.ResourceHandler.ObjType, key)
return
}
}
r.DoWithLock(key, func(key string) {
internalCacheEntry := r.ResourceHandler.GetInternalCacheEntry(obj)
Expand Down

0 comments on commit 5bf74e6

Please sign in to comment.