Skip to content

Commit

Permalink
Merge pull request #77 from gabemontero/redo-63
Browse files Browse the repository at this point in the history
BUILD-382: fix umount issue, cross volume coruption issue; complete read only on restart support
  • Loading branch information
openshift-merge-robot committed Dec 1, 2021
2 parents 32fffa0 + 88514b5 commit b34ea57
Show file tree
Hide file tree
Showing 29 changed files with 1,178 additions and 793 deletions.
35 changes: 25 additions & 10 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

sharev1clientset "github.com/openshift/client-go/sharedresource/clientset/versioned"
"github.com/openshift/csi-driver-shared-resource/pkg/client"
"github.com/openshift/csi-driver-shared-resource/pkg/config"
"github.com/openshift/csi-driver-shared-resource/pkg/controller"
Expand Down Expand Up @@ -41,7 +42,6 @@ var rootCmd = &cobra.Command{
Short: "",
Long: ``,
Run: func(cmd *cobra.Command, args []string) {
var kubeClient kubernetes.Interface
var err error

cfgManager := config.NewManager(cfgFilePath)
Expand All @@ -52,12 +52,20 @@ var rootCmd = &cobra.Command{
}

if !cfg.RefreshResources {
fmt.Println("Refresh-Resources disabled, loading a Kubernetes client for HostPathDriver")
fmt.Println("Refresh-Resources disabled")

if kubeClient, err = loadKubernetesClientset(); err != nil {
fmt.Printf("Failed to load Kubernetes API client: %s", err.Error())
os.Exit(1)
}
}
if kubeClient, err := loadKubernetesClientset(); err != nil {
fmt.Printf("Failed to load Kubernetes API client: %s", err.Error())
os.Exit(1)
} else {
client.SetClient(kubeClient)
}
if shareClient, err := loadSharedresourceClientset(); err != nil {
fmt.Printf("Failed to load SharedResource API client: %s", err.Error())
os.Exit(1)
} else {
client.SetShareClient(shareClient)
}

driver, err := hostpath.NewHostPathDriver(
Expand All @@ -68,14 +76,13 @@ var rootCmd = &cobra.Command{
endPoint,
maxVolumesPerNode,
version,
kubeClient,
)
if err != nil {
fmt.Printf("Failed to initialize driver: %s", err.Error())
os.Exit(1)
}

go runOperator(cfg)
go runOperator(cfg, driver)
go watchForConfigChanges(cfgManager)
driver.Run()
},
Expand Down Expand Up @@ -110,10 +117,18 @@ func loadKubernetesClientset() (kubernetes.Interface, error) {
return kubernetes.NewForConfig(kubeRestConfig)
}

func loadSharedresourceClientset() (sharev1clientset.Interface, error) {
kubeRestConfig, err := client.GetConfig()
if err != nil {
return nil, err
}
return sharev1clientset.NewForConfig(kubeRestConfig)
}

// runOperator based on the informed configuration, it will spawn and run the Controller, until
// trapping OS signals.
func runOperator(cfg *config.Config) {
c, err := controller.NewController(cfg.GetShareRelistInterval(), cfg.RefreshResources, cfg.IgnoredNamespaces)
func runOperator(cfg *config.Config, hp hostpath.HostPathDriver) {
c, err := controller.NewController(cfg.GetShareRelistInterval(), cfg.RefreshResources, cfg.IgnoredNamespaces, hp)
if err != nil {
fmt.Printf("Failed to set up controller: %s", err.Error())
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/spf13/pflag v1.0.5
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023
google.golang.org/grpc v1.31.0
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.22.1
k8s.io/apimachinery v0.22.1
k8s.io/client-go v0.22.1
Expand Down
93 changes: 21 additions & 72 deletions pkg/cache/configmaps.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package cache

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

sharev1alpha1 "github.com/openshift/api/sharedresource/v1alpha1"
"github.com/openshift/csi-driver-shared-resource/pkg/client"
)

/*
Expand All @@ -26,10 +24,6 @@ On the use of sync.Map, see the comments in share.go
*/

var (
// configmaps is our global configmap id (namespace + name) to configmap map, where entries are populated from
// controller events; it serves to facilitate quick lookup during share event processing, when the share references
// a configmap
configmaps = sync.Map{}
// configmapUpsertCallbacks has a key of the CSI volume ID and a value of the function to be called when a given
// configmap is updated, assuming the driver has mounted a share CSI volume with the configmap in a pod somewhere, and
// the corresponding storage on the pod gets updated by the function that is the value of the entry. Otherwise,
Expand All @@ -39,93 +33,48 @@ var (
// same thing as configmapUpsertCallbacks, but deletion of configmaps, and of of course the controller relist does not
// come into play here.
configmapDeleteCallbacks = sync.Map{}
// configmapsWithShares is a filtered list of configmaps where, via share events, we know at least one active share references
// a given configmap; when possible we range over this list vs. configsmaps
configmapsWithShares = sync.Map{}
// sharesWaitingOnConfigmaps conversely is for when a share has been created that references a configmap, but that
// configmap has not been recognized by the controller; quite possibly timing events on when we learn of sharedConfigMaps
// and configmaps if they happen to be created at roughly the same time come into play; also, if a pod with a share
// pointing to a configmap has been provisioned, but the the csi driver daemonset has been restarted, such timing
// of events where we learn of sharesConfigMaps before their configmaps can also occur, as we attempt to rebuild the CSI driver's
// state
sharesWaitingOnConfigmaps = sync.Map{}
)

// GetConfigMap retrieves a config map from the list of config maps referenced by SharedConfigMaps
func GetConfigMap(key interface{}) *corev1.ConfigMap {
obj, loaded := configmapsWithShares.Load(key)
if loaded {
cm, _ := obj.(*corev1.ConfigMap)
return cm
}
return nil
}

// SetConfigMap based on the shared-data-key, which contains the resource's namespace and name, this
// method can fetch and store it on cache. This method is called when the controller is not watching
// config maps, and the CSI driver must retrieve the config map when processing a NodePublishVolume call
// from the kubelet.
func SetConfigMap(kubeClient kubernetes.Interface, sharedDataKey string) error {
ns, name, err := SplitKey(sharedDataKey)
if err != nil {
return err
}

cm, err := kubeClient.CoreV1().ConfigMaps(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}

UpsertConfigMap(cm)
return nil
}

// UpsertConfigMap adds or updates as needed the config map to our various maps for correlating with SharedConfigMaps and
// calls registered upsert callbacks
func UpsertConfigMap(configmap *corev1.ConfigMap) {
key := GetKey(configmap)
klog.V(6).Infof("UpsertConfigMap key %s", key)
configmaps.Store(key, configmap)
// in case share arrived before configmap
processSharesWithoutConfigmaps := []string{}
sharesWaitingOnConfigmaps.Range(func(key, value interface{}) bool {
shareKey := key.(string)
share := value.(*sharev1alpha1.SharedConfigMap)
br := share.Spec.ConfigMapRef
configmapKey := BuildKey(br.Namespace, br.Name)
configmapsWithShares.Store(configmapKey, configmap)
//NOTE: share update ranger will store share in sharedConfigMaps sync.Map
// and we are supplying only this specific share to the csi driver update range callbacks.
shareConfigMapsUpdateCallbacks.Range(buildRanger(buildCallbackMap(share.Name, share)))
processSharesWithoutConfigmaps = append(processSharesWithoutConfigmaps, shareKey)
return true
})
for _, shareKey := range processSharesWithoutConfigmaps {
sharesWaitingOnConfigmaps.Delete(shareKey)
// first, find the shares pointing to this configmap, and call the callbacks, in case certain pods
// have had their permissions revoked; this will also handle if we had share events arrive before
// the corresponding configmap
sharecConfigMapList, err := client.GetListers().SharedConfigMaps.List(labels.Everything())
if err != nil {
klog.Warningf("error during UpsertConfigMap on shared configmaps lister list: %s", err.Error())
}
for _, share := range sharecConfigMapList {
if share.Spec.ConfigMapRef.Namespace == configmap.Namespace && share.Spec.ConfigMapRef.Name == configmap.Name {
shareSecretsUpdateCallbacks.Range(buildRanger(buildCallbackMap(share.Name, share)))
}
}
// otherwise process any share that arrived after the configmap
configmapsWithShares.Store(key, configmap)
configmapUpsertCallbacks.Range(buildRanger(buildCallbackMap(key, configmap)))
}

// DelConfigMap deletes this config map from the various configmap related maps
func DelConfigMap(configmap *corev1.ConfigMap) {
key := GetKey(configmap)
klog.V(4).Infof("DelConfigMap key %s", key)
configmaps.Delete(key)
configmapDeleteCallbacks.Range(buildRanger(buildCallbackMap(key, configmap)))
configmapsWithShares.Delete(key)
}

// RegisterConfigMapUpsertCallback will be called as part of the kubelet sending a mount CSI volume request for a pod;
// if the corresponding share references a configmap, then the function registered here will be called to possibly change
// storage
func RegisterConfigMapUpsertCallback(volID string, f func(key, value interface{}) bool) {
func RegisterConfigMapUpsertCallback(volID, cmID string, f func(key, value interface{}) bool) {
configmapUpsertCallbacks.Store(volID, f)
// cycle through the configmaps with sharedConfigMaps, where if the share associated with the volID CSI volume mount references
// one of the configmaps provided by the Range, the storage of the corresponding data on the pod will be completed using
// the supplied function
configmapsWithShares.Range(f)
ns, name, _ := SplitKey(cmID)
cm := client.GetConfigMap(ns, name)
if cm != nil {
f(BuildKey(cm.Namespace, cm.Name), cm)
} else {
klog.Warningf("not found on get configmap for %s vol %s", cmID, volID)
}
}

// UnregisterConfigMapUpsertCallback will be called as part of the kubelet sending a delete CSI volume request for a pod
Expand Down
96 changes: 23 additions & 73 deletions pkg/cache/secrets.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package cache

import (
"context"
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

sharev1alpha1 "github.com/openshift/api/sharedresource/v1alpha1"
corev1 "k8s.io/api/core/v1"
"github.com/openshift/csi-driver-shared-resource/pkg/client"
)

/*
Expand All @@ -27,10 +25,6 @@ On the use of sync.Map, see the comments in share.go
*/

var (
// secrets is our global configmap id (namespace + name) to secret map, where entries are populated from
// controller events; it serves to facilitate quick lookup during share event processing, when the share references
// a secret
secrets = sync.Map{}
// secretUpsertCallbacks has a key of the CSI volume ID and a value of the function to be called when a given
// secret is updated, assuming the driver has mounted a share CSI volume with the configmap in a pod somewhere, and
// the corresponding storage on the pod gets updated by the function that is the value of the entry. Otherwise,
Expand All @@ -40,93 +34,49 @@ var (
// same thing as secretUpsertCallbacks but deletion of secrets, and of of course the controller relist does not
// come into play here.
secretDeleteCallbacks = sync.Map{}
// secretsWithShare is a filtered list of secrets where, via share events, we know at least one active share references
// a given secret; when possible we range over this list vs. secrets
secretsWithShare = sync.Map{}
// sharesWaitingOnSecrets conversely is for when a share has been created that references a secret, but that
// secret has not been recognized by the controller; quite possibly timing events on when we learn of sharedSecrets
// and secret if they happen to be created at roughly the same time come into play; also, if a pod with a share
// pointing to a secret has been provisioned, but the the csi driver daemonset has been restarted, such timing
// of events where we learn of sharedSecrets before their secrets can also occur, as we attempt to rebuild the CSI driver's
// state
sharesWaitingOnSecrets = sync.Map{}
)

// GetSecret retrieves a secret from the list of secrets referenced by SharedSecrets
func GetSecret(key interface{}) *corev1.Secret {
obj, loaded := secretsWithShare.Load(key)
if loaded {
s, _ := obj.(*corev1.Secret)
return s
}
return nil
}

// SetSecret based on the shared-data-key, which contains the resource's namespace and name, this
// method can fetch and store it on cache. This method is called when the controller is not watching
// secrets, and the CSI driver must retrieve the secret when processing a NodePublishVolume call
// from the kubelet.
func SetSecret(kubeClient kubernetes.Interface, sharedDataKey string) error {
ns, name, err := SplitKey(sharedDataKey)
if err != nil {
return err
}

secret, err := kubeClient.CoreV1().Secrets(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}

UpsertSecret(secret)
return nil
}

// UpsertSecret adds or updates as needed the secret to our various maps for correlating with SharedSecrets and
// calls registered upsert callbacks
func UpsertSecret(secret *corev1.Secret) {
key := GetKey(secret)
klog.V(6).Infof("UpsertSecret key %s", key)
secrets.Store(key, secret)
// in case share arrived before secret
processedSharesWithoutSecrets := []string{}
sharesWaitingOnSecrets.Range(func(key, value interface{}) bool {
shareKey := key.(string)
share := value.(*sharev1alpha1.SharedSecret)
br := share.Spec.SecretRef
secretKey := BuildKey(br.Namespace, br.Name)
secretsWithShare.Store(secretKey, secret)
//NOTE: share update ranger will store share in sharedSecrets sync.Map
// and we are supplying only this specific share to the csi driver update range callbacks.
shareSecretsUpdateCallbacks.Range(buildRanger(buildCallbackMap(share.Name, share)))
processedSharesWithoutSecrets = append(processedSharesWithoutSecrets, shareKey)
return true
})
for _, shareKey := range processedSharesWithoutSecrets {
sharesWaitingOnSecrets.Delete(shareKey)
// first, find the shares pointing to this secret, and call the callbacks, in case certain pods
// have had their permissions revoked; this will also handle if we had share events arrive before
// the corresponding secret
sharedSecretsList, err := client.GetListers().SharedSecrets.List(labels.Everything())
if err != nil {
klog.Warningf("error during UpsertSecret on shared secrets lister list: %s", err.Error())
}
for _, share := range sharedSecretsList {
if share.Spec.SecretRef.Namespace == secret.Namespace && share.Spec.SecretRef.Name == secret.Name {
shareSecretsUpdateCallbacks.Range(buildRanger(buildCallbackMap(share.Name, share)))
}
}

// otherwise process any share that arrived after the secret
secretsWithShare.Store(key, secret)
secretUpsertCallbacks.Range(buildRanger(buildCallbackMap(key, secret)))
}

// DelSecret deletes this secret from the various secret related maps
func DelSecret(secret *corev1.Secret) {
key := GetKey(secret)
klog.V(4).Infof("DelSecret key %s", key)
secrets.Delete(key)
secretDeleteCallbacks.Range(buildRanger(buildCallbackMap(key, secret)))
secretsWithShare.Delete(key)
}

// RegisterSecretUpsertCallback will be called as part of the kubelet sending a mount CSI volume request for a pod;
// if the corresponding share references a secret, then the function registered here will be called to possibly change
// storage
func RegisterSecretUpsertCallback(volID string, f func(key, value interface{}) bool) {
func RegisterSecretUpsertCallback(volID, sID string, f func(key, value interface{}) bool) {
secretUpsertCallbacks.Store(volID, f)
// cycle through the secrets with sharedSecrets, where if the share associated with the volID CSI volume mount references
// one of the secrets provided by the Range, the storage of the corresponding data on the pod will be completed using
// the supplied function
secretsWithShare.Range(f)
ns, name, _ := SplitKey(sID)
s := client.GetSecret(ns, name)
if s != nil {
f(BuildKey(s.Namespace, s.Name), s)
} else {
klog.Warningf("not found on secret with key %s vol %s", sID, volID)
}
}

// UnregisterSecretUpsertCallback will be called as part of the kubelet sending a delete CSI volume request for a pod
Expand Down

0 comments on commit b34ea57

Please sign in to comment.