Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid initial 1s delay and use exponential backoff on spec / status checks for LogicalVolume #903

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,13 @@ func (s controllerServerNoLocked) ControllerGetCapabilities(context.Context, *cs

func (s controllerServerNoLocked) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
ctrlLogger.Info("ControllerExpandVolume called",
"volumeID", volumeID,
logger := ctrlLogger.WithValues("volumeID", volumeID,
"required", req.GetCapacityRange().GetRequiredBytes(),
"limit", req.GetCapacityRange().GetLimitBytes(),
"num_secrets", len(req.GetSecrets()))

logger.Info("ControllerExpandVolume called")

if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "volume id is nil")
}
Expand Down Expand Up @@ -657,6 +658,7 @@ func (s controllerServerNoLocked) ControllerExpandVolume(ctx context.Context, re
}

if requestCapacityBytes <= currentSize.Value() {
logger.Info("ControllerExpandVolume is waiting for node expansion to complete")
// "NodeExpansionRequired" is still true because it is unknown
// whether node expansion is completed or not.
return &csi.ControllerExpandVolumeResponse{
Expand All @@ -673,6 +675,7 @@ func (s controllerServerNoLocked) ControllerExpandVolume(ctx context.Context, re
return nil, status.Error(codes.Internal, "not enough space")
}

logger.Info("ControllerExpandVolume triggering lvService.ExpandVolume")
err = s.lvService.ExpandVolume(ctx, volumeID, requestCapacityBytes)
if err != nil {
_, ok := status.FromError(err)
Expand All @@ -681,6 +684,9 @@ func (s controllerServerNoLocked) ControllerExpandVolume(ctx context.Context, re
}
return nil, err
}

logger.Info("ControllerExpandVolume has succeeded")

return &csi.ControllerExpandVolumeResponse{
CapacityBytes: requestCapacityBytes,
NodeExpansionRequired: true,
Expand Down
165 changes: 87 additions & 78 deletions internal/driver/internal/k8s/logicalvolume_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/topolvm/topolvm"
Expand All @@ -16,6 +17,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -222,7 +225,7 @@ func (s *LogicalVolumeService) DeleteVolume(ctx context.Context, volumeID string

lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
if err == ErrVolumeNotFound {
if errors.Is(err, ErrVolumeNotFound) {
logger.Info("volume is not found", "volume_id", volumeID)
return nil
}
Expand All @@ -238,23 +241,23 @@ func (s *LogicalVolumeService) DeleteVolume(ctx context.Context, volumeID string
}

// wait until delete the target volume
for {
logger.Info("waiting for delete LogicalVolume", "name", lv.Name)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
}

err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, new(topolvmv1.LogicalVolume))
if err != nil {
return wait.Backoff{
Duration: 100 * time.Millisecond, // initial backoff
Factor: 2, // factor for duration increase
Jitter: 0.1,
Steps: math.MaxInt, // run for infinity; we assume context gets canceled
Cap: 10 * time.Second,
}.DelayFunc().Until(ctx, true, false, func(ctx context.Context) (bool, error) {
if err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, new(topolvmv1.LogicalVolume)); err != nil {
if apierrors.IsNotFound(err) {
return nil
return true, nil
}
logger.Error(err, "failed to get LogicalVolume", "name", lv.Name)
return err
return false, err
}
}
logger.Info("waiting for LogicalVolume to be deleted", "name", lv.Name)
return false, nil
})
}

// CreateSnapshot creates a snapshot of existing volume.
Expand Down Expand Up @@ -301,48 +304,58 @@ func (s *LogicalVolumeService) CreateSnapshot(ctx context.Context, node, dc, sou

// ExpandVolume expands volume
func (s *LogicalVolumeService) ExpandVolume(ctx context.Context, volumeID string, requestBytes int64) error {
logger.Info("k8s.ExpandVolume called", "volumeID", volumeID, "size", requestBytes)
logger := logger.WithValues("volume_id", volumeID, "size", requestBytes)
logger.Info("k8s.ExpandVolume called")
request := resource.NewQuantity(requestBytes, resource.BinarySI)

lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
return err
}

err = s.updateSpecSize(ctx, volumeID, resource.NewQuantity(requestBytes, resource.BinarySI))
err = s.updateSpecSize(ctx, volumeID, request)
if err != nil {
return err
}

// wait until topolvm-node expands the target volume
for {
logger.Info("waiting for update of 'status.currentSize'", "name", lv.Name)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}

return wait.Backoff{
Duration: 1 * time.Second, // initial backoff
Factor: 2, // factor for duration increase
Jitter: 0.1,
Steps: math.MaxInt, // run for infinity; we assume context gets canceled
Cap: 10 * time.Second,
}.DelayFunc().Until(ctx, true, false, func(ctx context.Context) (bool, error) {
var changedLV topolvmv1.LogicalVolume
err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, &changedLV)
if err != nil {
if err := s.getter.Get(ctx, client.ObjectKey{Name: lv.Name}, &changedLV); err != nil {
logger.Error(err, "failed to get LogicalVolume", "name", lv.Name)
return err
return false, err
}

if changedLV.Status.Code != codes.OK {
return status.Error(changedLV.Status.Code, changedLV.Status.Message)
return false, status.Error(changedLV.Status.Code, changedLV.Status.Message)
}

if changedLV.Status.CurrentSize == nil {
logger.Info("waiting for update of 'status.currentSize' "+
"to be filled initially", "name", lv.Name)
// WA: since Status.CurrentSize is added in v0.4.0. it may be missing.
// if the expansion is completed, it is filled, so wait for that.
continue
return false, nil
}
if changedLV.Status.CurrentSize.Value() != changedLV.Spec.Size.Value() {
logger.Info("failed to match current size and requested size", "current", changedLV.Status.CurrentSize.Value(), "requested", changedLV.Spec.Size.Value())
continue

if changedLV.Status.CurrentSize.Cmp(*request) != 0 {
logger.Info("waiting for update of 'status.currentSize' to be updated to signal successful expansion",
"name", lv.Name,
"status.currentSize", changedLV.Status.CurrentSize,
"spec.size", changedLV.Spec.Size,
"request", request,
)
return false, nil
}

return nil
}
logger.Info("LogicalVolume successfully expanded")
return true, nil
})
}

// GetVolume returns LogicalVolume by volume ID.
Expand All @@ -352,64 +365,60 @@ func (s *LogicalVolumeService) GetVolume(ctx context.Context, volumeID string) (

// updateSpecSize updates .Spec.Size of LogicalVolume.
func (s *LogicalVolumeService) updateSpecSize(ctx context.Context, volumeID string, size *resource.Quantity) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}

lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
return err
}

lv.Spec.Size = *size
if lv.Annotations == nil {
lv.Annotations = make(map[string]string)
}
lv.Annotations[topolvm.GetResizeRequestedAtKey()] = time.Now().UTC().String()

if err := s.writer.Update(ctx, lv); err != nil {
if apierrors.IsConflict(err) {
logger.Info("detect conflict when LogicalVolume spec update", "name", lv.Name)
continue
return wait.ExponentialBackoffWithContext(ctx,
retry.DefaultBackoff,
func(ctx context.Context) (bool, error) {
toshipp marked this conversation as resolved.
Show resolved Hide resolved
lv, err := s.GetVolume(ctx, volumeID)
if err != nil {
return false, err
}
logger.Error(err, "failed to update LogicalVolume spec", "name", lv.Name)
return err
}

return nil
}
lv.Spec.Size = *size
if lv.Annotations == nil {
lv.Annotations = make(map[string]string)
}
lv.Annotations[topolvm.GetResizeRequestedAtKey()] = time.Now().UTC().String()

if err := s.writer.Update(ctx, lv); err != nil {
if apierrors.IsConflict(err) {
logger.Info("detected conflict when trying to update LogicalVolume spec", "name", lv.Name)
return false, nil
} else {
logger.Error(err, "failed to update LogicalVolume spec", "name", lv.Name)
return false, err
}
}
return true, nil
})
}

