Skip to content

Commit

Permalink
ceph: added support for multus for csi
Browse files Browse the repository at this point in the history
CSI pods now utilize multus networking and connect to public
network specified in the CephCluster CR.

Closes: #5356
Signed-off-by: rohan47 <rohgupta@redhat.com>
  • Loading branch information
rohan47 committed Jul 23, 2020
1 parent dcadf10 commit d1054d1
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 21 deletions.
4 changes: 2 additions & 2 deletions pkg/operator/ceph/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ func (c *ClusterController) preClusterStartValidation(cluster *cluster, clusterO
logger.Warning("running osds on directory is not supported anymore, use devices instead.")
}
if cluster.Spec.Network.IsMultus() {
_, isPublic := cluster.Spec.Network.Selectors[config.PublicNetworkSelectorKeyName]
_, isCluster := cluster.Spec.Network.Selectors[config.ClusterNetworkSelectorKeyName]
_, isPublic := cluster.Spec.Network.Selectors[k8sutil.PublicNetworkSelectorKeyName]
_, isCluster := cluster.Spec.Network.Selectors[k8sutil.ClusterNetworkSelectorKeyName]
if !isPublic && !isCluster {
return errors.New("both network selector values for public and cluster selector cannot be empty for multus provider")
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/operator/ceph/config/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// PublicNetworkSelectorKeyName is the network selector key for the ceph public network
PublicNetworkSelectorKeyName = "public"
// ClusterNetworkSelectorKeyName is the network selector key for the ceph cluster network
ClusterNetworkSelectorKeyName = "cluster"
)

var (
// NetworkSelectors is a slice of ceph network selector key name
NetworkSelectors = []string{PublicNetworkSelectorKeyName, ClusterNetworkSelectorKeyName}
NetworkSelectors = []string{k8sutil.PublicNetworkSelectorKeyName, k8sutil.ClusterNetworkSelectorKeyName}
)

func generateNetworkSettings(context *clusterd.Context, namespace string, networkSelectors map[string]string) ([]Option, error) {
Expand Down Expand Up @@ -65,8 +58,8 @@ func generateNetworkSettings(context *clusterd.Context, namespace string, networ
return []Option{}, errors.Wrapf(err, "failed to get network attachment definition configuration for selector %q", selectorKey)
}

if netConfig.Ipam.Subnet != "" {
cephNetworks = append(cephNetworks, configOverride("global", fmt.Sprintf("%s_network", selectorKey), netConfig.Ipam.Subnet))
if netConfig.Ipam.Range != "" {
cephNetworks = append(cephNetworks, configOverride("global", fmt.Sprintf("%s_network", selectorKey), netConfig.Ipam.Range))
} else {
return []Option{}, errors.Errorf("empty subnet from network attachment definition %q", networkSelectors[selectorKey])
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/operator/ceph/csi/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@ import (
"strconv"

"github.com/pkg/errors"
rookclient "github.com/rook/rook/pkg/client/clientset/versioned"
controllerutil "github.com/rook/rook/pkg/operator/ceph/controller"
"github.com/rook/rook/pkg/operator/k8sutil"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/version"
"k8s.io/client-go/kubernetes"
)

func ValidateAndConfigureDrivers(clientset kubernetes.Interface, namespace, rookImage, securityAccount string, serverVersion *version.Info, ownerRef *metav1.OwnerReference) {
func ValidateAndConfigureDrivers(clientset kubernetes.Interface, rookclientset rookclient.Interface, namespace, rookImage, securityAccount string, serverVersion *version.Info, ownerRef *metav1.OwnerReference) {
if err := validateCSIVersion(clientset, namespace, rookImage, securityAccount, ownerRef); err != nil {
logger.Errorf("invalid csi version. %+v", err)
return
}

if err := startDrivers(clientset, namespace, serverVersion, ownerRef); err != nil {
if err := startDrivers(clientset, rookclientset, namespace, serverVersion, ownerRef); err != nil {
logger.Errorf("failed to start Ceph csi drivers. %v", err)
return
}
Expand Down
44 changes: 43 additions & 1 deletion pkg/operator/ceph/csi/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

rookclient "github.com/rook/rook/pkg/client/clientset/versioned"
controllerutil "github.com/rook/rook/pkg/operator/ceph/controller"

"github.com/pkg/errors"
Expand Down Expand Up @@ -205,7 +206,7 @@ func ValidateCSIParam() error {
return nil
}

func startDrivers(clientset kubernetes.Interface, namespace string, ver *version.Info, ownerRef *metav1.OwnerReference) error {
func startDrivers(clientset kubernetes.Interface, rookclientset rookclient.Interface, namespace string, ver *version.Info, ownerRef *metav1.OwnerReference) error {
var (
err error
rbdPlugin, cephfsPlugin *apps.DaemonSet
Expand Down Expand Up @@ -387,6 +388,13 @@ func startDrivers(clientset kubernetes.Interface, namespace string, ver *version
// apply resource request and limit to rbdplugin containers
applyResourcesToContainers(clientset, rbdPluginResource, &rbdPlugin.Spec.Template.Spec)
k8sutil.SetOwnerRef(&rbdPlugin.ObjectMeta, ownerRef)
multusApplied, err := applyCephClusterNetworkConfig(&rbdPlugin.Spec.Template.ObjectMeta, rookclientset)
if err != nil {
return errors.Wrapf(err, "failed to apply network config to rbd plugin daemonset: %+v", rbdPlugin)
}
if multusApplied {
rbdPlugin.Spec.Template.Spec.HostNetwork = false
}
err = k8sutil.CreateDaemonSet(csiRBDPlugin, namespace, clientset, rbdPlugin)
if err != nil {
return errors.Wrapf(err, "failed to start rbdplugin daemonset: %+v", rbdPlugin)
Expand Down Expand Up @@ -415,6 +423,10 @@ func startDrivers(clientset kubernetes.Interface, namespace string, ver *version
Type: apps.RecreateDeploymentStrategyType,
}

_, err = applyCephClusterNetworkConfig(&rbdProvisionerDeployment.Spec.Template.ObjectMeta, rookclientset)
if err != nil {
return errors.Wrapf(err, "failed to apply network config to rbd plugin provisioner deployment: %+v", rbdProvisionerDeployment)
}
err = k8sutil.CreateDeployment(clientset, csiRBDProvisioner, namespace, rbdProvisionerDeployment)
if err != nil {
return errors.Wrapf(err, "failed to start rbd provisioner deployment: %+v", rbdProvisionerDeployment)
Expand All @@ -435,6 +447,13 @@ func startDrivers(clientset kubernetes.Interface, namespace string, ver *version
// apply resource request and limit to cephfs plugin containers
applyResourcesToContainers(clientset, cephFSPluginResource, &cephfsPlugin.Spec.Template.Spec)
k8sutil.SetOwnerRef(&cephfsPlugin.ObjectMeta, ownerRef)
multusApplied, err := applyCephClusterNetworkConfig(&cephfsPlugin.Spec.Template.ObjectMeta, rookclientset)
if err != nil {
return errors.Wrapf(err, "failed to apply network config to cephfs plugin daemonset: %+v", cephfsPlugin)
}
if multusApplied {
cephfsPlugin.Spec.Template.Spec.HostNetwork = false
}
err = k8sutil.CreateDaemonSet(csiCephFSPlugin, namespace, clientset, cephfsPlugin)
if err != nil {
return errors.Wrapf(err, "failed to start cephfs plugin daemonset: %+v", cephfsPlugin)
Expand Down Expand Up @@ -464,6 +483,11 @@ func startDrivers(clientset kubernetes.Interface, namespace string, ver *version
cephfsProvisionerDeployment.Spec.Strategy = apps.DeploymentStrategy{
Type: apps.RecreateDeploymentStrategyType,
}

_, err = applyCephClusterNetworkConfig(&cephfsProvisionerDeployment.Spec.Template.ObjectMeta, rookclientset)
if err != nil {
return errors.Wrapf(err, "failed to apply network config to cephfs plugin provisioner deployment: %+v", cephfsProvisionerDeployment)
}
err = k8sutil.CreateDeployment(clientset, csiCephFSProvisioner, namespace, cephfsProvisionerDeployment)
if err != nil {
return errors.Wrapf(err, "failed to start cephfs provisioner deployment: %+v", cephfsProvisionerDeployment)
Expand Down Expand Up @@ -554,6 +578,24 @@ func deleteCSIDriverResources(
return succeeded
}

func applyCephClusterNetworkConfig(objectMeta *metav1.ObjectMeta, rookclientset rookclient.Interface) (bool, error) {
cephClusters, err := rookclientset.CephV1().CephClusters(objectMeta.Namespace).List(metav1.ListOptions{})
if err != nil {
logger.Debugf("No CephClusters found in namespace %s.", objectMeta.Namespace)
return false, nil
}
for _, cephCluster := range cephClusters.Items {
if cephCluster.Spec.Network.IsMultus() {
err = k8sutil.ApplyMultus(cephCluster.Spec.Network.NetworkSpec, objectMeta)
if err != nil {
return false, err
}
}
}

return true, nil
}

// createCSIDriverInfo Registers CSI driver by creating a CSIDriver object
func createCSIDriverInfo(clientset kubernetes.Interface, name string, ownerRef *metav1.OwnerReference) error {
attach := true
Expand Down
4 changes: 3 additions & 1 deletion pkg/operator/ceph/csi/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package csi
import (
"testing"

rookclient "github.com/rook/rook/pkg/client/clientset/versioned"
"github.com/rook/rook/pkg/operator/test"

"github.com/stretchr/testify/assert"
Expand All @@ -40,10 +41,11 @@ func TestStartCSI(t *testing.T) {
SnapshotterImage: "image",
}
clientset := test.New(t, 3)
var rookclientset rookclient.Interface
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
assert.Nil(t, err)
}
err = startDrivers(clientset, "ns", serverVersion, nil)
err = startDrivers(clientset, rookclientset, "ns", serverVersion, nil)
assert.Nil(t, err)
}
2 changes: 1 addition & 1 deletion pkg/operator/ceph/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (o *Operator) updateDrivers() error {
return errors.Wrap(err, "invalid csi params")
}

go csi.ValidateAndConfigureDrivers(o.context.Clientset, o.operatorNamespace, o.rookImage, o.securityAccount, serverVersion, ownerRef)
go csi.ValidateAndConfigureDrivers(o.context.Clientset, o.context.RookClientset, o.operatorNamespace, o.rookImage, o.securityAccount, serverVersion, ownerRef)
return nil
}

Expand Down
20 changes: 17 additions & 3 deletions pkg/operator/k8sutil/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// PublicNetworkSelectorKeyName is the network selector key for the ceph public network
PublicNetworkSelectorKeyName = "public"
// ClusterNetworkSelectorKeyName is the network selector key for the ceph cluster network
ClusterNetworkSelectorKeyName = "cluster"
)

// NetworkAttachmentConfig represents the configuration of the NetworkAttachmentDefinitions object
type NetworkAttachmentConfig struct {
CniVersion string `json:"cniVersion"`
Expand All @@ -34,7 +41,7 @@ type NetworkAttachmentConfig struct {
Mode string `json:"mode"`
Ipam struct {
Type string `json:"type"`
Subnet string `json:"subnet"`
Range string `json:"range"`
RangeStart string `json:"rangeStart"`
RangeEnd string `json:"rangeEnd"`
Routes []struct {
Expand Down Expand Up @@ -109,7 +116,7 @@ func ApplyMultus(net rookv1.NetworkSpec, objectMeta *metav1.ObjectMeta) error {
shortSyntax := false
jsonSyntax := false

for _, ns := range net.Selectors {
for k, ns := range net.Selectors {
var multusMap map[string]string
err := json.Unmarshal([]byte(ns), &multusMap)

Expand All @@ -119,7 +126,14 @@ func ApplyMultus(net rookv1.NetworkSpec, objectMeta *metav1.ObjectMeta) error {
shortSyntax = true
}

v = append(v, string(ns))
isCsi := strings.Contains(objectMeta.Labels["app"], "csi-")
if isCsi {
if k == PublicNetworkSelectorKeyName {
v = append(v, string(ns))
}
} else {
v = append(v, string(ns))
}
}

if shortSyntax && jsonSyntax {
Expand Down
2 changes: 1 addition & 1 deletion pkg/operator/k8sutil/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,5 @@ func TestGetNetworkAttachmentConfig(t *testing.T) {

config, err := GetNetworkAttachmentConfig(dummyNetAttachDef)
assert.NoError(t, err)
assert.Equal(t, "172.18.8.0/24", config.Ipam.Subnet)
assert.Equal(t, "172.18.8.0/24", config.Ipam.Range)
}

0 comments on commit d1054d1

Please sign in to comment.