Skip to content

Commit

Permalink
fix: avoid initial 1s delay and use exponential backoff
Browse files Browse the repository at this point in the history
Signed-off-by: Jakob Möller <jmoller@redhat.com>
  • Loading branch information
jakobmoellerdev committed Apr 25, 2024
1 parent 86e72f5 commit 1ac3277
Showing 1 changed file with 71 additions and 73 deletions.
144 changes: 71 additions & 73 deletions internal/driver/internal/k8s/logicalvolume_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// ErrVolumeNotFound represents the specified volume is not found.
var ErrVolumeNotFound = errors.New("VolumeID is not found")
var ErrMismatchedSize = errors.New("current size and requested size are mismatched")
var ErrMissingStatusCurrentSize = errors.New("status.currentSize is missing")
var ErrMissingStatusVolumeID = errors.New("status.volumeID is missing")

// LogicalVolumeService represents service for LogicalVolume.
// This is not concurrent safe, must take lock on caller.
Expand Down Expand Up @@ -238,23 +243,17 @@ func (s *LogicalVolumeService) DeleteVolume(ctx context.Context, volumeID string
}

// wait until delete the target volume
for {
logger.Info("waiting for delete LogicalVolume", "name", lv.Name)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
}

err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, new(topolvmv1.LogicalVolume))
if err != nil {
return wait.ExponentialBackoffWithContext(ctx, retry.DefaultBackoff, func(ctx context.Context) (done bool, err error) {
if err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, new(topolvmv1.LogicalVolume)); err != nil {
if apierrors.IsNotFound(err) {
return nil
return true, nil
}
logger.Error(err, "failed to get LogicalVolume", "name", lv.Name)
return err
return false, err
}
}
logger.Info("waiting for delete LogicalVolume", "name", lv.Name)
return false, nil
})
}

// CreateSnapshot creates a snapshot of existing volume.
Expand Down Expand Up @@ -313,36 +312,39 @@ func (s *LogicalVolumeService) ExpandVolume(ctx context.Context, volumeID string
return err
}

// wait until topolvm-node expands the target volume
for {
logger.Info("waiting for update of 'status.currentSize'", "name", lv.Name)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}

return wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
Duration: 1 * time.Second, // initial backoff
Factor: 2, // factor for duration increase
Jitter: 0.1,
Cap: 10 * time.Second, // wait up to 10 seconds per retry, 1, 2, 4, 8, 10 seconds.
Steps: 30, // run at most 10 times
}, func(ctx context.Context) (done bool, err error) {
var changedLV topolvmv1.LogicalVolume
err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, &changedLV)
if err != nil {
if err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, &changedLV); err != nil {
logger.Error(err, "failed to get LogicalVolume", "name", lv.Name)
return err
return true, err
}

if changedLV.Status.Code != codes.OK {
return status.Error(changedLV.Status.Code, changedLV.Status.Message)
return true, status.Error(changedLV.Status.Code, changedLV.Status.Message)
}

if changedLV.Status.CurrentSize == nil {
logger.Info("waiting for update of 'status.currentSize'", "name", lv.Name)
// WA: since Status.CurrentSize is added in v0.4.0. it may be missing.
// if the expansion is completed, it is filled, so wait for that.
continue
return false, ErrMissingStatusCurrentSize
}

if changedLV.Status.CurrentSize.Value() != changedLV.Spec.Size.Value() {
logger.Info("failed to match current size and requested size", "current", changedLV.Status.CurrentSize.Value(), "requested", changedLV.Spec.Size.Value())
continue
logger.Info("waiting for update of 'status.currentSize'", "name", lv.Name)
return false, ErrMismatchedSize
}

return nil
}
logger.Info("LogicalVolume successfully expanded", "volume_id", changedLV.Status.VolumeID)
return true, nil
})
}

// GetVolume returns LogicalVolume by volume ID.
Expand All @@ -352,64 +354,60 @@ func (s *LogicalVolumeService) GetVolume(ctx context.Context, volumeID string) (

// updateSpecSize updates .Spec.Size of LogicalVolume.
func (s *LogicalVolumeService) updateSpecSize(ctx context.Context, volumeID string, size *resource.Quantity) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}

lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
return err
}

lv.Spec.Size = *size
if lv.Annotations == nil {
lv.Annotations = make(map[string]string)
}
lv.Annotations[topolvm.GetResizeRequestedAtKey()] = time.Now().UTC().String()

if err := s.writer.Update(ctx, lv); err != nil {
if apierrors.IsConflict(err) {
logger.Info("detect conflict when LogicalVolume spec update", "name", lv.Name)
continue
return wait.ExponentialBackoffWithContext(ctx,
retry.DefaultBackoff,
func(ctx context.Context) (done bool, err error) {
lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
return true, err
}
logger.Error(err, "failed to update LogicalVolume spec", "name", lv.Name)
return err
}

return nil
}
lv.Spec.Size = *size
if lv.Annotations == nil {
lv.Annotations = make(map[string]string)
}
lv.Annotations[topolvm.GetResizeRequestedAtKey()] = time.Now().UTC().String()

if err := s.writer.Update(ctx, lv); err != nil {
if apierrors.IsConflict(err) {
logger.Info("detected conflict when trying to update LogicalVolume spec", "name", lv.Name)
return false, err
} else {
logger.Error(err, "failed to update LogicalVolume spec", "name", lv.Name)
return true, err
}
}
return true, nil
})
}

// waitForStatusUpdate waits for logical volume creation/failure/timeout, whichever comes first.
func (s *LogicalVolumeService) waitForStatusUpdate(ctx context.Context, name string) (string, error) {
for {
logger.Info("waiting for setting 'status.volumeID'", "name", name)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(100 * time.Millisecond):
}

var volumeID string
return volumeID, wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
Duration: 1 * time.Second, // initial backoff
Factor: 2, // factor for duration increase
Jitter: 0.1,
Cap: 10 * time.Second, // wait up to 10 seconds per retry, 1, 2, 4, 8, 10 seconds.
Steps: 10, // run at most 10 times
}, func(ctx context.Context) (done bool, err error) {
var newLV topolvmv1.LogicalVolume
err := s.getter.Get(ctx, client.ObjectKey{Name: name}, &newLV)
if err != nil {
if err := s.getter.Get(ctx, client.ObjectKey{Name: name}, &newLV); err != nil {
logger.Error(err, "failed to get LogicalVolume", "name", name)
return "", err
return true, err
}
if newLV.Status.VolumeID != "" {
logger.Info("end k8s.LogicalVolume", "volume_id", newLV.Status.VolumeID)
return newLV.Status.VolumeID, nil
logger.Info("LogicalVolume successfully provisioned", "volume_id", newLV.Status.VolumeID)
volumeID = newLV.Status.VolumeID
return true, nil
}
if newLV.Status.Code != codes.OK {
err := s.writer.Delete(ctx, &newLV)
if err != nil {
// log this error but do not return this error, because newLV.Status.Message is more important
logger.Error(err, "failed to delete LogicalVolume")
}
return "", status.Error(newLV.Status.Code, newLV.Status.Message)
return true, status.Error(newLV.Status.Code, newLV.Status.Message)
}
}
return false, ErrMissingStatusVolumeID
})
}

0 comments on commit 1ac3277

Please sign in to comment.