Skip to content

Commit

Permalink
Merge pull request #903 from jakobmoellerdev/avoid-1s-wait-on-loop
Browse files Browse the repository at this point in the history
fix: avoid initial 1s delay and use exponential backoff on spec / status checks for LogicalVolume
  • Loading branch information
daichimukai committed May 15, 2024
2 parents d7d81c8 + c85dde0 commit 8eec8e5
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 95 deletions.
10 changes: 8 additions & 2 deletions internal/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,13 @@ func (s controllerServerNoLocked) ControllerGetCapabilities(context.Context, *cs

func (s controllerServerNoLocked) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
ctrlLogger.Info("ControllerExpandVolume called",
"volumeID", volumeID,
logger := ctrlLogger.WithValues("volumeID", volumeID,
"required", req.GetCapacityRange().GetRequiredBytes(),
"limit", req.GetCapacityRange().GetLimitBytes(),
"num_secrets", len(req.GetSecrets()))

logger.Info("ControllerExpandVolume called")

if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id is nil")
}
Expand Down Expand Up @@ -657,6 +658,7 @@ func (s controllerServerNoLocked) ControllerExpandVolume(ctx context.Context, re
}

if requestCapacityBytes <= currentSize.Value() {
logger.Info("ControllerExpandVolume is waiting for node expansion to complete")
// "NodeExpansionRequired" is still true because it is unknown
// whether node expansion is completed or not.
return &csi.ControllerExpandVolumeResponse{
Expand All @@ -673,6 +675,7 @@ func (s controllerServerNoLocked) ControllerExpandVolume(ctx context.Context, re
return nil, status.Error(codes.Internal, "not enough space")
}

logger.Info("ControllerExpandVolume triggering lvService.ExpandVolume")
err = s.lvService.ExpandVolume(ctx, volumeID, requestCapacityBytes)
if err != nil {
_, ok := status.FromError(err)
Expand All @@ -681,6 +684,9 @@ func (s controllerServerNoLocked) ControllerExpandVolume(ctx context.Context, re
}
return nil, err
}

logger.Info("ControllerExpandVolume has succeeded")

return &csi.ControllerExpandVolumeResponse{
CapacityBytes: requestCapacityBytes,
NodeExpansionRequired: true,
Expand Down
165 changes: 87 additions & 78 deletions internal/driver/internal/k8s/logicalvolume_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/topolvm/topolvm"
Expand All @@ -16,6 +17,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -222,7 +225,7 @@ func (s *LogicalVolumeService) DeleteVolume(ctx context.Context, volumeID string

lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
if err == ErrVolumeNotFound {
if errors.Is(err, ErrVolumeNotFound) {
logger.Info("volume is not found", "volume_id", volumeID)
return nil
}
Expand All @@ -238,23 +241,23 @@ func (s *LogicalVolumeService) DeleteVolume(ctx context.Context, volumeID string
}

// wait until delete the target volume
for {
logger.Info("waiting for delete LogicalVolume", "name", lv.Name)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
}

err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, new(topolvmv1.LogicalVolume))
if err != nil {
return wait.Backoff{
Duration: 100 * time.Millisecond, // initial backoff
Factor: 2, // factor for duration increase
Jitter: 0.1,
Steps: math.MaxInt, // run for infinity; we assume context gets canceled
Cap: 10 * time.Second,
}.DelayFunc().Until(ctx, true, false, func(ctx context.Context) (bool, error) {
if err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, new(topolvmv1.LogicalVolume)); err != nil {
if apierrors.IsNotFound(err) {
return nil
return true, nil
}
logger.Error(err, "failed to get LogicalVolume", "name", lv.Name)
return err
return false, err
}
}
logger.Info("waiting for LogicalVolume to be deleted", "name", lv.Name)
return false, nil
})
}

// CreateSnapshot creates a snapshot of existing volume.
Expand Down Expand Up @@ -301,48 +304,58 @@ func (s *LogicalVolumeService) CreateSnapshot(ctx context.Context, node, dc, sou

// ExpandVolume expands volume
func (s *LogicalVolumeService) ExpandVolume(ctx context.Context, volumeID string, requestBytes int64) error {
logger.Info("k8s.ExpandVolume called", "volumeID", volumeID, "size", requestBytes)
logger := logger.WithValues("volume_id", volumeID, "size", requestBytes)
logger.Info("k8s.ExpandVolume called")
request := resource.NewQuantity(requestBytes, resource.BinarySI)

lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
return err
}

err = s.updateSpecSize(ctx, volumeID, resource.NewQuantity(requestBytes, resource.BinarySI))
err = s.updateSpecSize(ctx, volumeID, request)
if err != nil {
return err
}

// wait until topolvm-node expands the target volume
for {
logger.Info("waiting for update of 'status.currentSize'", "name", lv.Name)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}

return wait.Backoff{
Duration: 1 * time.Second, // initial backoff
Factor: 2, // factor for duration increase
Jitter: 0.1,
Steps: math.MaxInt, // run for infinity; we assume context gets canceled
Cap: 10 * time.Second,
}.DelayFunc().Until(ctx, true, false, func(ctx context.Context) (bool, error) {
var changedLV topolvmv1.LogicalVolume
err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, &changedLV)
if err != nil {
if err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, &changedLV); err != nil {
logger.Error(err, "failed to get LogicalVolume", "name", lv.Name)
return err
return false, err
}

if changedLV.Status.Code != codes.OK {
return status.Error(changedLV.Status.Code, changedLV.Status.Message)
return false, status.Error(changedLV.Status.Code, changedLV.Status.Message)
}

if changedLV.Status.CurrentSize == nil {
logger.Info("waiting for update of 'status.currentSize' "+
"to be filled initially", "name", lv.Name)
// WA: since Status.CurrentSize is added in v0.4.0. it may be missing.
// if the expansion is completed, it is filled, so wait for that.
continue
return false, nil
}
if changedLV.Status.CurrentSize.Value() != changedLV.Spec.Size.Value() {
logger.Info("failed to match current size and requested size", "current", changedLV.Status.CurrentSize.Value(), "requested", changedLV.Spec.Size.Value())
continue

if changedLV.Status.CurrentSize.Cmp(*request) != 0 {
logger.Info("waiting for update of 'status.currentSize' to be updated to signal successful expansion",
"name", lv.Name,
"status.currentSize", changedLV.Status.CurrentSize,
"spec.size", changedLV.Spec.Size,
"request", request,
)
return false, nil
}

return nil
}
logger.Info("LogicalVolume successfully expanded")
return true, nil
})
}

// GetVolume returns LogicalVolume by volume ID.
Expand All @@ -352,64 +365,60 @@ func (s *LogicalVolumeService) GetVolume(ctx context.Context, volumeID string) (

// updateSpecSize updates .Spec.Size of LogicalVolume.
func (s *LogicalVolumeService) updateSpecSize(ctx context.Context, volumeID string, size *resource.Quantity) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}

lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
return err
}

lv.Spec.Size = *size
if lv.Annotations == nil {
lv.Annotations = make(map[string]string)
}
lv.Annotations[topolvm.GetResizeRequestedAtKey()] = time.Now().UTC().String()

if err := s.writer.Update(ctx, lv); err != nil {
if apierrors.IsConflict(err) {
logger.Info("detect conflict when LogicalVolume spec update", "name", lv.Name)
continue
return wait.ExponentialBackoffWithContext(ctx,
retry.DefaultBackoff,
func(ctx context.Context) (bool, error) {
lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
return false, err
}
logger.Error(err, "failed to update LogicalVolume spec", "name", lv.Name)
return err
}

return nil
}
lv.Spec.Size = *size
if lv.Annotations == nil {
lv.Annotations = make(map[string]string)
}
lv.Annotations[topolvm.GetResizeRequestedAtKey()] = time.Now().UTC().String()

if err := s.writer.Update(ctx, lv); err != nil {
if apierrors.IsConflict(err) {
logger.Info("detected conflict when trying to update LogicalVolume spec", "name", lv.Name)
return false, nil
} else {
logger.Error(err, "failed to update LogicalVolume spec", "name", lv.Name)
return false, err
}
}
return true, nil
})
}

