Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cvc-operator): add automatic scaling of volumereplicas for CSI volumes #1613

Merged
merged 11 commits into from
Feb 26, 2020
149 changes: 52 additions & 97 deletions cmd/cvc-operator/controller/cstorvolumeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ func createCVR(
}
klog.V(2).Infof(
"Created CVR %s with phase %s on cstor pool %s",
rInfo.phase,
cvrObj.Name,
rInfo.phase,
pool.Name,
)
return cvrObj, nil
Expand Down Expand Up @@ -586,12 +586,6 @@ func randomizePoolList(list *apis.CStorPoolInstanceList) *apis.CStorPoolInstance
// 3. If PDB doesn't exist it creates new PDB(With CSPC hash)
func getOrCreatePodDisruptionBudget(
cspcName string, poolNames []string) (*policy.PodDisruptionBudget, error) {
prateekpandey14 marked this conversation as resolved.
Show resolved Hide resolved
// poolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace)
// if err != nil {
// return nil, errors.Wrapf(err,
// "failed to get volume replica pool names of volume %s",
// cvObj.Name)
// }
pdbLabels := cvclaim.GetPDBPoolLabels(poolNames)
labelSelector := apispdb.GetPDBLabelSelector(pdbLabels)
pdbList, err := apispdb.KubeClient().
Expand Down Expand Up @@ -698,35 +692,10 @@ func isHAVolume(cvcObj *apis.CStorVolumeClaim) bool {
// that PDB name. If PDB doesn't exist then create new PDB and return newely
// created PDB name.
// 4. If current volume is not HAVolume then return nothing.
func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim) (string, error) {
pdbName, hasPDB := cvcObj.GetLabels()[string(apis.PodDisruptionBudgetKey)]
pdbLabels := cvclaim.GetPDBPoolLabels(cvcObj.Status.PoolInfo)
labelSelector := apispdb.GetPDBLabelSelector(pdbLabels)
func getUpdatePDBForVolume(cvcObj *apis.CStorVolumeClaim) (string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getModifiedPDBForScaledVolume

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getModifiedPDBForScaledVolume

_, hasPDB := cvcObj.GetLabels()[string(apis.PodDisruptionBudgetKey)]
if hasPDB {
// Get PDB if exists among newely updated volume replicas pools
pdbList, err := apispdb.KubeClient().
WithNamespace(openebsNamespace).
List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return "", errors.Wrapf(err,
"failed to get PDB present among pools %v",
cvcObj.Status.PoolInfo,
)
}
if len(pdbList.Items) >= 1 && isHAVolume(cvcObj) {
for _, pdbObj := range pdbList.Items {
pdbPoolCount := len(pdbObj.Spec.Selector.MatchExpressions[0].Values)
// Let us assume that volume replicas was scale down from 4 to
// 3(i.e PDB was created on top of 4 pools). Now when scale down
// happens better to delete the PDB(if no one refering to it) and
// create PDB among 3 pools so that scaling down of cluster will
// not hold unnecessarily(i.e draining the node).
if pdbObj.Name == pdbName && pdbPoolCount < len(cvcObj.Status.PoolInfo) {
return pdbName, nil
}
}
}
err = deletePDBIfNotInUse(cvcObj)
err := deletePDBIfNotInUse(cvcObj)
if err != nil {
return "", err
}
Expand All @@ -751,7 +720,7 @@ func (c *CVCController) isCVCScalePending(cvc *apis.CStorVolumeClaim) bool {
return util.IsChangeInLists(desiredPoolNames, cvc.Status.PoolInfo)
}

// handlePostScalingProcess will does the following changes:
// updatePDBForScaledVolume will does the following changes:
// 1. Handle PDB updation based on no.of volume replicas. It should handle in
// following ways:
// 1.1 If Volume was already pointing to a PDB then check is that same PDB will be
Expand All @@ -766,15 +735,13 @@ func (c *CVCController) isCVCScalePending(cvc *apis.CStorVolumeClaim) bool {
// created PDB name.
// 2. Update CVC label to point it to newely PDB got from above step and also
// replicas pool information on status of CVC.
func handlePostScalingProcess(cvc *apis.CStorVolumeClaim,
currentRPNames []string) error {
// NOTE: This function return object as well as error if error occured
func updatePDBForScaledVolume(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) {
var err error
cvcCopy := cvc.DeepCopy()
cvc.Status.PoolInfo = []string{}
cvc.Status.PoolInfo = append(cvc.Status.PoolInfo, currentRPNames...)
pdbName, err := updatePDBForVolume(cvc)
pdbName, err := getUpdatePDBForVolume(cvc)
if err != nil {
return errors.Wrapf(err,
return cvcCopy, errors.Wrapf(err,
"failed to handle PDB for scaled volume %s",
cvc.Name,
)
Expand All @@ -786,66 +753,58 @@ func handlePostScalingProcess(cvc *apis.CStorVolumeClaim,
newCVCObj, err := cvclaim.NewKubeclient().WithNamespace(cvc.Namespace).Update(cvc)
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// If error occured point it to old cvc object it self
cvc = cvcCopy
return errors.Wrapf(err,
return cvcCopy, errors.Wrapf(err,
"failed to update %s CVC status with scaledup replica pool names",
cvc.Name,
)
}
// Point cvc to updated cvc object
cvc = newCVCObj
return nil
return newCVCObj, nil
}

// verifyAndUpdateScaleUpInfo does the following changes:
// updateCVCWithScaledUpInfo does the following changes:
// 1. Get list of new replica pool names by using CVC(spec and status)
// 2. Verify status of ScalingUp Replica(by using CV object) based on the status
// does following changes:
// 2.1: If scalingUp was completed then update PDB accordingly(only if it was
// HAVolume) and update the replica pool info on CVC(API calls).
// 2.2: If scalingUp was going then return error saying scalingUp was in
// progress.
func verifyAndUpdateScaleUpInfo(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVolume) error {
// scaledRPNames contains the new replica pool names where entier data was
// reconstructed from other replicas
scaledRPNames := []string{}
func updateCVCWithScaledUpInfo(cvc *apis.CStorVolumeClaim,
cvObj *apis.CStorVolume) (*apis.CStorVolumeClaim, error) {
pvName := cvc.GetAnnotations()[volumeID]
desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc)
newPoolNames := util.ListDiff(desiredPoolNames, cvc.Status.PoolInfo)
replicaPoolMap := map[string]bool{}

replicaPoolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, cvc.Namespace)
replicaPoolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace)
if err != nil {
return errors.Wrapf(err, "failed to get current replica pool information")
return cvc, errors.Wrapf(err, "failed to get current replica pool information")
}

for _, poolName := range replicaPoolNames {
replicaPoolMap[poolName] = true
}
for _, poolName := range newPoolNames {
if _, ok := replicaPoolMap[poolName]; ok {
scaledRPNames = append(scaledRPNames, poolName)
}
}
if len(scaledRPNames) > 0 {
var currentRPNames []string
currentRPNames = append(currentRPNames, cvc.Status.PoolInfo...)
currentRPNames = append(currentRPNames, scaledRPNames...)
// handlePostScalingProcess will handle PDB and CVC status
err := handlePostScalingProcess(cvc, currentRPNames)
if err != nil {
return errors.Wrapf(
err,
"failed to handle post volume replicas scale up process",
if _, ok := replicaPoolMap[poolName]; !ok {
return cvc, errors.Errorf(
"scaling replicas from %d to %d in progress",
len(cvc.Status.PoolInfo),
len(cvc.Spec.Policy.ReplicaPoolInfo),
)
}
return nil
}
return errors.Errorf(
"scaling replicas from %d to %d in progress",
len(cvc.Status.PoolInfo),
len(cvc.Spec.Policy.ReplicaPoolInfo),
)
cvcCopy := cvc.DeepCopy()
// store volume replica pool names in currentRPNames
cvc.Status.PoolInfo = append(cvc.Status.PoolInfo, newPoolNames...)
// updatePDBForScaledVolume will handle updating PDB and CVC status
cvc, err = updatePDBForScaledVolume(cvc)
if err != nil {
return cvcCopy, errors.Wrapf(
err,
"failed to handle post volume replicas scale up process",
)
}
return cvc, nil
}

// getScaleDownCVR return CVR which belongs to scale down pool
Expand Down Expand Up @@ -926,32 +885,32 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo
// 3. Create CVRs if CVR doesn't exist on scaled cStor
// pool(handleVolumeReplicaCreation will handle new CVR creations).
// 4. If scalingUp volume replicas was completed then do following
// things(verifyAndUpdateScaleUpInfo will does following things).
// things(updateCVCWithScaledUpInfo will does following things).
// 4.1.1 Update PDB according to the new pools(only if volume is HAVolume).
// 4.1.2 Update PDB label on CVC and replica pool information on status.
// 5. If scalingUp of volume replicas was not completed then return error
func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error {
func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) {
drCount := len(cvc.Spec.Policy.ReplicaPoolInfo)
cvObj, err := cv.NewKubeclient().
WithNamespace(openebsNamespace).
Get(cvc.Name, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name)
return cvc, errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name)
}
if cvObj.Spec.DesiredReplicationFactor < drCount {
cvObj.Spec.DesiredReplicationFactor = drCount
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we consider picking replicationCount from CVC instead of CV for cstor-volume-mgmt?
This will avoid maintaining this info at multiple places. So, cstor-volume-mgmt will do like:

  • pick the cvc.spec.ReplicaCount if len of ReplicaPoolInfo is zero
  • else consider replicationFactor as len of ReplicaPoolInfo if it is present
    what steps does cstor-vol-mgmt performs on noticing change in DRF?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to find from code, but unable to find other than scale down

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this info on how cstor-vol-mgmt works helps in review

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update PR with relevant pr's

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

@mittachaitu mittachaitu Feb 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cvObj, err = updateCStorVolumeInfo(cvObj)
if err != nil {
return err
return cvc, err
}
}
// Create replicas on new pools
err = handleVolumeReplicaCreation(cvc, cvObj)
if err != nil {
return err
return cvc, err
}
err = verifyAndUpdateScaleUpInfo(cvc, cvObj)
return err
cvc, err = updateCVCWithScaledUpInfo(cvc, cvObj)
return cvc, err
}

// scaleDownVolumeReplicas will process the following steps
Expand All @@ -962,32 +921,25 @@ func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error {
// 3. Check the status of scale down if scale down was completed then
// delete the CVR which belongs to scale down pool and then perform post scaling
// process(updating PDB accordingly if it is applicable and CVC replica pool status).
func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error {
func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) {
var cvrObj *apis.CStorVolumeReplica
drCount := len(cvc.Spec.Policy.ReplicaPoolInfo)
cvObj, err := cv.NewKubeclient().
WithNamespace(openebsNamespace).
Get(cvc.Name, metav1.GetOptions{})
if err != nil {
return errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name)
}
// If more than one replica was scale down at a time keep on return the error
if (cvObj.Spec.ReplicationFactor - drCount) > 1 {
return errors.Wrapf(err,
"cann't perform %d replicas scaledown at a time",
(cvObj.Spec.DesiredReplicationFactor - drCount),
)
return cvc, errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name)
}
cvrObj, err = getScaleDownCVR(cvc)
if err != nil && !k8serror.IsNotFound(err) {
return errors.Wrapf(err, "failed to get scale down CVR object")
return cvc, errors.Wrapf(err, "failed to get CVR requested for scale down operation")
}
if cvObj.Spec.DesiredReplicationFactor > drCount {
cvObj.Spec.DesiredReplicationFactor = drCount
delete(cvObj.Spec.ReplicaDetails.KnownReplicas, apis.ReplicaID(cvrObj.Spec.ReplicaID))
cvObj, err = updateCStorVolumeInfo(cvObj)
if err != nil {
return err
return cvc, err
}
}
// TODO: Make below function as cvObj.IsScaleDownInProgress()
Expand All @@ -997,18 +949,21 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error {
WithNamespace(openebsNamespace).
Delete(cvrObj.Name)
if err != nil {
return errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name)
return cvc, errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name)
}
}
desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc)
err = handlePostScalingProcess(cvc, desiredPoolNames)
cvcCopy := cvc.DeepCopy()
cvc.Status.PoolInfo = desiredPoolNames
// updatePDBForScaledVolume will handle updating PDB and CVC status
cvc, err = updatePDBForScaledVolume(cvc)
if err != nil {
return errors.Wrapf(err,
return cvcCopy, errors.Wrapf(err,
"failed to handle post volume replicas scale down process")
}
return nil
return cvc, nil
}
return errors.Errorf(
return cvc, errors.Errorf(
"Scaling down volume replicas from %d to %d is in progress",
len(cvc.Status.PoolInfo),
drCount,
Expand Down
6 changes: 3 additions & 3 deletions cmd/cvc-operator/controller/cvc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *CVCController) syncCVC(cvc *apis.CStorVolumeClaim) error {
}

if c.isCVCScalePending(cvc) {
// process scalingup/scalingdown of volume replicas only if there is
// process scale-up/scale-down of volume replicas only if there is
// change in curent and desired state of replicas pool information
_ = c.scaleVolumeReplicas(cvc)
}
Expand Down Expand Up @@ -697,9 +697,9 @@ func deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error {
func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeClaim) error {
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
var err error
if len(cvc.Spec.Policy.ReplicaPoolInfo) > len(cvc.Status.PoolInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this check sufficient? we need to make sure that - all the content in PoolInfo need to be part of ReplicaPoolInfo also, and then, few more are there in it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These kind checks should be a head before that is admission server.

err = scaleUpVolumeReplicas(cvc)
cvc, err = scaleUpVolumeReplicas(cvc)
} else if len(cvc.Spec.Policy.ReplicaPoolInfo) < len(cvc.Status.PoolInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well, check should be - there are no extra things in RepliaPoolInfo other than what is present in PoolInfo

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

err = scaleDownVolumeReplicas(cvc)
cvc, err = scaleDownVolumeReplicas(cvc)
} else {
c.recorder.Event(cvc, corev1.EventTypeWarning, "Migration",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this limitation get into webhooks?

Copy link
Author

@mittachaitu mittachaitu Feb 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the webhook for CVC is ready we will have following checks

  • Repetition of pool names either under spec or status shouldn't be there.
  • InitialReplicaCount shouldn't be modified.
  • The existing pool name can't be updated with a new pool name(Which is migration case).
  • Not allow scale down/up of volume replicas when one other in progress.
  • Not allow more that one replica at a time for scale down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Failure of scale up/down if added pool is not healthy ? ( but admin/user shouldn't be providing the unhealthy pool name in first place)

"Migration of volume replicas is not yet supported")
Expand Down