From af522b4b3b77d306bc95ae13ba1c53666dc842b9 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Tue, 11 Feb 2020 19:28:17 +0530 Subject: [PATCH 01/11] feat(cvc-operator): add automatic scaling of volumereplicas via CVC Signed-off-by: mittachaitu --- .../controller/cstorvolumeclaim.go | 396 +++++++++++++++++- cmd/cvc-operator/controller/cvc_controller.go | 58 ++- .../openebs.io/v1alpha1/cstor_volume_claim.go | 2 + .../openebs.io/v1alpha1/cstorvolume_policy.go | 16 + pkg/cstor/volume/v1alpha1/cstorvolume.go | 7 + pkg/cstor/volumereplica/v1alpha1/build.go | 4 +- .../v1alpha1/cstorvolumereplica.go | 21 + pkg/cstorvolumeclaim/v1alpha1/utils.go | 9 + pkg/util/util.go | 23 + 9 files changed, 517 insertions(+), 19 deletions(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index 55e1e6ab84..fa4cd68c3b 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -17,6 +17,7 @@ limitations under the License. package cstorvolumeclaim import ( + "fmt" "math/rand" "strings" "time" @@ -30,13 +31,16 @@ import ( cv "github.com/openebs/maya/pkg/cstor/volume/v1alpha1" cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1" cvclaim "github.com/openebs/maya/pkg/cstorvolumeclaim/v1alpha1" + "github.com/openebs/maya/pkg/hash" svc "github.com/openebs/maya/pkg/kubernetes/service/v1alpha1" + "github.com/openebs/maya/pkg/util" errors "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" k8serror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog" ) const ( @@ -50,8 +54,18 @@ const ( // minHAReplicaCount is minimum no.of replicas are required to decide // HighAvailable volume minHAReplicaCount = 3 + volumeID = "openebs.io/volumeID" + cspiLabel = "cstorpoolinstance.openebs.io/name" + cspiOnline = "ONLINE" ) +// replicaInfo struct is used to pass replica information to +// create CVR +type replicaInfo struct { + replicaID string + phase apis.CStorVolumeReplicaPhase +} + var ( cvPorts = []corev1.ServicePort{ corev1.ServicePort{ @@ -375,7 +389,10 @@ func (c *CVCController) distributeCVRs( for count, pool := range usablePoolList.Items { pool := pool if count < pendingReplicaCount { - _, err = createCVR(service, volume, claim, &pool) + rInfo := replicaInfo{ + phase: apis.CVRStatusEmpty, + } + _, err = createCVR(service, volume, claim, &pool, rInfo) if err != nil { return err } @@ -403,6 +420,7 @@ func createCVR( volume *apis.CStorVolume, claim *apis.CStorVolumeClaim, pool *apis.CStorPoolInstance, + rInfo replicaInfo, ) (*apis.CStorVolumeReplica, error) { var ( isClone string @@ -440,9 +458,11 @@ func createCVR( WithOwnerRefernceNew(getCVROwnerReference(volume)). WithFinalizers(getCVRFinalizer()). WithTargetIP(service.Spec.ClusterIP). + WithReplicaID(rInfo.replicaID). WithCapacity(volume.Spec.Capacity.String()). WithNewVersion(version.GetVersion()). WithDependentsUpgraded(). + WithStatusPhase(rInfo.phase). Build() if err != nil { return nil, errors.Wrapf( @@ -562,14 +582,13 @@ func randomizePoolList(list *apis.CStorPoolInstanceList) *apis.CStorPoolInstance // 2. If PDB exist it returns the PDB. // 3. If PDB doesn't exist it creates new PDB(With CSPC hash) func getOrCreatePodDisruptionBudget( - cvObj *apis.CStorVolume, cspcName string) (*policy.PodDisruptionBudget, error) { - pvName := cvObj.Labels[string(apis.PersistentVolumeCPK)] - poolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace) - if err != nil { - return nil, errors.Wrapf(err, - "failed to get volume replica pool names of volume %s", - cvObj.Name) - } + cvObj *apis.CStorVolume, cspcName string, poolNames []string) (*policy.PodDisruptionBudget, error) { + // poolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace) + // if err != nil { + // return nil, errors.Wrapf(err, + // "failed to get volume replica pool names of volume %s", + // cvObj.Name) + // } pdbLabels := cvclaim.GetPDBPoolLabels(poolNames) labelSelector := apispdb.GetPDBLabelSelector(pdbLabels) pdbList, err := apispdb.KubeClient(). @@ -632,6 +651,15 @@ func getPDBSelector(pools []string) *metav1.LabelSelector { } } +// addReplicaPoolInfo updates in-memory replicas pool information on spec and +// status of CVC +func addReplicaPoolInfo(cvcObj *apis.CStorVolumeClaim, poolNames []string) { + for i, poolName := range poolNames { + cvcObj.Spec.Policy.ReplicaPool.PoolInfo[i].PoolName = poolName + } + cvcObj.Status.PoolInfo = append(cvcObj.Status.PoolInfo, poolNames...) +} + // addPDBLabelOnCVC will add PodDisruptionBudget label on CVC func addPDBLabelOnCVC( cvcObj *apis.CStorVolumeClaim, pdbObj *policy.PodDisruptionBudget) { @@ -643,7 +671,353 @@ func addPDBLabelOnCVC( cvcObj.SetLabels(cvcLabels) } -// isHAVolume returns true if replica count is greater than or equal to 3 +// isHAVolume returns true if no.of replicas are greater than or equal to 3. +// If CVC doesn't hold any volume replica pool information then verify with +// ReplicaCount. If CVC holds any volume replica pool information then verify +// with Status.PoolInfo func isHAVolume(cvcObj *apis.CStorVolumeClaim) bool { - return cvcObj.Spec.ReplicaCount >= minHAReplicaCount + if len(cvcObj.Status.PoolInfo) == 0 { + return cvcObj.Spec.ReplicaCount >= minHAReplicaCount + } + return len(cvcObj.Status.PoolInfo) >= minHAReplicaCount +} + +// 1. If Volume was already pointing to a PDB then check is that same PDB will be +// applicable after scalingup/scalingdown(case might be from 4 to 3 +// replicas) if applicable then return same pdb name. If not applicable do +// following changes: +// 1.1 Delete PDB if no other CVC is pointing to PDB. +// 2. If current volume was not pointing to any PDB then do nothing. +// 3. If current volume is HAVolume then check is there any PDB already +// existing among the current replica pools. If PDB exists then return +// that PDB name. If PDB doesn't exist then create new PDB and return newely +// created PDB name. +// 4. If current volume is not HAVolume then return nothing. +func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim, + cvObj *apis.CStorVolume) (string, error) { + pdbName, hasPDB := cvcObj.GetLabels()[string(apis.PodDisruptionBudgetKey)] + pdbLabels := cvclaim.GetPDBPoolLabels(cvcObj.Status.PoolInfo) + labelSelector := apispdb.GetPDBLabelSelector(pdbLabels) + if hasPDB { + pdbList, err := apispdb.KubeClient(). + WithNamespace(openebsNamespace). + List(metav1.ListOptions{LabelSelector: labelSelector}) + if err != nil { + return "", errors.Wrapf(err, + "failed to get PDB present among pools %v", + cvcObj.Status.PoolInfo, + ) + } + if pdbList.Items[0].Name == pdbName { + return pdbName, nil + } + err = deletePDBIfNotInUse(cvcObj) + if err != nil { + return "", err + } + } + if !isHAVolume(cvcObj) { + return "", nil + } + pdbObj, err := getOrCreatePodDisruptionBudget(cvObj, + getCSPC(cvcObj), cvcObj.Status.PoolInfo) + if err != nil { + return "", err + } + return pdbObj.Name, nil +} + +// isCVCScalePending returns true if there is change in desired replica pool +// names and current replica pool names +// 1. Below function will check whether there is any change in desired replica +// pool names and current replica pool names. +func (c *CVCController) isCVCScalePending(cvc *apis.CStorVolumeClaim) bool { + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + return util.IsChangeInLists(desiredPoolNames, cvc.Status.PoolInfo) +} + +// handlePostScalingProcess will does the following changes: +// 1. Handle PDB updation based on no.of volume replicas. It should handle in +// following ways: +// 1.1 If Volume was already pointing to a PDB then check is that same PDB will be +// applicable after scalingup/scalingdown(case might be from 4 to 3 +// replicas) if applicable then return same pdb name. If not applicable do +// following changes: +// 1.1.1 Delete PDB if no other CVC is pointing to PDB. +// 1.2 If current volume was not pointing to any PDB then do nothing. +// 1.3 If current volume is HAVolume then check is there any PDB already +// existing among the current replica pools. If PDB exists then return +// that PDB name. If PDB doesn't exist then create new PDB and return newely +// created PDB name. +// 2. Update CVC label to point it to newely PDB got from above step and also +// replicas pool information on status of CVC. +func handlePostScalingProcess(cvc *apis.CStorVolumeClaim, + cvObj *apis.CStorVolume, currentRPNames []string) error { + var err error + cvcCopy := cvc.DeepCopy() + cvc.Status.PoolInfo = []string{} + cvc.Status.PoolInfo = append(cvc.Status.PoolInfo, currentRPNames...) + pdbName, err := updatePDBForVolume(cvc, cvObj) + if err != nil { + return errors.Wrapf(err, + "failed to handle PDB for scaled volume %s", + cvc.Name, + ) + } + delete(cvc.Labels, string(apis.PodDisruptionBudgetKey)) + if pdbName != "" { + cvc.Labels[string(apis.PodDisruptionBudgetKey)] = pdbName + } + cvc, err = cvclaim.NewKubeclient().WithNamespace(cvc.Namespace).Update(cvc) + if err != nil { + // If error occured point it to old cvc object it self + cvc = cvcCopy + return errors.Wrapf(err, + "failed to update %s CVC status with scaledup replica pool names", + cvc.Name, + ) + } + return nil +} + +// verifyAndUpdateScaleUpInfo does the following changes: +// 1. Get list of new replica pool names by using CVC(spec and status) +// 2. Verify status of ScalingUp Replica(by using CV object) based on the status +// does following changes: +// 2.1: If scalingUp was completed then update PDB accordingly(only if it was +// HAVolume) and update the replica pool info on CVC(API calls). +// 2.2: If scalingUp was going then return error saying scalingUp was in +// progress. +func verifyAndUpdateScaleUpInfo(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVolume) error { + // scaledRPNames contains the new replica pool names where entier data was + // reconstructed from other replicas + scaledRPNames := []string{} + pvName := cvc.GetAnnotations()[volumeID] + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + newPoolNames := util.ListDiff(desiredPoolNames, cvc.Status.PoolInfo) + for _, poolName := range newPoolNames { + cvrName := pvName + "-" + poolName + cvrObj, err := cvr.NewKubeclient(). + WithNamespace(getNamespace()). + Get(cvrName, metav1.GetOptions{}) + if err != nil { + klog.Errorf("failed to get CVR %s error: %v", cvrName, err) + continue + } + _, isIDExists := cvObj.Status.ReplicaDetails.KnownReplicas[apis.ReplicaID(cvrObj.Spec.ReplicaID)] + // ScalingUp was completed only if CVR replicaID exists on CV status + // and also CVR should be Healthy(there might be cases of replica + // migration in that case replicaID will be same zvol guid will be + // different) + if isIDExists && cvrObj.Status.Phase == apis.CVRStatusOnline { + scaledRPNames = append(scaledRPNames, poolName) + } + } + if len(scaledRPNames) > 0 { + var currentRPNames []string + currentRPNames = append(currentRPNames, cvc.Status.PoolInfo...) + currentRPNames = append(currentRPNames, scaledRPNames...) + // handlePostScalingProcess will handle PDB and CVC status + err := handlePostScalingProcess(cvc, cvObj, currentRPNames) + if err != nil { + return errors.Wrapf( + err, + "failed to handle post volume replicas scale up process", + ) + } + return nil + } + return errors.Errorf( + "scaling replicas from %d to %d in progress", + len(cvc.Status.PoolInfo), + len(cvc.Spec.Policy.ReplicaPool.PoolInfo), + ) +} + +func getScaleDownCVR(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeReplica, error) { + pvName := cvc.GetAnnotations()[volumeID] + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + removedPoolNames := util.ListDiff(cvc.Status.PoolInfo, desiredPoolNames) + cvrName := pvName + removedPoolNames[0] + return cvr.NewKubeclient(). + WithNamespace(getNamespace()). + Get(cvrName, metav1.GetOptions{}) +} + +// handleVolumeReplicaCreation does the following changes: +// 1. Get the list of new pool names(i.e poolNames which are in spec but not in +// status of CVC). +// 2. Creates new CVR on new pools only if CVR on that pool doesn't exists. If +// CVR already created then do nothing. +func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVolume) error { + pvName := cvc.GetAnnotations()[volumeID] + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + newPoolNames := util.ListDiff(desiredPoolNames, cvc.Status.PoolInfo) + errs := []error{} + var errorMsg string + + svcObj, err := svc.NewKubeClient(svc.WithNamespace(openebsNamespace)). + Get(cvc.Name, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "failed to get service object %s", cvc.Name) + } + + cvrApiList, err := cvr.NewKubeclient(). + WithNamespace(getNamespace()). + List(metav1.ListOptions{LabelSelector: pvSelector + "=" + pvName}) + if err != nil { + return errors.Wrapf(err, "failed to list cstorvolumereplicas of volume %s", pvName) + } + cvrListbuilder := cvr.NewListBuilder(). + WithAPIList(cvrApiList) + + for _, poolName := range newPoolNames { + if cvrListbuilder. + WithFilter(cvr.HasLabel(cspiLabel, poolName)). + List().Len() == 0 { + cspiObj, err := cspi.NewKubeClient(). + WithNamespace(getNamespace()). + Get(poolName, metav1.GetOptions{}) + if err != nil { + errorMsg = fmt.Sprintf("failed to get cstorpoolinstance %s error: %v", poolName, err) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + if cspiObj.Status.Phase != cspiOnline { + errorMsg = fmt.Sprintf( + "failed to create cstorvolumerplica on pool %s error: pool is not in %s", + cspiObj.Name, + cspiOnline, + ) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + hash, err := hash.Hash(pvName + "-" + poolName) + if err != nil { + errorMsg = fmt.Sprintf( + "failed to calculate of hase for new volume replica error: %v", + err) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + // TODO: Add a check for ClonedVolumeReplica scaleup case + // Create replica with Recreate state + rInfo := replicaInfo{ + replicaID: hash, + phase: apis.CVRStatusRecreate, + } + cvr, err := createCVR(svcObj, cvObj, cvc, cspiObj, rInfo) + if err != nil { + errorMsg = fmt.Sprintf( + "failed to create new replica on pool %s error: %v", + poolName, + err, + ) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + // Update cvrListbuilder with new replicas + cvrListbuilder = cvrListbuilder.AppendListBuilder(cvr) + } + } + if len(errs) > 0 { + return errors.Errorf("%+v", errs) + } + return nil +} + +// scaleUpVolumeReplicas does the following work +// 1. Fetch corresponding CStorVolume object of CVC. +// 2. Verify is there need to update desiredReplicationFactor of CVC(In etcd). +// 3. Create CVRs if doesn't created on scaled cStor +// pool(handleVolumeReplicaCreation will handle new CVR creations). +// 4. If scalingUp volume replicas was completed then do following +// things(verifyAndUpdateScaleUpInfo will does following things). If +// scalingUp of volume replicas was not completed then return error +// 4.1.1 Update PDB according to the new pools(only if volume is HAVolume). +// 4.1.2 Update PDB label on CVC and replica pool information on status. +func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error { + drCount := len(cvc.Spec.Policy.ReplicaPool.PoolInfo) + cvObj, err := cv.NewKubeclient(). + WithNamespace(getNamespace()). + Get(cvc.Name, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) + } + if cvObj.Spec.DesiredReplicationFactor < drCount { + cvObj.Spec.DesiredReplicationFactor = drCount + cvObj, err = updateCStorVolumeInfo(cvObj) + if err != nil { + return err + } + } + err = handleVolumeReplicaCreation(cvc, cvObj) + if err != nil { + return err + } + err = verifyAndUpdateScaleUpInfo(cvc, cvObj) + return err +} + +// scaleDownVolumeReplicas will process the following steps +// 1. Verify whether operation made by user is valid for scale down +// process(Only one replica scaledown at a time is allowed). +// 2. Update the CV object by decreasing the DRF and removing the +// replicaID entry. +// 3. Check the status of scale down if scale down was completed then +// perform post scaling process(updating PDB if applicable and CVC +// replica pool status). +func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { + drCount := len(cvc.Spec.Policy.ReplicaPool.PoolInfo) + cvObj, err := cv.NewKubeclient(). + WithNamespace(getNamespace()). + Get(cvc.Name, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) + } + // If more than one replica was scale down at a time keep on return the error + if (cvObj.Spec.ReplicationFactor - drCount) > 1 { + return errors.Wrapf(err, + "cann't perform %d replicas scaledown at a time", + (cvObj.Spec.DesiredReplicationFactor - drCount), + ) + } + if cvObj.Spec.DesiredReplicationFactor > drCount { + cvrObj, err := getScaleDownCVR(cvc) + if err != nil { + return errors.Wrapf(err, "failed to get scale down CVR object") + } + cvObj.Spec.DesiredReplicationFactor = drCount + delete(cvObj.Spec.ReplicaDetails.KnownReplicas, apis.ReplicaID(cvrObj.Spec.ReplicaID)) + cvObj, err = updateCStorVolumeInfo(cvObj) + if err != nil { + return err + } + } + if !cv.IsScaleDownInProgress(cvObj) { + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + err = handlePostScalingProcess(cvc, cvObj, desiredPoolNames) + if err != nil { + return errors.Wrapf(err, + "failed to handle post volume replicas scale down process") + } + return nil + } + return errors.Errorf( + "Scaling down volume replicas from %d to %d is in progress", + len(cvc.Status.PoolInfo), + drCount, + ) +} + +// UpdateCStorVolumeInfo modifies the CV Object in etcd by making update API call +// Note: Caller code should handle the error +func updateCStorVolumeInfo(cvObj *apis.CStorVolume) (*apis.CStorVolume, error) { + return cv.NewKubeclient(). + WithNamespace(getNamespace()). + Update(cvObj) } diff --git a/cmd/cvc-operator/controller/cvc_controller.go b/cmd/cvc-operator/controller/cvc_controller.go index 7ee902ae98..79dd705c90 100644 --- a/cmd/cvc-operator/controller/cvc_controller.go +++ b/cmd/cvc-operator/controller/cvc_controller.go @@ -26,6 +26,8 @@ import ( errors "github.com/pkg/errors" "k8s.io/klog" + cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1" + cvclaim "github.com/openebs/maya/pkg/cstorvolumeclaim/v1alpha1" corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" k8serror "k8s.io/apimachinery/pkg/api/errors" @@ -203,6 +205,12 @@ func (c *CVCController) syncCVC(cvc *apis.CStorVolumeClaim) error { if err != nil { return err } + + if c.isCVCScalePending(cvc) { + // process scalingup/scalingdown of volume replicas only if there is + // change in curent and desired state of replicas pool information + err = c.scaleVolumeReplicas(cvc) + } return nil } @@ -274,11 +282,19 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis return nil, err } + // Fetch the volume replica pool names and use them in PDB and updating in + // spec and status of CVC + poolNames, err := cvr.GetVolumeReplicaPoolNames(cvc.Name, openebsNamespace) + if err != nil { + return nil, errors.Wrapf(err, + "failed to get volume replica pool names of volume %s", cvObj.Name) + } + if isHAVolume(cvc) { // TODO: When multiple threads or multiple CVC controllers are set then // we have to revist entier PDB code path var pdbObj *policy.PodDisruptionBudget - pdbObj, err = getOrCreatePodDisruptionBudget(cvObj, getCSPC(cvc)) + pdbObj, err = getOrCreatePodDisruptionBudget(cvObj, getCSPC(cvc), poolNames) if err != nil { return nil, errors.Wrapf(err, "failed to create PDB for volume: %s", cvc.Name) @@ -291,6 +307,8 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis return nil, err } + // update volume replica pool information on cvc spec and status + addReplicaPoolInfo(cvc, poolNames) // update the cstorvolume reference, phase as "Bound" and desired // capacity cvc.Spec.CStorVolumeRef = volumeRef @@ -366,7 +384,7 @@ func (c *CVCController) removeClaimFinalizer( cvc *apis.CStorVolumeClaim, ) error { if isHAVolume(cvc) { - err := c.deletePDBIfNotInUse(cvc) + err := deletePDBIfNotInUse(cvc) if err != nil { return errors.Wrapf(err, "failed to verify whether PDB %s is in use by other volumes", @@ -637,14 +655,13 @@ func (c *CVCController) resizeCV(cv *apis.CStorVolume, newCapacity resource.Quan // deletePDBIfNotInUse deletes the PDB if no volume is refering to the // cStorvolumeclaim PDB -func (c *CVCController) deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { +func deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { //TODO: If HALease is enabled active-active then below code needs to be //revist pdbName := getPDBName(cvc) cvcLabelSelector := string(apis.PodDisruptionBudgetKey) + "=" + pdbName - cvcList, err := c.clientset. - OpenebsV1alpha1(). - CStorVolumeClaims(cvc.Namespace). + cvcList, err := cvclaim.NewKubeclient(). + WithNamespace(cvc.Namespace). List(metav1.ListOptions{LabelSelector: cvcLabelSelector}) if err != nil { return errors.Wrapf(err, @@ -654,9 +671,38 @@ func (c *CVCController) deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { err = apispdb.KubeClient(). WithNamespace(openebsNamespace). Delete(pdbName, &metav1.DeleteOptions{}) + if k8serror.IsNotFound(err) { + klog.Infof("pdb %s of volume %s was already deleted", pdbName, cvc.Name) + return nil + } if err != nil { return err } } return nil } + +// scaleVolumeReplicas identifies whether it is scaleup or scaledown case of +// volume replicas. If user added entry of pool info under the spec then changes +// are treated as scaleup case. If user removed poolInfo entry from spec then +// changes are treated as scale down case. If user just modifies the pool entry +// info under the spec then it is a kind of migration which is not yet supported +func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeClaim) error { + var err error + if len(cvc.Spec.Policy.ReplicaPool.PoolInfo) > len(cvc.Status.PoolInfo) { + err = scaleUpVolumeReplicas(cvc) + } else if len(cvc.Spec.Policy.ReplicaPool.PoolInfo) < len(cvc.Status.PoolInfo) { + err = scaleDownVolumeReplicas(cvc) + } else { + c.recorder.Event(cvc, corev1.EventTypeWarning, "Migration", + "Migration of volume replicas is not yet supported") + return nil + } + if err != nil { + c.recorder.Eventf(cvc, + corev1.EventTypeWarning, + "ScalingVolumeReplicas", + "%v", err) + } + return nil +} diff --git a/pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go b/pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go index b2c7394a0d..9ce5a24466 100644 --- a/pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go +++ b/pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go @@ -95,6 +95,8 @@ type CStorVolumeClaimStatus struct { // Capacity the actual resources of the underlying volume. Capacity corev1.ResourceList `json:"capacity,omitempty"` Conditions []CStorVolumeClaimCondition `json:"condition,omitempty"` + // PoolInfo represents current pool names where volume replicas exists + PoolInfo []string `json:"poolInfo"` } // CStorVolumeClaimCondition contains details about state of cstor volume diff --git a/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go b/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go index fb19906f36..0d65f1d583 100644 --- a/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go +++ b/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go @@ -45,6 +45,9 @@ type CStorVolumePolicySpec struct { Target TargetSpec `json:"target"` // ReplicaSpec represents configuration related to replicas resources Replica ReplicaSpec `json:"replica"` + // ReplicaPool holds the pool information of volume replicas. + // Ex: If volume is provisioned on which CStor pool volume replicas exist + ReplicaPool ReplicaPoolSpec `json:"replicaPool"` } // TargetSpec represents configuration related to cstor target and its resources @@ -95,6 +98,12 @@ type ReplicaSpec struct { Affinity *corev1.PodAffinity `json:"affinity"` } +// ReplicaPoolSpec represents the volume replicas pool information +type ReplicaPoolSpec struct { + // PoolInfo represents the pool information of replicas + PoolInfo []ReplicaPoolInfo `json:"poolInfo"` +} + // Provision represents volume provisioning configuration type Provision struct { // replicaAffinity is set to true then volume replica resources need to be @@ -102,6 +111,13 @@ type Provision struct { ReplicaAffinity bool `json:"replicaAffinity"` } +// ReplicaPoolInfo represents the pool information of volume replica +type ReplicaPoolInfo struct { + // PoolName represents the pool name where volume replica exists + PoolName string `json:"poolName"` + // UID also can be added +} + // CStorVolumePolicyStatus is for handling status of CstorVolumePolicy type CStorVolumePolicyStatus struct { Phase string `json:"phase"` diff --git a/pkg/cstor/volume/v1alpha1/cstorvolume.go b/pkg/cstor/volume/v1alpha1/cstorvolume.go index 0964de1bfa..0b47d95f1d 100644 --- a/pkg/cstor/volume/v1alpha1/cstorvolume.go +++ b/pkg/cstor/volume/v1alpha1/cstorvolume.go @@ -448,3 +448,10 @@ func (csr *CVReplicationDetails) UpdateCVWithReplicationDetails(kubeclient *Kube } return err } + +// IsScaleDownInProgress return true if length of status replica details is +// greater than length of spec replica details +func IsScaleDownInProgress(cvObj *apis.CStorVolume) bool { + return len(cvObj.Status.ReplicaDetails.KnownReplicas) > + len(cvObj.Spec.ReplicaDetails.KnownReplicas) +} diff --git a/pkg/cstor/volumereplica/v1alpha1/build.go b/pkg/cstor/volumereplica/v1alpha1/build.go index 52ba9c2d5b..bb61aa4c8e 100644 --- a/pkg/cstor/volumereplica/v1alpha1/build.go +++ b/pkg/cstor/volumereplica/v1alpha1/build.go @@ -204,8 +204,8 @@ func (b *Builder) WithCapacity(capacity string) *Builder { // WithStatusPhase sets the Status Phase of CStorVolumeReplica with provided //arguments -func (b *Builder) WithStatusPhase(phase string) *Builder { - b.cvr.object.Status.Phase = apis.CStorVolumeReplicaPhase(phase) +func (b *Builder) WithStatusPhase(phase apis.CStorVolumeReplicaPhase) *Builder { + b.cvr.object.Status.Phase = phase return b } diff --git a/pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go b/pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go index 05d020dc7d..f7bf4b5b0b 100644 --- a/pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go +++ b/pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go @@ -111,6 +111,13 @@ func (b *ListBuilder) WithAPIList( return b } +// AppendListBuilder append the provided CVR API object into existing ListBuilder +func (b *ListBuilder) AppendListBuilder( + cvr *apis.CStorVolumeReplica) *ListBuilder { + b.list.items = append(b.list.items, &CVR{object: cvr}) + return b +} + // List returns the list of cvr // instances that was built by this // builder @@ -145,6 +152,20 @@ func (p *CVR) IsHealthy() bool { return p.object.Status.Phase == "Healthy" } +// HasLabel returns true only if label key value matched to provided +// value. +func (p *CVR) HasLabel(key, value string) bool { + return p.object.Labels[key] == value +} + +// HasLabel returns predicate to filter out CVRs based on the +// provided key and values +func HasLabel(key, value string) Predicate { + return func(p *CVR) bool { + return p.HasLabel(key, value) + } +} + // IsHealthy is a Predicate to filter out cvrs // which is healthy func IsHealthy() Predicate { diff --git a/pkg/cstorvolumeclaim/v1alpha1/utils.go b/pkg/cstorvolumeclaim/v1alpha1/utils.go index 37d9152dcf..1d9ffbf792 100644 --- a/pkg/cstorvolumeclaim/v1alpha1/utils.go +++ b/pkg/cstorvolumeclaim/v1alpha1/utils.go @@ -61,3 +61,12 @@ func GetPDBLabels(poolNames []string, cspcName string) map[string]string { pdbLabels[string(apis.CStorPoolClusterCPK)] = cspcName return pdbLabels } + +// GetDesiredReplicaPoolNames returns list of desired pool names +func GetDesiredReplicaPoolNames(cvc *apis.CStorVolumeClaim) []string { + poolNames := []string{} + for _, poolInfo := range cvc.Spec.Policy.ReplicaPool.PoolInfo { + poolNames = append(poolNames, poolInfo.PoolName) + } + return poolNames +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 5c4384514f..fa3feb533a 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -218,3 +218,26 @@ func RemoveString(slice []string, s string) (result []string) { } return result } + +// IsChangeInLists returns true if there is any difference in listA and listB +func IsChangeInLists(listA, listB []string) bool { + listAMap := map[string]bool{} + listBMap := map[string]bool{} + for _, name := range listA { + listAMap[name] = true + } + for _, name := range listB { + listBMap[name] = true + } + for _, name := range listA { + if !listBMap[name] { + return true + } + } + for _, name := range listB { + if !listAMap[name] { + return true + } + } + return false +} From 21245cd1dc018109a8c1b2a33c3aae03a392d788 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Wed, 12 Feb 2020 18:19:20 +0530 Subject: [PATCH 02/11] This commit fixes the bugs in replica scaling feature Signed-off-by: mittachaitu --- .../controller/cstorvolumeclaim.go | 39 +++++++++++++------ cmd/cvc-operator/controller/cvc_controller.go | 12 ++++-- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index fa4cd68c3b..acb35486c9 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -582,7 +582,7 @@ func randomizePoolList(list *apis.CStorPoolInstanceList) *apis.CStorPoolInstance // 2. If PDB exist it returns the PDB. // 3. If PDB doesn't exist it creates new PDB(With CSPC hash) func getOrCreatePodDisruptionBudget( - cvObj *apis.CStorVolume, cspcName string, poolNames []string) (*policy.PodDisruptionBudget, error) { + cspcName string, poolNames []string) (*policy.PodDisruptionBudget, error) { // poolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace) // if err != nil { // return nil, errors.Wrapf(err, @@ -654,8 +654,10 @@ func getPDBSelector(pools []string) *metav1.LabelSelector { // addReplicaPoolInfo updates in-memory replicas pool information on spec and // status of CVC func addReplicaPoolInfo(cvcObj *apis.CStorVolumeClaim, poolNames []string) { - for i, poolName := range poolNames { - cvcObj.Spec.Policy.ReplicaPool.PoolInfo[i].PoolName = poolName + for _, poolName := range poolNames { + cvcObj.Spec.Policy.ReplicaPool.PoolInfo = append( + cvcObj.Spec.Policy.ReplicaPool.PoolInfo, + apis.ReplicaPoolInfo{PoolName: poolName}) } cvcObj.Status.PoolInfo = append(cvcObj.Status.PoolInfo, poolNames...) } @@ -699,6 +701,7 @@ func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim, pdbLabels := cvclaim.GetPDBPoolLabels(cvcObj.Status.PoolInfo) labelSelector := apispdb.GetPDBLabelSelector(pdbLabels) if hasPDB { + // Get PDB if exists among newely updated volume replicas pools pdbList, err := apispdb.KubeClient(). WithNamespace(openebsNamespace). List(metav1.ListOptions{LabelSelector: labelSelector}) @@ -708,8 +711,18 @@ func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim, cvcObj.Status.PoolInfo, ) } - if pdbList.Items[0].Name == pdbName { - return pdbName, nil + if len(pdbList.Items) >= 1 && isHAVolume(cvcObj) { + for _, pdbObj := range pdbList.Items { + pdbPoolCount := len(pdbObj.Spec.Selector.MatchExpressions[0].Values) + // Let us assume that volume replicas was scale down from 4 to + // 3(i.e PDB was created on top of 4 pools). Now when scale down + // happens better to delete the PDB(if no one refering to it) and + // create PDB among 3 pools so that scaling down of cluster will + // not hold unnecessarily(i.e draining the node). + if pdbObj.Name == pdbName && pdbPoolCount < len(cvcObj.Status.PoolInfo) { + return pdbName, nil + } + } } err = deletePDBIfNotInUse(cvcObj) if err != nil { @@ -719,8 +732,7 @@ func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim, if !isHAVolume(cvcObj) { return "", nil } - pdbObj, err := getOrCreatePodDisruptionBudget(cvObj, - getCSPC(cvcObj), cvcObj.Status.PoolInfo) + pdbObj, err := getOrCreatePodDisruptionBudget(getCSPC(cvcObj), cvcObj.Status.PoolInfo) if err != nil { return "", err } @@ -768,7 +780,7 @@ func handlePostScalingProcess(cvc *apis.CStorVolumeClaim, if pdbName != "" { cvc.Labels[string(apis.PodDisruptionBudgetKey)] = pdbName } - cvc, err = cvclaim.NewKubeclient().WithNamespace(cvc.Namespace).Update(cvc) + newCVCObj, err := cvclaim.NewKubeclient().WithNamespace(cvc.Namespace).Update(cvc) if err != nil { // If error occured point it to old cvc object it self cvc = cvcCopy @@ -777,6 +789,8 @@ func handlePostScalingProcess(cvc *apis.CStorVolumeClaim, cvc.Name, ) } + // Point cvc to updated cvc object + cvc = newCVCObj return nil } @@ -838,7 +852,7 @@ func getScaleDownCVR(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeReplica, erro pvName := cvc.GetAnnotations()[volumeID] desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) removedPoolNames := util.ListDiff(cvc.Status.PoolInfo, desiredPoolNames) - cvrName := pvName + removedPoolNames[0] + cvrName := pvName + "-" + removedPoolNames[0] return cvr.NewKubeclient(). WithNamespace(getNamespace()). Get(cvrName, metav1.GetOptions{}) @@ -862,14 +876,14 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo return errors.Wrapf(err, "failed to get service object %s", cvc.Name) } - cvrApiList, err := cvr.NewKubeclient(). + cvrAPIList, err := cvr.NewKubeclient(). WithNamespace(getNamespace()). List(metav1.ListOptions{LabelSelector: pvSelector + "=" + pvName}) if err != nil { return errors.Wrapf(err, "failed to list cstorvolumereplicas of volume %s", pvName) } cvrListbuilder := cvr.NewListBuilder(). - WithAPIList(cvrApiList) + WithAPIList(cvrAPIList) for _, poolName := range newPoolNames { if cvrListbuilder. @@ -987,7 +1001,8 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { ) } if cvObj.Spec.DesiredReplicationFactor > drCount { - cvrObj, err := getScaleDownCVR(cvc) + var cvrObj *apis.CStorVolumeReplica + cvrObj, err = getScaleDownCVR(cvc) if err != nil { return errors.Wrapf(err, "failed to get scale down CVR object") } diff --git a/cmd/cvc-operator/controller/cvc_controller.go b/cmd/cvc-operator/controller/cvc_controller.go index 79dd705c90..e0840494d3 100644 --- a/cmd/cvc-operator/controller/cvc_controller.go +++ b/cmd/cvc-operator/controller/cvc_controller.go @@ -294,7 +294,7 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis // TODO: When multiple threads or multiple CVC controllers are set then // we have to revist entier PDB code path var pdbObj *policy.PodDisruptionBudget - pdbObj, err = getOrCreatePodDisruptionBudget(cvObj, getCSPC(cvc), poolNames) + pdbObj, err = getOrCreatePodDisruptionBudget(getCSPC(cvc), poolNames) if err != nil { return nil, errors.Wrapf(err, "failed to create PDB for volume: %s", cvc.Name) @@ -307,14 +307,14 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis return nil, err } - // update volume replica pool information on cvc spec and status - addReplicaPoolInfo(cvc, poolNames) // update the cstorvolume reference, phase as "Bound" and desired // capacity cvc.Spec.CStorVolumeRef = volumeRef cvc.Spec.Policy = volumePolicy.Spec cvc.Status.Phase = apis.CStorVolumeClaimPhaseBound cvc.Status.Capacity = cvc.Spec.Capacity + // update volume replica pool information on cvc spec and status + addReplicaPoolInfo(cvc, poolNames) err = c.updateCVCObj(cvc, cvObj) if err != nil { @@ -678,6 +678,7 @@ func deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { if err != nil { return err } + klog.Infof("Successfully deleted the PDB %s of volume %s", pdbName, cvc.Name) } return nil } @@ -703,6 +704,11 @@ func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeClaim) error { corev1.EventTypeWarning, "ScalingVolumeReplicas", "%v", err) + return err } + c.recorder.Eventf(cvc, + corev1.EventTypeNormal, + "ScalingVolumeReplicas", + "successfully scaled volume replicas to %d", len(cvc.Status.PoolInfo)) return nil } From 641bc8c6e42f427b663528b7ed3c764afafeed02 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Wed, 12 Feb 2020 19:52:28 +0530 Subject: [PATCH 03/11] This commit fixes the golang ci linting issues Signed-off-by: mittachaitu --- .../controller/cstorvolumeclaim.go | 32 ++++++++++++------- cmd/cvc-operator/controller/cvc_controller.go | 2 +- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index acb35486c9..0d4b555fd4 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -695,8 +695,7 @@ func isHAVolume(cvcObj *apis.CStorVolumeClaim) bool { // that PDB name. If PDB doesn't exist then create new PDB and return newely // created PDB name. // 4. If current volume is not HAVolume then return nothing. -func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim, - cvObj *apis.CStorVolume) (string, error) { +func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim) (string, error) { pdbName, hasPDB := cvcObj.GetLabels()[string(apis.PodDisruptionBudgetKey)] pdbLabels := cvclaim.GetPDBPoolLabels(cvcObj.Status.PoolInfo) labelSelector := apispdb.GetPDBLabelSelector(pdbLabels) @@ -764,12 +763,12 @@ func (c *CVCController) isCVCScalePending(cvc *apis.CStorVolumeClaim) bool { // 2. Update CVC label to point it to newely PDB got from above step and also // replicas pool information on status of CVC. func handlePostScalingProcess(cvc *apis.CStorVolumeClaim, - cvObj *apis.CStorVolume, currentRPNames []string) error { + currentRPNames []string) error { var err error cvcCopy := cvc.DeepCopy() cvc.Status.PoolInfo = []string{} cvc.Status.PoolInfo = append(cvc.Status.PoolInfo, currentRPNames...) - pdbName, err := updatePDBForVolume(cvc, cvObj) + pdbName, err := updatePDBForVolume(cvc) if err != nil { return errors.Wrapf(err, "failed to handle PDB for scaled volume %s", @@ -832,7 +831,7 @@ func verifyAndUpdateScaleUpInfo(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVol currentRPNames = append(currentRPNames, cvc.Status.PoolInfo...) currentRPNames = append(currentRPNames, scaledRPNames...) // handlePostScalingProcess will handle PDB and CVC status - err := handlePostScalingProcess(cvc, cvObj, currentRPNames) + err := handlePostScalingProcess(cvc, currentRPNames) if err != nil { return errors.Wrapf( err, @@ -982,10 +981,12 @@ func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error { // process(Only one replica scaledown at a time is allowed). // 2. Update the CV object by decreasing the DRF and removing the // replicaID entry. -// 3. Check the status of scale down if scale down was completed then +// 3. Delete the CVR that belongs to removed pool entry. +// 4. Check the status of scale down if scale down was completed then // perform post scaling process(updating PDB if applicable and CVC // replica pool status). func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { + var cvrObj *apis.CStorVolumeReplica drCount := len(cvc.Spec.Policy.ReplicaPool.PoolInfo) cvObj, err := cv.NewKubeclient(). WithNamespace(getNamespace()). @@ -1000,12 +1001,13 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { (cvObj.Spec.DesiredReplicationFactor - drCount), ) } - if cvObj.Spec.DesiredReplicationFactor > drCount { - var cvrObj *apis.CStorVolumeReplica - cvrObj, err = getScaleDownCVR(cvc) - if err != nil { + cvrObj, err = getScaleDownCVR(cvc) + if err != nil { + if !k8serror.IsNotFound(err) { return errors.Wrapf(err, "failed to get scale down CVR object") } + } + if cvObj.Spec.DesiredReplicationFactor > drCount { cvObj.Spec.DesiredReplicationFactor = drCount delete(cvObj.Spec.ReplicaDetails.KnownReplicas, apis.ReplicaID(cvrObj.Spec.ReplicaID)) cvObj, err = updateCStorVolumeInfo(cvObj) @@ -1013,9 +1015,17 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { return err } } + if cvrObj != nil { + err = cvr.NewKubeclient(). + WithNamespace(getNamespace()). + Delete(cvrObj.Name) + if err != nil { + return errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name) + } + } if !cv.IsScaleDownInProgress(cvObj) { desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) - err = handlePostScalingProcess(cvc, cvObj, desiredPoolNames) + err = handlePostScalingProcess(cvc, desiredPoolNames) if err != nil { return errors.Wrapf(err, "failed to handle post volume replicas scale down process") diff --git a/cmd/cvc-operator/controller/cvc_controller.go b/cmd/cvc-operator/controller/cvc_controller.go index e0840494d3..6ffcdaba4f 100644 --- a/cmd/cvc-operator/controller/cvc_controller.go +++ b/cmd/cvc-operator/controller/cvc_controller.go @@ -209,7 +209,7 @@ func (c *CVCController) syncCVC(cvc *apis.CStorVolumeClaim) error { if c.isCVCScalePending(cvc) { // process scalingup/scalingdown of volume replicas only if there is // change in curent and desired state of replicas pool information - err = c.scaleVolumeReplicas(cvc) + _ = c.scaleVolumeReplicas(cvc) } return nil } From 0030e9a1f870021cd5e9ea3999a8797f4081347d Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Wed, 12 Feb 2020 20:03:26 +0530 Subject: [PATCH 04/11] This commit adds autogenerated file Signed-off-by: mittachaitu --- .../v1alpha1/zz_generated.deepcopy.go | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go index dab39b859a..b403062c4b 100644 --- a/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go @@ -1038,6 +1038,11 @@ func (in *CStorVolumeClaimStatus) DeepCopyInto(out *CStorVolumeClaimStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.PoolInfo != nil { + in, out := &in.PoolInfo, &out.PoolInfo + *out = make([]string, len(*in)) + copy(*out, *in) + } return } @@ -1169,6 +1174,7 @@ func (in *CStorVolumePolicySpec) DeepCopyInto(out *CStorVolumePolicySpec) { out.Provision = in.Provision in.Target.DeepCopyInto(&out.Target) in.Replica.DeepCopyInto(&out.Replica) + in.ReplicaPool.DeepCopyInto(&out.ReplicaPool) return } @@ -1732,6 +1738,43 @@ func (in *RaidGroup) DeepCopy() *RaidGroup { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReplicaPoolInfo) DeepCopyInto(out *ReplicaPoolInfo) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplicaPoolInfo. +func (in *ReplicaPoolInfo) DeepCopy() *ReplicaPoolInfo { + if in == nil { + return nil + } + out := new(ReplicaPoolInfo) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReplicaPoolSpec) DeepCopyInto(out *ReplicaPoolSpec) { + *out = *in + if in.PoolInfo != nil { + in, out := &in.PoolInfo, &out.PoolInfo + *out = make([]ReplicaPoolInfo, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplicaPoolSpec. +func (in *ReplicaPoolSpec) DeepCopy() *ReplicaPoolSpec { + if in == nil { + return nil + } + out := new(ReplicaPoolSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplicaSpec) DeepCopyInto(out *ReplicaSpec) { *out = *in From 15cb229d6306da061debc48c11cfb226f1a0cf60 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Wed, 12 Feb 2020 22:23:40 +0530 Subject: [PATCH 05/11] This commit updates getNamespace() to openebsNamespace variable Signed-off-by: mittachaitu --- .../controller/cstorvolumeclaim.go | 21 +++++++------------ cmd/cvc-operator/controller/cvc_controller.go | 4 ++-- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index 0d4b555fd4..b82c0fe5fc 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -218,11 +218,6 @@ func listCStorPools( LabelSelector: string(apis.CStorPoolClusterCPK) + "=" + cspcName, }) - // cspList, err := ncsp.NewKubeClient().WithNamespace(getNamespace()). - // List(metav1.ListOptions{ - // LabelSelector: string(apis.CStorPoolClusterCPK) + "=" + cspcName, - // }) - if err != nil { return nil, errors.Wrapf( err, @@ -811,7 +806,7 @@ func verifyAndUpdateScaleUpInfo(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVol for _, poolName := range newPoolNames { cvrName := pvName + "-" + poolName cvrObj, err := cvr.NewKubeclient(). - WithNamespace(getNamespace()). + WithNamespace(openebsNamespace). Get(cvrName, metav1.GetOptions{}) if err != nil { klog.Errorf("failed to get CVR %s error: %v", cvrName, err) @@ -853,7 +848,7 @@ func getScaleDownCVR(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeReplica, erro removedPoolNames := util.ListDiff(cvc.Status.PoolInfo, desiredPoolNames) cvrName := pvName + "-" + removedPoolNames[0] return cvr.NewKubeclient(). - WithNamespace(getNamespace()). + WithNamespace(openebsNamespace). Get(cvrName, metav1.GetOptions{}) } @@ -876,7 +871,7 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo } cvrAPIList, err := cvr.NewKubeclient(). - WithNamespace(getNamespace()). + WithNamespace(openebsNamespace). List(metav1.ListOptions{LabelSelector: pvSelector + "=" + pvName}) if err != nil { return errors.Wrapf(err, "failed to list cstorvolumereplicas of volume %s", pvName) @@ -889,7 +884,7 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo WithFilter(cvr.HasLabel(cspiLabel, poolName)). List().Len() == 0 { cspiObj, err := cspi.NewKubeClient(). - WithNamespace(getNamespace()). + WithNamespace(openebsNamespace). Get(poolName, metav1.GetOptions{}) if err != nil { errorMsg = fmt.Sprintf("failed to get cstorpoolinstance %s error: %v", poolName, err) @@ -956,7 +951,7 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error { drCount := len(cvc.Spec.Policy.ReplicaPool.PoolInfo) cvObj, err := cv.NewKubeclient(). - WithNamespace(getNamespace()). + WithNamespace(openebsNamespace). Get(cvc.Name, metav1.GetOptions{}) if err != nil { return errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) @@ -989,7 +984,7 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { var cvrObj *apis.CStorVolumeReplica drCount := len(cvc.Spec.Policy.ReplicaPool.PoolInfo) cvObj, err := cv.NewKubeclient(). - WithNamespace(getNamespace()). + WithNamespace(openebsNamespace). Get(cvc.Name, metav1.GetOptions{}) if err != nil { return errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) @@ -1017,7 +1012,7 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { } if cvrObj != nil { err = cvr.NewKubeclient(). - WithNamespace(getNamespace()). + WithNamespace(openebsNamespace). Delete(cvrObj.Name) if err != nil { return errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name) @@ -1043,6 +1038,6 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { // Note: Caller code should handle the error func updateCStorVolumeInfo(cvObj *apis.CStorVolume) (*apis.CStorVolume, error) { return cv.NewKubeclient(). - WithNamespace(getNamespace()). + WithNamespace(openebsNamespace). Update(cvObj) } diff --git a/cmd/cvc-operator/controller/cvc_controller.go b/cmd/cvc-operator/controller/cvc_controller.go index 6ffcdaba4f..7759149e71 100644 --- a/cmd/cvc-operator/controller/cvc_controller.go +++ b/cmd/cvc-operator/controller/cvc_controller.go @@ -333,7 +333,7 @@ func (c *CVCController) getVolumePolicy( if policyName != "" { klog.Infof("uses cstorvolume policy %q to configure volume %q", policyName, cvc.Name) - volumePolicy, err = c.clientset.OpenebsV1alpha1().CStorVolumePolicies(getNamespace()).Get(policyName, metav1.GetOptions{}) + volumePolicy, err = c.clientset.OpenebsV1alpha1().CStorVolumePolicies(openebsNamespace).Get(policyName, metav1.GetOptions{}) if err != nil { return nil, errors.Wrapf( err, @@ -645,7 +645,7 @@ func (c *CVCController) resizeCV(cv *apis.CStorVolume, newCapacity resource.Quan if err != nil { return fmt.Errorf("can't update capacity of CV %s as generate patch data failed: %v", cv.Name, err) } - _, updateErr := c.clientset.OpenebsV1alpha1().CStorVolumes(getNamespace()). + _, updateErr := c.clientset.OpenebsV1alpha1().CStorVolumes(openebsNamespace). Patch(cv.Name, types.MergePatchType, patchBytes) if updateErr != nil { return updateErr From 2129e1e9f3ebcd437f8215001b67935d768f9742 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Fri, 14 Feb 2020 15:39:31 +0530 Subject: [PATCH 06/11] This commit fixes the issues in travis Signed-off-by: mittachaitu --- tests/operations.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operations.go b/tests/operations.go index f4dfcce470..0969856f29 100644 --- a/tests/operations.go +++ b/tests/operations.go @@ -1145,7 +1145,7 @@ func (ops *Operations) BuildAndCreateCVR() *apis.CStorVolumeReplica { WithFinalizers([]string{cvr.CStorVolumeReplicaFinalizer}). WithCapacity(cvrConfig.Capacity). WithTargetIP(cvrConfig.TargetIP). - WithStatusPhase(cvrConfig.Phase). + WithStatusPhase(apis.CStorVolumeReplicaPhase(cvrConfig.Phase)). WithReplicaID(cvrConfig.ReplicaID). Build() Expect(err).To(BeNil()) From 16f4afa655c922bbcf31413eb4ff686c5e148ee8 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Fri, 21 Feb 2020 01:19:20 +0530 Subject: [PATCH 07/11] This commit address the review comments Signed-off-by: mittachaitu --- .../controller/cstorvolumeclaim.go | 189 ++++++++---------- cmd/cvc-operator/controller/cvc_controller.go | 12 +- .../openebs.io/v1alpha1/cstorvolume_policy.go | 10 +- .../v1alpha1/zz_generated.deepcopy.go | 27 +-- pkg/cstorvolumeclaim/v1alpha1/utils.go | 2 +- .../replica_replace_utils_test.go | 2 +- 6 files changed, 103 insertions(+), 139 deletions(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index b82c0fe5fc..97be712c8e 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -225,9 +225,6 @@ func listCStorPools( cspcName, ) } - if len(cstorPoolList.Items) < replicaCount { - return nil, errors.New("not enough pools available to create replicas") - } return cstorPoolList, nil } @@ -352,13 +349,16 @@ func (c *CVCController) distributeCVRs( srcVolName string err error ) + rInfo := replicaInfo{ + phase: apis.CVRStatusEmpty, + } cspcName := getCSPC(claim) if len(cspcName) == 0 { return errors.New("failed to get cspc name from cstorvolumeclaim") } - poolList, err := listCStorPools(cspcName, claim.Spec.ReplicaCount) + poolList, err := listCStorPools(cspcName, pendingReplicaCount) if err != nil { return err } @@ -381,12 +381,14 @@ func (c *CVCController) distributeCVRs( if c.isReplicaAffinityEnabled(policy) { usablePoolList = prioritizedPoolList(claim.Publish.NodeID, usablePoolList) } + + if len(usablePoolList.Items) < pendingReplicaCount { + return errors.New("not enough pools available to create replicas") + } + for count, pool := range usablePoolList.Items { pool := pool if count < pendingReplicaCount { - rInfo := replicaInfo{ - phase: apis.CVRStatusEmpty, - } _, err = createCVR(service, volume, claim, &pool, rInfo) if err != nil { return err @@ -474,6 +476,12 @@ func createCVR( cvrObj.Name, ) } + klog.V(2).Infof( + "Created CVR %s with phase %s on cstor pool %s", + rInfo.phase, + cvrObj.Name, + pool.Name, + ) return cvrObj, nil } return cvrObj, nil @@ -650,8 +658,8 @@ func getPDBSelector(pools []string) *metav1.LabelSelector { // status of CVC func addReplicaPoolInfo(cvcObj *apis.CStorVolumeClaim, poolNames []string) { for _, poolName := range poolNames { - cvcObj.Spec.Policy.ReplicaPool.PoolInfo = append( - cvcObj.Spec.Policy.ReplicaPool.PoolInfo, + cvcObj.Spec.Policy.ReplicaPoolInfo = append( + cvcObj.Spec.Policy.ReplicaPoolInfo, apis.ReplicaPoolInfo{PoolName: poolName}) } cvcObj.Status.PoolInfo = append(cvcObj.Status.PoolInfo, poolNames...) @@ -737,6 +745,7 @@ func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim) (string, error) { // names and current replica pool names // 1. Below function will check whether there is any change in desired replica // pool names and current replica pool names. +// Note: Scale up/down of cvc will not work until cvc is in bound state func (c *CVCController) isCVCScalePending(cvc *apis.CStorVolumeClaim) bool { desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) return util.IsChangeInLists(desiredPoolNames, cvc.Status.PoolInfo) @@ -803,21 +812,18 @@ func verifyAndUpdateScaleUpInfo(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVol pvName := cvc.GetAnnotations()[volumeID] desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) newPoolNames := util.ListDiff(desiredPoolNames, cvc.Status.PoolInfo) + replicaPoolMap := map[string]bool{} + + replicaPoolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, cvc.Namespace) + if err != nil { + return errors.Wrapf(err, "failed to get current replica pool information") + } + + for _, poolName := range replicaPoolNames { + replicaPoolMap[poolName] = true + } for _, poolName := range newPoolNames { - cvrName := pvName + "-" + poolName - cvrObj, err := cvr.NewKubeclient(). - WithNamespace(openebsNamespace). - Get(cvrName, metav1.GetOptions{}) - if err != nil { - klog.Errorf("failed to get CVR %s error: %v", cvrName, err) - continue - } - _, isIDExists := cvObj.Status.ReplicaDetails.KnownReplicas[apis.ReplicaID(cvrObj.Spec.ReplicaID)] - // ScalingUp was completed only if CVR replicaID exists on CV status - // and also CVR should be Healthy(there might be cases of replica - // migration in that case replicaID will be same zvol guid will be - // different) - if isIDExists && cvrObj.Status.Phase == apis.CVRStatusOnline { + if _, ok := replicaPoolMap[poolName]; ok { scaledRPNames = append(scaledRPNames, poolName) } } @@ -838,10 +844,11 @@ func verifyAndUpdateScaleUpInfo(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVol return errors.Errorf( "scaling replicas from %d to %d in progress", len(cvc.Status.PoolInfo), - len(cvc.Spec.Policy.ReplicaPool.PoolInfo), + len(cvc.Spec.Policy.ReplicaPoolInfo), ) } +// getScaleDownCVR return CVR which belongs to scale down pool func getScaleDownCVR(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeReplica, error) { pvName := cvc.GetAnnotations()[volumeID] desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) @@ -870,66 +877,41 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo return errors.Wrapf(err, "failed to get service object %s", cvc.Name) } - cvrAPIList, err := cvr.NewKubeclient(). - WithNamespace(openebsNamespace). - List(metav1.ListOptions{LabelSelector: pvSelector + "=" + pvName}) - if err != nil { - return errors.Wrapf(err, "failed to list cstorvolumereplicas of volume %s", pvName) - } - cvrListbuilder := cvr.NewListBuilder(). - WithAPIList(cvrAPIList) - for _, poolName := range newPoolNames { - if cvrListbuilder. - WithFilter(cvr.HasLabel(cspiLabel, poolName)). - List().Len() == 0 { - cspiObj, err := cspi.NewKubeClient(). - WithNamespace(openebsNamespace). - Get(poolName, metav1.GetOptions{}) - if err != nil { - errorMsg = fmt.Sprintf("failed to get cstorpoolinstance %s error: %v", poolName, err) - errs = append(errs, errors.Errorf("%v", errorMsg)) - klog.Errorf("%s", errorMsg) - continue - } - if cspiObj.Status.Phase != cspiOnline { - errorMsg = fmt.Sprintf( - "failed to create cstorvolumerplica on pool %s error: pool is not in %s", - cspiObj.Name, - cspiOnline, - ) - errs = append(errs, errors.Errorf("%v", errorMsg)) - klog.Errorf("%s", errorMsg) - continue - } - hash, err := hash.Hash(pvName + "-" + poolName) - if err != nil { - errorMsg = fmt.Sprintf( - "failed to calculate of hase for new volume replica error: %v", - err) - errs = append(errs, errors.Errorf("%v", errorMsg)) - klog.Errorf("%s", errorMsg) - continue - } - // TODO: Add a check for ClonedVolumeReplica scaleup case - // Create replica with Recreate state - rInfo := replicaInfo{ - replicaID: hash, - phase: apis.CVRStatusRecreate, - } - cvr, err := createCVR(svcObj, cvObj, cvc, cspiObj, rInfo) - if err != nil { - errorMsg = fmt.Sprintf( - "failed to create new replica on pool %s error: %v", - poolName, - err, - ) - errs = append(errs, errors.Errorf("%v", errorMsg)) - klog.Errorf("%s", errorMsg) - continue - } - // Update cvrListbuilder with new replicas - cvrListbuilder = cvrListbuilder.AppendListBuilder(cvr) + cspiObj, err := cspi.NewKubeClient(). + WithNamespace(openebsNamespace). + Get(poolName, metav1.GetOptions{}) + if err != nil { + errorMsg = fmt.Sprintf("failed to get cstorpoolinstance %s error: %v", poolName, err) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + hash, err := hash.Hash(pvName + "-" + poolName) + if err != nil { + errorMsg = fmt.Sprintf( + "failed to calculate of hase for new volume replica error: %v", + err) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + // TODO: Add a check for ClonedVolumeReplica scaleup case + // Create replica with Recreate state + rInfo := replicaInfo{ + replicaID: hash, + phase: apis.CVRStatusRecreate, + } + _, err = createCVR(svcObj, cvObj, cvc, cspiObj, rInfo) + if err != nil { + errorMsg = fmt.Sprintf( + "failed to create new replica on pool %s error: %v", + poolName, + err, + ) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue } } if len(errs) > 0 { @@ -941,15 +923,15 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo // scaleUpVolumeReplicas does the following work // 1. Fetch corresponding CStorVolume object of CVC. // 2. Verify is there need to update desiredReplicationFactor of CVC(In etcd). -// 3. Create CVRs if doesn't created on scaled cStor +// 3. Create CVRs if CVR doesn't exist on scaled cStor // pool(handleVolumeReplicaCreation will handle new CVR creations). // 4. If scalingUp volume replicas was completed then do following -// things(verifyAndUpdateScaleUpInfo will does following things). If -// scalingUp of volume replicas was not completed then return error +// things(verifyAndUpdateScaleUpInfo will does following things). // 4.1.1 Update PDB according to the new pools(only if volume is HAVolume). // 4.1.2 Update PDB label on CVC and replica pool information on status. +// 5. If scalingUp of volume replicas was not completed then return error func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error { - drCount := len(cvc.Spec.Policy.ReplicaPool.PoolInfo) + drCount := len(cvc.Spec.Policy.ReplicaPoolInfo) cvObj, err := cv.NewKubeclient(). WithNamespace(openebsNamespace). Get(cvc.Name, metav1.GetOptions{}) @@ -963,6 +945,7 @@ func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error { return err } } + // Create replicas on new pools err = handleVolumeReplicaCreation(cvc, cvObj) if err != nil { return err @@ -976,13 +959,12 @@ func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error { // process(Only one replica scaledown at a time is allowed). // 2. Update the CV object by decreasing the DRF and removing the // replicaID entry. -// 3. Delete the CVR that belongs to removed pool entry. -// 4. Check the status of scale down if scale down was completed then -// perform post scaling process(updating PDB if applicable and CVC -// replica pool status). +// 3. Check the status of scale down if scale down was completed then +// delete the CVR which belongs to scale down pool and then perform post scaling +// process(updating PDB accordingly if it is applicable and CVC replica pool status). func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { var cvrObj *apis.CStorVolumeReplica - drCount := len(cvc.Spec.Policy.ReplicaPool.PoolInfo) + drCount := len(cvc.Spec.Policy.ReplicaPoolInfo) cvObj, err := cv.NewKubeclient(). WithNamespace(openebsNamespace). Get(cvc.Name, metav1.GetOptions{}) @@ -997,10 +979,8 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { ) } cvrObj, err = getScaleDownCVR(cvc) - if err != nil { - if !k8serror.IsNotFound(err) { - return errors.Wrapf(err, "failed to get scale down CVR object") - } + if err != nil && !k8serror.IsNotFound(err) { + return errors.Wrapf(err, "failed to get scale down CVR object") } if cvObj.Spec.DesiredReplicationFactor > drCount { cvObj.Spec.DesiredReplicationFactor = drCount @@ -1010,15 +990,16 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { return err } } - if cvrObj != nil { - err = cvr.NewKubeclient(). - WithNamespace(openebsNamespace). - Delete(cvrObj.Name) - if err != nil { - return errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name) - } - } + // TODO: Make below function as cvObj.IsScaleDownInProgress() if !cv.IsScaleDownInProgress(cvObj) { + if cvrObj != nil { + err = cvr.NewKubeclient(). + WithNamespace(openebsNamespace). + Delete(cvrObj.Name) + if err != nil { + return errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name) + } + } desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) err = handlePostScalingProcess(cvc, desiredPoolNames) if err != nil { diff --git a/cmd/cvc-operator/controller/cvc_controller.go b/cmd/cvc-operator/controller/cvc_controller.go index 7759149e71..aa12200116 100644 --- a/cmd/cvc-operator/controller/cvc_controller.go +++ b/cmd/cvc-operator/controller/cvc_controller.go @@ -313,6 +313,11 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis cvc.Spec.Policy = volumePolicy.Spec cvc.Status.Phase = apis.CStorVolumeClaimPhaseBound cvc.Status.Capacity = cvc.Spec.Capacity + + // TODO: Below function needs to be converted into + // cvc.addReplicaPoolInfo(poolNames) while moving to cstor-operators + // repo(Currently in Maya writing functions in API package is not encouraged) + // update volume replica pool information on cvc spec and status addReplicaPoolInfo(cvc, poolNames) @@ -353,7 +358,8 @@ func (c *CVCController) isReplicaAffinityEnabled(policy *apis.CStorVolumePolicy) } // distributePendingCVRs trigers create and distribute pending cstorvolumereplica -// resource among the available cstor pools +// resource among the available cstor pools. This func returns error even when +// required no.of CVRs are Not created func (c *CVCController) distributePendingCVRs( cvc *apis.CStorVolumeClaim, cv *apis.CStorVolume, @@ -690,9 +696,9 @@ func deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { // info under the spec then it is a kind of migration which is not yet supported func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeClaim) error { var err error - if len(cvc.Spec.Policy.ReplicaPool.PoolInfo) > len(cvc.Status.PoolInfo) { + if len(cvc.Spec.Policy.ReplicaPoolInfo) > len(cvc.Status.PoolInfo) { err = scaleUpVolumeReplicas(cvc) - } else if len(cvc.Spec.Policy.ReplicaPool.PoolInfo) < len(cvc.Status.PoolInfo) { + } else if len(cvc.Spec.Policy.ReplicaPoolInfo) < len(cvc.Status.PoolInfo) { err = scaleDownVolumeReplicas(cvc) } else { c.recorder.Event(cvc, corev1.EventTypeWarning, "Migration", diff --git a/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go b/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go index 0d65f1d583..c854c30ac2 100644 --- a/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go +++ b/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go @@ -45,9 +45,9 @@ type CStorVolumePolicySpec struct { Target TargetSpec `json:"target"` // ReplicaSpec represents configuration related to replicas resources Replica ReplicaSpec `json:"replica"` - // ReplicaPool holds the pool information of volume replicas. + // ReplicaPoolInfo holds the pool information of volume replicas. // Ex: If volume is provisioned on which CStor pool volume replicas exist - ReplicaPool ReplicaPoolSpec `json:"replicaPool"` + ReplicaPoolInfo []ReplicaPoolInfo `json:"replicaPoolInfo"` } // TargetSpec represents configuration related to cstor target and its resources @@ -98,12 +98,6 @@ type ReplicaSpec struct { Affinity *corev1.PodAffinity `json:"affinity"` } -// ReplicaPoolSpec represents the volume replicas pool information -type ReplicaPoolSpec struct { - // PoolInfo represents the pool information of replicas - PoolInfo []ReplicaPoolInfo `json:"poolInfo"` -} - // Provision represents volume provisioning configuration type Provision struct { // replicaAffinity is set to true then volume replica resources need to be diff --git a/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go index b403062c4b..b747d7bc1f 100644 --- a/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go @@ -1174,7 +1174,11 @@ func (in *CStorVolumePolicySpec) DeepCopyInto(out *CStorVolumePolicySpec) { out.Provision = in.Provision in.Target.DeepCopyInto(&out.Target) in.Replica.DeepCopyInto(&out.Replica) - in.ReplicaPool.DeepCopyInto(&out.ReplicaPool) + if in.ReplicaPoolInfo != nil { + in, out := &in.ReplicaPoolInfo, &out.ReplicaPoolInfo + *out = make([]ReplicaPoolInfo, len(*in)) + copy(*out, *in) + } return } @@ -1754,27 +1758,6 @@ func (in *ReplicaPoolInfo) DeepCopy() *ReplicaPoolInfo { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ReplicaPoolSpec) DeepCopyInto(out *ReplicaPoolSpec) { - *out = *in - if in.PoolInfo != nil { - in, out := &in.PoolInfo, &out.PoolInfo - *out = make([]ReplicaPoolInfo, len(*in)) - copy(*out, *in) - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplicaPoolSpec. -func (in *ReplicaPoolSpec) DeepCopy() *ReplicaPoolSpec { - if in == nil { - return nil - } - out := new(ReplicaPoolSpec) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplicaSpec) DeepCopyInto(out *ReplicaSpec) { *out = *in diff --git a/pkg/cstorvolumeclaim/v1alpha1/utils.go b/pkg/cstorvolumeclaim/v1alpha1/utils.go index 1d9ffbf792..0c09f5c3c8 100644 --- a/pkg/cstorvolumeclaim/v1alpha1/utils.go +++ b/pkg/cstorvolumeclaim/v1alpha1/utils.go @@ -65,7 +65,7 @@ func GetPDBLabels(poolNames []string, cspcName string) map[string]string { // GetDesiredReplicaPoolNames returns list of desired pool names func GetDesiredReplicaPoolNames(cvc *apis.CStorVolumeClaim) []string { poolNames := []string{} - for _, poolInfo := range cvc.Spec.Policy.ReplicaPool.PoolInfo { + for _, poolInfo := range cvc.Spec.Policy.ReplicaPoolInfo { poolNames = append(poolNames, poolInfo.PoolName) } return poolNames diff --git a/tests/cstor/volume/replica_replace/replica_replace_utils_test.go b/tests/cstor/volume/replica_replace/replica_replace_utils_test.go index 814932dd94..66c10390a2 100644 --- a/tests/cstor/volume/replica_replace/replica_replace_utils_test.go +++ b/tests/cstor/volume/replica_replace/replica_replace_utils_test.go @@ -206,7 +206,7 @@ func buildCVRFromExistingCVR( WithCapacity(cvrObj.Spec.Capacity). WithReplicaID(cvrObj.Spec.ReplicaID). WithFinalizers([]string{cvr.CStorVolumeReplicaFinalizer}). - WithStatusPhase("Recreate") + WithStatusPhase(apis.CVRStatusRecreate) cvrObj, err := buildCVRObj.Build() Expect(err).To(BeNil()) return cvrObj From 715acc83bb46e6d39fb63d0cf043535c79d8e678 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Tue, 25 Feb 2020 21:54:34 +0530 Subject: [PATCH 08/11] This commit incorporates review comments Signed-off-by: mittachaitu --- .../controller/cstorvolumeclaim.go | 149 ++++++------------ cmd/cvc-operator/controller/cvc_controller.go | 6 +- 2 files changed, 55 insertions(+), 100 deletions(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index 97be712c8e..02fc904529 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -478,8 +478,8 @@ func createCVR( } klog.V(2).Infof( "Created CVR %s with phase %s on cstor pool %s", - rInfo.phase, cvrObj.Name, + rInfo.phase, pool.Name, ) return cvrObj, nil @@ -586,12 +586,6 @@ func randomizePoolList(list *apis.CStorPoolInstanceList) *apis.CStorPoolInstance // 3. If PDB doesn't exist it creates new PDB(With CSPC hash) func getOrCreatePodDisruptionBudget( cspcName string, poolNames []string) (*policy.PodDisruptionBudget, error) { - // poolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace) - // if err != nil { - // return nil, errors.Wrapf(err, - // "failed to get volume replica pool names of volume %s", - // cvObj.Name) - // } pdbLabels := cvclaim.GetPDBPoolLabels(poolNames) labelSelector := apispdb.GetPDBLabelSelector(pdbLabels) pdbList, err := apispdb.KubeClient(). @@ -698,35 +692,10 @@ func isHAVolume(cvcObj *apis.CStorVolumeClaim) bool { // that PDB name. If PDB doesn't exist then create new PDB and return newely // created PDB name. // 4. If current volume is not HAVolume then return nothing. -func updatePDBForVolume(cvcObj *apis.CStorVolumeClaim) (string, error) { - pdbName, hasPDB := cvcObj.GetLabels()[string(apis.PodDisruptionBudgetKey)] - pdbLabels := cvclaim.GetPDBPoolLabels(cvcObj.Status.PoolInfo) - labelSelector := apispdb.GetPDBLabelSelector(pdbLabels) +func getUpdatePDBForVolume(cvcObj *apis.CStorVolumeClaim) (string, error) { + _, hasPDB := cvcObj.GetLabels()[string(apis.PodDisruptionBudgetKey)] if hasPDB { - // Get PDB if exists among newely updated volume replicas pools - pdbList, err := apispdb.KubeClient(). - WithNamespace(openebsNamespace). - List(metav1.ListOptions{LabelSelector: labelSelector}) - if err != nil { - return "", errors.Wrapf(err, - "failed to get PDB present among pools %v", - cvcObj.Status.PoolInfo, - ) - } - if len(pdbList.Items) >= 1 && isHAVolume(cvcObj) { - for _, pdbObj := range pdbList.Items { - pdbPoolCount := len(pdbObj.Spec.Selector.MatchExpressions[0].Values) - // Let us assume that volume replicas was scale down from 4 to - // 3(i.e PDB was created on top of 4 pools). Now when scale down - // happens better to delete the PDB(if no one refering to it) and - // create PDB among 3 pools so that scaling down of cluster will - // not hold unnecessarily(i.e draining the node). - if pdbObj.Name == pdbName && pdbPoolCount < len(cvcObj.Status.PoolInfo) { - return pdbName, nil - } - } - } - err = deletePDBIfNotInUse(cvcObj) + err := deletePDBIfNotInUse(cvcObj) if err != nil { return "", err } @@ -751,7 +720,7 @@ func (c *CVCController) isCVCScalePending(cvc *apis.CStorVolumeClaim) bool { return util.IsChangeInLists(desiredPoolNames, cvc.Status.PoolInfo) } -// handlePostScalingProcess will does the following changes: +// updatePDBForScaledVolume will does the following changes: // 1. Handle PDB updation based on no.of volume replicas. It should handle in // following ways: // 1.1 If Volume was already pointing to a PDB then check is that same PDB will be @@ -766,15 +735,13 @@ func (c *CVCController) isCVCScalePending(cvc *apis.CStorVolumeClaim) bool { // created PDB name. // 2. Update CVC label to point it to newely PDB got from above step and also // replicas pool information on status of CVC. -func handlePostScalingProcess(cvc *apis.CStorVolumeClaim, - currentRPNames []string) error { +// NOTE: This function return object as well as error if error occured +func updatePDBForScaledVolume(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) { var err error cvcCopy := cvc.DeepCopy() - cvc.Status.PoolInfo = []string{} - cvc.Status.PoolInfo = append(cvc.Status.PoolInfo, currentRPNames...) - pdbName, err := updatePDBForVolume(cvc) + pdbName, err := getUpdatePDBForVolume(cvc) if err != nil { - return errors.Wrapf(err, + return cvcCopy, errors.Wrapf(err, "failed to handle PDB for scaled volume %s", cvc.Name, ) @@ -786,18 +753,15 @@ func handlePostScalingProcess(cvc *apis.CStorVolumeClaim, newCVCObj, err := cvclaim.NewKubeclient().WithNamespace(cvc.Namespace).Update(cvc) if err != nil { // If error occured point it to old cvc object it self - cvc = cvcCopy - return errors.Wrapf(err, + return cvcCopy, errors.Wrapf(err, "failed to update %s CVC status with scaledup replica pool names", cvc.Name, ) } - // Point cvc to updated cvc object - cvc = newCVCObj - return nil + return newCVCObj, nil } -// verifyAndUpdateScaleUpInfo does the following changes: +// updateCVCWithScaledUpInfo does the following changes: // 1. Get list of new replica pool names by using CVC(spec and status) // 2. Verify status of ScalingUp Replica(by using CV object) based on the status // does following changes: @@ -805,47 +769,42 @@ func handlePostScalingProcess(cvc *apis.CStorVolumeClaim, // HAVolume) and update the replica pool info on CVC(API calls). // 2.2: If scalingUp was going then return error saying scalingUp was in // progress. -func verifyAndUpdateScaleUpInfo(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVolume) error { - // scaledRPNames contains the new replica pool names where entier data was - // reconstructed from other replicas - scaledRPNames := []string{} +func updateCVCWithScaledUpInfo(cvc *apis.CStorVolumeClaim, + cvObj *apis.CStorVolume) (*apis.CStorVolumeClaim, error) { pvName := cvc.GetAnnotations()[volumeID] desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) newPoolNames := util.ListDiff(desiredPoolNames, cvc.Status.PoolInfo) replicaPoolMap := map[string]bool{} - replicaPoolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, cvc.Namespace) + replicaPoolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace) if err != nil { - return errors.Wrapf(err, "failed to get current replica pool information") + return cvc, errors.Wrapf(err, "failed to get current replica pool information") } for _, poolName := range replicaPoolNames { replicaPoolMap[poolName] = true } for _, poolName := range newPoolNames { - if _, ok := replicaPoolMap[poolName]; ok { - scaledRPNames = append(scaledRPNames, poolName) - } - } - if len(scaledRPNames) > 0 { - var currentRPNames []string - currentRPNames = append(currentRPNames, cvc.Status.PoolInfo...) - currentRPNames = append(currentRPNames, scaledRPNames...) - // handlePostScalingProcess will handle PDB and CVC status - err := handlePostScalingProcess(cvc, currentRPNames) - if err != nil { - return errors.Wrapf( - err, - "failed to handle post volume replicas scale up process", + if _, ok := replicaPoolMap[poolName]; !ok { + return cvc, errors.Errorf( + "scaling replicas from %d to %d in progress", + len(cvc.Status.PoolInfo), + len(cvc.Spec.Policy.ReplicaPoolInfo), ) } - return nil } - return errors.Errorf( - "scaling replicas from %d to %d in progress", - len(cvc.Status.PoolInfo), - len(cvc.Spec.Policy.ReplicaPoolInfo), - ) + cvcCopy := cvc.DeepCopy() + // store volume replica pool names in currentRPNames + cvc.Status.PoolInfo = append(cvc.Status.PoolInfo, newPoolNames...) + // updatePDBForScaledVolume will handle updating PDB and CVC status + cvc, err = updatePDBForScaledVolume(cvc) + if err != nil { + return cvcCopy, errors.Wrapf( + err, + "failed to handle post volume replicas scale up process", + ) + } + return cvc, nil } // getScaleDownCVR return CVR which belongs to scale down pool @@ -926,32 +885,32 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo // 3. Create CVRs if CVR doesn't exist on scaled cStor // pool(handleVolumeReplicaCreation will handle new CVR creations). // 4. If scalingUp volume replicas was completed then do following -// things(verifyAndUpdateScaleUpInfo will does following things). +// things(updateCVCWithScaledUpInfo will does following things). // 4.1.1 Update PDB according to the new pools(only if volume is HAVolume). // 4.1.2 Update PDB label on CVC and replica pool information on status. // 5. If scalingUp of volume replicas was not completed then return error -func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error { +func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) { drCount := len(cvc.Spec.Policy.ReplicaPoolInfo) cvObj, err := cv.NewKubeclient(). WithNamespace(openebsNamespace). Get(cvc.Name, metav1.GetOptions{}) if err != nil { - return errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) + return cvc, errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) } if cvObj.Spec.DesiredReplicationFactor < drCount { cvObj.Spec.DesiredReplicationFactor = drCount cvObj, err = updateCStorVolumeInfo(cvObj) if err != nil { - return err + return cvc, err } } // Create replicas on new pools err = handleVolumeReplicaCreation(cvc, cvObj) if err != nil { - return err + return cvc, err } - err = verifyAndUpdateScaleUpInfo(cvc, cvObj) - return err + cvc, err = updateCVCWithScaledUpInfo(cvc, cvObj) + return cvc, err } // scaleDownVolumeReplicas will process the following steps @@ -962,32 +921,25 @@ func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) error { // 3. Check the status of scale down if scale down was completed then // delete the CVR which belongs to scale down pool and then perform post scaling // process(updating PDB accordingly if it is applicable and CVC replica pool status). -func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { +func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) { var cvrObj *apis.CStorVolumeReplica drCount := len(cvc.Spec.Policy.ReplicaPoolInfo) cvObj, err := cv.NewKubeclient(). WithNamespace(openebsNamespace). Get(cvc.Name, metav1.GetOptions{}) if err != nil { - return errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) - } - // If more than one replica was scale down at a time keep on return the error - if (cvObj.Spec.ReplicationFactor - drCount) > 1 { - return errors.Wrapf(err, - "cann't perform %d replicas scaledown at a time", - (cvObj.Spec.DesiredReplicationFactor - drCount), - ) + return cvc, errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) } cvrObj, err = getScaleDownCVR(cvc) if err != nil && !k8serror.IsNotFound(err) { - return errors.Wrapf(err, "failed to get scale down CVR object") + return cvc, errors.Wrapf(err, "failed to get CVR requested for scale down operation") } if cvObj.Spec.DesiredReplicationFactor > drCount { cvObj.Spec.DesiredReplicationFactor = drCount delete(cvObj.Spec.ReplicaDetails.KnownReplicas, apis.ReplicaID(cvrObj.Spec.ReplicaID)) cvObj, err = updateCStorVolumeInfo(cvObj) if err != nil { - return err + return cvc, err } } // TODO: Make below function as cvObj.IsScaleDownInProgress() @@ -997,18 +949,21 @@ func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) error { WithNamespace(openebsNamespace). Delete(cvrObj.Name) if err != nil { - return errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name) + return cvc, errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name) } } desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) - err = handlePostScalingProcess(cvc, desiredPoolNames) + cvcCopy := cvc.DeepCopy() + cvc.Status.PoolInfo = desiredPoolNames + // updatePDBForScaledVolume will handle updating PDB and CVC status + cvc, err = updatePDBForScaledVolume(cvc) if err != nil { - return errors.Wrapf(err, + return cvcCopy, errors.Wrapf(err, "failed to handle post volume replicas scale down process") } - return nil + return cvc, nil } - return errors.Errorf( + return cvc, errors.Errorf( "Scaling down volume replicas from %d to %d is in progress", len(cvc.Status.PoolInfo), drCount, diff --git a/cmd/cvc-operator/controller/cvc_controller.go b/cmd/cvc-operator/controller/cvc_controller.go index aa12200116..75fd25e11a 100644 --- a/cmd/cvc-operator/controller/cvc_controller.go +++ b/cmd/cvc-operator/controller/cvc_controller.go @@ -207,7 +207,7 @@ func (c *CVCController) syncCVC(cvc *apis.CStorVolumeClaim) error { } if c.isCVCScalePending(cvc) { - // process scalingup/scalingdown of volume replicas only if there is + // process scale-up/scale-down of volume replicas only if there is // change in curent and desired state of replicas pool information _ = c.scaleVolumeReplicas(cvc) } @@ -697,9 +697,9 @@ func deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeClaim) error { var err error if len(cvc.Spec.Policy.ReplicaPoolInfo) > len(cvc.Status.PoolInfo) { - err = scaleUpVolumeReplicas(cvc) + cvc, err = scaleUpVolumeReplicas(cvc) } else if len(cvc.Spec.Policy.ReplicaPoolInfo) < len(cvc.Status.PoolInfo) { - err = scaleDownVolumeReplicas(cvc) + cvc, err = scaleDownVolumeReplicas(cvc) } else { c.recorder.Event(cvc, corev1.EventTypeWarning, "Migration", "Migration of volume replicas is not yet supported") From 68c2eaef97c8cf49375765dc16a5190fb3d0e995 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Tue, 25 Feb 2020 22:52:57 +0530 Subject: [PATCH 09/11] This commit handles PDB creation Signed-off-by: mittachaitu --- .../controller/cstorvolumeclaim.go | 38 ++++++++----------- 1 file changed, 15 insertions(+), 23 deletions(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index 02fc904529..f3e3a3b4c4 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -595,14 +595,11 @@ func getOrCreatePodDisruptionBudget( return nil, errors.Wrapf(err, "failed to list PDB belongs to pools %v", pdbLabels) } - if len(pdbList.Items) > 1 { - return nil, errors.Wrapf(err, - "current PDB count %d of pools %v", - len(pdbList.Items), - pdbLabels) - } - if len(pdbList.Items) == 1 { - return &pdbList.Items[0], nil + for _, pdbObj := range pdbList.Items { + pdbObj := pdbObj + if !util.IsChangeInLists(pdbObj.Spec.Selector.MatchExpressions[0].Values, poolNames) { + return &pdbObj, nil + } } return createPDB(poolNames, cspcName) } @@ -681,17 +678,13 @@ func isHAVolume(cvcObj *apis.CStorVolumeClaim) bool { return len(cvcObj.Status.PoolInfo) >= minHAReplicaCount } -// 1. If Volume was already pointing to a PDB then check is that same PDB will be -// applicable after scalingup/scalingdown(case might be from 4 to 3 -// replicas) if applicable then return same pdb name. If not applicable do -// following changes: -// 1.1 Delete PDB if no other CVC is pointing to PDB. -// 2. If current volume was not pointing to any PDB then do nothing. -// 3. If current volume is HAVolume then check is there any PDB already +// 1. If Volume was pointing to PDB then delete PDB if no other CVC is +// pointing to PDB. +// 2. If current volume is HAVolume then check is there any PDB already // existing among the current replica pools. If PDB exists then return // that PDB name. If PDB doesn't exist then create new PDB and return newely // created PDB name. -// 4. If current volume is not HAVolume then return nothing. +// 3. If current volume is not HAVolume then return nothing. func getUpdatePDBForVolume(cvcObj *apis.CStorVolumeClaim) (string, error) { _, hasPDB := cvcObj.GetLabels()[string(apis.PodDisruptionBudgetKey)] if hasPDB { @@ -763,12 +756,12 @@ func updatePDBForScaledVolume(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClai // updateCVCWithScaledUpInfo does the following changes: // 1. Get list of new replica pool names by using CVC(spec and status) -// 2. Verify status of ScalingUp Replica(by using CV object) based on the status -// does following changes: -// 2.1: If scalingUp was completed then update PDB accordingly(only if it was +// 2. Get the list of CVR pool names and verify whether CVRs exist on new pools. +// If new pools exist then does following changes: +// 2.1: Then update PDB accordingly(only if it was // HAVolume) and update the replica pool info on CVC(API calls). -// 2.2: If scalingUp was going then return error saying scalingUp was in -// progress. +// 3. If CVR doesn't exist on new pool names then return error saying scaledown +// is in progress. func updateCVCWithScaledUpInfo(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVolume) (*apis.CStorVolumeClaim, error) { pvName := cvc.GetAnnotations()[volumeID] @@ -909,8 +902,7 @@ func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, if err != nil { return cvc, err } - cvc, err = updateCVCWithScaledUpInfo(cvc, cvObj) - return cvc, err + return updateCVCWithScaledUpInfo(cvc, cvObj) } // scaleDownVolumeReplicas will process the following steps From ecf5801322bb7cab201740b42a0e133b6cf7c208 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Wed, 26 Feb 2020 11:40:13 +0530 Subject: [PATCH 10/11] This commit fixes the function signature Signed-off-by: mittachaitu --- cmd/cvc-operator/controller/cstorvolumeclaim.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index f3e3a3b4c4..83893309bb 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -205,10 +205,7 @@ func getPDBName(claim *apis.CStorVolumeClaim) string { // listCStorPools get the list of available pool using the storagePoolClaim // as labelSelector. -func listCStorPools( - cspcName string, - replicaCount int, -) (*apis.CStorPoolInstanceList, error) { +func listCStorPools(cspcName string) (*apis.CStorPoolInstanceList, error) { if cspcName == "" { return nil, errors.New("failed to list cstorpool: cspc name missing") @@ -358,7 +355,7 @@ func (c *CVCController) distributeCVRs( return errors.New("failed to get cspc name from cstorvolumeclaim") } - poolList, err := listCStorPools(cspcName, pendingReplicaCount) + poolList, err := listCStorPools(cspcName) if err != nil { return err } From 05b4bf28e07ef36b54df919f84cfdb7356d26386 Mon Sep 17 00:00:00 2001 From: mittachaitu Date: Wed, 26 Feb 2020 12:53:46 +0530 Subject: [PATCH 11/11] This commit fixes typo in logs Signed-off-by: mittachaitu --- cmd/cvc-operator/controller/cstorvolumeclaim.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index 83893309bb..f4ffe1a788 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -839,7 +839,7 @@ func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVo hash, err := hash.Hash(pvName + "-" + poolName) if err != nil { errorMsg = fmt.Sprintf( - "failed to calculate of hase for new volume replica error: %v", + "failed to calculate of hash for new volume replica error: %v", err) errs = append(errs, errors.Errorf("%v", errorMsg)) klog.Errorf("%s", errorMsg)