Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cvc-operator): add automatic scaling of volumereplicas for CSI volumes #1613

Merged
merged 11 commits into from
Feb 26, 2020
396 changes: 385 additions & 11 deletions cmd/cvc-operator/controller/cstorvolumeclaim.go

Large diffs are not rendered by default.

58 changes: 52 additions & 6 deletions cmd/cvc-operator/controller/cvc_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
errors "github.com/pkg/errors"
"k8s.io/klog"

cvr "github.com/openebs/maya/pkg/cstor/volumereplica/v1alpha1"
cvclaim "github.com/openebs/maya/pkg/cstorvolumeclaim/v1alpha1"
corev1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -203,6 +205,12 @@ func (c *CVCController) syncCVC(cvc *apis.CStorVolumeClaim) error {
if err != nil {
return err
}

if c.isCVCScalePending(cvc) {
// process scalingup/scalingdown of volume replicas only if there is
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
// change in curent and desired state of replicas pool information
err = c.scaleVolumeReplicas(cvc)
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

Expand Down Expand Up @@ -274,11 +282,19 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis
return nil, err
}

// Fetch the volume replica pool names and use them in PDB and updating in
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
// spec and status of CVC
poolNames, err := cvr.GetVolumeReplicaPoolNames(cvc.Name, openebsNamespace)
if err != nil {
return nil, errors.Wrapf(err,
"failed to get volume replica pool names of volume %s", cvObj.Name)
}

if isHAVolume(cvc) {
// TODO: When multiple threads or multiple CVC controllers are set then
// we have to revist entier PDB code path
var pdbObj *policy.PodDisruptionBudget
pdbObj, err = getOrCreatePodDisruptionBudget(cvObj, getCSPC(cvc))
pdbObj, err = getOrCreatePodDisruptionBudget(cvObj, getCSPC(cvc), poolNames)
if err != nil {
return nil, errors.Wrapf(err,
"failed to create PDB for volume: %s", cvc.Name)
Expand All @@ -291,6 +307,8 @@ func (c *CVCController) createVolumeOperation(cvc *apis.CStorVolumeClaim) (*apis
return nil, err
}

// update volume replica pool information on cvc spec and status
addReplicaPoolInfo(cvc, poolNames)
// update the cstorvolume reference, phase as "Bound" and desired
// capacity
cvc.Spec.CStorVolumeRef = volumeRef
Expand Down Expand Up @@ -366,7 +384,7 @@ func (c *CVCController) removeClaimFinalizer(
cvc *apis.CStorVolumeClaim,
) error {
if isHAVolume(cvc) {
err := c.deletePDBIfNotInUse(cvc)
err := deletePDBIfNotInUse(cvc)
if err != nil {
return errors.Wrapf(err,
"failed to verify whether PDB %s is in use by other volumes",
Expand Down Expand Up @@ -637,14 +655,13 @@ func (c *CVCController) resizeCV(cv *apis.CStorVolume, newCapacity resource.Quan

// deletePDBIfNotInUse deletes the PDB if no volume is refering to the
// cStorvolumeclaim PDB
func (c *CVCController) deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error {
func deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error {
//TODO: If HALease is enabled active-active then below code needs to be
//revist
pdbName := getPDBName(cvc)
cvcLabelSelector := string(apis.PodDisruptionBudgetKey) + "=" + pdbName
cvcList, err := c.clientset.
OpenebsV1alpha1().
CStorVolumeClaims(cvc.Namespace).
cvcList, err := cvclaim.NewKubeclient().
WithNamespace(cvc.Namespace).
List(metav1.ListOptions{LabelSelector: cvcLabelSelector})
if err != nil {
return errors.Wrapf(err,
Expand All @@ -654,9 +671,38 @@ func (c *CVCController) deletePDBIfNotInUse(cvc *apis.CStorVolumeClaim) error {
err = apispdb.KubeClient().
WithNamespace(openebsNamespace).
Delete(pdbName, &metav1.DeleteOptions{})
if k8serror.IsNotFound(err) {
klog.Infof("pdb %s of volume %s was already deleted", pdbName, cvc.Name)
return nil
}
if err != nil {
return err
}
}
return nil
}

// scaleVolumeReplicas identifies whether it is scaleup or scaledown case of
// volume replicas. If user added entry of pool info under the spec then changes
// are treated as scaleup case. If user removed poolInfo entry from spec then
// changes are treated as scale down case. If user just modifies the pool entry
// info under the spec then it is a kind of migration which is not yet supported
func (c *CVCController) scaleVolumeReplicas(cvc *apis.CStorVolumeClaim) error {
mittachaitu marked this conversation as resolved.
Show resolved Hide resolved
var err error
if len(cvc.Spec.Policy.ReplicaPool.PoolInfo) > len(cvc.Status.PoolInfo) {
err = scaleUpVolumeReplicas(cvc)
} else if len(cvc.Spec.Policy.ReplicaPool.PoolInfo) < len(cvc.Status.PoolInfo) {
err = scaleDownVolumeReplicas(cvc)
} else {
c.recorder.Event(cvc, corev1.EventTypeWarning, "Migration",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this limitation get into webhooks?

Copy link
Author

@mittachaitu mittachaitu Feb 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the webhook for CVC is ready we will have following checks

  • Repetition of pool names either under spec or status shouldn't be there.
  • InitialReplicaCount shouldn't be modified.
  • The existing pool name can't be updated with a new pool name(Which is migration case).
  • Not allow scale down/up of volume replicas when one other in progress.
  • Not allow more that one replica at a time for scale down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Failure of scale up/down if added pool is not healthy ? ( but admin/user shouldn't be providing the unhealthy pool name in first place)

"Migration of volume replicas is not yet supported")
return nil
}
if err != nil {
c.recorder.Eventf(cvc,
corev1.EventTypeWarning,
"ScalingVolumeReplicas",
"%v", err)
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/cstor_volume_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ type CStorVolumeClaimStatus struct {
// Capacity the actual resources of the underlying volume.
Capacity corev1.ResourceList `json:"capacity,omitempty"`
Conditions []CStorVolumeClaimCondition `json:"condition,omitempty"`
// PoolInfo represents current pool names where volume replicas exists
PoolInfo []string `json:"poolInfo"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is different from Spec? do you see a need for this as well same as []ReplicaPoolInfo instead of []string?

Copy link
Author

@mittachaitu mittachaitu Feb 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO spec can have different things like name & uid.. then the status should strictly have only one information which conveys the intent.

}

// CStorVolumeClaimCondition contains details about state of cstor volume
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/cstorvolume_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type CStorVolumePolicySpec struct {
Target TargetSpec `json:"target"`
// ReplicaSpec represents configuration related to replicas resources
Replica ReplicaSpec `json:"replica"`
// ReplicaPool holds the pool information of volume replicas.
// Ex: If volume is provisioned on which CStor pool volume replicas exist
ReplicaPool ReplicaPoolSpec `json:"replicaPool"`
}

// TargetSpec represents configuration related to cstor target and its resources
Expand Down Expand Up @@ -95,13 +98,26 @@ type ReplicaSpec struct {
Affinity *corev1.PodAffinity `json:"affinity"`
}

// ReplicaPoolSpec represents the volume replicas pool information
type ReplicaPoolSpec struct {
// PoolInfo represents the pool information of replicas
PoolInfo []ReplicaPoolInfo `json:"poolInfo"`
}

// Provision represents volume provisioning configuration
type Provision struct {
// replicaAffinity is set to true then volume replica resources need to be
// distributed across the cstor pool instances based on the given topology
ReplicaAffinity bool `json:"replicaAffinity"`
}

// ReplicaPoolInfo represents the pool information of volume replica
type ReplicaPoolInfo struct {
// PoolName represents the pool name where volume replica exists
PoolName string `json:"poolName"`
// UID also can be added
}

// CStorVolumePolicyStatus is for handling status of CstorVolumePolicy
type CStorVolumePolicyStatus struct {
Phase string `json:"phase"`
Expand Down
7 changes: 7 additions & 0 deletions pkg/cstor/volume/v1alpha1/cstorvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,10 @@ func (csr *CVReplicationDetails) UpdateCVWithReplicationDetails(kubeclient *Kube
}
return err
}

// IsScaleDownInProgress return true if length of status replica details is
// greater than length of spec replica details
func IsScaleDownInProgress(cvObj *apis.CStorVolume) bool {
return len(cvObj.Status.ReplicaDetails.KnownReplicas) >
len(cvObj.Spec.ReplicaDetails.KnownReplicas)
}
4 changes: 2 additions & 2 deletions pkg/cstor/volumereplica/v1alpha1/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ func (b *Builder) WithCapacity(capacity string) *Builder {

// WithStatusPhase sets the Status Phase of CStorVolumeReplica with provided
//arguments
func (b *Builder) WithStatusPhase(phase string) *Builder {
b.cvr.object.Status.Phase = apis.CStorVolumeReplicaPhase(phase)
func (b *Builder) WithStatusPhase(phase apis.CStorVolumeReplicaPhase) *Builder {
b.cvr.object.Status.Phase = phase
return b
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/cstor/volumereplica/v1alpha1/cstorvolumereplica.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (b *ListBuilder) WithAPIList(
return b
}

// AppendListBuilder append the provided CVR API object into existing ListBuilder
func (b *ListBuilder) AppendListBuilder(
cvr *apis.CStorVolumeReplica) *ListBuilder {
b.list.items = append(b.list.items, &CVR{object: cvr})
return b
}

// List returns the list of cvr
// instances that was built by this
// builder
Expand Down Expand Up @@ -145,6 +152,20 @@ func (p *CVR) IsHealthy() bool {
return p.object.Status.Phase == "Healthy"
}

// HasLabel returns true only if label key value matched to provided
// value.
func (p *CVR) HasLabel(key, value string) bool {
return p.object.Labels[key] == value
}

// HasLabel returns predicate to filter out CVRs based on the
// provided key and values
func HasLabel(key, value string) Predicate {
return func(p *CVR) bool {
return p.HasLabel(key, value)
}
}

// IsHealthy is a Predicate to filter out cvrs
// which is healthy
func IsHealthy() Predicate {
Expand Down
9 changes: 9 additions & 0 deletions pkg/cstorvolumeclaim/v1alpha1/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,12 @@ func GetPDBLabels(poolNames []string, cspcName string) map[string]string {
pdbLabels[string(apis.CStorPoolClusterCPK)] = cspcName
return pdbLabels
}

// GetDesiredReplicaPoolNames returns list of desired pool names
func GetDesiredReplicaPoolNames(cvc *apis.CStorVolumeClaim) []string {
poolNames := []string{}
for _, poolInfo := range cvc.Spec.Policy.ReplicaPool.PoolInfo {
poolNames = append(poolNames, poolInfo.PoolName)
}
return poolNames
}
23 changes: 23 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,26 @@ func RemoveString(slice []string, s string) (result []string) {
}
return result
}

// IsChangeInLists returns true if there is any difference in listA and listB
func IsChangeInLists(listA, listB []string) bool {
listAMap := map[string]bool{}
listBMap := map[string]bool{}
for _, name := range listA {
listAMap[name] = true
}
for _, name := range listB {
listBMap[name] = true
}
for _, name := range listA {
if !listBMap[name] {
return true
}
}
for _, name := range listB {
if !listAMap[name] {
return true
}
}
return false
}