Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cvc-operator): add automatic scaling of volumereplicas for CSI volumes #1613

Merged
merged 11 commits into from
Feb 26, 2020
370 changes: 350 additions & 20 deletions cmd/cvc-operator/controller/cstorvolumeclaim.go

Large diffs are not rendered by default.

76 changes: 67 additions & 9 deletions cmd/cvc-operator/controller/cvc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
errors "github.com/pkg/errors"
"k8s.io/klog"

cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
cvclaim "github.com/openebs/maya/pkg/cstorvolumeclaim/v1alpha1"
corev1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -203,6 +205,12 @@ func (c *CVCController) syncCVC(cvc *apis.CStorVolumeClaim) error {
if err != nil {
return err
}

if c.isCVCScalePending(cvc) {
// process scale-up/scale-down of volume replicas only if there is
// change in curent and desired state of replicas pool information
_ = c.scaleVolumeReplicas(cvc)
}
return nil
}

Expand Down Expand Up @@ -274,11 +282,19 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis
return nil, err
}

// Fetch the volume replica pool names and use them in PDB and updating in
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
// spec and status of CVC
poolNames, err := cvr.GetVolumeReplicaPoolNames(cvc.Name, openebsNamespace)
if err != nil {
return nil, errors.Wrapf(err,
"failed to get volume replica pool names of volume %s", cvObj.Name)
}