// waitForStatusUpdate waits for logical volume creation/failure/timeout, whichever comes first.
func (s *LogicalVolumeService) waitForStatusUpdate(ctx context.Context, name string) (string, error) {
for {
logger.Info("waiting for setting 'status.volumeID'", "name", name)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(100 * time.Millisecond):
}

var volumeID string
return volumeID, wait.Backoff{
Duration: 1 * time.Second, // initial backoff
toshipp marked this conversation as resolved.
Show resolved Hide resolved
Factor: 2, // factor for duration increase
Jitter: 0.1,
Steps: math.MaxInt, // run for infinity; we assume context gets canceled
Cap: 10 * time.Second,
}.DelayFunc().Until(ctx, true, false, func(ctx context.Context) (bool, error) {
var newLV topolvmv1.LogicalVolume
err := s.getter.Get(ctx, client.ObjectKey{Name: name}, &newLV)
if err != nil {
if err := s.getter.Get(ctx, client.ObjectKey{Name: name}, &newLV); err != nil {
logger.Error(err, "failed to get LogicalVolume", "name", name)
return "", err
return false, err
}
if newLV.Status.VolumeID != "" {
logger.Info("end k8s.LogicalVolume", "volume_id", newLV.Status.VolumeID)
return newLV.Status.VolumeID, nil
logger.Info("LogicalVolume successfully provisioned", "volume_id", newLV.Status.VolumeID)
volumeID = newLV.Status.VolumeID
return true, nil
}
if newLV.Status.Code != codes.OK {
err := s.writer.Delete(ctx, &newLV)
if err != nil {
// log this error but do not return this error, because newLV.Status.Message is more important
logger.Error(err, "failed to delete LogicalVolume")
}
return "", status.Error(newLV.Status.Code, newLV.Status.Message)
return false, status.Error(newLV.Status.Code, newLV.Status.Message)
}
}
return false, nil
})
}
24 changes: 10 additions & 14 deletions internal/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package driver

