Skip to content

Commit

Permalink
incorporate review comments
Browse files Browse the repository at this point in the history
Signed-off-by: sonasingh46 <sonasingh46@gmail.com>
  • Loading branch information
sonasingh46 committed Dec 19, 2018
1 parent bd1e57c commit ee5f590
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 54 deletions.
48 changes: 22 additions & 26 deletions cmd/maya-apiserver/spc-watcher/handler.go
Expand Up @@ -17,11 +17,10 @@ limitations under the License.
package spc

import (
"fmt"
"github.com/golang/glog"
"github.com/openebs/CITF/pkg/apis/openebs.io/v1alpha1"
apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/pkg/errors"
k8serror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
Expand All @@ -44,7 +43,7 @@ func (c *Controller) syncHandler(key, operation string, object interface{}) erro
// interface type of nil is different from nil but all other type of nil has the same type as that of nil.
spcObject := object.(*apis.StoragePoolClaim)
if spcObject == nil {
return fmt.Errorf("storagepoolclaim object not found for storage pool deletion")
return errors.New("storagepoolclaim object not found for storage pool deletion")
}
spcGot = spcObject
}
Expand Down Expand Up @@ -115,15 +114,15 @@ func (c *Controller) getSpcResource(key string) (*apis.StoragePoolClaim, error)
// Convert the key(namespace/name) string into a distinct name
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("Invalid resource key: %s", key))
runtime.HandleError(errors.Wrapf(err, "Invalid resource key: %s", key))
return nil, err
}
spcGot, err := c.clientset.OpenebsV1alpha1().StoragePoolClaims().Get(name, metav1.GetOptions{})
if err != nil {
// The SPC resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("spcGot '%s' in work queue no longer exists:'%v'", key, err))
if k8serror.IsNotFound(err) {
runtime.HandleError(errors.Wrapf(err, "spcGot '%s' in work queue no longer exists", key))
// No need to return error to caller as we still want to fire the delete handler
// using the spc key(name)
// If error is returned the caller function will return without calling the spcEventHandler
Expand All @@ -149,7 +148,7 @@ func (c *Controller) syncSpc(spcGot *apis.StoragePoolClaim) error {
// pendingPoolCount holds the pending pool that should be provisioned to get the desired state.
pendingPoolCount := maxPoolCount - currentPoolCount
// Call the storage pool create logic to provision the pending pools.
err = c.storagePoolCreateWrapper(pendingPoolCount, spcGot)
err = c.create(pendingPoolCount, spcGot)
if err != nil {
return err
}
Expand All @@ -159,60 +158,57 @@ func (c *Controller) syncSpc(spcGot *apis.StoragePoolClaim) error {

// addEventHandler is the event handler for the add event of spc.
func (c *Controller) addEventHandler(spc *apis.StoragePoolClaim) error {
err := storagePoolValidator(spc)
err := validate(spc)
if err != nil {
return err
}
currentPoolCount, err := c.getCurrentPoolCount(spc)
if err != nil {
return err
}
err = c.storagePoolCreateWrapper(spc.Spec.MaxPools-currentPoolCount, spc)
if err != nil {
return err
}
err = c.create(spc.Spec.MaxPools-currentPoolCount, spc)
return nil
}

// storagePoolCreateWrapper is a wrapper function that calls the actual function to create pool as many time
// create is a wrapper function that calls the actual function to create pool as many time
// as the number of pools need to be created.
func (c *Controller) storagePoolCreateWrapper(maxPool int, spc *apis.StoragePoolClaim) error {
func (c *Controller) create(maxPool int, spc *apis.StoragePoolClaim) error {
var newSpcLease Leaser
newSpcLease = &Lease{spc, SpcLeaseKey, c.clientset, c.kubeclientset}
err := newSpcLease.Hold()
if err != nil {
return fmt.Errorf("Could not acquire lease on spc object:%v", err)
return errors.Wrapf(err, "Could not acquire lease on spc object")
}
glog.Infof("Lease acquired successfully on storagepoolclaim %s ", spc.Name)
defer newSpcLease.Release()
for poolCount := 1; poolCount <= maxPool; poolCount++ {
glog.Infof("Provisioning pool %d/%d for storagepoolclaim %s", poolCount, maxPool, spc.Name)
err = CreateStoragePool(spc)
if err != nil {
glog.Errorf("Pool provisioning failed for %d/%d for storagepoolclaim %s", poolCount, maxPool, spc.Name)
runtime.HandleError(errors.Wrapf(err, "Pool provisioning failed for %d/%d for storagepoolclaim %s", poolCount, maxPool, spc.Name))
}
}
return nil
}

// storagePoolValidator validates the spc configuration before creation of pool.
func storagePoolValidator(spc *apis.StoragePoolClaim) error {
// validate validates the spc configuration before creation of pool.
func validate(spc *apis.StoragePoolClaim) error {
// Validations for poolType
if spc.Spec.MaxPools <= 0 {
return fmt.Errorf("aborting storagepool create operation for %s as maxPool count is invalid ", spc.Name)
return errors.Errorf("aborting storagepool create operation for %s as maxPool count is invalid ", spc.Name)
}
poolType := spc.Spec.PoolSpec.PoolType
if poolType == "" {
return fmt.Errorf("aborting storagepool create operation for %s as no poolType is specified", spc.Name)
return errors.Errorf("aborting storagepool create operation for %s as no poolType is specified", spc.Name)
}

if !(poolType == string(v1alpha1.PoolTypeStripedCPV) || poolType == string(v1alpha1.PoolTypeMirroredCPV)) {
return fmt.Errorf("aborting storagepool create operation as specified poolType is %s which is invalid", poolType)
if !(poolType == string(apis.PoolTypeStripedCPV) || poolType == string(apis.PoolTypeMirroredCPV)) {
return errors.Errorf("aborting storagepool create operation as specified poolType is %s which is invalid", poolType)
}

diskType := spc.Spec.Type
if !(diskType == string(v1alpha1.TypeSparseCPV) || diskType == string(v1alpha1.TypeDiskCPV)) {
return fmt.Errorf("aborting storagepool create operation as specified type is %s which is invalid", diskType)
if !(diskType == string(apis.TypeSparseCPV) || diskType == string(apis.TypeDiskCPV)) {
return errors.Errorf("aborting storagepool create operation as specified type is %s which is invalid", diskType)
}
return nil
}
Expand All @@ -222,7 +218,7 @@ func (c *Controller) getCurrentPoolCount(spc *apis.StoragePoolClaim) (int, error
// Get the current count of provisioned pool for the storagepool claim
spList, err := c.clientset.OpenebsV1alpha1().StoragePools().List(metav1.ListOptions{LabelSelector: string(apis.StoragePoolClaimCPK) + "=" + spc.Name})
if err != nil {
return 0, fmt.Errorf("unable to get current pool count:unable to list storagepools: %v", err)
return 0, errors.Errorf("unable to get current pool count:unable to list storagepools: %v", err)
}
return len(spList.Items), nil
}
25 changes: 12 additions & 13 deletions cmd/maya-apiserver/spc-watcher/select_disk.go
Expand Up @@ -19,11 +19,10 @@ package spc
import (
mach_apis_meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
//openebs "github.com/openebs/maya/pkg/client/clientset/versioned"
"errors"
"fmt"
"github.com/golang/glog"
"github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
openebs "github.com/openebs/maya/pkg/client/generated/clientset/internalclientset"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/runtime"
)

const (
Expand All @@ -41,7 +40,7 @@ type clientSet struct {

type diskList struct {
//diskList is the list of usable disks that can be used in storagepool provisioning.
diskList []string
items []string
}

type nodeDisk struct {
Expand All @@ -55,7 +54,7 @@ func (k *clientSet) nodeDiskAlloter(cp *v1alpha1.StoragePoolClaim) (*nodeDisk, e
// should not be returned.
listDisk, err := k.getDisk(cp)
if err != nil {
return nil, fmt.Errorf("error in getting the disk list:%v", err)
return nil, errors.Errorf("error in getting the disk list:%v", err)
}
if len(listDisk.Items) == 0 {
return nil, errors.New("no disk object found")
Expand Down Expand Up @@ -113,12 +112,12 @@ func (k *clientSet) nodeSelector(listDisk *v1alpha1.DiskList, poolType string, s
if nodeDiskMap[value.Labels[string(v1alpha1.HostNameCPK)]] == nil {
// Entry to this block means first time the hostname will be mapped for the first time.
// Obviously, this entry of hostname(node) is for a usable disk and initialize diskCount to 1.
nodeDiskMap[value.Labels[string(v1alpha1.HostNameCPK)]] = &diskList{diskList: []string{value.Name}}
nodeDiskMap[value.Labels[string(v1alpha1.HostNameCPK)]] = &diskList{items: []string{value.Name}}
} else {
// Entry to this block means the hostname was already mapped and it has more than one disk and at least two disks.
nodeDisk := nodeDiskMap[value.Labels[string(v1alpha1.HostNameCPK)]]
// Add the current disk to the diskList for this node.
nodeDisk.diskList = append(nodeDisk.diskList, value.Name)
nodeDisk.items = append(nodeDisk.items, value.Name)
}

}
Expand All @@ -134,7 +133,7 @@ func diskSelector(nodeDiskMap map[string]*diskList, poolType string) *nodeDisk {
selectedDisk := &nodeDisk{
nodeName: "",
disks: diskList{
diskList: []string{},
items: []string{},
},
}

Expand All @@ -154,12 +153,12 @@ func diskSelector(nodeDiskMap map[string]*diskList, poolType string) *nodeDisk {

// If the current disk count on the node is less than the required disks
// then this is a dirty node and it will not qualify.
if len(val.diskList) < requiredDiskCount {
if len(val.items) < requiredDiskCount {
continue
}
// Select the required disk from qualified nodes.
for i := 0; i < requiredDiskCount; i++ {
selectedDisk.disks.diskList = append(selectedDisk.disks.diskList, val.diskList[i])
selectedDisk.disks.items = append(selectedDisk.disks.items, val.items[i])
}
selectedDisk.nodeName = node
break
Expand All @@ -186,7 +185,7 @@ func (k *clientSet) getUsedDiskMap() (map[string]int, error) {
// Get the list of disk that has been used already for pool provisioning
spList, err := k.oecs.OpenebsV1alpha1().StoragePools().List(mach_apis_meta_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("unable to get the list of storagepools:%v", err)
return nil, errors.Wrapf(err, "unable to get the list of storagepools")
}
// Form a map that will hold all the used disk
usedDiskMap := make(map[string]int)
Expand All @@ -205,7 +204,7 @@ func (k *clientSet) getUsedNodeMap(spc string) (map[string]int, error) {
// Get the list of storagepool
spList, err := k.oecs.OpenebsV1alpha1().StoragePools().List(mach_apis_meta_v1.ListOptions{LabelSelector: string(v1alpha1.StoragePoolClaimCPK) + "=" + spc})
if err != nil {
return nil, fmt.Errorf("unable to get the list of storagepools for stragepoolclaim %s:%v", spc, err)
return nil, errors.Wrapf(err, "unable to get the list of storagepools for stragepoolclaim %s", spc)
}
// Form a map that will hold all the nodes where storagepool for the spc has been already created.
usedNodeMap := make(map[string]int)
Expand All @@ -231,7 +230,7 @@ func (k *clientSet) getDisk(cp *v1alpha1.StoragePoolClaim) (*v1alpha1.DiskList,
for _, v := range spcDisks {
getDisk, err := k.oecs.OpenebsV1alpha1().Disks().Get(v, mach_apis_meta_v1.GetOptions{})
if err != nil {
glog.Error("Error in fetching disk:", err)
runtime.HandleError(errors.Wrapf(err, "Error in fetching disk"))
} else {
// Deep-copy not required unless the object internal fields of objects are pointer referenced.
listDisk.Items = append(listDisk.Items, *getDisk)
Expand Down
4 changes: 2 additions & 2 deletions cmd/maya-apiserver/spc-watcher/select_disk_test.go
Expand Up @@ -190,8 +190,8 @@ func TestNodeDiskAlloter(t *testing.T) {
if gotErr != test.err {
t.Fatalf("Test case failed as the expected error %v but got %v", test.err, gotErr)
}
if len(diskList.disks.diskList) != test.expectedDiskListLength {
t.Errorf("Test case failed as the expected disk list length is %d but got %d", test.expectedDiskListLength, len(diskList.disks.diskList))
if len(diskList.disks.items) != test.expectedDiskListLength {
t.Errorf("Test case failed as the expected disk list length is %d but got %d", test.expectedDiskListLength, len(diskList.disks.items))
}
})
}
Expand Down
18 changes: 9 additions & 9 deletions cmd/maya-apiserver/spc-watcher/storagepool_create.go
Expand Up @@ -17,12 +17,12 @@ limitations under the License.
package spc

import (
"fmt"
"github.com/golang/glog"
"github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
apis "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
"github.com/openebs/maya/pkg/client/k8s"
"github.com/openebs/maya/pkg/storagepool"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -74,11 +74,11 @@ func poolCreateWorker(pool *apis.CasPool) error {

storagepoolOps, err := storagepool.NewCasPoolOperation(pool)
if err != nil {
return fmt.Errorf("NewCasPoolOperation failed error '%s'", err.Error())
return errors.Wrapf(err, "NewCasPoolOperation failed error")
}
_, err = storagepoolOps.Create()
if err != nil {
return fmt.Errorf("failed to create cas template based storagepool: error '%s'", err.Error())
return errors.Wrapf(err, "failed to create cas template based storagepool")

}
glog.Infof("Cas template based storagepool created successfully: name '%s'", pool.StoragePoolClaim)
Expand Down Expand Up @@ -109,17 +109,17 @@ func (newClientSet *clientSet) casPoolBuilder(casPool *apis.CasPool, spc *apis.S
// getDiskList will hold node and disks attached to it to be used for storagepool provisioning.
nodeDisks, err := newClientSet.nodeDiskAlloter(spc)
if err != nil {
return nil, fmt.Errorf("aborting storagepool create operation as no node qualified: %v", err)
return nil, errors.Wrapf(err, "aborting storagepool create operation as no node qualified")
}
if len(nodeDisks.disks.diskList) == 0 {
return nil, fmt.Errorf("aborting storagepool create operation as no disk was found")
if len(nodeDisks.disks.items) == 0 {
return nil, errors.New("aborting storagepool create operation as no disk was found")
}
// For each of the disks, extract the device Id and fill the 'DeviceId' field of the CasPool object with it.
// In case, device Id is not available, fill the 'DeviceId' field of the CasPool object with device path.
for _, v := range nodeDisks.disks.diskList {
for _, v := range nodeDisks.disks.items {
gotDisk, err := newClientSet.oecs.OpenebsV1alpha1().Disks().Get(v, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("Failed to get device id for disk:failed to list the disks:%s", err)
return nil, errors.Wrapf(err, "Failed to get device id for disk:failed to list the disks")
}
if len(gotDisk.Spec.DevLinks) != 0 && len(gotDisk.Spec.DevLinks[0].Links) != 0 {
// Fill device Id of the disk to the CasPool object.
Expand All @@ -134,6 +134,6 @@ func (newClientSet *clientSet) casPoolBuilder(casPool *apis.CasPool, spc *apis.S
// Fill the node name to the CasPool object.
casPool.NodeName = nodeDisks.nodeName
// Fill the disks attached to this node to the CasPool object.
casPool.DiskList = nodeDisks.disks.diskList
casPool.DiskList = nodeDisks.disks.items
return casPool, nil
}
6 changes: 6 additions & 0 deletions pkg/apis/openebs.io/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/install/v1alpha1/cstor_pool.go
Expand Up @@ -105,7 +105,7 @@ spec:
apiVersion: openebs.io/v1alpha1
kind: RunTask
metadata:
name: cstor-pool-create-putcstorpoolcr-default-0.8.0
name: cstor-pool-create-putcstorpoolcr-default
spec:
meta: |
apiVersion: openebs.io/v1alpha1
Expand Down Expand Up @@ -139,7 +139,7 @@ spec:
apiVersion: openebs.io/v1alpha1
kind: RunTask
metadata:
name: cstor-pool-create-putcstorpooldeployment-default-0.8.0
name: cstor-pool-create-putcstorpooldeployment-default
spec:
meta: |
runNamespace: {{.Config.RunNamespace.value}}
Expand Down Expand Up @@ -283,7 +283,7 @@ spec:
apiVersion: openebs.io/v1alpha1
kind: RunTask
metadata:
name: cstor-pool-create-putstoragepoolcr-default-0.8.0
name: cstor-pool-create-putstoragepoolcr-default
spec:
meta: |
apiVersion: openebs.io/v1alpha1
Expand Down Expand Up @@ -315,7 +315,7 @@ spec:
apiVersion: openebs.io/v1alpha1
kind: RunTask
metadata:
name: cstor-pool-create-patchstoragepoolclaim-default-0.8.0
name: cstor-pool-create-patchstoragepoolclaim-default
spec:
meta: |
id: patchstoragepoolclaim
Expand Down

0 comments on commit ee5f590

Please sign in to comment.