// waitForStatusUpdate waits for logical volume creation/failure/timeout, whichever comes first.
func (s *LogicalVolumeService) waitForStatusUpdate(ctx context.Context, name string) (string, error) {
for {
logger.Info("waiting for setting 'status.volumeID'", "name", name)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(100 * time.Millisecond):
}

var volumeID string
return volumeID, wait.Backoff{
Duration: 1 * time.Second, // initial backoff
Factor: 2, // factor for duration increase
Jitter: 0.1,
Steps: math.MaxInt, // run for infinity; we assume context gets canceled
Cap: 10 * time.Second,
}.DelayFunc().Until(ctx, true, false, func(ctx context.Context) (bool, error) {
var newLV topolvmv1.LogicalVolume
err := s.getter.Get(ctx, client.ObjectKey{Name: name}, &newLV)
if err != nil {
if err := s.getter.Get(ctx, client.ObjectKey{Name: name}, &newLV); err != nil {
logger.Error(err, "failed to get LogicalVolume", "name", name)
return "", err
return false, err
}
if newLV.Status.VolumeID != "" {
logger.Info("end k8s.LogicalVolume", "volume_id", newLV.Status.VolumeID)
return newLV.Status.VolumeID, nil
logger.Info("LogicalVolume successfully provisioned", "volume_id", newLV.Status.VolumeID)
volumeID = newLV.Status.VolumeID
return true, nil
}
if newLV.Status.Code != codes.OK {
err := s.writer.Delete(ctx, &newLV)
if err != nil {
// log this error but do not return this error, because newLV.Status.Message is more important
logger.Error(err, "failed to delete LogicalVolume")
}
return "", status.Error(newLV.Status.Code, newLV.Status.Message)
return false, status.Error(newLV.Status.Code, newLV.Status.Message)
}
}
return false, nil
})
}
24 changes: 10 additions & 14 deletions internal/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package driver