import (
"context"
"errors"
"io"
"os"
"path"
Expand Down Expand Up @@ -454,13 +455,12 @@ func (s *nodeServerNoLocked) NodeGetVolumeStats(ctx context.Context, req *csi.No
func (s *nodeServerNoLocked) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
volumeID := req.GetVolumeId()
volumePath := req.GetVolumePath()

nodeLogger.Info("NodeExpandVolume is called",
"volume_id", volumeID,
logger := nodeLogger.WithValues("volume_id", volumeID,
"volume_path", volumePath,
"required", req.GetCapacityRange().GetRequiredBytes(),
"limit", req.GetCapacityRange().GetLimitBytes(),
)
"limit", req.GetCapacityRange().GetLimitBytes())

logger.Info("NodeExpandVolume is called")

if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "no volume_id is provided")
Expand All @@ -480,10 +480,7 @@ func (s *nodeServerNoLocked) NodeExpandVolume(ctx context.Context, req *csi.Node
}

if isBlock := req.GetVolumeCapability().GetBlock() != nil; isBlock {
nodeLogger.Info("NodeExpandVolume(block) is skipped",
"volume_id", volumeID,
"target_path", volumePath,
)
logger.Info("NodeExpandVolume(block) is skipped")
return &csi.NodeExpandVolumeResponse{}, nil
}

Expand All @@ -492,7 +489,7 @@ func (s *nodeServerNoLocked) NodeExpandVolume(ctx context.Context, req *csi.Node
deviceClass := topolvm.DefaultDeviceClassName
if err == nil {
deviceClass = lvr.Spec.DeviceClass
} else if err != k8s.ErrVolumeNotFound {
} else if !errors.Is(err, k8s.ErrVolumeNotFound) {
return nil, err
}
lv, err := s.getLvFromContext(ctx, deviceClass, volumeID)
Expand All @@ -517,16 +514,15 @@ func (s *nodeServerNoLocked) NodeExpandVolume(ctx context.Context, req *csi.Node
if len(devicePath) == 0 {
return nil, status.Errorf(codes.Internal, "filesystem %s is not mounted at %s", volumeID, volumePath)
}
logger = logger.WithValues("device", devicePath)

logger.Info("triggering filesystem resize")
r := mountutil.NewResizeFs(s.mounter.Exec)
if _, err := r.Resize(device, volumePath); err != nil {
return nil, status.Errorf(codes.Internal, "failed to resize filesystem %s (mounted at: %s): %v", volumeID, volumePath, err)
}

nodeLogger.Info("NodeExpandVolume(fs) is succeeded",
"volume_id", volumeID,
"target_path", volumePath,
)
logger.Info("NodeExpandVolume(fs) is succeeded")
jakobmoellerdev marked this conversation as resolved.
Show resolved Hide resolved

// `capacity_bytes` in NodeExpandVolumeResponse is defined as OPTIONAL.
// If this field needs to be filled, the value should be equal to `.status.currentSize` of the corresponding
Expand Down
7 changes: 7 additions & 0 deletions internal/lvmd/lvservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ func (s *lvService) ResizeLV(ctx context.Context, req *proto.ResizeLVRequest) (*
return nil, status.Error(codes.Internal, fmt.Sprintf("unsupported device class target: %s", dc.Type))
}

logger.Info(
"lvservice request - ResizeLV",
"requested", requested,
"current", current,
"free", free,
)

if free < (requested - current) {
logger.Error(err, "no enough space left on VG",
"requested", requested,
Expand Down
Loading