Skip to content

Commit

Permalink
Merge pull request #1911 from edreed/edreed-add-to-operation-queue-race
Browse files Browse the repository at this point in the history
[V2] fix: fix segfault due to empty operation queue
  • Loading branch information
k8s-ci-robot committed Jul 20, 2023
2 parents 6b1261f + e3f59cf commit d384db4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
10 changes: 7 additions & 3 deletions pkg/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,13 @@ type operationQueue struct {
isActive bool
}

func (q *operationQueue) remove(element *list.Element) {
// operationQueue might have been cleared before the lock was acquired
// so always check if the list is empty or not before removing object from the queue, otherwise it would set the underlying length of the queue to be < 0, causing issues
// Remove the element from the queue if the queue is not empty and the element
// is in the list.
//
// Because the queue may have been cleared while the element is in use, this
// function checks if the queue is empty before removing the element to prevent
// underflow of the queue length.
func (q *operationQueue) safeRemove(element *list.Element) {
if q.Front() != nil {
_ = q.Remove(element)
}
Expand Down
28 changes: 17 additions & 11 deletions pkg/controller/shared_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,35 +205,41 @@ func (c *SharedState) addToOperationQueue(ctx context.Context, volumeName string
})
lockable.Unlock()

// if first operation, start goroutine
// If this is the first operation, start the goroutine
if isFirst {
go func() {
lockable.Lock()
defer lockable.Unlock()
operationQueue := lockable.entry.(*operationQueue)

for {
operationQueue := lockable.entry.(*operationQueue)
// pop the first operation
// Get the first operation exiting the loop if the queue is empty.
front := operationQueue.Front()
if front == nil {
break
}

operation := front.Value.(*replicaOperation)

// only run the operation if the operation requester is not enlisted in blacklist
// Only run the operation if the operation requester is not enlisted in blacklist
if !operationQueue.gcExclusionList.has(operation.requester) {

// Release the lock while executing the operation to avoid deadlocks.
lockable.Unlock()
err := operation.operationFunc(operation.ctx)
lockable.Lock()

if shouldRequeueReplicaOperation(operation.isReplicaGarbageCollection, err) {
// if operation failed, push it to the end of the queue
// If operation failed, push it to the end of the queue if the queue is
// still active.
if operationQueue.isActive {
operationQueue.PushBack(operation)
}
}
}

operationQueue.remove(front)
// there is no entry remaining, exit the loop
if operationQueue.Front() == nil {
break
}
// Remove the operation from the queue.
operationQueue.safeRemove(front)
}
}()
}
Expand Down Expand Up @@ -301,7 +307,7 @@ func (c *SharedState) dequeueGarbageCollection(volumeName string) {
for cur := queue.Front(); cur != nil; cur = next {
next = cur.Next()
if cur.Value.(*replicaOperation).isReplicaGarbageCollection {
queue.remove(cur)
queue.safeRemove(cur)
}
}
}
Expand Down

0 comments on commit d384db4

Please sign in to comment.