Skip to content

Commit

Permalink
fix: minor bugfix in replic management logic
Browse files Browse the repository at this point in the history
fix: minor bugfix in replic management logic
  • Loading branch information
landreasyan committed Sep 15, 2022
1 parent cee1ff9 commit 7482738
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 16 deletions.
3 changes: 2 additions & 1 deletion pkg/controller/attach_detach.go
Expand Up @@ -209,7 +209,8 @@ func (r *ReconcileAttachDetach) triggerAttach(ctx context.Context, azVolumeAttac
if publishCtx = attachResult.PublishContext(); publishCtx != nil {
handleSuccess(false)
}
if attachErr = <-attachResult.ResultChannel(); attachErr != nil {
var ok bool
if attachErr, ok = <-attachResult.ResultChannel(); !ok || attachErr != nil {
handleError()
} else {
handleSuccess(true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/azvolume.go
Expand Up @@ -289,7 +289,7 @@ func (r *ReconcileAzVolume) triggerDelete(ctx context.Context, azVolume *azdiskv

// start waiting for replica AzVolumeAttachment CRIs to be deleted
for i, attachment := range attachments {
waiter, err := r.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectDeleted)
waiter, err := r.conditionWatcher.NewConditionWaiter(deleteCtx, watcher.AzVolumeAttachmentType, attachment.Name, verifyObjectFailedOrDeleted)
if err != nil {
updateFunc = func(obj client.Object) error {
return reportError(obj, err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/common.go
Expand Up @@ -995,6 +995,13 @@ func verifyObjectDeleted(obj interface{}, objectDeleted bool) (bool, error) {
if obj == nil || objectDeleted {
return true, nil
}
return false, nil
}

func verifyObjectFailedOrDeleted(obj interface{}, objectDeleted bool) (bool, error) {
if obj == nil || objectDeleted {
return true, nil
}

// otherwise, the volume detachment has either failed with error or pending
azVolumeAttachmentInstance := obj.(*azdiskv1beta2.AzVolumeAttachment)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/replica.go
Expand Up @@ -78,7 +78,7 @@ func (r *ReconcileReplica) Reconcile(ctx context.Context, request reconcile.Requ

// If promotion event, create a replacement replica
if isAttached(azVolumeAttachment) && azVolumeAttachment.Status.Detail.PreviousRole == azdiskv1beta2.ReplicaRole {
r.triggerManageReplica(ctx, azVolumeAttachment.Spec.VolumeName)
r.triggerManageReplica(azVolumeAttachment.Spec.VolumeName)
}
}
} else {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (r *ReconcileReplica) Reconcile(ctx context.Context, request reconcile.Requ
_, _ = waiter.Wait(goCtx)

// add replica management operation to the queue
r.triggerManageReplica(goCtx, azVolumeAttachment.Spec.VolumeName)
r.triggerManageReplica(azVolumeAttachment.Spec.VolumeName)
}()
}
} else if azVolumeAttachment.Status.State == azdiskv1beta2.AttachmentFailed {
Expand All @@ -125,7 +125,7 @@ func (r *ReconcileReplica) Reconcile(ctx context.Context, request reconcile.Requ
}

//nolint:contextcheck // context is not inherited by design
func (r *ReconcileReplica) triggerManageReplica(ctx context.Context, volumeName string) {
func (r *ReconcileReplica) triggerManageReplica(volumeName string) {
manageReplicaCtx, w := workflow.New(context.Background(), workflow.WithDetails(consts.VolumeNameLabel, volumeName))
defer w.Finish(nil)
r.addToOperationQueue(
Expand Down
20 changes: 9 additions & 11 deletions pkg/controller/shared_state.go
Expand Up @@ -177,7 +177,6 @@ func (c *SharedState) addToOperationQueue(ctx context.Context, volumeName string
}
lockable := v.(*lockableEntry)
lockable.Lock()

isFirst := lockable.entry.(*operationQueue).Len() <= 0
_ = lockable.entry.(*operationQueue).PushBack(&replicaOperation{
ctx: ctx,
Expand All @@ -199,35 +198,34 @@ func (c *SharedState) addToOperationQueue(ctx context.Context, volumeName string
if isFirst {
go func() {
lockable.Lock()
defer lockable.Unlock()
for {
operationQueue := lockable.entry.(*operationQueue)
// pop the first operation
front := operationQueue.Front()
operation := front.Value.(*replicaOperation)
lockable.Unlock()

// only run the operation if the operation requester is not enlisted in blacklist
if !operationQueue.gcExclusionList.has(operation.requester) {
if err := operation.operationFunc(operation.ctx); err != nil {
lockable.Unlock()
err := operation.operationFunc(operation.ctx)
lockable.Lock()
if err != nil {
if shouldRequeueReplicaOperation(operation.isReplicaGarbageCollection, err) {
// if failed, push it to the end of the queue
lockable.Lock()
if operationQueue.isActive {
operationQueue.PushBack(operation)
}
lockable.Unlock()
}
}
}

lockable.Lock()
operationQueue.remove(front)
// there is no entry remaining, exit the loop
if operationQueue.Front() == nil {
break
}
}
lockable.Unlock()
}()
}
}
Expand All @@ -241,8 +239,8 @@ func (c *SharedState) deleteOperationQueue(volumeName string) {
// clear the queue in case, there still is an entry in queue
lockable := v.(*lockableEntry)
lockable.Lock()
defer lockable.Unlock()
lockable.entry.(*operationQueue).Init()
lockable.Unlock()
}

func (c *SharedState) closeOperationQueue(volumeName string) func() {
Expand All @@ -265,8 +263,8 @@ func (c *SharedState) addToGcExclusionList(volumeName string, target operationRe
}
lockable := v.(*lockableEntry)
lockable.Lock()
defer lockable.Unlock()
lockable.entry.(*operationQueue).gcExclusionList.add(target)
lockable.Unlock()
}

func (c *SharedState) removeFromExclusionList(volumeName string, target operationRequester) {
Expand All @@ -276,8 +274,8 @@ func (c *SharedState) removeFromExclusionList(volumeName string, target operatio
}
lockable := v.(*lockableEntry)
lockable.Lock()
defer lockable.Unlock()
delete(lockable.entry.(*operationQueue).gcExclusionList, target)
lockable.Unlock()
}

func (c *SharedState) dequeueGarbageCollection(volumeName string) {
Expand All @@ -287,6 +285,7 @@ func (c *SharedState) dequeueGarbageCollection(volumeName string) {
}
lockable := v.(*lockableEntry)
lockable.Lock()
defer lockable.Unlock()
queue := lockable.entry.(*operationQueue)
// look for garbage collection operation in the queue and remove from queue
var next *list.Element
Expand All @@ -296,7 +295,6 @@ func (c *SharedState) dequeueGarbageCollection(volumeName string) {
queue.remove(cur)
}
}
lockable.Unlock()
}

func (c *SharedState) getVolumesFromPod(ctx context.Context, podName string) ([]string, error) {
Expand Down

0 comments on commit 7482738

Please sign in to comment.