Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OADP-1067: Add queuing for VSB and VSR #190

Merged
merged 9 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/v1alpha1/volumesnapshotbackup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type VolumeSnapshotBackupStatus struct {
ResticRepository string `json:"resticrepository,omitempty"`
// volumesnapshot backup phase status
Phase VolumeSnapshotBackupPhase `json:"phase,omitempty"`
// volumesnapshotbackup batching status
BatchingStatus VolumeSnapshotBackupBatchingStatus `json:"batchingStatus,omitempty"`
// name of the VolumeSnapshotClass
VolumeSnapshotClassName string `json:"volumeSnapshotClassName,omitempty"`
// StartTimestamp records the time a volsumesnapshotbackup was started.
Expand Down Expand Up @@ -90,6 +92,18 @@ const (
SnapMoverBackupPhaseCleanup VolumeSnapshotBackupPhase = "Cleanup"
)

type VolumeSnapshotBackupBatchingStatus string

const (
SnapMoverBackupBatchingCompleted VolumeSnapshotBackupBatchingStatus = "Completed"

SnapMoverBackupBatchingQueued VolumeSnapshotBackupBatchingStatus = "Queued"

SnapMoverBackupBatchingProcessing VolumeSnapshotBackupBatchingStatus = "Processing"

SnapMoverBackupBatchingFailed VolumeSnapshotBackupBatchingStatus = "Failed"
eemcmullan marked this conversation as resolved.
Show resolved Hide resolved
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:path=volumesnapshotbackups,shortName=vsb
Expand Down
14 changes: 14 additions & 0 deletions api/v1alpha1/volumesnapshotrestore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type VolumeSnapshotRestoreStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty"`
// volumesnapshot restore phase status
Phase VolumeSnapshotRestorePhase `json:"phase,omitempty"`
// volumesnapshotrestore batching status
BatchingStatus VolumeSnapshotRestoreBatchingStatus `json:"batchingStatus,omitempty"`
// name of the volumesnapshot snaphandle that is backed up
SnapshotHandle string `json:"snapshotHandle,omitempty"`
// name of the volumesnapshotcontent that is backed up
Expand Down Expand Up @@ -72,6 +74,18 @@ const (
SnapMoverRestorePhaseCleanup VolumeSnapshotRestorePhase = "Cleanup"
)

type VolumeSnapshotRestoreBatchingStatus string

const (
SnapMoverRestoreBatchingCompleted VolumeSnapshotRestoreBatchingStatus = "Completed"

SnapMoverRestoreBatchingQueued VolumeSnapshotRestoreBatchingStatus = "Queued"

SnapMoverRestoreBatchingProcessing VolumeSnapshotRestoreBatchingStatus = "Processing"

SnapMoverRestoreBatchingFailed VolumeSnapshotRestoreBatchingStatus = "Failed"
eemcmullan marked this conversation as resolved.
Show resolved Hide resolved
)

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:resource:path=volumesnapshotrestores,shortName=vsr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ spec:
description: VolumeSnapshotBackupStatus defines the observed state of
VolumeSnapshotBackup
properties:
batchingStatus:
description: volumesnapshotbackup batching status
type: string
completed:
type: boolean
completionTimestamp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ spec:
description: VolumeSnapshotRestoreStatus defines the observed state of
VolumeSnapshotRestore
properties:
batchingStatus:
description: volumesnapshotrestore batching status
type: string
completionTimestamp:
description: CompletionTimestamp records the time a volumesnapshotrestore
reached a terminal state.
Expand Down
75 changes: 74 additions & 1 deletion controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
volsnapmoverv1alpha1 "github.com/konveyor/volume-snapshot-mover/api/v1alpha1"
snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
velero "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,6 +30,12 @@ const (
volumeSnapshotClassDefaultKey = "snapshot.storage.kubernetes.io/is-default-class"
storageClassDefaultKey = "storageclass.kubernetes.io/is-default-class"
OADPBSLProviderName = "openshift.io/oadp-bsl-provider"

// VSM deployment vars
vsmDeploymentName = "volume-snapshot-mover"
vsmContainerName = "data-mover-controller-container"
batchBackupName = "DATAMOVER_CONCURRENT_BACKUP"
batchRestoreName = "DATAMOVER_CONCURRENT_RESTORE"
)

// Restic secret data keys
Expand Down Expand Up @@ -498,7 +505,6 @@ func updateVSBStatusPhase(vsb *volsnapmoverv1alpha1.VolumeSnapshotBackup, phase
func GetDataMoverConfigMap(namespace string, log logr.Logger, client client.Client) (*corev1.ConfigMap, error) {

cm := corev1.ConfigMap{}

err := client.Get(context.Background(), types.NamespacedName{Namespace: namespace, Name: DataMoverConfMapName}, &cm)
// configmap will not exist if config values were not set
if k8serrors.IsNotFound(err) {
Expand All @@ -521,3 +527,70 @@ func GetVeleroServiceAccount(namespace string, client client.Client) (*corev1.Se

return &sa, nil
}

func getVSMContainer(namespace string, client client.Client) (*corev1.Container, error) {
vsmDeployment := appsv1.Deployment{}
err := client.Get(context.TODO(), types.NamespacedName{Name: vsmDeploymentName, Namespace: namespace}, &vsmDeployment)
if err != nil {
return nil, err
}

// get VSM container
var vsmContainer *corev1.Container
for i, container := range vsmDeployment.Spec.Template.Spec.Containers {
if container.Name == vsmContainerName {
vsmContainer = &vsmDeployment.Spec.Template.Spec.Containers[i]
break
}
}

if vsmContainer == nil {
return nil, errors.New(fmt.Sprintf("cannot obtain vsm container %s", vsmContainerName))
}
return vsmContainer, nil
}

func GetBackupBatchValue(namespace string, client client.Client) (string, error) {

vsmContainer, err := getVSMContainer(namespace, client)
if err != nil {
return "", err
}
// get batching values from deployment env
var backupBatchValue string

for i, env := range vsmContainer.Env {
if env.Name == batchBackupName {
backupBatchValue = vsmContainer.Env[i].Value
break
}
}

if backupBatchValue == "" {
return "", errors.New(fmt.Sprint("cannot obtain vsb batch value"))
}
return backupBatchValue, nil
}

func GetRestoreBatchValue(namespace string, client client.Client) (string, error) {

vsmContainer, err := getVSMContainer(namespace, client)
if err != nil {
return "", err
}
// get batching values from deployment env
var restoreBatchValue string

for i, env := range vsmContainer.Env {
if env.Name == batchRestoreName {
restoreBatchValue = vsmContainer.Env[i].Value
break
}
}

if restoreBatchValue == "" {
return "", errors.New(fmt.Sprint("cannot obtain vsb batch value"))
eemcmullan marked this conversation as resolved.
Show resolved Hide resolved
}
return restoreBatchValue, nil

}
7 changes: 6 additions & 1 deletion controllers/replicationdestination.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (r *VolumeSnapshotRestoreReconciler) SetVSRStatus(log logr.Logger) (bool, e
sourceSpec := repDest.Spec.Trigger.Manual
if sourceStatus == sourceSpec {

r.Log.Info(fmt.Sprintf("marking volumesnapshotrestore %s as VolSync phase completed", r.req.NamespacedName))
vsr.Status.Phase = volsnapmoverv1alpha1.SnapMoverRestoreVolSyncPhaseCompleted

// recording completion timestamp for VSR as completed is a terminal state
now := metav1.Now()
vsr.Status.CompletionTimestamp = &now
Expand All @@ -195,12 +197,15 @@ func (r *VolumeSnapshotRestoreReconciler) SetVSRStatus(log logr.Logger) (bool, e
vsr.Status.ReplicationDestinationData.CompletionTimestamp = repDest.Status.LastSyncTime
}

r.Log.Info(fmt.Sprintf("marking volumesnapshotrestore %s batching status as completed", vsr.Name))
vsr.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverRestoreBatchingCompleted

// Update VSR status as completed
err := r.Status().Update(context.Background(), &vsr)
if err != nil {
return false, err
}
r.Log.Info(fmt.Sprintf("marking volumesnapshotrestore %s as completed", r.req.NamespacedName))

return true, nil
}

Expand Down
9 changes: 7 additions & 2 deletions controllers/replicationsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ func (r *VolumeSnapshotBackupReconciler) setStatusFromRepSource(vsb *volsnapmove

if repSourceCompleted && reconConditionCompleted.Type == volsyncv1alpha1.ConditionSynchronizing {

// Update VSB status as completed
// Update VSB status as volsync completed
vsb.Status.Phase = volsnapmoverv1alpha1.SnapMoverVolSyncPhaseCompleted
r.Log.Info(fmt.Sprintf("marking volumesnapshotbackup %s VolSync phase as complete", r.req.NamespacedName))

// recording completion timestamp for VSB as completed is a terminal state
now := metav1.Now()
vsb.Status.CompletionTimestamp = &now
Expand All @@ -191,11 +193,14 @@ func (r *VolumeSnapshotBackupReconciler) setStatusFromRepSource(vsb *volsnapmove
vsb.Status.ReplicationSourceData.CompletionTimestamp = repSource.Status.LastSyncTime
}

vsb.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverBackupBatchingCompleted
r.Log.Info(fmt.Sprintf("marking volumesnapshotbackup %s batching status as completed", vsb.Name))

err := r.Status().Update(context.Background(), vsb)
if err != nil {
return false, err
}
r.Log.Info(fmt.Sprintf("marking volumesnapshotbackup %s VolSync phase as complete", r.req.NamespacedName))

return true, nil

// ReplicationSource phase is still in progress
Expand Down
50 changes: 50 additions & 0 deletions controllers/volumesnapshotbackup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"strconv"
"time"

"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -47,6 +48,9 @@ const ReconciledReasonError = "Error"
const ReconciledReasonComplete = "Complete"
const ReconcileCompleteMessage = "Reconcile complete"

var processingVSBs = 0
var VSBBatchNumber = 0

// VolumeSnapshotBackupReconciler reconciles a VolumeSnapshotBackup object
type VolumeSnapshotBackupReconciler struct {
client.Client
Expand Down Expand Up @@ -100,11 +104,21 @@ func (r *VolumeSnapshotBackupReconciler) Reconcile(ctx context.Context, req ctrl
Name: vsb.Name,
}

if VSBBatchNumber == 0 {
batchValue, err := GetBackupBatchValue(vsb.Spec.ProtectedNamespace, r.Client)
if err != nil {
return ctrl.Result{}, err
}
VSBBatchNumber, _ = strconv.Atoi(batchValue)
eemcmullan marked this conversation as resolved.
Show resolved Hide resolved
}

// stop reconciling on this resource when completed or failed
if (vsb.Status.Phase == volsnapmoverv1alpha1.SnapMoverBackupPhaseCompleted ||
vsb.Status.Phase == volsnapmoverv1alpha1.SnapMoverBackupPhaseFailed ||
vsb.Status.Phase == volsnapmoverv1alpha1.SnapMoverBackupPhasePartiallyFailed) &&
vsb.DeletionTimestamp.IsZero() {

// remove from queue
return ctrl.Result{
Requeue: false,
}, nil
Expand All @@ -121,6 +135,8 @@ func (r *VolumeSnapshotBackupReconciler) Reconcile(ctx context.Context, req ctrl
}

if !vsb.DeletionTimestamp.IsZero() {
processingVSBs--

_, err := r.CleanBackupResources(r.Log)
if err != nil {
return ctrl.Result{}, err
Expand All @@ -135,6 +151,40 @@ func (r *VolumeSnapshotBackupReconciler) Reconcile(ctx context.Context, req ctrl
return ctrl.Result{}, nil
}

// another VSB can be added to processing batch
eemcmullan marked this conversation as resolved.
Show resolved Hide resolved
if len(vsb.Status.BatchingStatus) > 0 && vsb.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverBackupBatchingCompleted {
processingVSBs--
}

// update non-processed VSB as queued
if processingVSBs >= VSBBatchNumber && len(vsb.Status.BatchingStatus) == 0 {
r.Log.Info(fmt.Sprintf("marking vsb %v batching status as queued", vsb.Name))

vsb.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverBackupBatchingQueued
err := r.Status().Update(context.Background(), &vsb)
if err != nil {
return ctrl.Result{}, err
}

// requeue VSB is max batch number is still being processed
} else if processingVSBs >= VSBBatchNumber && vsb.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverBackupBatchingQueued {
r.Log.Info(fmt.Sprintf("requeuing vsb %v as max vsbs are being processed", vsb.Name))
return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, nil

// add a queued VSB to processing batch
} else if processingVSBs < VSBBatchNumber && (vsb.Status.BatchingStatus == "" ||
vsb.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverBackupBatchingQueued) {

processingVSBs++
r.Log.Info(fmt.Sprintf("marking vsb %v batching status as processing", vsb.Name))

vsb.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverBackupBatchingProcessing
err := r.Status().Update(context.Background(), &vsb)
if err != nil {
return ctrl.Result{}, err
}
}

// Run through all reconcilers associated with VSB needs
// Reconciliation logic

Expand Down
47 changes: 47 additions & 0 deletions controllers/volumesnapshotrestore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"strconv"
"time"

k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -39,6 +40,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

var processingVSRs = 0
var VSRBatchNumber = 0

// VolumeSnapshotRestoreReconciler reconciles a VolumeSnapshotRestore object
type VolumeSnapshotRestoreReconciler struct {
client.Client
Expand Down Expand Up @@ -88,15 +92,58 @@ func (r *VolumeSnapshotRestoreReconciler) Reconcile(ctx context.Context, req ctr
Name: vsr.Name,
}

if VSRBatchNumber == 0 {
batchValue, err := GetRestoreBatchValue(vsr.Spec.ProtectedNamespace, r.Client)
if err != nil {
return ctrl.Result{}, err
}
VSRBatchNumber, _ = strconv.Atoi(batchValue)
eemcmullan marked this conversation as resolved.
Show resolved Hide resolved
}

// stop reconciling on this resource when completed or failed
if vsr.Status.Phase == volsnapmoverv1alpha1.SnapMoverRestorePhaseCompleted ||
vsr.Status.Phase == volsnapmoverv1alpha1.SnapMoverRestorePhaseFailed ||
vsr.Status.Phase == volsnapmoverv1alpha1.SnapMoverRestorePhasePartiallyFailed {

return ctrl.Result{
Requeue: false,
}, nil
}

// another VSR can be added to processing batch
if len(vsr.Status.BatchingStatus) > 0 && vsr.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverRestoreBatchingCompleted {
processingVSRs--
}

// update non-processed VSR as queued
if processingVSRs >= VSRBatchNumber && len(vsr.Status.BatchingStatus) == 0 {
r.Log.Info(fmt.Sprintf("marking vsr %v batching status as queued", vsr.Name))

vsr.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverRestoreBatchingQueued
err := r.Status().Update(context.Background(), &vsr)
if err != nil {
return ctrl.Result{}, err
}

// requeue VSR is max batch number is still being processed
} else if processingVSRs >= VSRBatchNumber && vsr.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverRestoreBatchingQueued {
r.Log.Info(fmt.Sprintf("requeuing vsr %v as max vsrs are being processed", vsr.Name))
return ctrl.Result{Requeue: true, RequeueAfter: 5 * time.Second}, nil

// add a queued VSR to processing batch
} else if processingVSRs < VSRBatchNumber && (vsr.Status.BatchingStatus == "" ||
vsr.Status.BatchingStatus == volsnapmoverv1alpha1.SnapMoverRestoreBatchingQueued) {

processingVSRs++
r.Log.Info(fmt.Sprintf("marking vsr %v batching status as processing", vsr.Name))

vsr.Status.BatchingStatus = volsnapmoverv1alpha1.SnapMoverRestoreBatchingProcessing
err := r.Status().Update(context.Background(), &vsr)
if err != nil {
return ctrl.Result{}, err
}
}

// Run through all reconcilers associated with VSR needs
// Reconciliation logic

Expand Down