Skip to content

Commit

Permalink
[US3990]refactor:(cstor-operator): refactor cstor operator (openebs-a…
Browse files Browse the repository at this point in the history
…rchive#867)

* refactor:(cstor-operator): refactor cstor operator

This commit will do following:
1. Iteration for pool creation will not happen at runtasks
   rather cstor-operator will one by one create pools to converge
   to maxPool count.

2. minPool field in SPC is deprecated. Providing minPool field in
   SPC won't do any thing.

3. maxPool field for manual provisioning is manadatory now to
   support reconciliation for manual provisioned pool.

4. Reconciliation has been added to manual provisioned pool.

Signed-off-by: sonasingh46 <sonasingh46@gmail.com>
  • Loading branch information
sonasingh46 authored and Ashutosh Kumar committed Jan 21, 2019
1 parent 688a76f commit adf6d67
Show file tree
Hide file tree
Showing 12 changed files with 401 additions and 501 deletions.
184 changes: 143 additions & 41 deletions cmd/maya-apiserver/spc-watcher/handler.go
Expand Up @@ -17,14 +17,19 @@ limitations under the License.
package spc

import (
"fmt"
"github.com/golang/glog"
apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
"github.com/openebs/maya/pkg/client/k8s"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/pkg/errors"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"strings"
)

const (
// DEFAULTPOOlCOUNT will be set to maxPool field of spc if maxPool field is not provided.
DEFAULTPOOlCOUNT = 3
)

// syncHandler compares the actual state with the desired, and attempts to
Expand All @@ -44,7 +49,7 @@ func (c *Controller) syncHandler(key, operation string, object interface{}) erro
// interface type of nil is different from nil but all other type of nil has the same type as that of nil.
spcObject := object.(*apis.StoragePoolClaim)
if spcObject == nil {
return fmt.Errorf("storagepoolclaim object not found for storage pool deletion")
return errors.New("storagepoolclaim object not found for storage pool deletion")
}
spcGot = spcObject
}
Expand All @@ -68,16 +73,8 @@ func (c *Controller) syncHandler(key, operation string, object interface{}) erro
func (c *Controller) spcEventHandler(operation string, spcGot *apis.StoragePoolClaim) (string, error) {
switch operation {
case addEvent:
// CreateStoragePool function will create the storage pool
// It is a create event so resync should be false and pendingPoolcount is passed 0
// pendingPoolcount is not used when resync is false.
err := c.CreateStoragePool(spcGot, false, 0)
if err != nil {
glog.Error("Storagepool could not be created:", err)
// To-Do
// If Some error occur patch the spc object with appropriate reason
}

// Call addEventHandler in case of add event.
err := c.addEventHandler(spcGot, false)
return addEvent, err

case updateEvent:
Expand All @@ -89,7 +86,7 @@ func (c *Controller) spcEventHandler(operation string, spcGot *apis.StoragePoolC
if err != nil {
glog.Errorf("Storagepool %s could not be synced:%v", spcGot.Name, err)
}
return syncEvent, err
return syncEvent, nil
case deleteEvent:
err := DeleteStoragePool(spcGot)

Expand Down Expand Up @@ -123,15 +120,15 @@ func (c *Controller) getSpcResource(key string) (*apis.StoragePoolClaim, error)
// Convert the key(namespace/name) string into a distinct name
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("Invalid resource key: %s", key))
runtime.HandleError(errors.Wrapf(err, "Invalid resource key: %s", key))
return nil, err
}
spcGot, err := c.clientset.OpenebsV1alpha1().StoragePoolClaims().Get(name, metav1.GetOptions{})
if err != nil {
// The SPC resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("spcGot '%s' in work queue no longer exists:'%v'", key, err))
if k8serror.IsNotFound(err) {
runtime.HandleError(errors.Wrapf(err, "spcGot '%s' in work queue no longer exists", key))
// No need to return error to caller as we still want to fire the delete handler
// using the spc key(name)
// If error is returned the caller function will return without calling the spcEventHandler
Expand All @@ -143,40 +140,145 @@ func (c *Controller) getSpcResource(key string) (*apis.StoragePoolClaim, error)
return spcGot, nil
}

// synSpc is the function which tries to converge to a desired state for the spc.
func (c *Controller) syncSpc(spcGot *apis.StoragePoolClaim) error {
if len(spcGot.Spec.Disks.DiskList) > 0 {
// TODO : reconciliation for manual storagepool provisioning
glog.V(1).Infof("No reconciliation needed for manual provisioned pool of storagepoolclaim %s", spcGot.Name)
return nil
glog.V(1).Infof("Reconciling storagepoolclaim %s", spcGot.Name)
err := c.addEventHandler(spcGot, true)
return err
}

// addEventHandler is the event handler for the add event of spc.
func (c *Controller) addEventHandler(spc *apis.StoragePoolClaim, resync bool) error {
// TODO: Move these to admission webhook plugins or CRD validations
mutateSpc, err := validate(spc)
if err != nil {
return err
}
glog.V(1).Infof("Syncing storagepoolclaim %s", spcGot.Name)
// Get kubernetes clientset
// namespaces is not required, hence passed empty.
newK8sClient, err := k8s.NewK8sClient("")
// validate can mutate spc object -- for example if maxPool field is not present in case
// of auto provisioning, maxPool will default to 3.
// We need to immediately patch SPC object here.
if mutateSpc && !resync {
spc, err = c.clientset.OpenebsV1alpha1().StoragePoolClaims().Update(spc)
if err != nil {
return errors.Wrap(err, "spc patch for defaulting the field(s) failed")
}
}
pendingPoolCount, err := c.getPendingPoolCount(spc)
if err != nil {
return err
}
// Get openebs clientset using a getter method (i.e. GetOECS() ) as
// the openebs clientset is not exported.
newOecsClient := newK8sClient.GetOECS()
err = c.create(pendingPoolCount, spc)
return nil
}

// create is a wrapper function that calls the actual function to create pool as many time
// as the number of pools need to be created.
func (c *Controller) create(pendingPoolCount int, spc *apis.StoragePoolClaim) error {
var newSpcLease Leaser
newSpcLease = &Lease{spc, SpcLeaseKey, c.clientset, c.kubeclientset}
err := newSpcLease.Hold()
if err != nil {
return errors.Wrapf(err, "Could not acquire lease on spc object")
}
glog.Infof("Lease acquired successfully on storagepoolclaim %s ", spc.Name)
defer newSpcLease.Release()
for poolCount := 1; poolCount <= pendingPoolCount; poolCount++ {
glog.Infof("Provisioning pool %d/%d for storagepoolclaim %s", poolCount, pendingPoolCount, spc.Name)
err = CreateStoragePool(spc)
if err != nil {
runtime.HandleError(errors.Wrapf(err, "Pool provisioning failed for %d/%d for storagepoolclaim %s", poolCount, pendingPoolCount, spc.Name))
}
}
return nil
}

// validate validates the spc configuration before creation of pool.
func validate(spc *apis.StoragePoolClaim) (bool, error) {
// TODO: Move these to admission webhook plugins or CRD validations
// Validations for poolType
var mutateSpc bool
// If maxPool field is skipped or entered a 0 value in SPC then for the add event it will default to 3.
// In case of resync maxPool field will not be mutated.
if len(spc.Spec.Disks.DiskList) == 0 {
maxPools := spc.Spec.MaxPools
if maxPools == 0 {
spc.Spec.MaxPools = DEFAULTPOOlCOUNT
mutateSpc = true
}
if maxPools < 0 {
return mutateSpc, errors.Errorf("aborting storagepool create operation for %s as invalid maxPool value %d", spc.Name, maxPools)
}
}
poolType := spc.Spec.PoolSpec.PoolType
if poolType == "" {
return mutateSpc, errors.Errorf("aborting storagepool create operation for %s as no poolType is specified", spc.Name)
}

if !(poolType == string(apis.PoolTypeStripedCPV) || poolType == string(apis.PoolTypeMirroredCPV)) {
return mutateSpc, errors.Errorf("aborting storagepool create operation as specified poolType is %s which is invalid", poolType)
}

diskType := spc.Spec.Type
if !(diskType == string(apis.TypeSparseCPV) || diskType == string(apis.TypeDiskCPV)) {
return mutateSpc, errors.Errorf("aborting storagepool create operation as specified type is %s which is invalid", diskType)
}
return mutateSpc, nil
}

// getCurrentPoolCount give the current pool count for the given auto provisioned spc.
func (c *Controller) getCurrentPoolCount(spc *apis.StoragePoolClaim) (int, error) {
// Get the current count of provisioned pool for the storagepool claim
spList, err := newOecsClient.OpenebsV1alpha1().StoragePools().List(metav1.ListOptions{LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + spcGot.Name})
spList, err := c.clientset.OpenebsV1alpha1().StoragePools().List(metav1.ListOptions{LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + spc.Name})
if err != nil {
return fmt.Errorf("unable to list storagepools: %v", err)
return 0, errors.Errorf("unable to get current pool count:unable to list storagepools: %v", err)
}
currentPoolCount := len(spList.Items)
return len(spList.Items), nil
}

// If current pool count is less than maxpool count, try to converge to maxpool
if currentPoolCount < int(spcGot.Spec.MaxPools) {
glog.Infof("Converging storagepoolclaim %s to desired state:current pool count is %d,desired pool count is %d", spcGot.Name, currentPoolCount, spcGot.Spec.MaxPools)
// pendingPoolCount holds the pending pool that should be provisioned to get the desired state.
pendingPoolCount := int(spcGot.Spec.MaxPools) - currentPoolCount
// Call the storage pool create logic to provision the pending pools.
err := c.CreateStoragePool(spcGot, true, pendingPoolCount)
// getPendingPoolCount gives the count of pool that needs to be provisioned for a given spc.
func (c *Controller) getPendingPoolCount(spc *apis.StoragePoolClaim) (int, error) {
var pendingPoolCount int
if len(spc.Spec.Disks.DiskList) == 0 {
// Getting pending pool count in case of auto provisioned spc.
currentPoolCount, err := c.getCurrentPoolCount(spc)
if err != nil {
return 0, err
}
maxPoolCount := int(spc.Spec.MaxPools)
pendingPoolCount = maxPoolCount - currentPoolCount
} else {
// TODO: -- Refactor using disk refernces to find the used disks
// Getting pending pool count in case of manual provisioned spc.
// allNodeDiskMap holds the map : diskName --> hostName ; for all the disks.
allNodeDiskMap := make(map[string]string)
// Get all disk in one shot from kube-apiserver
diskList, err := c.clientset.OpenebsV1alpha1().Disks().List(metav1.ListOptions{})
if err != nil {
return err
return 0, err
}
newClientSet := &clientSet{
c.clientset,
}
// Get used disk for the SPC
// usedDiskMap holds the disk which are already used.
usedDiskMap, err := newClientSet.getUsedDiskMap()
for _, disk := range diskList.Items {
if usedDiskMap[disk.Name] == 1 {
continue
}
allNodeDiskMap[disk.Name] = disk.Labels[string(apis.HostNameCPK)]
}
// nodeCountMap holds the node names as the key over which pool should be provisioned.
nodeCountMap := make(map[string]int)
for _, spcDisk := range spc.Spec.Disks.DiskList {
if !(len(strings.TrimSpace(allNodeDiskMap[spcDisk])) == 0) {
nodeCountMap[allNodeDiskMap[spcDisk]]++
}
}
pendingPoolCount = len(nodeCountMap)
}
return nil
if pendingPoolCount < 0 {
return 0, errors.Errorf("Got invalid pending pool count %d", pendingPoolCount)
}
return pendingPoolCount, nil
}
9 changes: 8 additions & 1 deletion cmd/maya-apiserver/spc-watcher/handler_test.go
Expand Up @@ -257,8 +257,15 @@ func TestSyncHandler(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "pool1",
},
Spec: apis.StoragePoolClaimSpec{
Type: string(apis.TypeSparseCPV),
MaxPools: 3,
PoolSpec: apis.CStorPoolAttr{
PoolType: string(apis.PoolTypeStripedCPV),
},
},
},
expectedError: true,
expectedError: false,
},

// TestCase#4
Expand Down

0 comments on commit adf6d67

Please sign in to comment.