Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
  • 2 commits
  • 4 files changed
  • 0 commit comments
  • 2 contributors
Commits on May 03, 2018
Add pod informer to controller
Start pod informer

also shutodwn podQueue

Added getting the PVC from pod's volume list using a global const for vol name

Generalized set-annotation func, added pod to pvc status writes

Use Filtered IndexInformer

Filter Pods by Label, examine all cluster PVCs

Resource dequeuing funcs return true unless key is malformed or processing succeeds

Pod correctly annotates PVC w/ status

Fix error message

Aligned log line with PVC func
Showing with 187 additions and 89 deletions.
  1. +10 −3 cmd/controller/controller.go
  2. +123 −44 pkg/controller/controller.go
  3. +12 −18 pkg/controller/controller_test.go
  4. +42 −24 pkg/controller/util.go
@@ -8,6 +8,7 @@ import (
"github.com/golang/glog"
"github.com/kubevirt/containerized-data-importer/pkg/common"
"github.com/kubevirt/containerized-data-importer/pkg/controller"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
@@ -53,10 +54,16 @@ func main() {
if err != nil {
glog.Fatalf("Error getting kube client: %v\n", err)
}
informerFactory := informers.NewSharedInformerFactory(client, common.DEFAULT_RESYNC_PERIOD)
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims().Informer()
cdiController, err := controller.NewController(client, pvcInformer, importerImage, pullPolicy)
pvcInformerFactory := informers.NewSharedInformerFactory(client, common.DEFAULT_RESYNC_PERIOD)
podInformerFactory := informers.NewFilteredSharedInformerFactory(client, common.DEFAULT_RESYNC_PERIOD, "", func(options *v1.ListOptions) {
options.LabelSelector = common.CDI_LABEL_SELECTOR
})
pvcInformer := pvcInformerFactory.Core().V1().PersistentVolumeClaims().Informer()
podInformer := podInformerFactory.Core().V1().Pods().Informer()
cdiController, err := controller.NewController(client, pvcInformer, podInformer, importerImage, pullPolicy)
if err != nil {
glog.Fatal("Error creating CDI controller: %v", err)
}
@@ -7,6 +7,7 @@ import (
"github.com/golang/glog"
"github.com/kubevirt/containerized-data-importer/pkg/common"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
@@ -20,42 +21,58 @@ const (
AnnImportPod = "kubevirt.io/storage.import.importPodName"
// importer pod annotations
AnnCreatedBy = "kubevirt.io/storage.createdByController"
AnnPodPhase = "kubevirt.io/storage.import.pod.phase"
)
type Controller struct {
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
pvcInformer cache.SharedIndexInformer
importerImage string
pullPolicy string // Options: IfNotPresent, Always, or Never
clientset kubernetes.Interface
pvcQueue, podQueue workqueue.RateLimitingInterface
pvcInformer, podInformer cache.SharedIndexInformer
importerImage string
pullPolicy string // Options: IfNotPresent, Always, or Never
}
func NewController(client kubernetes.Interface, pvcInformer cache.SharedIndexInformer, importerImage string, pullPolicy string) (*Controller, error) {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
func NewController(client kubernetes.Interface, pvcInformer, podInformer cache.SharedIndexInformer, importerImage string, pullPolicy string) (*Controller, error) {
c := &Controller{
clientset: client,
queue: queue,
pvcQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
podQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
pvcInformer: pvcInformer,
podInformer: podInformer,
importerImage: importerImage,
pullPolicy: pullPolicy,
}
// Bind the Index/Informer to the queue only for new pvcs
// Bind the pvc SharedIndexInformer to the pvc queue
c.pvcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.AddRateLimited(key)
c.pvcQueue.AddRateLimited(key)
}
},
// this is triggered by an update or it will also be
// be triggered periodically even if no changes were made.
UpdateFunc: func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.AddRateLimited(key)
c.pvcQueue.AddRateLimited(key)
}
},
})
// Bind the pod SharedIndexInformer to the pod queue
c.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
c.podQueue.AddRateLimited(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
c.podQueue.AddRateLimited(key)
}
},
})
@@ -64,76 +81,130 @@ func NewController(client kubernetes.Interface, pvcInformer cache.SharedIndexInf
}
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer c.queue.ShutDown()
defer func() {
c.pvcQueue.ShutDown()
c.podQueue.ShutDown()
}()
glog.Infoln("Starting CDI controller loop")
if threadiness < 1 {
return fmt.Errorf("controller.Run: expected >0 threads, got %d", threadiness)
}
go c.pvcInformer.Run(stopCh)
go c.podInformer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.pvcInformer.HasSynced) {
return fmt.Errorf("controller.Run: Timeout waiting for cache sync")
return fmt.Errorf("controller.Run: Timeout waiting for pvc cache sync")
}
if !cache.WaitForCacheSync(stopCh, c.podInformer.HasSynced) {
return fmt.Errorf("controller.Run: Timeout waiting for pod cache sync")
}
glog.Infoln("Controller cache has synced")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorkers, time.Second, stopCh)
go wait.Until(c.runPVCWorkers, time.Second, stopCh)
go wait.Until(c.runPodWorkers, time.Second, stopCh)
}
<-stopCh
return nil
}
func (c *Controller) runWorkers() {
for c.ProcessNextItem() {
func (c *Controller) runPodWorkers() {
for c.ProcessNextPodItem() {
}
}
func (c *Controller) runPVCWorkers() {
for c.ProcessNextPvcItem() {
}
}
func (c *Controller) ProcessNextPodItem() bool {
key, shutdown := c.podQueue.Get()
if shutdown {
return false
}
defer c.podQueue.Done(key)
pod, err := c.podFromKey(key)
if err != nil {
c.forgetKey(fmt.Sprintf("Unable to get pod object: %v", err), key)
return true
}
if ! metav1.HasAnnotation(pod.ObjectMeta, AnnCreatedBy) {
c.forgetKey("Pod does not have annotation "+AnnCreatedBy, key)
return true
}
if err := c.processPodItem(pod); err == nil {
c.forgetKey(fmt.Sprintf("Processing Pod %q completed", pod.Name), key)
}
return true
}
func (c *Controller) processPodItem(pod *v1.Pod) error {
glog.Infof("processPodItem: processing pod named %q\n", pod.Name)
// First get the pod's CDI-relative pvc name
var pvcKey string
for _, vol := range pod.Spec.Volumes {
if vol.Name == DataVolName {
glog.Infof("processPodItem: Pod has volume matching CDI claim")
pvcKey = fmt.Sprintf("%s/%s", pod.Namespace, vol.PersistentVolumeClaim.ClaimName)
break
}
}
if len(pvcKey) == 0 {
// For some reason, no pvc matching the volume name was found.
return fmt.Errorf("processPodItem: Pod does not contain volume %q", DataVolName)
}
glog.Infof("processPodItem: Getting PVC object for key %q", pvcKey)
pvc, err := c.pvcFromKey(pvcKey)
if err != nil {
return fmt.Errorf("processPodItem: error getting pvc from key: %v", err)
}
err = c.setPVCAnnotation(pvc, AnnPodPhase, string(pod.Status.Phase))
if err != nil {
return fmt.Errorf("processPodItem: error setting PVC annotation: %v", err)
}
glog.Infof("processPodItem: Pod phase %q annotated in PVC %q", pod.Status.Phase, pvcKey)
return nil
}
// Select pvcs with AnnEndpoint
// Note: only new and updated pvcs will trigger an add to the work queue, Deleted pvcs
// are ignored.
func (c *Controller) ProcessNextItem() bool {
key, shutdown := c.queue.Get()
func (c *Controller) ProcessNextPvcItem() bool {
key, shutdown := c.pvcQueue.Get()
if shutdown {
return false
}
defer c.queue.Done(key)
defer c.pvcQueue.Done(key)
pvc, err := c.pvcFromKey(key)
if pvc == nil {
return c.forgetKey("", key)
}
glog.Infof("processNextPVCItem: next pvc to process: %s\n", key)
if err != nil {
return c.forgetKey(fmt.Sprintf("processNextItem: error converting key to pvc: %v", err), key)
return c.forgetKey(fmt.Sprintf("processNextPVCItem: error converting key to pvc: %v", err), key)
}
// check to see if we have our endpoint and we are not already processing this pvc
if !c.checkIfShouldQueuePVC(pvc, "processNextItem") {
return c.forgetKey(fmt.Sprintf("processNextItem: annotation %q not found or pvc %s is already being worked, skipping pvc\n", AnnEndpoint, pvc.Name), key)
if !c.checkIfShouldProcessPVC(pvc, "processNextPVCItem") {
return c.forgetKey(fmt.Sprintf("processNextPVCItem: annotation %q not found or pvc %s is already being worked, skipping pvc\n", AnnEndpoint, pvc.Name), key)
}
glog.Infof("processNextItem: next pvc to process: %s\n", key)
glog.Infof("processNextPVCItem: next pvc to process: %s\n", key)
// all checks have passed, let's process it!
if err := c.processItem(pvc); err == nil {
if err := c.processPvcItem(pvc); err == nil {
// If the proceess succeeds, we're done operating on this key; remove it from the queue
return c.forgetKey(fmt.Sprintf("processNextItem: error processing key %q: %v", key, err), key)
}
return true
}
func (c *Controller) forgetKey(msg string, key interface{}) bool {
if len(msg) > 0 {
glog.Info(msg)
return c.forgetKey(fmt.Sprintf("Processing PVC %q completed", key), key)
}
c.queue.Forget(key)
return true
}
// Create the importer pod with the pvc and optional secret.
func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error {
func (c *Controller) processPvcItem(pvc *v1.PersistentVolumeClaim) error {
e := func(err error, s string) error {
if s == "" {
return fmt.Errorf("processItem: %v\n", err)
return fmt.Errorf("processPvcItem: %v\n", err)
}
return fmt.Errorf("processItem: %s: %v\n", s, err)
return fmt.Errorf("processPvcItem: %s: %v\n", s, err)
}
ep, err := getEndpoint(pvc)
@@ -145,12 +216,12 @@ func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error {
return e(err, "")
}
if secretName == "" {
glog.Infof("processItem: no secret will be supplied to endpoint %q\n", ep)
glog.Infof("processPvcItem: no secret will be supplied to endpoint %q\n", ep)
}
// check our existing pvc one more time to ensure we should be working on it
// and to help mitigate any unforeseen race conditions.
if !c.checkIfShouldQueuePVC(pvc, "processItem") {
if !c.checkIfShouldProcessPVC(pvc, "processItem") {
return e(nil, "pvc is already being processed")
}
@@ -159,7 +230,7 @@ func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error {
if err != nil {
return e(err, "create pod")
}
err = c.setAnnoImportPod(pvc, pod.Name)
err = c.setPVCAnnotation(pvc, AnnImportPod, pod.Name)
if err != nil {
return e(err, "set annotation")
}
@@ -177,3 +248,11 @@ func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error {
}
return nil
}
func (c *Controller) forgetKey(msg string, key interface{}) bool {
if len(msg) > 0 {
glog.Info(msg)
}
c.pvcQueue.Forget(key)
return true
}
@@ -15,7 +15,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
k8stesting "k8s.io/client-go/tools/cache/testing"
"k8s.io/client-go/util/workqueue"
)
type operation int
@@ -42,31 +41,25 @@ var _ = Describe("Controller", func() {
}
setUpInformer := func(pvc *v1.PersistentVolumeClaim, op operation) {
// build queue value of ns + "/" + pvc name if exists
ns := pvc.Namespace
name := pvc.Name
queueKey := name
if len(ns) > 0 {
queueKey = fmt.Sprintf("%s/%s", ns, name)
}
stop = make(chan struct{})
fakeClient = fake.NewSimpleClientset()
objSource := k8stesting.NewFakeControllerSource()
pvcInformer := cache.NewSharedIndexInformer(objSource, pvc, common.DEFAULT_RESYNC_PERIOD, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
fakeClient = fake.NewSimpleClientset(pvc)
pvcSource := k8stesting.NewFakePVCControllerSource()
podSource := k8stesting.NewFakeControllerSource()
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
pvcInformer := cache.NewSharedIndexInformer(pvcSource, pvc, common.DEFAULT_RESYNC_PERIOD, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podInformer := cache.NewSharedIndexInformer(podSource, &v1.Pod{}, common.DEFAULT_RESYNC_PERIOD, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
var err error // declare err here to prevent shadowing `controller`, declared in the outer block
controller, err = NewController(fakeClient, pvcInformer, common.IMPORTER_DEFAULT_IMAGE, common.IMPORTER_DEFAULT_PULL_POLICY)
controller, err = NewController(fakeClient, pvcInformer, podInformer, common.IMPORTER_DEFAULT_IMAGE, common.IMPORTER_DEFAULT_PULL_POLICY)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("setupInformer failed to create controller: %v", err))
if op == opAdd || op == opUpdate {
objSource.Add(pvc)
queue.Add(queueKey)
pvcSource.Add(pvc)
}
go pvcInformer.Run(stop)
go podInformer.Run(stop)
Expect(cache.WaitForCacheSync(stop, pvcInformer.HasSynced)).To(BeTrue())
Expect(cache.WaitForCacheSync(stop, podInformer.HasSynced)).To(BeTrue())
}
AfterEach(func() {
@@ -131,9 +124,10 @@ var _ = Describe("Controller", func() {
It(test.descr, func() {
By(fmt.Sprintf("creating in-mem pvc %q with endpt anno=%q", fullname, annotations))
pvcObj := createInMemPVC(ns, pvcName, annotations)
By("invoking the controller")
By("Creating the controller")
setUpInformer(pvcObj, ops)
controller.ProcessNextItem()
controller.ProcessNextPvcItem()
//controller.ProcessNextPodItem()
By("checking if importer pod is present")
pod, err := getImporterPod(fakeClient, ns, exptPod)
if exptErr {
Oops, something went wrong.

No commit comments for this range