Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OADP-1067: Add queuing for VSB and VSR #190

Merged
merged 9 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/v1alpha1/volumesnapshotbackup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type VolumeSnapshotBackupStatus struct {
ResticRepository string `json:"resticrepository,omitempty"`
// volumesnapshot backup phase status
Phase VolumeSnapshotBackupPhase `json:"phase,omitempty"`
// volumesnapshotbackup batching status
BatchingStatus VolumeSnapshotBackupBatchingStatus `json:"batchingStatus,omitempty"`
// name of the VolumeSnapshotClass
VolumeSnapshotClassName string `json:"volumeSnapshotClassName,omitempty"`
// StartTimestamp records the time a volsumesnapshotbackup was started.
Expand Down Expand Up @@ -90,6 +92,16 @@ const (
SnapMoverBackupPhaseCleanup VolumeSnapshotBackupPhase = "Cleanup"
)

type VolumeSnapshotBackupBatchingStatus string

const (
SnapMoverBackupBatchingCompleted VolumeSnapshotBackupBatchingStatus = "Completed"

SnapMoverBackupBatchingQueued VolumeSnapshotBackupBatchingStatus = "Queued"

SnapMoverBackupBatchingProcessing VolumeSnapshotBackupBatchingStatus = "Processing"
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:path=volumesnapshotbackups,shortName=vsb
Expand Down
12 changes: 12 additions & 0 deletions api/v1alpha1/volumesnapshotrestore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type VolumeSnapshotRestoreStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
// volumesnapshot restore phase status
Phase VolumeSnapshotRestorePhase `json:"phase,omitempty"`
// volumesnapshotrestore batching status
BatchingStatus VolumeSnapshotRestoreBatchingStatus `json:"batchingStatus,omitempty"`
// name of the volumesnapshot snaphandle that is backed up
SnapshotHandle string `json:"snapshotHandle,omitempty"`
// name of the volumesnapshotcontent that is backed up
Expand Down Expand Up @@ -72,6 +74,16 @@ const (
SnapMoverRestorePhaseCleanup VolumeSnapshotRestorePhase = "Cleanup"
)

type VolumeSnapshotRestoreBatchingStatus string

const (
SnapMoverRestoreBatchingCompleted VolumeSnapshotRestoreBatchingStatus = "Completed"

SnapMoverRestoreBatchingQueued VolumeSnapshotRestoreBatchingStatus = "Queued"

SnapMoverRestoreBatchingProcessing VolumeSnapshotRestoreBatchingStatus = "Processing"
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:path=volumesnapshotrestores,shortName=vsr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ spec:
description: VolumeSnapshotBackupStatus defines the observed state of
VolumeSnapshotBackup
properties:
batchingStatus:
description: volumesnapshotbackup batching status
type: string
completed:
type: boolean
completionTimestamp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ spec:
description: VolumeSnapshotRestoreStatus defines the observed state of
VolumeSnapshotRestore
properties:
batchingStatus:
description: volumesnapshotrestore batching status
type: string
completionTimestamp:
description: CompletionTimestamp records the time a volumesnapshotrestore
reached a terminal state.
Expand Down
142 changes: 141 additions & 1 deletion controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
volsnapmoverv1alpha1 "github.com/konveyor/volume-snapshot-mover/api/v1alpha1"
snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
velero "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,6 +30,12 @@ const (
volumeSnapshotClassDefaultKey = "snapshot.storage.kubernetes.io/is-default-class"
storageClassDefaultKey = "storageclass.kubernetes.io/is-default-class"
OADPBSLProviderName = "openshift.io/oadp-bsl-provider"

// VSM deployment vars
vsmDeploymentName = "volume-snapshot-mover"
vsmContainerName = "data-mover-controller-container"
batchBackupName = "DATAMOVER_CONCURRENT_BACKUP"
batchRestoreName = "DATAMOVER_CONCURRENT_RESTORE"
)

// Restic secret data keys
Expand Down Expand Up @@ -498,7 +505,6 @@ func updateVSBStatusPhase(vsb *volsnapmoverv1alpha1.VolumeSnapshotBackup, phase
func GetDataMoverConfigMap(namespace string, log logr.Logger, client client.Client) (*corev1.ConfigMap, error) {

cm := corev1.ConfigMap{}

err := client.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: DataMoverConfMapName}, &cm)
// configmap will not exist if config values were not set
if k8serrors.IsNotFound(err) {
Expand All @@ -521,3 +527,137 @@ func GetVeleroServiceAccount(namespace string, client client.Client) (*corev1.Se

return &sa, nil
}

func getVSMContainer(namespace string, client client.Client) (*corev1.Container, error) {
vsmDeployment := appsv1.Deployment{}
err := client.Get(context.TODO(), types.NamespacedName{Name: vsmDeploymentName, Namespace: namespace}, &vsmDeployment)
if err != nil {
return nil, err
}

// get VSM container
var vsmContainer *corev1.Container
for i, container := range vsmDeployment.Spec.Template.Spec.Containers {
if container.Name == vsmContainerName {
vsmContainer = &vsmDeployment.Spec.Template.Spec.Containers[i]
break
}
}

if vsmContainer == nil {
return nil, errors.New(fmt.Sprintf("cannot obtain vsm container %s", vsmContainerName))
}
return vsmContainer, nil
}

func GetBackupBatchValue(namespace string, client client.Client) (string, error) {

vsmContainer, err := getVSMContainer(namespace, client)
if err != nil {
return "", err
}
// get batching values from deployment env
var backupBatchValue string

for i, env := range vsmContainer.Env {
if env.Name == batchBackupName {
backupBatchValue = vsmContainer.Env[i].Value
break
}
}

if backupBatchValue == "" {
return "", errors.New(fmt.Sprint("cannot obtain vsb batch value"))
}

return backupBatchValue, nil
}

func GetRestoreBatchValue(namespace string, client client.Client) (string, error) {

vsmContainer, err := getVSMContainer(namespace, client)
if err != nil {
return "", err
}
// get batching values from deployment env
var restoreBatchValue string

for i, env := range vsmContainer.Env {
if env.Name == batchRestoreName {
restoreBatchValue = vsmContainer.Env[i].Value
break
}
}

if restoreBatchValue == "" {
return "", errors.New(fmt.Sprint("cannot obtain vsb batch value"))
eemcmullan marked this conversation as resolved.
Show resolved Hide resolved
}

return restoreBatchValue, nil
}

func (r *VolumeSnapshotBackupReconciler) setVSBQueue(vsb *volsnapmoverv1alpha1.VolumeSnapshotBackup, log logr.Logger) (bool, error) {

// update non-processed VSB as queued
if processingVSBs >= VSBBatchNumber && len(vsb.Status.BatchingStatus) == 0 {
log.Info(fmt.Sprintf("marking vsb %v batching status as queued", vsb.Name))

vsb.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverBackupBatchingQueued
err := r.Status().Update(context.Background(), vsb)
if err != nil {
return false, err
}

// requeue VSB is max batch number is still being processed
} else if processingVSBs >= VSBBatchNumber && vsb.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverBackupBatchingQueued {
return false, nil

// add a queued VSB to processing batch
} else if processingVSBs < VSBBatchNumber && (vsb.Status.BatchingStatus == "" ||
vsb.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverBackupBatchingQueued) {

processingVSBs++
log.Info(fmt.Sprintf("marking vsb %v batching status as processing", vsb.Name))

vsb.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverBackupBatchingProcessing
err := r.Status().Update(context.Background(), vsb)
if err != nil {
return false, err
}
}

return true, nil
}

func (r *VolumeSnapshotRestoreReconciler) setVSRQueue(vsr *volsnapmoverv1alpha1.VolumeSnapshotRestore, log logr.Logger) (bool, error) {

// update non-processed VSR as queued
if processingVSRs >= VSRBatchNumber && len(vsr.Status.BatchingStatus) == 0 {
log.Info(fmt.Sprintf("marking vsr %v batching status as queued", vsr.Name))

vsr.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverRestoreBatchingQueued
err := r.Status().Update(context.Background(), vsr)
if err != nil {
return false, err
}

// requeue VSR is max batch number is still being processed
} else if processingVSRs >= VSRBatchNumber && vsr.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverRestoreBatchingQueued {
return false, nil

// add a queued VSR to processing batch
} else if processingVSRs < VSRBatchNumber && (vsr.Status.BatchingStatus == "" ||
vsr.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverRestoreBatchingQueued) {

processingVSRs++
log.Info(fmt.Sprintf("marking vsr %v batching status as processing", vsr.Name))

vsr.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverRestoreBatchingProcessing
err := r.Status().Update(context.Background(), vsr)
if err != nil {
return false, err
}
}

return true, nil
}
9 changes: 8 additions & 1 deletion controllers/replicationdestination.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (r *VolumeSnapshotRestoreReconciler) SetVSRStatus(log logr.Logger) (bool, e
sourceSpec := repDest.Spec.Trigger.Manual
if sourceStatus == sourceSpec {

r.Log.Info(fmt.Sprintf("marking volumesnapshotrestore %s as VolSync phase completed", r.req.NamespacedName))
vsr.Status.Phase = volsnapmoverv1alpha1.SnapMoverRestoreVolSyncPhaseCompleted

// recording completion timestamp for VSR as completed is a terminal state
now := metav1.Now()
vsr.Status.CompletionTimestamp = &now
Expand All @@ -195,12 +197,17 @@ func (r *VolumeSnapshotRestoreReconciler) SetVSRStatus(log logr.Logger) (bool, e
vsr.Status.ReplicationDestinationData.CompletionTimestamp = repDest.Status.LastSyncTime
}

r.Log.Info(fmt.Sprintf("marking volumesnapshotrestore %s batching status as completed", vsr.Name))
vsr.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverRestoreBatchingCompleted

processingVSRs--

// Update VSR status as completed
err := r.Status().Update(context.Background(), &vsr)
if err != nil {
return false, err
}
r.Log.Info(fmt.Sprintf("marking volumesnapshotrestore %s as completed", r.req.NamespacedName))

return true, nil
}

Expand Down
11 changes: 9 additions & 2 deletions controllers/replicationsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ func (r *VolumeSnapshotBackupReconciler) setStatusFromRepSource(vsb *volsnapmove

if repSourceCompleted && reconConditionCompleted.Type == volsyncv1alpha1.ConditionSynchronizing {

// Update VSB status as completed
// Update VSB status as volsync completed
vsb.Status.Phase = volsnapmoverv1alpha1.SnapMoverVolSyncPhaseCompleted
r.Log.Info(fmt.Sprintf("marking volumesnapshotbackup %s VolSync phase as complete", r.req.NamespacedName))

// recording completion timestamp for VSB as completed is a terminal state
now := metav1.Now()
vsb.Status.CompletionTimestamp = &now
Expand All @@ -191,11 +193,16 @@ func (r *VolumeSnapshotBackupReconciler) setStatusFromRepSource(vsb *volsnapmove
vsb.Status.ReplicationSourceData.CompletionTimestamp = repSource.Status.LastSyncTime
}

vsb.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverBackupBatchingCompleted
r.Log.Info(fmt.Sprintf("marking volumesnapshotbackup %s batching status as completed", vsb.Name))

processingVSBs--

err := r.Status().Update(context.Background(), vsb)
if err != nil {
return false, err
}
r.Log.Info(fmt.Sprintf("marking volumesnapshotbackup %s VolSync phase as complete", r.req.NamespacedName))

return true, nil

// ReplicationSource phase is still in progress
Expand Down
35 changes: 35 additions & 0 deletions controllers/volumesnapshotbackup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"strconv"
"time"

"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -47,6 +48,9 @@ const ReconciledReasonError = "Error"
const ReconciledReasonComplete = "Complete"
const ReconcileCompleteMessage = "Reconcile complete"

var processingVSBs = 0
var VSBBatchNumber = 0

// VolumeSnapshotBackupReconciler reconciles a VolumeSnapshotBackup object
type VolumeSnapshotBackupReconciler struct {
client.Client
Expand Down Expand Up @@ -100,11 +104,25 @@ func (r *VolumeSnapshotBackupReconciler) Reconcile(ctx context.Context, req ctrl
Name: vsb.Name,
}

if VSBBatchNumber == 0 {
batchValue, err := GetBackupBatchValue(vsb.Spec.ProtectedNamespace, r.Client)
if err != nil {
return ctrl.Result{}, err
}

VSBBatchNumber, err = strconv.Atoi(batchValue)
if err != nil {
return ctrl.Result{}, err
}
}

// stop reconciling on this resource when completed or failed
if (vsb.Status.Phase == volsnapmoverv1alpha1.SnapMoverBackupPhaseCompleted ||
vsb.Status.Phase == volsnapmoverv1alpha1.SnapMoverBackupPhaseFailed ||
vsb.Status.Phase == volsnapmoverv1alpha1.SnapMoverBackupPhasePartiallyFailed) &&
vsb.DeletionTimestamp.IsZero() {

// remove from queue
return ctrl.Result{
Requeue: false,
}, nil
Expand All @@ -120,7 +138,24 @@ func (r *VolumeSnapshotBackupReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{Requeue: true}, nil
}

// Check and add VSBs to queue until full
processed, err := r.setVSBQueue(&vsb, r.Log)
if err != nil {
return ctrl.Result{}, err
}

// no error but VSB queue is full
if !processed && err == nil {
r.Log.Info(fmt.Sprintf("requeuing vsb %v as max vsbs are being processed", vsb.Name))
return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, nil
}

if !vsb.DeletionTimestamp.IsZero() {
// if batchingStatus is completed then processingVSBs has already been decremented

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the comment does not match with the condition, are we checking for BatchingStatus == completed or BatchingStatus != completed ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So processingVSBs already gets decremented once SnapMoverBackupBatchingCompleted. Therefore we would not want to decrement again if a VSB has that status but then starts to be deleted, because then it would be decremented twice for a single VSB

if vsb.Status.BatchingStatus != "" && vsb.Status.BatchingStatus != volsnapmoverv1alpha1.SnapMoverBackupBatchingCompleted {
processingVSBs--
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully we can do similar thing for VSR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's the plan


_, err := r.CleanBackupResources(r.Log)
if err != nil {
return ctrl.Result{}, err
Expand Down