if isHAVolume(cvc) {
// TODO: When multiple threads or multiple CVC controllers are set then
// we have to revist entier PDB code path
var pdbObj *policy.PodDisruptionBudget
pdbObj, err = getOrCreatePodDisruptionBudget(cvObj, getCSPC(cvc))
pdbObj, err = getOrCreatePodDisruptionBudget(getCSPC(cvc), poolNames)
if err != nil {
return nil, errors.Wrapf(err,
"failed to create PDB for volume: %s", cvc.Name)
Expand All @@ -298,6 +314,13 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis
cvc.Status.Phase = apis.CStorVolumeClaimPhaseBound
cvc.Status.Capacity = cvc.Spec.Capacity

// TODO: Below function needs to be converted into
// cvc.addReplicaPoolInfo(poolNames) while moving to cstor-operators
// repo(Currently in Maya writing functions in API package is not encouraged)

// update volume replica pool information on cvc spec and status
addReplicaPoolInfo(cvc, poolNames)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about changing it as receiver function like cvc.addReplicaPoolInfo(poolNames)


err = c.updateCVCObj(cvc, cvObj)
if err != nil {
return nil, err
Expand All @@ -315,7 +338,7 @@ func (c *CVCController) getVolumePolicy(

if policyName != "" {
klog.Infof("uses cstorvolume policy %q to configure volume %q", policyName, cvc.Name)
volumePolicy, err = c.clientset.OpenebsV1alpha1().CStorVolumePolicies(getNamespace()).Get(policyName, metav1.GetOptions{})
volumePolicy, err = c.clientset.OpenebsV1alpha1().CStorVolumePolicies(openebsNamespace).Get(policyName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(
err,
Expand All @@ -335,7 +358,8 @@ func (c *CVCController) isReplicaAffinityEnabled(policy *apis.CStorVolumePolicy)
}

// distributePendingCVRs trigers create and distribute pending cstorvolumereplica
// resource among the available cstor pools
// resource among the available cstor pools. This func returns error even when
// required no.of CVRs are Not created
func (c *CVCController) distributePendingCVRs(
cvc *apis.CStorVolumeClaim,
cv *apis.CStorVolume,
Expand Down Expand Up @@ -366,7 +390,7 @@ func (c *CVCController) removeClaimFinalizer(
cvc *apis.CStorVolumeClaim,
) error {
if isHAVolume(cvc) {
err := c.deletePDBIfNotInUse(cvc)
err := deletePDBIfNotInUse(cvc)
if err != nil {
return errors.Wrapf(err,
"failed to verify whether PDB %s is in use by other volumes",
Expand Down Expand Up @@ -627,7 +651,7 @@ func (c *CVCController) resizeCV(cv *apis.CStorVolume, newCapacity resource.Quan
if err != nil {
return fmt.Errorf("can't update capacity of CV %s as generate patch data failed: %v", cv.Name, err)
}
_, updateErr := c.clientset.OpenebsV1alpha1().CStorVolumes(getNamespace()).
_, updateErr := c.clientset.OpenebsV1alpha1().CStorVolumes(openebsNamespace).
Patch(cv.Name, types.MergePatchType, patchBytes)
if updateErr != nil {
return updateErr
Expand All @@ -637,14 +661,13 @@ func (c *CVCController) resizeCV(cv *apis.CStorVolume, newCapacity resource.Quan

// deletePDBIfNotInUse deletes the PDB if no volume is refering to the
// cStorvolumeclaim PDB
func (c *CVCController) deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error {
func deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error {
//TODO: If HALease is enabled active-active then below code needs to be
//revist
pdbName := getPDBName(cvc)
cvcLabelSelector := string(apis.PodDisruptionBudgetKey) + "=" + pdbName
cvcList, err := c.clientset.
OpenebsV1alpha1().
CStorVolumeClaims(cvc.Namespace).
cvcList, err := cvclaim.NewKubeclient().
WithNamespace(cvc.Namespace).
List(metav1.ListOptions{LabelSelector: cvcLabelSelector})
if err != nil {
return errors.Wrapf(err,
Expand All @@ -654,9 +677,44 @@ func (c *CVCController) deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error {
err = apispdb.KubeClient().
WithNamespace(openebsNamespace).
Delete(pdbName, &metav1.DeleteOptions{})
if k8serror.IsNotFound(err) {
klog.Infof("pdb %s of volume %s was already deleted", pdbName, cvc.Name)
return nil
}
if err != nil {
return err
}
klog.Infof("Successfully deleted the PDB %s of volume %s", pdbName, cvc.Name)
}
return nil
}

// scaleVolumeReplicas identifies whether it is scaleup or scaledown case of
// volume replicas. If user added entry of pool info under the spec then changes
// are treated as scaleup case. If user removed poolInfo entry from spec then
// changes are treated as scale down case. If user just modifies the pool entry
// info under the spec then it is a kind of migration which is not yet supported
func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeClaim) error {
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
var err error
if len(cvc.Spec.Policy.ReplicaPoolInfo) > len(cvc.Status.PoolInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this check sufficient? we need to make sure that - all the content in PoolInfo need to be part of ReplicaPoolInfo also, and then, few more are there in it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These kind checks should be a head before that is admission server.

cvc, err = scaleUpVolumeReplicas(cvc)
} else if len(cvc.Spec.Policy.ReplicaPoolInfo) < len(cvc.Status.PoolInfo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well, check should be - there are no extra things in RepliaPoolInfo other than what is present in PoolInfo

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

cvc, err = scaleDownVolumeReplicas(cvc)
} else {
c.recorder.Event(cvc, corev1.EventTypeWarning, "Migration",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this limitation get into webhooks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the webhook for CVC is ready we will have following checks

  • Repetition of pool names either under spec or status shouldn't be there.
  • InitialReplicaCount shouldn't be modified.
  • The existing pool name can't be updated with a new pool name(Which is migration case).
  • Not allow scale down/up of volume replicas when one other in progress.
  • Not allow more that one replica at a time for scale down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Failure of scale up/down if added pool is not healthy ? ( but admin/user shouldn't be providing the unhealthy pool name in first place)

"Migration of volume replicas is not yet supported")
return nil
}
if err != nil {
c.recorder.Eventf(cvc,
corev1.EventTypeWarning,
"ScalingVolumeReplicas",
"%v", err)
return err
}
c.recorder.Eventf(cvc,
corev1.EventTypeNormal,
"ScalingVolumeReplicas",
"successfully scaled volume replicas to %d", len(cvc.Status.PoolInfo))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to make cvc point to new object

return nil
}
2 changes: 2 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type CStorVolumeClaimStatus struct {
// Capacity the actual resources of the underlying volume.
Capacity corev1.ResourceList `json:"capacity,omitempty"`
Conditions []CStorVolumeClaimCondition `json:"condition,omitempty"`
// PoolInfo represents current pool names where volume replicas exists
PoolInfo []string `json:"poolInfo"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is different from Spec? do you see a need for this as well same as []ReplicaPoolInfo instead of []string?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO spec can have different things like name & uid.. then the status should strictly have only one information which conveys the intent.

}

// CStorVolumeClaimCondition contains details about state of cstor volume
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type CStorVolumePolicySpec struct {
Target TargetSpec `json:"target"`
// ReplicaSpec represents configuration related to replicas resources
Replica ReplicaSpec `json:"replica"`
// ReplicaPoolInfo holds the pool information of volume replicas.
// Ex: If volume is provisioned on which CStor pool volume replicas exist
ReplicaPoolInfo []ReplicaPoolInfo `json:"replicaPoolInfo"`
}

// TargetSpec represents configuration related to cstor target and its resources
Expand Down Expand Up @@ -102,6 +105,13 @@ type Provision struct {
ReplicaAffinity bool `json:"replicaAffinity"`
}

// ReplicaPoolInfo represents the pool information of volume replica
type ReplicaPoolInfo struct {
// PoolName represents the pool name where volume replica exists
PoolName string `json:"poolName"`
// UID also can be added
}

// CStorVolumePolicyStatus is for handling status of CstorVolumePolicy
type CStorVolumePolicyStatus struct {
Phase string `json:"phase"`
Expand Down
26 changes: 26 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/cstor/volume/v1alpha1/cstorvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,10 @@ func (csr *CVReplicationDetails) UpdateCVWithReplicationDetails(kubeclient *Kube
}
return err
}

// IsScaleDownInProgress return true if length of status replica details is
// greater than length of spec replica details
func IsScaleDownInProgress(cvObj *apis.CStorVolume) bool {
return len(cvObj.Status.ReplicaDetails.KnownReplicas) >
len(cvObj.Spec.ReplicaDetails.KnownReplicas)
}
4 changes: 2 additions & 2 deletions pkg/cstor/volumereplica/v1alpha1/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ func (b *Builder) WithCapacity(capacity string) *Builder {

// WithStatusPhase sets the Status Phase of CStorVolumeReplica with provided
//arguments
func (b *Builder) WithStatusPhase(phase string) *Builder {
b.cvr.object.Status.Phase = apis.CStorVolumeReplicaPhase(phase)
func (b *Builder) WithStatusPhase(phase apis.CStorVolumeReplicaPhase) *Builder {
b.cvr.object.Status.Phase = phase
return b
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (b *ListBuilder) WithAPIList(
return b
}

// AppendListBuilder append the provided CVR API object into existing ListBuilder
func (b *ListBuilder) AppendListBuilder(
cvr *apis.CStorVolumeReplica) *ListBuilder {
b.list.items = append(b.list.items, &CVR{object: cvr})
return b
}

// List returns the list of cvr
// instances that was built by this
// builder
Expand Down Expand Up @@ -145,6 +152,20 @@ func (p *CVR) IsHealthy() bool {
return p.object.Status.Phase == "Healthy"
}

// HasLabel returns true only if label key value matched to provided
// value.
func (p *CVR) HasLabel(key, value string) bool {
return p.object.Labels[key] == value
}

// HasLabel returns predicate to filter out CVRs based on the
// provided key and values
func HasLabel(key, value string) Predicate {
return func(p *CVR) bool {
return p.HasLabel(key, value)
}
}

// IsHealthy is a Predicate to filter out cvrs
// which is healthy
func IsHealthy() Predicate {
Expand Down
9 changes: 9 additions & 0 deletions pkg/cstorvolumeclaim/v1alpha1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,12 @@ func GetPDBLabels(poolNames []string, cspcName string) map[string]string {
pdbLabels[string(apis.CStorPoolClusterCPK)] = cspcName
return pdbLabels
}

// GetDesiredReplicaPoolNames returns list of desired pool names
func GetDesiredReplicaPoolNames(cvc *apis.CStorVolumeClaim) []string {
poolNames := []string{}
for _, poolInfo := range cvc.Spec.Policy.ReplicaPoolInfo {
poolNames = append(poolNames, poolInfo.PoolName)
}
return poolNames
}
23 changes: 23 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,26 @@ func RemoveString(slice []string, s string) (result []string) {
}
return result
}

// IsChangeInLists returns true if there is any difference in listA and listB
func IsChangeInLists(listA, listB []string) bool {
listAMap := map[string]bool{}
listBMap := map[string]bool{}
for _, name := range listA {
listAMap[name] = true
}
for _, name := range listB {
listBMap[name] = true
}
for _, name := range listA {
if !listBMap[name] {
return true
}
}
for _, name := range listB {
if !listAMap[name] {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func buildCVRFromExistingCVR(
WithCapacity(cvrObj.Spec.Capacity).
WithReplicaID(cvrObj.Spec.ReplicaID).
WithFinalizers([]string{cvr.CStorVolumeReplicaFinalizer}).
WithStatusPhase("Recreate")
WithStatusPhase(apis.CVRStatusRecreate)
cvrObj, err := buildCVRObj.Build()
Expect(err).To(BeNil())
return cvrObj
Expand Down
2 changes: 1 addition & 1 deletion tests/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ func (ops *Operations) BuildAndCreateCVR() *apis.CStorVolumeReplica {
WithFinalizers([]string{cvr.CStorVolumeReplicaFinalizer}).
WithCapacity(cvrConfig.Capacity).
WithTargetIP(cvrConfig.TargetIP).
WithStatusPhase(cvrConfig.Phase).
WithStatusPhase(apis.CStorVolumeReplicaPhase(cvrConfig.Phase)).
WithReplicaID(cvrConfig.ReplicaID).
Build()
Expect(err).To(BeNil())
Expand Down