Skip to content

Commit

Permalink
Add state to snapshot create and configurable retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: Grant Griffiths <grant@portworx.com>
  • Loading branch information
Grant Griffiths committed Oct 30, 2019
1 parent 89889f0 commit fd0e055
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 47 deletions.
4 changes: 4 additions & 0 deletions cmd/csi-snapshotter/main.go
Expand Up @@ -62,6 +62,8 @@ var (
csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.")
createSnapshotContentRetryCount = flag.Int("create-snapshotcontent-retrycount", 5, "Number of retries when we create a snapshot content object for a snapshot.")
createSnapshotContentInterval = flag.Duration("create-snapshotcontent-interval", 10*time.Second, "Interval between retries when we create a snapshot content object for a snapshot.")
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed snapshot creation. It doubles with each failure, up to retry-interval-max.")
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed snapshot creation.")
resyncPeriod = flag.Duration("resync-period", 60*time.Second, "Resync interval of the controller.")
snapshotNamePrefix = flag.String("snapshot-name-prefix", "snapshot", "Prefix to apply to the name of a created snapshot")
snapshotNameUUIDLength = flag.Int("snapshot-name-uuid-length", -1, "Length in characters for the generated uuid of a created snapshot. Defaults behavior is to NOT truncate.")
Expand Down Expand Up @@ -176,6 +178,8 @@ func main() {
coreFactory.Core().V1().PersistentVolumeClaims(),
*createSnapshotContentRetryCount,
*createSnapshotContentInterval,
*retryIntervalStart,
*retryIntervalMax,
snapShotter,
*csiTimeout,
*resyncPeriod,
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/csi_handler.go
Expand Up @@ -30,7 +30,7 @@ import (

// Handler is responsible for handling VolumeSnapshot events from informer.
type Handler interface {
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error)
CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error)
DeleteSnapshot(content *crdv1.VolumeSnapshotContent, snapshotterCredentials map[string]string) error
GetSnapshotStatus(content *crdv1.VolumeSnapshotContent) (bool, time.Time, int64, error)
}
Expand Down Expand Up @@ -58,19 +58,20 @@ func NewCSIHandler(
}
}

