Permalink
Comparing changes
Open a pull request
- 2 commits
- 6 files changed
- 0 commit comments
- 1 contributor
Unified
Split
Showing
with
210 additions
and 97 deletions.
- +16 −3 cmd/controller/controller.go
- +1 −1 manifests/controller/cdi-controller-deployment.yaml
- +14 −8 pkg/common/common.go
- +124 −42 pkg/controller/controller.go
- +12 −18 pkg/controller/controller_test.go
- +43 −25 pkg/controller/util.go
| @@ -8,6 +8,7 @@ import ( | ||
| "github.com/golang/glog" | ||
| "github.com/kubevirt/containerized-data-importer/pkg/common" | ||
| "github.com/kubevirt/containerized-data-importer/pkg/controller" | ||
| "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/client-go/informers" | ||
| "k8s.io/client-go/kubernetes" | ||
| "k8s.io/client-go/tools/clientcmd" | ||
| @@ -17,6 +18,7 @@ var ( | ||
| configPath string | ||
| masterURL string | ||
| importerImage string | ||
| pullPolicy string | ||
| ) | ||
| // The optional importer image is obtained here along with the supported flags. | ||
| @@ -30,11 +32,16 @@ func init() { | ||
| flag.StringVar(&configPath, "kubeconfig", os.Getenv("KUBECONFIG"), "(Optional) Overrides $KUBECONFIG") | ||
| flag.StringVar(&masterURL, "server", "", "(Optional) URL address of a remote api server. Do not set for local clusters.") | ||
| flag.Parse() | ||
| // env variables | ||
| importerImage = os.Getenv(IMPORTER_IMAGE) | ||
| if importerImage == "" { | ||
| importerImage = common.IMPORTER_DEFAULT_IMAGE | ||
| } | ||
| pullPolicy = common.IMPORTER_DEFAULT_PULL_POLICY | ||
| if pp := os.Getenv(common.IMPORTER_PULL_POLICY); len(pp) != 0 { | ||
| pullPolicy = pp | ||
| } | ||
| glog.Infof("init: complete: CDI controller will create the %q version of the importer\n", importerImage) | ||
| } | ||
| @@ -47,10 +54,16 @@ func main() { | ||
| if err != nil { | ||
| glog.Fatalf("Error getting kube client: %v\n", err) | ||
| } | ||
| informerFactory := informers.NewSharedInformerFactory(client, common.DEFAULT_RESYNC_PERIOD) | ||
| pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer() | ||
| cdiController, err := controller.NewController(client, pvcInformer, importerImage) | ||
| pvcInformerFactory := informers.NewSharedInformerFactory(client, common.DEFAULT_RESYNC_PERIOD) | ||
| podInformerFactory := informers.NewFilteredSharedInformerFactory(client, common.DEFAULT_RESYNC_PERIOD, "", func(options *v1.ListOptions) { | ||
| options.LabelSelector = common.CDI_LABEL_SELECTOR | ||
| }) | ||
| pvcInformer := pvcInformerFactory.Core().V1().PersistentVolumeClaims().Informer() | ||
| podInformer := podInformerFactory.Core().V1().Pods().Informer() | ||
| cdiController, err := controller.NewController(client, pvcInformer, podInformer, importerImage, pullPolicy) | ||
| if err != nil { | ||
| glog.Fatal("Error creating CDI controller: %v", err) | ||
| } | ||
| @@ -17,4 +17,4 @@ spec: | ||
| containers: | ||
| - name: cdi-controller | ||
| image: kubevirt/cdi-controller:0.4.0-alpha.0 | ||
| imagePullPolicy: Always | ||
| imagePullPolicy: IfNotPresent | ||
| @@ -1,17 +1,21 @@ | ||
| package common | ||
| import "time" | ||
| import ( | ||
| "time" | ||
| "k8s.io/api/core/v1" | ||
| ) | ||
| // Common types and constants used by the importer and controller. | ||
| // TODO: maybe the vm cloner can use these common values | ||
| const ( | ||
| CDI_VERSION = "0.4.0-alpha.0" | ||
| CDI_VERSION = "0.4.0-alpha.0" | ||
| IMPORTER_DEFAULT_IMAGE = "docker.io/kubevirt/cdi-importer:" + CDI_VERSION | ||
| CDI_LABEL_KEY = "app" | ||
| CDI_LABEL_VALUE = "containerized-data-importer" | ||
| CDI_LABEL_SELECTOR = CDI_LABEL_KEY + "=" + CDI_LABEL_VALUE | ||
| CDI_LABEL_KEY = "app" | ||
| CDI_LABEL_VALUE = "containerized-data-importer" | ||
| CDI_LABEL_SELECTOR = CDI_LABEL_KEY + "=" + CDI_LABEL_VALUE | ||
| // host file constants: | ||
| IMPORTER_WRITE_DIR = "/data" | ||
| @@ -21,10 +25,12 @@ const ( | ||
| IMPORTER_PODNAME = "importer" | ||
| IMPORTER_DATA_DIR = "/data" | ||
| IMPORTER_S3_HOST = "s3.amazonaws.com" | ||
| IMPORTER_DEFAULT_PULL_POLICY = string(v1.PullIfNotPresent) | ||
| // env var names | ||
| IMPORTER_ENDPOINT = "IMPORTER_ENDPOINT" | ||
| IMPORTER_ACCESS_KEY_ID = "IMPORTER_ACCESS_KEY_ID" | ||
| IMPORTER_SECRET_KEY = "IMPORTER_SECRET_KEY" | ||
| IMPORTER_PULL_POLICY = "IMPORTER_PULL_POLICY" | ||
| IMPORTER_ENDPOINT = "IMPORTER_ENDPOINT" | ||
| IMPORTER_ACCESS_KEY_ID = "IMPORTER_ACCESS_KEY_ID" | ||
| IMPORTER_SECRET_KEY = "IMPORTER_SECRET_KEY" | ||
| // key names expected in credential secret | ||
| KeyAccess = "accessKeyId" | ||
| KeySecret = "secretKey" | ||
| @@ -7,6 +7,7 @@ import ( | ||
| "github.com/golang/glog" | ||
| "github.com/kubevirt/containerized-data-importer/pkg/common" | ||
| "k8s.io/api/core/v1" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/util/wait" | ||
| "k8s.io/client-go/kubernetes" | ||
| "k8s.io/client-go/tools/cache" | ||
| @@ -20,39 +21,58 @@ const ( | ||
| AnnImportPod = "kubevirt.io/storage.import.importPodName" | ||
| // importer pod annotations | ||
| AnnCreatedBy = "kubevirt.io/storage.createdByController" | ||
| AnnPodPhase = "kubevirt.io/storage.import.pod.phase" | ||
| ) | ||
| type Controller struct { | ||
| clientset kubernetes.Interface | ||
| queue workqueue.RateLimitingInterface | ||
| pvcInformer cache.SharedIndexInformer | ||
| importerImage string | ||
| clientset kubernetes.Interface | ||
| pvcQueue, podQueue workqueue.RateLimitingInterface | ||
| pvcInformer, podInformer cache.SharedIndexInformer | ||
| importerImage string | ||
| pullPolicy string // Options: IfNotPresent, Always, or Never | ||
| } | ||
| func NewController(client kubernetes.Interface, pvcInformer cache.SharedIndexInformer, importerImage string) (*Controller, error) { | ||
| queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) | ||
| func NewController(client kubernetes.Interface, pvcInformer, podInformer cache.SharedIndexInformer, importerImage string, pullPolicy string) (*Controller, error) { | ||
| c := &Controller{ | ||
| clientset: client, | ||
| queue: queue, | ||
| pvcQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), | ||
| podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), | ||
| pvcInformer: pvcInformer, | ||
| podInformer: podInformer, | ||
| importerImage: importerImage, | ||
| pullPolicy: pullPolicy, | ||
| } | ||
| // Bind the Index/Informer to the queue only for new pvcs | ||
| // Bind the pvc SharedIndexInformer to the pvc queue | ||
| c.pvcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
| AddFunc: func(obj interface{}) { | ||
| key, err := cache.MetaNamespaceKeyFunc(obj) | ||
| if err == nil { | ||
| queue.AddRateLimited(key) | ||
| c.pvcQueue.AddRateLimited(key) | ||
| } | ||
| }, | ||
| // this is triggered by an update or it will also be | ||
| // be triggered periodically even if no changes were made. | ||
| UpdateFunc: func(old, new interface{}) { | ||
| key, err := cache.MetaNamespaceKeyFunc(new) | ||
| if err == nil { | ||
| queue.AddRateLimited(key) | ||
| c.pvcQueue.AddRateLimited(key) | ||
| } | ||
| }, | ||
| }) | ||
| // Bind the pod SharedIndexInformer to the pod queue | ||
| c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||
| AddFunc: func(obj interface{}) { | ||
| key, err := cache.MetaNamespaceKeyFunc(obj) | ||
| if err == nil { | ||
| c.podQueue.AddRateLimited(key) | ||
| } | ||
| }, | ||
| UpdateFunc: func(oldObj, newObj interface{}) { | ||
| key, err := cache.MetaNamespaceKeyFunc(newObj) | ||
| if err == nil { | ||
| c.podQueue.AddRateLimited(key) | ||
| } | ||
| }, | ||
| }) | ||
| @@ -61,76 +81,130 @@ func NewController(client kubernetes.Interface, pvcInformer cache.SharedIndexInf | ||
| } | ||
| func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { | ||
| defer c.queue.ShutDown() | ||
| defer func() { | ||
| c.pvcQueue.ShutDown() | ||
| c.podQueue.ShutDown() | ||
| }() | ||
| glog.Infoln("Starting CDI controller loop") | ||
| if threadiness < 1 { | ||
| return fmt.Errorf("controller.Run: expected >0 threads, got %d", threadiness) | ||
| } | ||
| go c.pvcInformer.Run(stopCh) | ||
| go c.podInformer.Run(stopCh) | ||
| if !cache.WaitForCacheSync(stopCh, c.pvcInformer.HasSynced) { | ||
| return fmt.Errorf("controller.Run: Timeout waiting for cache sync") | ||
| return fmt.Errorf("controller.Run: Timeout waiting for pvc cache sync") | ||
| } | ||
| if !cache.WaitForCacheSync(stopCh, c.podInformer.HasSynced) { | ||
| return fmt.Errorf("controller.Run: Timeout waiting for pod cache sync") | ||
| } | ||
| glog.Infoln("Controller cache has synced") | ||
| for i := 0; i < threadiness; i++ { | ||
| go wait.Until(c.runWorkers, time.Second, stopCh) | ||
| go wait.Until(c.runPVCWorkers, time.Second, stopCh) | ||
| go wait.Until(c.runPodWorkers, time.Second, stopCh) | ||
| } | ||
| <-stopCh | ||
| return nil | ||
| } | ||
| func (c *Controller) runWorkers() { | ||
| for c.ProcessNextItem() { | ||
| func (c *Controller) runPodWorkers() { | ||
| for c.ProcessNextPodItem() { | ||
| } | ||
| } | ||
| func (c *Controller) runPVCWorkers() { | ||
| for c.ProcessNextPvcItem() { | ||
| } | ||
| } | ||
| func (c *Controller) ProcessNextPodItem() bool { | ||
| key, shutdown := c.podQueue.Get() | ||
| if shutdown { | ||
| return false | ||
| } | ||
| defer c.podQueue.Done(key) | ||
| pod, err := c.podFromKey(key) | ||
| if err != nil { | ||
| c.forgetKey(fmt.Sprintf("Unable to get pod object: %v", err), key) | ||
| return true | ||
| } | ||
| if ! metav1.HasAnnotation(pod.ObjectMeta, AnnCreatedBy) { | ||
| c.forgetKey("Pod does not have annotation "+AnnCreatedBy, key) | ||
| return true | ||
| } | ||
| if err := c.processPodItem(pod); err == nil { | ||
| c.forgetKey(fmt.Sprintf("Processing Pod %q completed", pod.Name), key) | ||
| } | ||
| return true | ||
| } | ||
| func (c *Controller) processPodItem(pod *v1.Pod) error { | ||
| glog.Infof("processPodItem: processing pod named %q\n", pod.Name) | ||
| // First get the pod's CDI-relative pvc name | ||
| var pvcKey string | ||
| for _, vol := range pod.Spec.Volumes { | ||
| if vol.Name == DataVolName { | ||
| glog.Infof("processPodItem: Pod has volume matching CDI claim") | ||
| pvcKey = fmt.Sprintf("%s/%s", pod.Namespace, vol.PersistentVolumeClaim.ClaimName) | ||
| break | ||
| } | ||
| } | ||
| if len(pvcKey) == 0 { | ||
| // For some reason, no pvc matching the volume name was found. | ||
| return fmt.Errorf("processPodItem: Pod does not contain volume %q", DataVolName) | ||
| } | ||
| glog.Infof("processPodItem: Getting PVC object for key %q", pvcKey) | ||
| pvc, err := c.pvcFromKey(pvcKey) | ||
| if err != nil { | ||
| return fmt.Errorf("processPodItem: error getting pvc from key: %v", err) | ||
| } | ||
| err = c.setPVCAnnotation(pvc, AnnPodPhase, string(pod.Status.Phase)) | ||
| if err != nil { | ||
| return fmt.Errorf("processPodItem: error setting PVC annotation: %v", err) | ||
| } | ||
| glog.Infof("processPodItem: Pod phase %q annotated in PVC %q", pod.Status.Phase, pvcKey) | ||
| return nil | ||
| } | ||
| // Select pvcs with AnnEndpoint | ||
| // Note: only new and updated pvcs will trigger an add to the work queue, Deleted pvcs | ||
| // are ignored. | ||
| func (c *Controller) ProcessNextItem() bool { | ||
| key, shutdown := c.queue.Get() | ||
| func (c *Controller) ProcessNextPvcItem() bool { | ||
| key, shutdown := c.pvcQueue.Get() | ||
| if shutdown { | ||
| return false | ||
| } | ||
| defer c.queue.Done(key) | ||
| defer c.pvcQueue.Done(key) | ||
| pvc, err := c.pvcFromKey(key) | ||
| if pvc == nil { | ||
| return c.forgetKey("", key) | ||
| } | ||
| glog.Infof("processNextPVCItem: next pvc to process: %s\n", key) | ||
| if err != nil { | ||
| return c.forgetKey(fmt.Sprintf("processNextItem: error converting key to pvc: %v", err), key) | ||
| return c.forgetKey(fmt.Sprintf("processNextPVCItem: error converting key to pvc: %v", err), key) | ||
| } | ||
| // check to see if we have our endpoint and we are not already processing this pvc | ||
| if !c.checkIfShouldQueuePVC(pvc, "processNextItem") { | ||
| return c.forgetKey(fmt.Sprintf("processNextItem: annotation %q not found or pvc %s is already being worked, skipping pvc\n", AnnEndpoint, pvc.Name), key) | ||
| if !c.checkIfShouldProcessPVC(pvc, "processNextPVCItem") { | ||
| return c.forgetKey(fmt.Sprintf("processNextPVCItem: annotation %q not found or pvc %s is already being worked, skipping pvc\n", AnnEndpoint, pvc.Name), key) | ||
| } | ||
| glog.Infof("processNextItem: next pvc to process: %s\n", key) | ||
| glog.Infof("processNextPVCItem: next pvc to process: %s\n", key) | ||
| // all checks have passed, let's process it! | ||
| if err := c.processItem(pvc); err == nil { | ||
| if err := c.processPvcItem(pvc); err == nil { | ||
| // If the proceess succeeds, we're done operating on this key; remove it from the queue | ||
| return c.forgetKey(fmt.Sprintf("processNextItem: error processing key %q: %v", key, err), key) | ||
| } | ||
| return true | ||
| } | ||
| func (c *Controller) forgetKey(msg string, key interface{}) bool { | ||
| if len(msg) > 0 { | ||
| glog.Info(msg) | ||
| return c.forgetKey(fmt.Sprintf("Processing PVC %q completed", key), key) | ||
| } | ||
| c.queue.Forget(key) | ||
| return true | ||
| } | ||
| // Create the importer pod with the pvc and optional secret. | ||
| func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error { | ||
| func (c *Controller) processPvcItem(pvc *v1.PersistentVolumeClaim) error { | ||
| e := func(err error, s string) error { | ||
| if s == "" { | ||
| return fmt.Errorf("processItem: %v\n", err) | ||
| return fmt.Errorf("processPvcItem: %v\n", err) | ||
| } | ||
| return fmt.Errorf("processItem: %s: %v\n", s, err) | ||
| return fmt.Errorf("processPvcItem: %s: %v\n", s, err) | ||
| } | ||
| ep, err := getEndpoint(pvc) | ||
| @@ -142,12 +216,12 @@ func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error { | ||
| return e(err, "") | ||
| } | ||
| if secretName == "" { | ||
| glog.Infof("processItem: no secret will be supplied to endpoint %q\n", ep) | ||
| glog.Infof("processPvcItem: no secret will be supplied to endpoint %q\n", ep) | ||
| } | ||
| // check our existing pvc one more time to ensure we should be working on it | ||
| // and to help mitigate any unforeseen race conditions. | ||
| if !c.checkIfShouldQueuePVC(pvc, "processItem") { | ||
| if !c.checkIfShouldProcessPVC(pvc, "processItem") { | ||
| return e(nil, "pvc is already being processed") | ||
| } | ||
| @@ -156,7 +230,7 @@ func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error { | ||
| if err != nil { | ||
| return e(err, "create pod") | ||
| } | ||
| err = c.setAnnoImportPod(pvc, pod.Name) | ||
| err = c.setPVCAnnotation(pvc, AnnImportPod, pod.Name) | ||
| if err != nil { | ||
| return e(err, "set annotation") | ||
| } | ||
| @@ -174,3 +248,11 @@ func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error { | ||
| } | ||
| return nil | ||
| } | ||
| func (c *Controller) forgetKey(msg string, key interface{}) bool { | ||
| if len(msg) > 0 { | ||
| glog.Info(msg) | ||
| } | ||
| c.pvcQueue.Forget(key) | ||
| return true | ||
| } | ||
Oops, something went wrong.