import (
"context"
"errors"
"io"
"os"
"path"
Expand Down Expand Up @@ -454,13 +455,12 @@ func (s *nodeServerNoLocked) NodeGetVolumeStats(ctx context.Context, req *csi.No
func (s *nodeServerNoLocked) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
volumePath := req.GetVolumePath()

nodeLogger.Info("NodeExpandVolume is called",
"volume_id", volumeID,
logger := nodeLogger.WithValues("volume_id", volumeID,
"volume_path", volumePath,
"required", req.GetCapacityRange().GetRequiredBytes(),
"limit", req.GetCapacityRange().GetLimitBytes(),
)
"limit", req.GetCapacityRange().GetLimitBytes())

logger.Info("NodeExpandVolume is called")

if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "no volume_id is provided")
Expand All @@ -480,10 +480,7 @@ func (s *nodeServerNoLocked) NodeExpandVolume(ctx context.Context, req *csi.Node
}

if isBlock := req.GetVolumeCapability().GetBlock() != nil; isBlock {
nodeLogger.Info("NodeExpandVolume(block) is skipped",
"volume_id", volumeID,
"target_path", volumePath,
)
logger.Info("NodeExpandVolume(block) is skipped")
return &csi.NodeExpandVolumeResponse{}, nil
}

Expand All @@ -492,7 +489,7 @@ func (s *nodeServerNoLocked) NodeExpandVolume(ctx context.Context, req *csi.Node
deviceClass := topolvm.DefaultDeviceClassName
if err == nil {
deviceClass = lvr.Spec.DeviceClass
} else if err != k8s.ErrVolumeNotFound {
} else if !errors.Is(err, k8s.ErrVolumeNotFound) {
return nil, err
}
lv, err := s.getLvFromContext(ctx, deviceClass, volumeID)
Expand All @@ -517,16 +514,15 @@ func (s *nodeServerNoLocked) NodeExpandVolume(ctx context.Context, req *csi.Node
if len(devicePath) == 0 {
return nil, status.Errorf(codes.Internal, "filesystem %s is not mounted at %s", volumeID, volumePath)
}
logger = logger.WithValues("device", devicePath)

logger.Info("triggering filesystem resize")
r := mountutil.NewResizeFs(s.mounter.Exec)
if _, err := r.Resize(device, volumePath); err != nil {
return nil, status.Errorf(codes.Internal, "failed to resize filesystem %s (mounted at: %s): %v", volumeID, volumePath, err)
}

nodeLogger.Info("NodeExpandVolume(fs) is succeeded",
"volume_id", volumeID,
"target_path", volumePath,
)
logger.Info("NodeExpandVolume(fs) is succeeded")

// `capacity_bytes` in NodeExpandVolumeResponse is defined as OPTIONAL.
// If this field needs to be filled, the value should be equal to `.status.currentSize` of the corresponding
Expand Down
7 changes: 7 additions & 0 deletions internal/lvmd/lvservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ func (s *lvService) ResizeLV(ctx context.Context, req *proto.ResizeLVRequest) (*
return nil, status.Error(codes.Internal, fmt.Sprintf("unsupported device class target: %s", dc.Type))
}

logger.Info(
"lvservice request - ResizeLV",
"requested", requested,
"current", current,
"free", free,
)

if free < (requested - current) {
logger.Error(err, "no enough space left on VG",
"requested", requested,
Expand Down
Loading

0 comments on commit 8eec8e5

Please sign in to comment.