func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) {
func (handler *csiHandler) CreateSnapshot(snapshot *crdv1.VolumeSnapshot, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) {

ctx, cancel := context.WithTimeout(context.Background(), handler.timeout)
defer cancel()

snapshotName, err := makeSnapshotName(handler.snapshotNamePrefix, string(snapshot.UID), handler.snapshotNameUUIDLength)
if err != nil {
return "", "", time.Time{}, 0, false, err
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, err
}
newParameters, err := removePrefixedParameters(parameters)
if err != nil {
return "", "", time.Time{}, 0, false, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("failed to remove CSI Parameters of prefixed keys: %v", err)
}

return handler.snapshotter.CreateSnapshot(ctx, snapshotName, volume, newParameters, snapshotterCredentials)
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/framework_test.go
Expand Up @@ -35,6 +35,7 @@ import (
snapshotscheme "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned/scheme"
informers "github.com/kubernetes-csi/external-snapshotter/pkg/client/informers/externalversions"
storagelisters "github.com/kubernetes-csi/external-snapshotter/pkg/client/listers/volumesnapshot/v1beta1"
"github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -757,6 +758,8 @@ func newTestController(kubeClient kubernetes.Interface, clientset clientset.Inte
coreFactory.Core().V1().PersistentVolumeClaims(),
3,
5*time.Millisecond,
5*time.Millisecond,
10*time.Second,
fakeSnapshot,
5*time.Millisecond,
60*time.Second,
Expand Down Expand Up @@ -1372,10 +1375,10 @@ type fakeSnapshotter struct {
t *testing.T
}

func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, error) {
func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName string, volume *v1.PersistentVolume, parameters map[string]string, snapshotterCredentials map[string]string) (string, string, time.Time, int64, bool, snapshotter.SnapshottingState, error) {
if f.createCallCounter >= len(f.createCalls) {
f.t.Errorf("Unexpected CSI Create Snapshot call: snapshotName=%s, volume=%v, index: %d, calls: %+v", snapshotName, volume.Name, f.createCallCounter, f.createCalls)
return "", "", time.Time{}, 0, false, fmt.Errorf("unexpected call")
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("unexpected call")
}
call := f.createCalls[f.createCallCounter]
f.createCallCounter++
Expand All @@ -1402,9 +1405,10 @@ func (f *fakeSnapshotter) CreateSnapshot(ctx context.Context, snapshotName strin
}

if err != nil {
return "", "", time.Time{}, 0, false, fmt.Errorf("unexpected call")
return "", "", time.Time{}, 0, false, snapshotter.SnapshottingFinished, fmt.Errorf("unexpected call")
}
return call.driverName, call.snapshotId, call.creationTime, call.size, call.readyToUse, call.err

return call.driverName, call.snapshotId, call.creationTime, call.size, call.readyToUse, snapshotter.SnapshottingFinished, call.err
}

func (f *fakeSnapshotter) DeleteSnapshot(ctx context.Context, snapshotID string, snapshotterCredentials map[string]string) error {
Expand Down
56 changes: 37 additions & 19 deletions pkg/controller/snapshot_controller.go
Expand Up @@ -18,10 +18,8 @@ package controller

import (
"fmt"
"strings"
"time"

crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1beta1"
"github.com/kubernetes-csi/external-snapshotter/pkg/snapshotter"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -34,6 +32,8 @@ import (
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/slice"
"strings"
"time"
)

// ==================================================================
Expand Down Expand Up @@ -363,20 +363,37 @@ func (ctrl *csiSnapshotController) storeContentUpdate(content interface{}) (bool

// createSnapshot starts new asynchronous operation to create snapshot
func (ctrl *csiSnapshotController) createSnapshot(snapshot *crdv1.VolumeSnapshot) error {
klog.V(5).Infof("createSnapshot[%s]: started", snapshotKey(snapshot))
opName := fmt.Sprintf("create-%s[%s]", snapshotKey(snapshot), string(snapshot.UID))
key := snapshotKey(snapshot)
klog.V(5).Infof("createSnapshot[%s]: started", key)
opName := fmt.Sprintf("create-%s[%s]", key, string(snapshot.UID))
ctrl.scheduleOperation(opName, func() error {
snapshotObj, err := ctrl.createSnapshotOperation(snapshot)
snapshotObj, state, err := ctrl.createSnapshotOperation(snapshot)
if err != nil {
ctrl.updateSnapshotErrorStatusWithEvent(snapshot, v1.EventTypeWarning, "SnapshotCreationFailed", fmt.Sprintf("Failed to create snapshot: %v", err))
klog.Errorf("createSnapshot [%s]: error occurred in createSnapshotOperation: %v", opName, err)

// Handle state:
if state == snapshotter.SnapshottingFinished {
// Snapshotting finished, remove obj from snapshotsInProgress.
ctrl.snapshotsInProgress.Delete(key)
} else if state == snapshotter.SnapshottingInBackground {
// Snapshotting still in progress.
klog.V(4).Infof("createSnapshot [%s]: Temporary error received, adding Snapshot back in queue: %v", key, err)
ctrl.snapshotsInProgress.Store(key, snapshotObj)
} else {
// State is SnapshottingNoChange. Don't change snapshotsInProgress.
}

return err
}

// If no errors, update the snapshot.
_, updateErr := ctrl.storeSnapshotUpdate(snapshotObj)
if updateErr != nil {
// We will get an "snapshot update" event soon, this is not a big error
klog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", snapshotKey(snapshotObj), updateErr)
klog.V(4).Infof("createSnapshot [%s]: cannot update internal cache: %v", key, updateErr)
}

return nil
})
return nil
Expand Down Expand Up @@ -588,7 +605,7 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn
if err != nil {
return nil, err
}
driverName, snapshotID, creationTime, size, readyToUse, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
driverName, snapshotID, creationTime, size, readyToUse, _, err = ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
if err != nil {
klog.Errorf("checkandUpdateBoundSnapshotStatusOperation: failed to call create snapshot to check whether the snapshot is ready to use %q", err)
return nil, err
Expand Down Expand Up @@ -622,35 +639,35 @@ func (ctrl *csiSnapshotController) checkandUpdateBoundSnapshotStatusOperation(sn
// 2. Update VolumeSnapshot status with creationtimestamp information
// 3. Create the VolumeSnapshotContent object with the snapshot id information.
// 4. Bind the VolumeSnapshot and VolumeSnapshotContent object
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, error) {
func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.VolumeSnapshot) (*crdv1.VolumeSnapshot, snapshotter.SnapshottingState, error) {
klog.Infof("createSnapshot: Creating snapshot %s through the plugin ...", snapshotKey(snapshot))

if snapshot.Status != nil && snapshot.Status.Error != nil && snapshot.Status.Error.Message != nil && !isControllerUpdateFailError(snapshot.Status.Error) {
klog.V(4).Infof("error is already set in snapshot, do not retry to create: %s", *snapshot.Status.Error.Message)
return snapshot, nil
return snapshot, snapshotter.SnapshottingNoChange, nil
}

// If PVC is not being deleted and finalizer is not added yet, a finalizer should be added.
klog.V(5).Infof("createSnapshotOperation: Check if PVC is not being deleted and add Finalizer for source of snapshot [%s] if needed", snapshot.Name)
err := ctrl.ensureSnapshotSourceFinalizer(snapshot)
if err != nil {
klog.Errorf("createSnapshotOperation failed to add finalizer for source of snapshot %s", err)
return nil, err
return nil, snapshotter.SnapshottingNoChange, err
}

class, volume, contentName, snapshotterSecretRef, err := ctrl.getCreateSnapshotInput(snapshot)
if err != nil {
return nil, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
return nil, snapshotter.SnapshottingNoChange, fmt.Errorf("failed to get input parameters to create snapshot %s: %q", snapshot.Name, err)
}

snapshotterCredentials, err := getCredentials(ctrl.client, snapshotterSecretRef)
if err != nil {
return nil, err
return nil, snapshotter.SnapshottingNoChange, err
}

driverName, snapshotID, creationTime, size, readyToUse, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
driverName, snapshotID, creationTime, size, readyToUse, state, err := ctrl.handler.CreateSnapshot(snapshot, volume, class.Parameters, snapshotterCredentials)
if err != nil {
return nil, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
return nil, state, fmt.Errorf("failed to take snapshot of the volume, %s: %q", volume.Name, err)
}

klog.V(5).Infof("Created snapshot: driver %s, snapshotId %s, creationTime %v, size %d, readyToUse %t", driverName, snapshotID, creationTime, size, readyToUse)
Expand All @@ -667,12 +684,12 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
}

if err != nil {
return nil, err
return nil, snapshotter.SnapshottingInBackground, err
}
// Create VolumeSnapshotContent in the database
snapshotRef, err := ref.GetReference(scheme.Scheme, snapshot)
if err != nil {
return nil, err
return nil, snapshotter.SnapshottingInBackground, err
}

timestamp := creationTime.UnixNano()
Expand Down Expand Up @@ -730,9 +747,10 @@ func (ctrl *csiSnapshotController) createSnapshotOperation(snapshot *crdv1.Volum
strerr := fmt.Sprintf("Error creating volume snapshot content object for snapshot %s: %v.", snapshotKey(snapshot), err)
klog.Error(strerr)
ctrl.eventRecorder.Event(newSnapshot, v1.EventTypeWarning, "CreateSnapshotContentFailed", strerr)
return nil, newControllerUpdateError(snapshotKey(snapshot), err.Error())
return nil, snapshotter.SnapshottingInBackground, newControllerUpdateError(snapshotKey(snapshot), err.Error())
}
return newSnapshot, nil

return newSnapshot, snapshotter.SnapshottingFinished, nil
}

// Delete a snapshot
Expand Down
64 changes: 52 additions & 12 deletions pkg/controller/snapshot_controller_base.go
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"fmt"
"sync"
"time"

crdv1 "github.com/kubernetes-csi/external-snapshotter/pkg/apis/volumesnapshot/v1beta1"
Expand Down Expand Up @@ -50,6 +51,9 @@ type csiSnapshotController struct {
snapshotQueue workqueue.RateLimitingInterface
contentQueue workqueue.RateLimitingInterface

// Map UID -> *Snapshot with all snapshots in progress in the background.
snapshotsInProgress sync.Map

snapshotLister storagelisters.VolumeSnapshotLister
snapshotListerSynced cache.InformerSynced
contentLister storagelisters.VolumeSnapshotContentLister
Expand All @@ -68,6 +72,8 @@ type csiSnapshotController struct {

createSnapshotContentRetryCount int
createSnapshotContentInterval time.Duration
retryIntervalStart time.Duration
retryIntervalMax time.Duration
resyncPeriod time.Duration
}

Expand All @@ -82,6 +88,8 @@ func NewCSISnapshotController(
pvcInformer coreinformers.PersistentVolumeClaimInformer,
createSnapshotContentRetryCount int,
createSnapshotContentInterval time.Duration,
retryIntervalStart time.Duration,
retryIntervalMax time.Duration,
snapshotter snapshotter.Snapshotter,
timeout time.Duration,
resyncPeriod time.Duration,
Expand All @@ -103,10 +111,12 @@ func NewCSISnapshotController(
runningOperations: goroutinemap.NewGoRoutineMap(true),
createSnapshotContentRetryCount: createSnapshotContentRetryCount,
createSnapshotContentInterval: createSnapshotContentInterval,
retryIntervalStart: retryIntervalStart,
retryIntervalMax: retryIntervalMax,
resyncPeriod: resyncPeriod,
snapshotStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
contentStore: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-snapshot"),
snapshotQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(retryIntervalStart, retryIntervalMax), "csi-snapshotter-snapshot"),
contentQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "csi-snapshotter-content"),
}

Expand Down Expand Up @@ -215,22 +225,38 @@ func (ctrl *csiSnapshotController) snapshotWorker() {
klog.Errorf("error getting namespace & name of snapshot %q to get snapshot from informer: %v", key, err)
return false
}
snapshot, err := ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
if err == nil {
// The volume snapshot still exists in informer cache, the event must have
// been add/update/sync

// Attempt to get snapshot from the informer
var snapshot *crdv1.VolumeSnapshot
snapshot, err = ctrl.snapshotLister.VolumeSnapshots(namespace).Get(name)
if err != nil && !errors.IsNotFound(err) {
klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err)
return false
} else if errors.IsNotFound(err) {
// Check snapshotsInProgress for the snapshot if not found from the informer
inProgressObj, ok := ctrl.snapshotsInProgress.Load(key)
if ok {
snapshot, ok = inProgressObj.(*crdv1.VolumeSnapshot)
if !ok {
klog.Errorf("expected vs, got %+v", inProgressObj)
return false
}
}

}

if snapshot != nil {
// If the volume snapshot still exists in informer cache, the event must have
// been add/update/sync. Otherwise, the volume snapshot was still in progress.
newSnapshot, err := ctrl.checkAndUpdateSnapshotClass(snapshot)
if err == nil {
klog.V(5).Infof("passed checkAndUpdateSnapshotClass for snapshot %q", key)
ctrl.updateSnapshot(newSnapshot)
}
return false
}
if err != nil && !errors.IsNotFound(err) {
klog.V(2).Infof("error getting snapshot %q from informer: %v", key, err)
return false
}
// The snapshot is not in informer cache, the event must have been "delete"

// The snapshot is not in informer cache or in progress, the event must have been "delete"
vsObj, found, err := ctrl.snapshotStore.GetByKey(key)
if err != nil {
klog.V(2).Infof("error getting snapshot %q from cache: %v", key, err)
Expand All @@ -251,6 +277,10 @@ func (ctrl *csiSnapshotController) snapshotWorker() {
if err == nil {
ctrl.deleteSnapshot(newSnapshot)
}

ctrl.snapshotQueue.Forget(keyObj)
ctrl.snapshotsInProgress.Delete(key)

return false
}

Expand Down Expand Up @@ -377,12 +407,22 @@ func (ctrl *csiSnapshotController) updateSnapshot(snapshot *crdv1.VolumeSnapshot
}
err = ctrl.syncSnapshot(snapshot)
if err != nil {
sKey := snapshotKey(snapshot)

// if the snapshot has been deleted, remove from snapshots in progress
if _, exists, _ := ctrl.snapshotStore.Get(sKey); !exists {
ctrl.snapshotsInProgress.Delete(sKey)
} else {
// otherwise, add back to the snapshot queue to retry.
ctrl.snapshotQueue.AddRateLimited(sKey)
}

if errors.IsConflict(err) {
// Version conflict error happens quite often and the controller
// recovers from it easily.
klog.V(3).Infof("could not sync claim %q: %+v", snapshotKey(snapshot), err)
klog.V(3).Infof("could not sync claim %q: %+v", sKey, err)
} else {
klog.Errorf("could not sync volume %q: %+v", snapshotKey(snapshot), err)
klog.Errorf("could not sync volume %q: %+v", sKey, err)
}
}
}
Expand Down

0 comments on commit fd0e055

Please sign in to comment.