diff --git a/cmd/cvc-operator/controller/cstorvolumeclaim.go b/cmd/cvc-operator/controller/cstorvolumeclaim.go index 55e1e6ab84..f4ffe1a788 100644 --- a/cmd/cvc-operator/controller/cstorvolumeclaim.go +++ b/cmd/cvc-operator/controller/cstorvolumeclaim.go @@ -17,6 +17,7 @@ limitations under the License. package cstorvolumeclaim import ( + "fmt" "math/rand" "strings" "time" @@ -30,13 +31,16 @@ import ( cv "github.com/openebs/maya/pkg/cstor/volume/v1alpha1" cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1" cvclaim "github.com/openebs/maya/pkg/cstorvolumeclaim/v1alpha1" + "github.com/openebs/maya/pkg/hash" svc "github.com/openebs/maya/pkg/kubernetes/service/v1alpha1" + "github.com/openebs/maya/pkg/util" errors "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" k8serror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog" ) const ( @@ -50,8 +54,18 @@ const ( // minHAReplicaCount is minimum no.of replicas are required to decide // HighAvailable volume minHAReplicaCount = 3 + volumeID = "openebs.io/volumeID" + cspiLabel = "cstorpoolinstance.openebs.io/name" + cspiOnline = "ONLINE" ) +// replicaInfo struct is used to pass replica information to +// create CVR +type replicaInfo struct { + replicaID string + phase apis.CStorVolumeReplicaPhase +} + var ( cvPorts = []corev1.ServicePort{ corev1.ServicePort{ @@ -191,10 +205,7 @@ func getPDBName(claim *apis.CStorVolumeClaim) string { // listCStorPools get the list of available pool using the storagePoolClaim // as labelSelector. -func listCStorPools( - cspcName string, - replicaCount int, -) (*apis.CStorPoolInstanceList, error) { +func listCStorPools(cspcName string) (*apis.CStorPoolInstanceList, error) { if cspcName == "" { return nil, errors.New("failed to list cstorpool: cspc name missing") @@ -204,11 +215,6 @@ func listCStorPools( LabelSelector: string(apis.CStorPoolClusterCPK) + "=" + cspcName, }) - // cspList, err := ncsp.NewKubeClient().WithNamespace(getNamespace()). - // List(metav1.ListOptions{ - // LabelSelector: string(apis.CStorPoolClusterCPK) + "=" + cspcName, - // }) - if err != nil { return nil, errors.Wrapf( err, @@ -216,9 +222,6 @@ func listCStorPools( cspcName, ) } - if len(cstorPoolList.Items) < replicaCount { - return nil, errors.New("not enough pools available to create replicas") - } return cstorPoolList, nil } @@ -343,13 +346,16 @@ func (c *CVCController) distributeCVRs( srcVolName string err error ) + rInfo := replicaInfo{ + phase: apis.CVRStatusEmpty, + } cspcName := getCSPC(claim) if len(cspcName) == 0 { return errors.New("failed to get cspc name from cstorvolumeclaim") } - poolList, err := listCStorPools(cspcName, claim.Spec.ReplicaCount) + poolList, err := listCStorPools(cspcName) if err != nil { return err } @@ -372,10 +378,15 @@ func (c *CVCController) distributeCVRs( if c.isReplicaAffinityEnabled(policy) { usablePoolList = prioritizedPoolList(claim.Publish.NodeID, usablePoolList) } + + if len(usablePoolList.Items) < pendingReplicaCount { + return errors.New("not enough pools available to create replicas") + } + for count, pool := range usablePoolList.Items { pool := pool if count < pendingReplicaCount { - _, err = createCVR(service, volume, claim, &pool) + _, err = createCVR(service, volume, claim, &pool, rInfo) if err != nil { return err } @@ -403,6 +414,7 @@ func createCVR( volume *apis.CStorVolume, claim *apis.CStorVolumeClaim, pool *apis.CStorPoolInstance, + rInfo replicaInfo, ) (*apis.CStorVolumeReplica, error) { var ( isClone string @@ -440,9 +452,11 @@ func createCVR( WithOwnerRefernceNew(getCVROwnerReference(volume)). WithFinalizers(getCVRFinalizer()). WithTargetIP(service.Spec.ClusterIP). + WithReplicaID(rInfo.replicaID). WithCapacity(volume.Spec.Capacity.String()). WithNewVersion(version.GetVersion()). WithDependentsUpgraded(). + WithStatusPhase(rInfo.phase). Build() if err != nil { return nil, errors.Wrapf( @@ -459,6 +473,12 @@ func createCVR( cvrObj.Name, ) } + klog.V(2).Infof( + "Created CVR %s with phase %s on cstor pool %s", + cvrObj.Name, + rInfo.phase, + pool.Name, + ) return cvrObj, nil } return cvrObj, nil @@ -562,14 +582,7 @@ func randomizePoolList(list *apis.CStorPoolInstanceList) *apis.CStorPoolInstance // 2. If PDB exist it returns the PDB. // 3. If PDB doesn't exist it creates new PDB(With CSPC hash) func getOrCreatePodDisruptionBudget( - cvObj *apis.CStorVolume, cspcName string) (*policy.PodDisruptionBudget, error) { - pvName := cvObj.Labels[string(apis.PersistentVolumeCPK)] - poolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace) - if err != nil { - return nil, errors.Wrapf(err, - "failed to get volume replica pool names of volume %s", - cvObj.Name) - } + cspcName string, poolNames []string) (*policy.PodDisruptionBudget, error) { pdbLabels := cvclaim.GetPDBPoolLabels(poolNames) labelSelector := apispdb.GetPDBLabelSelector(pdbLabels) pdbList, err := apispdb.KubeClient(). @@ -579,14 +592,11 @@ func getOrCreatePodDisruptionBudget( return nil, errors.Wrapf(err, "failed to list PDB belongs to pools %v", pdbLabels) } - if len(pdbList.Items) > 1 { - return nil, errors.Wrapf(err, - "current PDB count %d of pools %v", - len(pdbList.Items), - pdbLabels) - } - if len(pdbList.Items) == 1 { - return &pdbList.Items[0], nil + for _, pdbObj := range pdbList.Items { + pdbObj := pdbObj + if !util.IsChangeInLists(pdbObj.Spec.Selector.MatchExpressions[0].Values, poolNames) { + return &pdbObj, nil + } } return createPDB(poolNames, cspcName) } @@ -632,6 +642,17 @@ func getPDBSelector(pools []string) *metav1.LabelSelector { } } +// addReplicaPoolInfo updates in-memory replicas pool information on spec and +// status of CVC +func addReplicaPoolInfo(cvcObj *apis.CStorVolumeClaim, poolNames []string) { + for _, poolName := range poolNames { + cvcObj.Spec.Policy.ReplicaPoolInfo = append( + cvcObj.Spec.Policy.ReplicaPoolInfo, + apis.ReplicaPoolInfo{PoolName: poolName}) + } + cvcObj.Status.PoolInfo = append(cvcObj.Status.PoolInfo, poolNames...) +} + // addPDBLabelOnCVC will add PodDisruptionBudget label on CVC func addPDBLabelOnCVC( cvcObj *apis.CStorVolumeClaim, pdbObj *policy.PodDisruptionBudget) { @@ -643,7 +664,305 @@ func addPDBLabelOnCVC( cvcObj.SetLabels(cvcLabels) } -// isHAVolume returns true if replica count is greater than or equal to 3 +// isHAVolume returns true if no.of replicas are greater than or equal to 3. +// If CVC doesn't hold any volume replica pool information then verify with +// ReplicaCount. If CVC holds any volume replica pool information then verify +// with Status.PoolInfo func isHAVolume(cvcObj *apis.CStorVolumeClaim) bool { - return cvcObj.Spec.ReplicaCount >= minHAReplicaCount + if len(cvcObj.Status.PoolInfo) == 0 { + return cvcObj.Spec.ReplicaCount >= minHAReplicaCount + } + return len(cvcObj.Status.PoolInfo) >= minHAReplicaCount +} + +// 1. If Volume was pointing to PDB then delete PDB if no other CVC is +// pointing to PDB. +// 2. If current volume is HAVolume then check is there any PDB already +// existing among the current replica pools. If PDB exists then return +// that PDB name. If PDB doesn't exist then create new PDB and return newely +// created PDB name. +// 3. If current volume is not HAVolume then return nothing. +func getUpdatePDBForVolume(cvcObj *apis.CStorVolumeClaim) (string, error) { + _, hasPDB := cvcObj.GetLabels()[string(apis.PodDisruptionBudgetKey)] + if hasPDB { + err := deletePDBIfNotInUse(cvcObj) + if err != nil { + return "", err + } + } + if !isHAVolume(cvcObj) { + return "", nil + } + pdbObj, err := getOrCreatePodDisruptionBudget(getCSPC(cvcObj), cvcObj.Status.PoolInfo) + if err != nil { + return "", err + } + return pdbObj.Name, nil +} + +// isCVCScalePending returns true if there is change in desired replica pool +// names and current replica pool names +// 1. Below function will check whether there is any change in desired replica +// pool names and current replica pool names. +// Note: Scale up/down of cvc will not work until cvc is in bound state +func (c *CVCController) isCVCScalePending(cvc *apis.CStorVolumeClaim) bool { + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + return util.IsChangeInLists(desiredPoolNames, cvc.Status.PoolInfo) +} + +// updatePDBForScaledVolume will does the following changes: +// 1. Handle PDB updation based on no.of volume replicas. It should handle in +// following ways: +// 1.1 If Volume was already pointing to a PDB then check is that same PDB will be +// applicable after scalingup/scalingdown(case might be from 4 to 3 +// replicas) if applicable then return same pdb name. If not applicable do +// following changes: +// 1.1.1 Delete PDB if no other CVC is pointing to PDB. +// 1.2 If current volume was not pointing to any PDB then do nothing. +// 1.3 If current volume is HAVolume then check is there any PDB already +// existing among the current replica pools. If PDB exists then return +// that PDB name. If PDB doesn't exist then create new PDB and return newely +// created PDB name. +// 2. Update CVC label to point it to newely PDB got from above step and also +// replicas pool information on status of CVC. +// NOTE: This function return object as well as error if error occured +func updatePDBForScaledVolume(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) { + var err error + cvcCopy := cvc.DeepCopy() + pdbName, err := getUpdatePDBForVolume(cvc) + if err != nil { + return cvcCopy, errors.Wrapf(err, + "failed to handle PDB for scaled volume %s", + cvc.Name, + ) + } + delete(cvc.Labels, string(apis.PodDisruptionBudgetKey)) + if pdbName != "" { + cvc.Labels[string(apis.PodDisruptionBudgetKey)] = pdbName + } + newCVCObj, err := cvclaim.NewKubeclient().WithNamespace(cvc.Namespace).Update(cvc) + if err != nil { + // If error occured point it to old cvc object it self + return cvcCopy, errors.Wrapf(err, + "failed to update %s CVC status with scaledup replica pool names", + cvc.Name, + ) + } + return newCVCObj, nil +} + +// updateCVCWithScaledUpInfo does the following changes: +// 1. Get list of new replica pool names by using CVC(spec and status) +// 2. Get the list of CVR pool names and verify whether CVRs exist on new pools. +// If new pools exist then does following changes: +// 2.1: Then update PDB accordingly(only if it was +// HAVolume) and update the replica pool info on CVC(API calls). +// 3. If CVR doesn't exist on new pool names then return error saying scaledown +// is in progress. +func updateCVCWithScaledUpInfo(cvc *apis.CStorVolumeClaim, + cvObj *apis.CStorVolume) (*apis.CStorVolumeClaim, error) { + pvName := cvc.GetAnnotations()[volumeID] + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + newPoolNames := util.ListDiff(desiredPoolNames, cvc.Status.PoolInfo) + replicaPoolMap := map[string]bool{} + + replicaPoolNames, err := cvr.GetVolumeReplicaPoolNames(pvName, openebsNamespace) + if err != nil { + return cvc, errors.Wrapf(err, "failed to get current replica pool information") + } + + for _, poolName := range replicaPoolNames { + replicaPoolMap[poolName] = true + } + for _, poolName := range newPoolNames { + if _, ok := replicaPoolMap[poolName]; !ok { + return cvc, errors.Errorf( + "scaling replicas from %d to %d in progress", + len(cvc.Status.PoolInfo), + len(cvc.Spec.Policy.ReplicaPoolInfo), + ) + } + } + cvcCopy := cvc.DeepCopy() + // store volume replica pool names in currentRPNames + cvc.Status.PoolInfo = append(cvc.Status.PoolInfo, newPoolNames...) + // updatePDBForScaledVolume will handle updating PDB and CVC status + cvc, err = updatePDBForScaledVolume(cvc) + if err != nil { + return cvcCopy, errors.Wrapf( + err, + "failed to handle post volume replicas scale up process", + ) + } + return cvc, nil +} + +// getScaleDownCVR return CVR which belongs to scale down pool +func getScaleDownCVR(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeReplica, error) { + pvName := cvc.GetAnnotations()[volumeID] + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + removedPoolNames := util.ListDiff(cvc.Status.PoolInfo, desiredPoolNames) + cvrName := pvName + "-" + removedPoolNames[0] + return cvr.NewKubeclient(). + WithNamespace(openebsNamespace). + Get(cvrName, metav1.GetOptions{}) +} + +// handleVolumeReplicaCreation does the following changes: +// 1. Get the list of new pool names(i.e poolNames which are in spec but not in +// status of CVC). +// 2. Creates new CVR on new pools only if CVR on that pool doesn't exists. If +// CVR already created then do nothing. +func handleVolumeReplicaCreation(cvc *apis.CStorVolumeClaim, cvObj *apis.CStorVolume) error { + pvName := cvc.GetAnnotations()[volumeID] + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + newPoolNames := util.ListDiff(desiredPoolNames, cvc.Status.PoolInfo) + errs := []error{} + var errorMsg string + + svcObj, err := svc.NewKubeClient(svc.WithNamespace(openebsNamespace)). + Get(cvc.Name, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "failed to get service object %s", cvc.Name) + } + + for _, poolName := range newPoolNames { + cspiObj, err := cspi.NewKubeClient(). + WithNamespace(openebsNamespace). + Get(poolName, metav1.GetOptions{}) + if err != nil { + errorMsg = fmt.Sprintf("failed to get cstorpoolinstance %s error: %v", poolName, err) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + hash, err := hash.Hash(pvName + "-" + poolName) + if err != nil { + errorMsg = fmt.Sprintf( + "failed to calculate of hash for new volume replica error: %v", + err) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + // TODO: Add a check for ClonedVolumeReplica scaleup case + // Create replica with Recreate state + rInfo := replicaInfo{ + replicaID: hash, + phase: apis.CVRStatusRecreate, + } + _, err = createCVR(svcObj, cvObj, cvc, cspiObj, rInfo) + if err != nil { + errorMsg = fmt.Sprintf( + "failed to create new replica on pool %s error: %v", + poolName, + err, + ) + errs = append(errs, errors.Errorf("%v", errorMsg)) + klog.Errorf("%s", errorMsg) + continue + } + } + if len(errs) > 0 { + return errors.Errorf("%+v", errs) + } + return nil +} + +// scaleUpVolumeReplicas does the following work +// 1. Fetch corresponding CStorVolume object of CVC. +// 2. Verify is there need to update desiredReplicationFactor of CVC(In etcd). +// 3. Create CVRs if CVR doesn't exist on scaled cStor +// pool(handleVolumeReplicaCreation will handle new CVR creations). +// 4. If scalingUp volume replicas was completed then do following +// things(updateCVCWithScaledUpInfo will does following things). +// 4.1.1 Update PDB according to the new pools(only if volume is HAVolume). +// 4.1.2 Update PDB label on CVC and replica pool information on status. +// 5. If scalingUp of volume replicas was not completed then return error +func scaleUpVolumeReplicas(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) { + drCount := len(cvc.Spec.Policy.ReplicaPoolInfo) + cvObj, err := cv.NewKubeclient(). + WithNamespace(openebsNamespace). + Get(cvc.Name, metav1.GetOptions{}) + if err != nil { + return cvc, errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) + } + if cvObj.Spec.DesiredReplicationFactor < drCount { + cvObj.Spec.DesiredReplicationFactor = drCount + cvObj, err = updateCStorVolumeInfo(cvObj) + if err != nil { + return cvc, err + } + } + // Create replicas on new pools + err = handleVolumeReplicaCreation(cvc, cvObj) + if err != nil { + return cvc, err + } + return updateCVCWithScaledUpInfo(cvc, cvObj) +} + +// scaleDownVolumeReplicas will process the following steps +// 1. Verify whether operation made by user is valid for scale down +// process(Only one replica scaledown at a time is allowed). +// 2. Update the CV object by decreasing the DRF and removing the +// replicaID entry. +// 3. Check the status of scale down if scale down was completed then +// delete the CVR which belongs to scale down pool and then perform post scaling +// process(updating PDB accordingly if it is applicable and CVC replica pool status). +func scaleDownVolumeReplicas(cvc *apis.CStorVolumeClaim) (*apis.CStorVolumeClaim, error) { + var cvrObj *apis.CStorVolumeReplica + drCount := len(cvc.Spec.Policy.ReplicaPoolInfo) + cvObj, err := cv.NewKubeclient(). + WithNamespace(openebsNamespace). + Get(cvc.Name, metav1.GetOptions{}) + if err != nil { + return cvc, errors.Wrapf(err, "failed to get cstorvolumes object %s", cvc.Name) + } + cvrObj, err = getScaleDownCVR(cvc) + if err != nil && !k8serror.IsNotFound(err) { + return cvc, errors.Wrapf(err, "failed to get CVR requested for scale down operation") + } + if cvObj.Spec.DesiredReplicationFactor > drCount { + cvObj.Spec.DesiredReplicationFactor = drCount + delete(cvObj.Spec.ReplicaDetails.KnownReplicas, apis.ReplicaID(cvrObj.Spec.ReplicaID)) + cvObj, err = updateCStorVolumeInfo(cvObj) + if err != nil { + return cvc, err + } + } + // TODO: Make below function as cvObj.IsScaleDownInProgress() + if !cv.IsScaleDownInProgress(cvObj) { + if cvrObj != nil { + err = cvr.NewKubeclient(). + WithNamespace(openebsNamespace). + Delete(cvrObj.Name) + if err != nil { + return cvc, errors.Wrapf(err, "failed to delete cstorvolumereplica %s", cvrObj.Name) + } + } + desiredPoolNames := cvclaim.GetDesiredReplicaPoolNames(cvc) + cvcCopy := cvc.DeepCopy() + cvc.Status.PoolInfo = desiredPoolNames + // updatePDBForScaledVolume will handle updating PDB and CVC status + cvc, err = updatePDBForScaledVolume(cvc) + if err != nil { + return cvcCopy, errors.Wrapf(err, + "failed to handle post volume replicas scale down process") + } + return cvc, nil + } + return cvc, errors.Errorf( + "Scaling down volume replicas from %d to %d is in progress", + len(cvc.Status.PoolInfo), + drCount, + ) +} + +// UpdateCStorVolumeInfo modifies the CV Object in etcd by making update API call +// Note: Caller code should handle the error +func updateCStorVolumeInfo(cvObj *apis.CStorVolume) (*apis.CStorVolume, error) { + return cv.NewKubeclient(). + WithNamespace(openebsNamespace). + Update(cvObj) } diff --git a/cmd/cvc-operator/controller/cvc_controller.go b/cmd/cvc-operator/controller/cvc_controller.go index 7ee902ae98..75fd25e11a 100644 --- a/cmd/cvc-operator/controller/cvc_controller.go +++ b/cmd/cvc-operator/controller/cvc_controller.go @@ -26,6 +26,8 @@ import ( errors "github.com/pkg/errors" "k8s.io/klog" + cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1" + cvclaim "github.com/openebs/maya/pkg/cstorvolumeclaim/v1alpha1" corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" k8serror "k8s.io/apimachinery/pkg/api/errors" @@ -203,6 +205,12 @@ func (c *CVCController) syncCVC(cvc *apis.CStorVolumeClaim) error { if err != nil { return err } + + if c.isCVCScalePending(cvc) { + // process scale-up/scale-down of volume replicas only if there is + // change in curent and desired state of replicas pool information + _ = c.scaleVolumeReplicas(cvc) + } return nil } @@ -274,11 +282,19 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis return nil, err } + // Fetch the volume replica pool names and use them in PDB and updating in + // spec and status of CVC + poolNames, err := cvr.GetVolumeReplicaPoolNames(cvc.Name, openebsNamespace) + if err != nil { + return nil, errors.Wrapf(err, + "failed to get volume replica pool names of volume %s", cvObj.Name) + } + if isHAVolume(cvc) { // TODO: When multiple threads or multiple CVC controllers are set then // we have to revist entier PDB code path var pdbObj *policy.PodDisruptionBudget - pdbObj, err = getOrCreatePodDisruptionBudget(cvObj, getCSPC(cvc)) + pdbObj, err = getOrCreatePodDisruptionBudget(getCSPC(cvc), poolNames) if err != nil { return nil, errors.Wrapf(err, "failed to create PDB for volume: %s", cvc.Name) @@ -298,6 +314,13 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis cvc.Status.Phase = apis.CStorVolumeClaimPhaseBound cvc.Status.Capacity = cvc.Spec.Capacity + // TODO: Below function needs to be converted into + // cvc.addReplicaPoolInfo(poolNames) while moving to cstor-operators + // repo(Currently in Maya writing functions in API package is not encouraged) + + // update volume replica pool information on cvc spec and status + addReplicaPoolInfo(cvc, poolNames) + err = c.updateCVCObj(cvc, cvObj) if err != nil { return nil, err @@ -315,7 +338,7 @@ func (c *CVCController) getVolumePolicy( if policyName != "" { klog.Infof("uses cstorvolume policy %q to configure volume %q", policyName, cvc.Name) - volumePolicy, err = c.clientset.OpenebsV1alpha1().CStorVolumePolicies(getNamespace()).Get(policyName, metav1.GetOptions{}) + volumePolicy, err = c.clientset.OpenebsV1alpha1().CStorVolumePolicies(openebsNamespace).Get(policyName, metav1.GetOptions{}) if err != nil { return nil, errors.Wrapf( err, @@ -335,7 +358,8 @@ func (c *CVCController) isReplicaAffinityEnabled(policy *apis.CStorVolumePolicy) } // distributePendingCVRs trigers create and distribute pending cstorvolumereplica -// resource among the available cstor pools +// resource among the available cstor pools. This func returns error even when +// required no.of CVRs are Not created func (c *CVCController) distributePendingCVRs( cvc *apis.CStorVolumeClaim, cv *apis.CStorVolume, @@ -366,7 +390,7 @@ func (c *CVCController) removeClaimFinalizer( cvc *apis.CStorVolumeClaim, ) error { if isHAVolume(cvc) { - err := c.deletePDBIfNotInUse(cvc) + err := deletePDBIfNotInUse(cvc) if err != nil { return errors.Wrapf(err, "failed to verify whether PDB %s is in use by other volumes", @@ -627,7 +651,7 @@ func (c *CVCController) resizeCV(cv *apis.CStorVolume, newCapacity resource.Quan if err != nil { return fmt.Errorf("can't update capacity of CV %s as generate patch data failed: %v", cv.Name, err) } - _, updateErr := c.clientset.OpenebsV1alpha1().CStorVolumes(getNamespace()). + _, updateErr := c.clientset.OpenebsV1alpha1().CStorVolumes(openebsNamespace). Patch(cv.Name, types.MergePatchType, patchBytes) if updateErr != nil { return updateErr @@ -637,14 +661,13 @@ func (c *CVCController) resizeCV(cv *apis.CStorVolume, newCapacity resource.Quan // deletePDBIfNotInUse deletes the PDB if no volume is refering to the // cStorvolumeclaim PDB -func (c *CVCController) deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { +func deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { //TODO: If HALease is enabled active-active then below code needs to be //revist pdbName := getPDBName(cvc) cvcLabelSelector := string(apis.PodDisruptionBudgetKey) + "=" + pdbName - cvcList, err := c.clientset. - OpenebsV1alpha1(). - CStorVolumeClaims(cvc.Namespace). + cvcList, err := cvclaim.NewKubeclient(). + WithNamespace(cvc.Namespace). List(metav1.ListOptions{LabelSelector: cvcLabelSelector}) if err != nil { return errors.Wrapf(err, @@ -654,9 +677,44 @@ func (c *CVCController) deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error { err = apispdb.KubeClient(). WithNamespace(openebsNamespace). Delete(pdbName, &metav1.DeleteOptions{}) + if k8serror.IsNotFound(err) { + klog.Infof("pdb %s of volume %s was already deleted", pdbName, cvc.Name) + return nil + } if err != nil { return err } + klog.Infof("Successfully deleted the PDB %s of volume %s", pdbName, cvc.Name) + } + return nil +} + +// scaleVolumeReplicas identifies whether it is scaleup or scaledown case of +// volume replicas. If user added entry of pool info under the spec then changes +// are treated as scaleup case. If user removed poolInfo entry from spec then +// changes are treated as scale down case. If user just modifies the pool entry +// info under the spec then it is a kind of migration which is not yet supported +func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeClaim) error { + var err error + if len(cvc.Spec.Policy.ReplicaPoolInfo) > len(cvc.Status.PoolInfo) { + cvc, err = scaleUpVolumeReplicas(cvc) + } else if len(cvc.Spec.Policy.ReplicaPoolInfo) < len(cvc.Status.PoolInfo) { + cvc, err = scaleDownVolumeReplicas(cvc) + } else { + c.recorder.Event(cvc, corev1.EventTypeWarning, "Migration", + "Migration of volume replicas is not yet supported") + return nil + } + if err != nil { + c.recorder.Eventf(cvc, + corev1.EventTypeWarning, + "ScalingVolumeReplicas", + "%v", err) + return err } + c.recorder.Eventf(cvc, + corev1.EventTypeNormal, + "ScalingVolumeReplicas", + "successfully scaled volume replicas to %d", len(cvc.Status.PoolInfo)) return nil } diff --git a/pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go b/pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go index b2c7394a0d..9ce5a24466 100644 --- a/pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go +++ b/pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go @@ -95,6 +95,8 @@ type CStorVolumeClaimStatus struct { // Capacity the actual resources of the underlying volume. Capacity corev1.ResourceList `json:"capacity,omitempty"` Conditions []CStorVolumeClaimCondition `json:"condition,omitempty"` + // PoolInfo represents current pool names where volume replicas exists + PoolInfo []string `json:"poolInfo"` } // CStorVolumeClaimCondition contains details about state of cstor volume diff --git a/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go b/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go index fb19906f36..c854c30ac2 100644 --- a/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go +++ b/pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go @@ -45,6 +45,9 @@ type CStorVolumePolicySpec struct { Target TargetSpec `json:"target"` // ReplicaSpec represents configuration related to replicas resources Replica ReplicaSpec `json:"replica"` + // ReplicaPoolInfo holds the pool information of volume replicas. + // Ex: If volume is provisioned on which CStor pool volume replicas exist + ReplicaPoolInfo []ReplicaPoolInfo `json:"replicaPoolInfo"` } // TargetSpec represents configuration related to cstor target and its resources @@ -102,6 +105,13 @@ type Provision struct { ReplicaAffinity bool `json:"replicaAffinity"` } +// ReplicaPoolInfo represents the pool information of volume replica +type ReplicaPoolInfo struct { + // PoolName represents the pool name where volume replica exists + PoolName string `json:"poolName"` + // UID also can be added +} + // CStorVolumePolicyStatus is for handling status of CstorVolumePolicy type CStorVolumePolicyStatus struct { Phase string `json:"phase"` diff --git a/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go index dab39b859a..b747d7bc1f 100644 --- a/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go @@ -1038,6 +1038,11 @@ func (in *CStorVolumeClaimStatus) DeepCopyInto(out *CStorVolumeClaimStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.PoolInfo != nil { + in, out := &in.PoolInfo, &out.PoolInfo + *out = make([]string, len(*in)) + copy(*out, *in) + } return } @@ -1169,6 +1174,11 @@ func (in *CStorVolumePolicySpec) DeepCopyInto(out *CStorVolumePolicySpec) { out.Provision = in.Provision in.Target.DeepCopyInto(&out.Target) in.Replica.DeepCopyInto(&out.Replica) + if in.ReplicaPoolInfo != nil { + in, out := &in.ReplicaPoolInfo, &out.ReplicaPoolInfo + *out = make([]ReplicaPoolInfo, len(*in)) + copy(*out, *in) + } return } @@ -1732,6 +1742,22 @@ func (in *RaidGroup) DeepCopy() *RaidGroup { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ReplicaPoolInfo) DeepCopyInto(out *ReplicaPoolInfo) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ReplicaPoolInfo. +func (in *ReplicaPoolInfo) DeepCopy() *ReplicaPoolInfo { + if in == nil { + return nil + } + out := new(ReplicaPoolInfo) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplicaSpec) DeepCopyInto(out *ReplicaSpec) { *out = *in diff --git a/pkg/cstor/volume/v1alpha1/cstorvolume.go b/pkg/cstor/volume/v1alpha1/cstorvolume.go index 0964de1bfa..0b47d95f1d 100644 --- a/pkg/cstor/volume/v1alpha1/cstorvolume.go +++ b/pkg/cstor/volume/v1alpha1/cstorvolume.go @@ -448,3 +448,10 @@ func (csr *CVReplicationDetails) UpdateCVWithReplicationDetails(kubeclient *Kube } return err } + +// IsScaleDownInProgress return true if length of status replica details is +// greater than length of spec replica details +func IsScaleDownInProgress(cvObj *apis.CStorVolume) bool { + return len(cvObj.Status.ReplicaDetails.KnownReplicas) > + len(cvObj.Spec.ReplicaDetails.KnownReplicas) +} diff --git a/pkg/cstor/volumereplica/v1alpha1/build.go b/pkg/cstor/volumereplica/v1alpha1/build.go index 52ba9c2d5b..bb61aa4c8e 100644 --- a/pkg/cstor/volumereplica/v1alpha1/build.go +++ b/pkg/cstor/volumereplica/v1alpha1/build.go @@ -204,8 +204,8 @@ func (b *Builder) WithCapacity(capacity string) *Builder { // WithStatusPhase sets the Status Phase of CStorVolumeReplica with provided //arguments -func (b *Builder) WithStatusPhase(phase string) *Builder { - b.cvr.object.Status.Phase = apis.CStorVolumeReplicaPhase(phase) +func (b *Builder) WithStatusPhase(phase apis.CStorVolumeReplicaPhase) *Builder { + b.cvr.object.Status.Phase = phase return b } diff --git a/pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go b/pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go index 05d020dc7d..f7bf4b5b0b 100644 --- a/pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go +++ b/pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go @@ -111,6 +111,13 @@ func (b *ListBuilder) WithAPIList( return b } +// AppendListBuilder append the provided CVR API object into existing ListBuilder +func (b *ListBuilder) AppendListBuilder( + cvr *apis.CStorVolumeReplica) *ListBuilder { + b.list.items = append(b.list.items, &CVR{object: cvr}) + return b +} + // List returns the list of cvr // instances that was built by this // builder @@ -145,6 +152,20 @@ func (p *CVR) IsHealthy() bool { return p.object.Status.Phase == "Healthy" } +// HasLabel returns true only if label key value matched to provided +// value. +func (p *CVR) HasLabel(key, value string) bool { + return p.object.Labels[key] == value +} + +// HasLabel returns predicate to filter out CVRs based on the +// provided key and values +func HasLabel(key, value string) Predicate { + return func(p *CVR) bool { + return p.HasLabel(key, value) + } +} + // IsHealthy is a Predicate to filter out cvrs // which is healthy func IsHealthy() Predicate { diff --git a/pkg/cstorvolumeclaim/v1alpha1/utils.go b/pkg/cstorvolumeclaim/v1alpha1/utils.go index 37d9152dcf..0c09f5c3c8 100644 --- a/pkg/cstorvolumeclaim/v1alpha1/utils.go +++ b/pkg/cstorvolumeclaim/v1alpha1/utils.go @@ -61,3 +61,12 @@ func GetPDBLabels(poolNames []string, cspcName string) map[string]string { pdbLabels[string(apis.CStorPoolClusterCPK)] = cspcName return pdbLabels } + +// GetDesiredReplicaPoolNames returns list of desired pool names +func GetDesiredReplicaPoolNames(cvc *apis.CStorVolumeClaim) []string { + poolNames := []string{} + for _, poolInfo := range cvc.Spec.Policy.ReplicaPoolInfo { + poolNames = append(poolNames, poolInfo.PoolName) + } + return poolNames +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 5c4384514f..fa3feb533a 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -218,3 +218,26 @@ func RemoveString(slice []string, s string) (result []string) { } return result } + +// IsChangeInLists returns true if there is any difference in listA and listB +func IsChangeInLists(listA, listB []string) bool { + listAMap := map[string]bool{} + listBMap := map[string]bool{} + for _, name := range listA { + listAMap[name] = true + } + for _, name := range listB { + listBMap[name] = true + } + for _, name := range listA { + if !listBMap[name] { + return true + } + } + for _, name := range listB { + if !listAMap[name] { + return true + } + } + return false +} diff --git a/tests/cstor/volume/replica_replace/replica_replace_utils_test.go b/tests/cstor/volume/replica_replace/replica_replace_utils_test.go index 814932dd94..66c10390a2 100644 --- a/tests/cstor/volume/replica_replace/replica_replace_utils_test.go +++ b/tests/cstor/volume/replica_replace/replica_replace_utils_test.go @@ -206,7 +206,7 @@ func buildCVRFromExistingCVR( WithCapacity(cvrObj.Spec.Capacity). WithReplicaID(cvrObj.Spec.ReplicaID). WithFinalizers([]string{cvr.CStorVolumeReplicaFinalizer}). - WithStatusPhase("Recreate") + WithStatusPhase(apis.CVRStatusRecreate) cvrObj, err := buildCVRObj.Build() Expect(err).To(BeNil()) return cvrObj diff --git a/tests/operations.go b/tests/operations.go index f4dfcce470..0969856f29 100644 --- a/tests/operations.go +++ b/tests/operations.go @@ -1145,7 +1145,7 @@ func (ops *Operations) BuildAndCreateCVR() *apis.CStorVolumeReplica { WithFinalizers([]string{cvr.CStorVolumeReplicaFinalizer}). WithCapacity(cvrConfig.Capacity). WithTargetIP(cvrConfig.TargetIP). - WithStatusPhase(cvrConfig.Phase). + WithStatusPhase(apis.CStorVolumeReplicaPhase(cvrConfig.Phase)). WithReplicaID(cvrConfig.ReplicaID). Build() Expect(err).To(BeNil())