Skip to content

Commit

Permalink
Read-Only Mode
Browse files Browse the repository at this point in the history
Creating an additional flag (`--readonly`) so the object-cache is
updated just before mounting the volume. It allows the controller to
only watch OpenShift's `Share`.
  • Loading branch information
otaviof committed Aug 21, 2021
1 parent 8a00803 commit 4716365
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 36 deletions.
28 changes: 26 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

v1 "github.com/openshift/api/operator/v1"
"github.com/openshift/csi-driver-shared-resource/pkg/client"
"github.com/openshift/csi-driver-shared-resource/pkg/controller"
"github.com/openshift/csi-driver-shared-resource/pkg/hostpath"
)
Expand All @@ -26,6 +28,7 @@ var (
maxVolumesPerNode int64
version string
shareRelistInterval string
readOnly bool

shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
onlyOneSignalHandler = make(chan struct{})
Expand All @@ -37,7 +40,19 @@ var rootCmd = &cobra.Command{
Short: "",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
driver, err := hostpath.NewHostPathDriver(hostpath.DataRoot, hostpath.VolumeMapRoot, driverName, nodeID, endPoint, maxVolumesPerNode, version)
var kubeClient kubernetes.Interface
var err error

if readOnly {
fmt.Println("Read-Only mode: Loading Kubernetes client for HostPathDriver")

if kubeClient, err = loadKubeConfig(); err != nil {
fmt.Printf("Failed to load Kubernetes API client: %s", err.Error())
os.Exit(1)
}
}

driver, err := hostpath.NewHostPathDriver(hostpath.DataRoot, hostpath.VolumeMapRoot, driverName, nodeID, endPoint, maxVolumesPerNode, version, kubeClient)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
Expand Down Expand Up @@ -68,6 +83,15 @@ func init() {
rootCmd.Flags().Int64Var(&maxVolumesPerNode, "maxvolumespernode", 0, "limit of volumes per node")
rootCmd.Flags().StringVar(&shareRelistInterval, "share-relist-interval", "",
"the time between controller relist on the share resource expressed with golang time.Duration syntax(default=10m")
rootCmd.Flags().BoolVar(&readOnly, "readonly", false, "does not watch for resource updates")
}

func loadKubeConfig() (*kubernetes.Clientset, error) {
kubeRestConfig, err := client.GetConfig()
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(kubeRestConfig)
}

func runOperator() {
Expand All @@ -81,7 +105,7 @@ func runOperator() {
shareRelist = controller.DefaultResyncDuration
}
}
c, err := controller.NewController(shareRelist)
c, err := controller.NewController(shareRelist, readOnly)
if err != nil {
fmt.Printf("Failed to set up controller: %s", err.Error())
os.Exit(1)
Expand Down
10 changes: 10 additions & 0 deletions pkg/cache/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"fmt"
"strings"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -19,6 +20,15 @@ func BuildKey(namespace, name string) string {
return namespace + ":" + name
}

// SplitKey splits the shared-data-key in namespace and name.
func SplitKey(key string) (string, string, error) {
s := strings.Split(key, ":")
if len(s) != 2 || s[0] == "" || s[1] == "" {
return "", "", fmt.Errorf("unable to split key '%s' in namespace and name", key)
}
return s[0], s[1], nil
}

func buildCallbackMap(key, value interface{}) *sync.Map {
c := &sync.Map{}
c.Store(key, value)
Expand Down
23 changes: 22 additions & 1 deletion pkg/cache/configmaps.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package cache

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

sharev1alpha1 "github.com/openshift/csi-driver-shared-resource/pkg/api/sharedresource/v1alpha1"
Expand All @@ -24,7 +27,7 @@ On the use of sync.Map, see the comments in share.go

var (
// configmaps is our global configmap id (namespace + name) to configmap map, where entries are populated from
// contorller events; it serves to facilitate quick lookup during share event processing, when the share references
// controller events; it serves to facilitate quick lookup during share event processing, when the share references
// a configmap
configmaps = sync.Map{}
// configmapUpsertCallbacks has a key of the CSI volume ID and a value of the function to be called when a given
Expand Down Expand Up @@ -57,6 +60,24 @@ func GetConfigMap(key interface{}) *corev1.ConfigMap {
return nil
}

// SetConfigMap based on the shared-data-key, which contains the resource's namespace and name, this
// method can fetch and store it on cache.
func SetConfigMap(kubeClient kubernetes.Interface, sharedDataKey string) error {
ns, name, err := SplitKey(sharedDataKey)
if err != nil {
return err
}

ctx := context.TODO()
cm, err := kubeClient.CoreV1().ConfigMaps(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}

UpsertConfigMap(cm)
return nil
}

func UpsertConfigMap(configmap *corev1.ConfigMap) {
key := GetKey(configmap)
klog.V(0).Infof("UpsertConfigMap key %s", key)
Expand Down
23 changes: 22 additions & 1 deletion pkg/cache/secrets.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package cache

import (
"context"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

sharev1alpha1 "github.com/openshift/csi-driver-shared-resource/pkg/api/sharedresource/v1alpha1"
Expand All @@ -25,7 +28,7 @@ On the use of sync.Map, see the comments in share.go

var (
// secrets is our global configmap id (namespace + name) to secret map, where entries are populated from
// contorller events; it serves to facilitate quick lookup during share event processing, when the share references
// controller events; it serves to facilitate quick lookup during share event processing, when the share references
// a secret
secrets = sync.Map{}
// secretUpsertCallbacks has a key of the CSI volume ID and a value of the function to be called when a given
Expand Down Expand Up @@ -58,6 +61,24 @@ func GetSecret(key interface{}) *corev1.Secret {
return nil
}

// SetSecret based on the shared-data-key, which contains the resource's namespace and name, this
// method can fetch and store it on cache.
func SetSecret(kubeClient kubernetes.Interface, sharedDataKey string) error {
ns, name, err := SplitKey(sharedDataKey)
if err != nil {
return err
}

ctx := context.TODO()
secret, err := kubeClient.CoreV1().Secrets(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}

UpsertSecret(secret)
return nil
}

func UpsertSecret(secret *corev1.Secret) {
key := GetKey(secret)
klog.V(4).Infof("UpsertSecret key %s", key)
Expand Down
55 changes: 39 additions & 16 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
type Controller struct {
kubeRestConfig *rest.Config

kubeClient *kubernetes.Clientset

cfgMapWorkqueue workqueue.RateLimitingInterface
secretWorkqueue workqueue.RateLimitingInterface
shareWorkqueue workqueue.RateLimitingInterface
Expand All @@ -43,7 +45,9 @@ type Controller struct {
listers *client.Listers
}

func NewController(shareRelist time.Duration) (*Controller, error) {
// NewController instantiate a new controller with relisting interval, and optional read-only mode.
// Read only mode means the controller won't keep watching ConfigMaps and Secret for future changes.
func NewController(shareRelist time.Duration, readOnly bool) (*Controller, error) {
kubeRestConfig, err := client.GetConfig()
if err != nil {
return nil, err
Expand Down Expand Up @@ -112,46 +116,65 @@ func NewController(shareRelist time.Duration) (*Controller, error) {
shareRelist)

c := &Controller{
kubeClient: kubeClient,
kubeRestConfig: kubeRestConfig,
cfgMapWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"shared-resource-configmap-changes"),
secretWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"shared-resource-secret-changes"),
shareWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"shared-resource-share-changes"),
informerFactory: informerFactory,
shareInformerFactory: shareInformerFactory,
cfgMapInformer: informerFactory.Core().V1().ConfigMaps().Informer(),
secInformer: informerFactory.Core().V1().Secrets().Informer(),
shareInformer: shareInformerFactory.Sharedresource().V1alpha1().Shares().Informer(),
listers: client.GetListers(),
}

client.SetConfigMapsLister(c.informerFactory.Core().V1().ConfigMaps().Lister())
client.SetSecretsLister(c.informerFactory.Core().V1().Secrets().Lister())
if !readOnly {
c.cfgMapWorkqueue = workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "shared-resource-configmap-changes")
c.secretWorkqueue = workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "shared-resource-secret-changes")
c.cfgMapInformer = informerFactory.Core().V1().ConfigMaps().Informer()
c.secInformer = informerFactory.Core().V1().Secrets().Informer()

client.SetConfigMapsLister(c.informerFactory.Core().V1().ConfigMaps().Lister())
client.SetSecretsLister(c.informerFactory.Core().V1().Secrets().Lister())
}

client.SetSharesLister(c.shareInformerFactory.Sharedresource().V1alpha1().Shares().Lister())

c.cfgMapInformer.AddEventHandler(c.configMapEventHandler())
c.secInformer.AddEventHandler(c.secretEventHandler())
if !readOnly {
c.cfgMapInformer.AddEventHandler(c.configMapEventHandler())
c.secInformer.AddEventHandler(c.secretEventHandler())
}
c.shareInformer.AddEventHandler(c.shareEventHandler())

return c, nil
}

func (c *Controller) Run(stopCh <-chan struct{}) error {
defer c.cfgMapWorkqueue.ShutDown()
defer c.secretWorkqueue.ShutDown()
if c.cfgMapWorkqueue != nil && c.secretWorkqueue != nil {
defer c.cfgMapWorkqueue.ShutDown()
defer c.secretWorkqueue.ShutDown()
}
defer c.shareWorkqueue.ShutDown()

c.informerFactory.Start(stopCh)
c.shareInformerFactory.Start(stopCh)

if !cache.WaitForCacheSync(stopCh, c.cfgMapInformer.HasSynced, c.secInformer.HasSynced, c.shareInformer.HasSynced) {
if c.cfgMapInformer != nil && !cache.WaitForCacheSync(stopCh, c.cfgMapInformer.HasSynced) {
return fmt.Errorf("failed to wait for ConfigMap informer cache to sync")
}
if c.secInformer != nil && !cache.WaitForCacheSync(stopCh, c.secInformer.HasSynced) {
return fmt.Errorf("failed to wait for Secrets informer cache to sync")
}
if !cache.WaitForCacheSync(stopCh, c.shareInformer.HasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}

go wait.Until(c.configMapEventProcessor, time.Second, stopCh)
go wait.Until(c.secretEventProcessor, time.Second, stopCh)
if c.cfgMapWorkqueue != nil {
go wait.Until(c.configMapEventProcessor, time.Second, stopCh)
}
if c.secretWorkqueue != nil {
go wait.Until(c.secretEventProcessor, time.Second, stopCh)
}
go wait.Until(c.shareEventProcessor, time.Second, stopCh)

<-stopCh
Expand Down
40 changes: 39 additions & 1 deletion pkg/hostpath/hostpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

sharev1alpha1 "github.com/openshift/csi-driver-shared-resource/pkg/api/sharedresource/v1alpha1"
Expand All @@ -48,6 +49,10 @@ type hostPath struct {
ns *nodeServer

root string

// kubeClient optional clientset, when informed the driver will employ it to update the cache
// based on the Share's backing-resource.
kubeClient kubernetes.Interface
}

var (
Expand Down Expand Up @@ -101,7 +106,10 @@ type HostPathDriver interface {
mapVolumeToPod(hpv *hostPathVolume) error
}

func NewHostPathDriver(root, volMapRoot, driverName, nodeID, endpoint string, maxVolumesPerNode int64, version string) (*hostPath, error) {
// NewHostPathDriver instantiate the HostPathDriver with the driver details. Optionally, a
// Kubernetes Clientset can be informed to update (warm up) the object cache before creating the
// volume (and it's data) for mounting on the incoming pod.
func NewHostPathDriver(root, volMapRoot, driverName, nodeID, endpoint string, maxVolumesPerNode int64, version string, kubeClient kubernetes.Interface) (*hostPath, error) {
if driverName == "" {
return nil, errors.New("no driver name provided")
}
Expand All @@ -128,13 +136,18 @@ func NewHostPathDriver(root, volMapRoot, driverName, nodeID, endpoint string, ma
klog.Infof("Driver: %v ", driverName)
klog.Infof("Version: %s", vendorVersion)

if kubeClient != nil {
klog.Info("HostPathDriver will directly read Kubernetes resources!")
}

hp := &hostPath{
name: driverName,
version: vendorVersion,
nodeID: nodeID,
endpoint: endpoint,
maxVolumesPerNode: maxVolumesPerNode,
root: root,
kubeClient: kubeClient,
}

volMapOnDiskPath = filepath.Join(volMapRoot, VolumeMapFile)
Expand Down Expand Up @@ -524,8 +537,33 @@ func mapBackingResourceToPod(hpv *hostPathVolume) error {
return nil
}

// updateObjCache fetches the resources and populates the object-cache just before mounting.
func (hp *hostPath) updateObjCache(hpv *hostPathVolume) error {
kind := hpv.GetSharedDataKind()
key := hpv.GetSharedDataKey()
klog.V(4).Infof("populating object-cache with '%s' (key='%s') before mounting", kind, key)
switch strings.TrimSpace(kind) {
case "ConfigMap":
return objcache.SetConfigMap(hp.kubeClient, key)
case "Secret":
return objcache.SetSecret(hp.kubeClient, key)
default:
return fmt.Errorf("invalid share backing resource kind %s", kind)
}
}

func (hp *hostPath) mapVolumeToPod(hpv *hostPathVolume) error {
klog.V(4).Infof("mapVolumeToPod calling mapBackingResourceToPod")

// given the kubeclient is instantiated, it will use it to fetch the resources just before
// mounting the volume on the pod, otherwise, it's exected the object-cache already contains the
// resource in question
if hp.kubeClient != nil {
if err := hp.updateObjCache(hpv); err != nil {
return err
}
}

err := mapBackingResourceToPod(hpv)
if err != nil {
return err
Expand Down

0 comments on commit 4716365

Please sign in to comment.