Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
otaviof committed Aug 18, 2021
1 parent 8a00803 commit bc99e2e
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 19 deletions.
25 changes: 23 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

v1 "github.com/openshift/api/operator/v1"
"github.com/openshift/csi-driver-shared-resource/pkg/client"
"github.com/openshift/csi-driver-shared-resource/pkg/controller"
"github.com/openshift/csi-driver-shared-resource/pkg/hostpath"
)
Expand All @@ -26,6 +28,7 @@ var (
maxVolumesPerNode int64
version string
shareRelistInterval string
readOnly bool

shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
onlyOneSignalHandler = make(chan struct{})
Expand All @@ -37,7 +40,16 @@ var rootCmd = &cobra.Command{
Short: "",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
driver, err := hostpath.NewHostPathDriver(hostpath.DataRoot, hostpath.VolumeMapRoot, driverName, nodeID, endPoint, maxVolumesPerNode, version)
var kubeClient *kubernetes.Clientset
var err error
if readOnly {
if kubeClient, err = loadKubeConfig(); err != nil {
fmt.Printf("Failed to load Kubernetes API client: %s", err.Error())
os.Exit(1)
}
}

driver, err := hostpath.NewHostPathDriver(hostpath.DataRoot, hostpath.VolumeMapRoot, driverName, nodeID, endPoint, maxVolumesPerNode, version, kubeClient)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
Expand Down Expand Up @@ -68,6 +80,15 @@ func init() {
rootCmd.Flags().Int64Var(&maxVolumesPerNode, "maxvolumespernode", 0, "limit of volumes per node")
rootCmd.Flags().StringVar(&shareRelistInterval, "share-relist-interval", "",
"the time between controller relist on the share resource expressed with golang time.Duration syntax(default=10m")
rootCmd.Flags().BoolVar(&readOnly, "readonly", false, "read-only mode, does not modify content")
}

func loadKubeConfig() (*kubernetes.Clientset, error) {
kubeRestConfig, err := client.GetConfig()
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(kubeRestConfig)
}

func runOperator() {
Expand All @@ -81,7 +102,7 @@ func runOperator() {
shareRelist = controller.DefaultResyncDuration
}
}
c, err := controller.NewController(shareRelist)
c, err := controller.NewController(shareRelist, readOnly)
if err != nil {
fmt.Printf("Failed to set up controller: %s", err.Error())
os.Exit(1)
Expand Down
9 changes: 9 additions & 0 deletions pkg/cache/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"fmt"
"strings"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -19,6 +20,14 @@ func BuildKey(namespace, name string) string {
return namespace + ":" + name
}

func SplitKey(key string) (string, string, error) {
s := strings.Split(key, ":")
if len(s) != 2 {
return "", "", fmt.Errorf("unable to split key '%s' in namespace and name", key)
}
return s[0], s[1], nil
}

func buildCallbackMap(key, value interface{}) *sync.Map {
c := &sync.Map{}
c.Store(key, value)
Expand Down
19 changes: 19 additions & 0 deletions pkg/cache/configmaps.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package cache

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

sharev1alpha1 "github.com/openshift/csi-driver-shared-resource/pkg/api/sharedresource/v1alpha1"
Expand Down Expand Up @@ -57,6 +60,22 @@ func GetConfigMap(key interface{}) *corev1.ConfigMap {
return nil
}

func SetConfigMap(kubeClient *kubernetes.Clientset, sharedDataKey string) error {
ns, name, err := SplitKey(sharedDataKey)
if err != nil {
return err
}

ctx := context.TODO()
cm, err := kubeClient.CoreV1().ConfigMaps(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}

UpsertConfigMap(cm)
return nil
}

func UpsertConfigMap(configmap *corev1.ConfigMap) {
key := GetKey(configmap)
klog.V(0).Infof("UpsertConfigMap key %s", key)
Expand Down
76 changes: 60 additions & 16 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -29,6 +30,9 @@ const (
type Controller struct {
kubeRestConfig *rest.Config

readOnly bool
kubeClient *kubernetes.Clientset

cfgMapWorkqueue workqueue.RateLimitingInterface
secretWorkqueue workqueue.RateLimitingInterface
shareWorkqueue workqueue.RateLimitingInterface
Expand All @@ -43,7 +47,9 @@ type Controller struct {
listers *client.Listers
}

func NewController(shareRelist time.Duration) (*Controller, error) {
// NewController instantiate a new controller with relisting interval, and optional read-only mode.
// Read only mode means the controller won't keep watching ConfigMaps and Secret for future changes.
func NewController(shareRelist time.Duration, readOnly bool) (*Controller, error) {
kubeRestConfig, err := client.GetConfig()
if err != nil {
return nil, err
Expand Down Expand Up @@ -112,46 +118,66 @@ func NewController(shareRelist time.Duration) (*Controller, error) {
shareRelist)

c := &Controller{
readOnly: readOnly,
kubeClient: kubeClient,
kubeRestConfig: kubeRestConfig,
cfgMapWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"shared-resource-configmap-changes"),
secretWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"shared-resource-secret-changes"),
shareWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"shared-resource-share-changes"),
informerFactory: informerFactory,
shareInformerFactory: shareInformerFactory,
cfgMapInformer: informerFactory.Core().V1().ConfigMaps().Informer(),
secInformer: informerFactory.Core().V1().Secrets().Informer(),
shareInformer: shareInformerFactory.Sharedresource().V1alpha1().Shares().Informer(),
listers: client.GetListers(),
}

client.SetConfigMapsLister(c.informerFactory.Core().V1().ConfigMaps().Lister())
client.SetSecretsLister(c.informerFactory.Core().V1().Secrets().Lister())
if !readOnly {
c.cfgMapWorkqueue = workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "projected-resource-configmap-changes")
c.secretWorkqueue = workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(), "projected-resource-secret-changes")
c.cfgMapInformer = informerFactory.Core().V1().ConfigMaps().Informer()
c.secInformer = informerFactory.Core().V1().Secrets().Informer()

client.SetConfigMapsLister(c.informerFactory.Core().V1().ConfigMaps().Lister())
client.SetSecretsLister(c.informerFactory.Core().V1().Secrets().Lister())
}

client.SetSharesLister(c.shareInformerFactory.Sharedresource().V1alpha1().Shares().Lister())

c.cfgMapInformer.AddEventHandler(c.configMapEventHandler())
c.secInformer.AddEventHandler(c.secretEventHandler())
if !readOnly {
c.cfgMapInformer.AddEventHandler(c.configMapEventHandler())
c.secInformer.AddEventHandler(c.secretEventHandler())
}
c.shareInformer.AddEventHandler(c.shareEventHandler())

return c, nil
}

func (c *Controller) Run(stopCh <-chan struct{}) error {
defer c.cfgMapWorkqueue.ShutDown()
defer c.secretWorkqueue.ShutDown()
if c.cfgMapWorkqueue != nil && c.secretWorkqueue != nil {
defer c.cfgMapWorkqueue.ShutDown()
defer c.secretWorkqueue.ShutDown()
}
defer c.shareWorkqueue.ShutDown()

c.informerFactory.Start(stopCh)
c.shareInformerFactory.Start(stopCh)

if !cache.WaitForCacheSync(stopCh, c.cfgMapInformer.HasSynced, c.secInformer.HasSynced, c.shareInformer.HasSynced) {
if c.cfgMapInformer != nil && !cache.WaitForCacheSync(stopCh, c.cfgMapInformer.HasSynced) {
return fmt.Errorf("failed to wait for ConfigMap informer cache to sync")
}
if c.secInformer != nil && !cache.WaitForCacheSync(stopCh, c.secInformer.HasSynced) {
return fmt.Errorf("failed to wait for Secrets informer cache to sync")
}
if !cache.WaitForCacheSync(stopCh, c.shareInformer.HasSynced) {
return fmt.Errorf("failed to wait for caches to sync")
}

go wait.Until(c.configMapEventProcessor, time.Second, stopCh)
go wait.Until(c.secretEventProcessor, time.Second, stopCh)
if c.cfgMapWorkqueue != nil {
go wait.Until(c.configMapEventProcessor, time.Second, stopCh)
}
if c.secretWorkqueue != nil {
go wait.Until(c.secretEventProcessor, time.Second, stopCh)
}
go wait.Until(c.shareEventProcessor, time.Second, stopCh)

<-stopCh
Expand Down Expand Up @@ -424,6 +450,24 @@ func (c *Controller) shareEventProcessor() {
}
}

func (c *Controller) getConfigMap(ctx context.Context, ns, name string) (*corev1.ConfigMap, error) {
opts := metav1.GetOptions{}
return c.kubeClient.CoreV1().ConfigMaps(ns).Get(ctx, name, opts)
}

func (c *Controller) getSecret(ctx context.Context, ns, name string) (*corev1.Secret, error) {
opts := metav1.GetOptions{}
return c.kubeClient.CoreV1().Secrets(ns).Get(ctx, name, opts)
}

func (c *Controller) manageBackingResource(share *sharev1alpha1.Share) {
switch share.Spec.BackingResource.Kind {
case "ConfigMap":
case "Secret":
default:
}
}

func (c *Controller) syncShare(event client.Event) error {
obj := event.Object.DeepCopyObject()
share, ok := obj.(*sharev1alpha1.Share)
Expand Down
25 changes: 24 additions & 1 deletion pkg/hostpath/hostpath.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

sharev1alpha1 "github.com/openshift/csi-driver-shared-resource/pkg/api/sharedresource/v1alpha1"
Expand All @@ -48,6 +49,8 @@ type hostPath struct {
ns *nodeServer

root string

kubeClient *kubernetes.Clientset
}

var (
Expand Down Expand Up @@ -101,7 +104,7 @@ type HostPathDriver interface {
mapVolumeToPod(hpv *hostPathVolume) error
}

func NewHostPathDriver(root, volMapRoot, driverName, nodeID, endpoint string, maxVolumesPerNode int64, version string) (*hostPath, error) {
func NewHostPathDriver(root, volMapRoot, driverName, nodeID, endpoint string, maxVolumesPerNode int64, version string, kubeClient *kubernetes.Clientset) (*hostPath, error) {
if driverName == "" {
return nil, errors.New("no driver name provided")
}
Expand Down Expand Up @@ -135,6 +138,7 @@ func NewHostPathDriver(root, volMapRoot, driverName, nodeID, endpoint string, ma
endpoint: endpoint,
maxVolumesPerNode: maxVolumesPerNode,
root: root,
kubeClient: kubeClient,
}

volMapOnDiskPath = filepath.Join(volMapRoot, VolumeMapFile)
Expand Down Expand Up @@ -524,8 +528,27 @@ func mapBackingResourceToPod(hpv *hostPathVolume) error {
return nil
}

func (hp *hostPath) populateCache(hpv *hostPathVolume) error {
klog.V(4).Infof("populating object-cache before creating mount scaffold")
switch strings.TrimSpace(hpv.GetSharedDataKind()) {
case "ConfigMap":
return objcache.SetConfigMap(hp.kubeClient, hpv.GetSharedDataKey())
case "Secret":
default:
return fmt.Errorf("invalid share backing resource kind %s", hpv.GetSharedDataKind())
}
return nil
}

func (hp *hostPath) mapVolumeToPod(hpv *hostPathVolume) error {
klog.V(4).Infof("mapVolumeToPod calling mapBackingResourceToPod")

if hp.kubeClient != nil {
if err := hp.populateCache(hpv); err != nil {
return err
}
}

err := mapBackingResourceToPod(hpv)
if err != nil {
return err
Expand Down
12 changes: 12 additions & 0 deletions pkg/hostpath/hpv.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ func (hpv *hostPathVolume) GetTargetPath() string {
defer hpv.Lock.Unlock()
return hpv.TargetPath
}

// TODO: document!
func (hpv *hostPathVolume) GetTargetFullPath() string {
hpv.Lock.Lock()
defer hpv.Lock.Unlock()

if hpv.ReadOnly {
return hpv.GetVolPathBindMountDir()
}
return hpv.GetTargetPath()
}

func (hpv *hostPathVolume) GetSharedDataKey() string {
hpv.Lock.Lock()
defer hpv.Lock.Unlock()
Expand Down

0 comments on commit bc99e2e

Please sign in to comment.