Skip to content

Commit

Permalink
feat(topology): adding support for custom topology keys
Browse files Browse the repository at this point in the history
Now user can label the nodes with the required topology, the ZFSPV
driver will support all the node labels as topology keys.

We should label the nodes first and then deploy the driver to make it aware of
all the labels that node has. If we want to add labels after ZFS-LocalPV driver
has been deployed, a restart all the node agents are required so that the driver
can pick the lables and add them as supported topology keys.

Signed-off-by: Pawan <pawan@mayadata.io>
  • Loading branch information
pawanpraka1 committed Apr 27, 2020
1 parent d57976e commit 649ff9f
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 8 deletions.
22 changes: 22 additions & 0 deletions pkg/client/k8s/v1alpha1/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,28 @@ func NumberOfNodes() (int, error) {
}
}

// GetNode returns a node instance from kubernetes cluster
func GetNode(name string) (*corev1.Node, error) {
n := Node()
node, err := n.Get(name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to get node")
} else {
return node, nil
}
}

// ListNodes returns list of node instance from kubernetes cluster
func ListNodes(options metav1.ListOptions) (*corev1.NodeList, error) {
n := Node()
nodelist, err := n.List(options)
if err != nil {
return nil, errors.Wrapf(err, "failed to list node")
} else {
return nodelist, nil
}
}

// GetOSAndKernelVersion gets us the OS,Kernel version
func GetOSAndKernelVersion() (string, error) {
nodes := Node()
Expand Down
12 changes: 11 additions & 1 deletion pkg/driver/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1alpha1"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
k8sapi "github.com/openebs/zfs-localpv/pkg/client/k8s/v1alpha1"
"github.com/openebs/zfs-localpv/pkg/mgmt/snapshot"
"github.com/openebs/zfs-localpv/pkg/mgmt/volume"
"github.com/openebs/zfs-localpv/pkg/zfs"
Expand Down Expand Up @@ -171,7 +172,16 @@ func (ns *node) NodeGetInfo(
req *csi.NodeGetInfoRequest,
) (*csi.NodeGetInfoResponse, error) {

topology := map[string]string{zfs.ZFSTopologyKey: ns.driver.config.NodeID}
node, err := k8sapi.GetNode(ns.driver.config.NodeID)
if err != nil {
logrus.Errorf("failed to get the node %s", ns.driver.config.NodeID)
return nil, err
}

// support all the keys that node has
topology := node.Labels
topology[zfs.ZFSTopologyKey] = ns.driver.config.NodeID

return &csi.NodeGetInfoResponse{
NodeId: ns.driver.config.NodeID,
AccessibleTopology: &csi.Topology{
Expand Down
52 changes: 45 additions & 7 deletions pkg/driver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
k8sapi "github.com/openebs/zfs-localpv/pkg/client/k8s/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

zfs "github.com/openebs/zfs-localpv/pkg/zfs"
Expand All @@ -34,10 +35,39 @@ const (
VolumeWeighted = "VolumeWeighted"
)

// GetNodeList gets the nodelist which satisfies the topology info
func GetNodeList(topo *csi.TopologyRequirement) ([]string, error) {

var nodelist []string

list, err := k8sapi.ListNodes(metav1.ListOptions{})
if err != nil {
return nil, err
}

for _, node := range list.Items {
for _, prf := range topo.Preferred {
nodeFiltered := false
for key, value := range prf.Segments {
if node.Labels[key] != value {
nodeFiltered = true
break
}
}
if nodeFiltered == false {
nodelist = append(nodelist, node.Name)
break
}
}
}

return nodelist, nil
}

// volumeWeightedScheduler goes through all the pools on the nodes mentioned
// in the topology and picks the node which has less volume on
// the given zfs pool.
func volumeWeightedScheduler(topo *csi.TopologyRequirement, pool string) string {
func volumeWeightedScheduler(nodelist []string, pool string) string {
var selected string

zvlist, err := volbuilder.NewKubeclient().
Expand All @@ -62,8 +92,7 @@ func volumeWeightedScheduler(topo *csi.TopologyRequirement, pool string) string

// schedule it on the node which has less
// number of volume for the given pool
for _, prf := range topo.Preferred {
node := prf.Segments[zfs.ZFSTopologyKey]
for _, node := range nodelist {
if volmap[node] < numVol {
selected = node
numVol = volmap[node]
Expand All @@ -81,16 +110,25 @@ func scheduler(topo *csi.TopologyRequirement, schld string, pool string) string
logrus.Errorf("topology information not provided")
return ""
}

nodelist, err := GetNodeList(topo)
if err != nil || len(nodelist) == 0 {
logrus.Errorf("ndelist err : %v", err.Error())
return ""
} else {
logrus.Infof("nodelist : %v", nodelist)
}

// if there is a single node, schedule it on that
if len(topo.Preferred) == 1 {
return topo.Preferred[0].Segments[zfs.ZFSTopologyKey]
if len(nodelist) == 1 {
return nodelist[0]
}

switch schld {
case VolumeWeighted:
return volumeWeightedScheduler(topo, pool)
return volumeWeightedScheduler(nodelist, pool)
default:
return volumeWeightedScheduler(topo, pool)
return volumeWeightedScheduler(nodelist, pool)
}

return ""
Expand Down

0 comments on commit 649ff9f

Please sign in to comment.