Skip to content

Commit

Permalink
Merge pull request #137 from admiyo/job-watch
Browse files Browse the repository at this point in the history
Framework for watch/job unittest
  • Loading branch information
rmohr committed Mar 13, 2017
2 parents 2d814a3 + 31ca507 commit f8195fa
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 0 deletions.
5 changes: 5 additions & 0 deletions cmd/virt-controller/virt-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func main() {
go migrationController.Run(1, stop)
migrationController.WaitForSync(stop)

_, jobController := watch.NewJobController(vmService, nil, restClient)
jobController.StartInformer(stop)
go jobController.Run(1, stop)
jobController.WaitForSync(stop)

httpLogger := logger.With("service", "http")

httpLogger.Info().Log("action", "listening", "interface", *host, "port", *port)
Expand Down
69 changes: 69 additions & 0 deletions pkg/virt-controller/watch/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package watch

import (
kubeapi "k8s.io/client-go/pkg/api"
batchv1 "k8s.io/client-go/pkg/apis/batch/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/labels"
"k8s.io/client-go/pkg/util/workqueue"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
kvirtv1 "kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/virt-controller/services"
)

func migrationJobSelector() kubeapi.ListOptions {
fieldSelector := fields.Everything()
labelSelector, err := labels.Parse(kvirtv1.DomainLabel)
if err != nil {
panic(err)
}
return kubeapi.ListOptions{FieldSelector: fieldSelector, LabelSelector: labelSelector}
}

func NewJobController(vmService services.VMService, recorder record.EventRecorder, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) {
selector := migrationJobSelector()
lw := kubecli.NewListWatchFromClient(restClient, "jobs", kubeapi.NamespaceDefault, selector.FieldSelector, selector.LabelSelector)
return NewJobControllerWithListWatch(vmService, recorder, lw, restClient)
}

func NewJobControllerWithListWatch(vmService services.VMService, _ record.EventRecorder, lw cache.ListerWatcher, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) {

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return kubecli.NewController(lw, queue, &batchv1.Job{}, func(store cache.Store, queue workqueue.RateLimitingInterface) bool {
key, quit := queue.Get()
if quit {
return false
}
defer queue.Done(key)

// Fetch the latest Job state from cache
obj, exists, err := store.GetByKey(key.(string))

if err != nil {
queue.AddRateLimited(key)
return true
}
if exists {
var job *batchv1.Job = obj.(*batchv1.Job)

if job.Status.Succeeded < 1 {
//Job did not succeed, do not update the vm
return true
}

name := job.ObjectMeta.Labels["vmname"]
vm, err := vmService.FetchVM(name)
if err != nil {
//TODO proper error handling
queue.AddRateLimited(key)
return true
}
vm.Status.Phase = kvirtv1.Running
putVm(vm, restClient, nil)
}
return true
})
}
170 changes: 170 additions & 0 deletions pkg/virt-controller/watch/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package watch

import (
"net/http"

"github.com/facebookgo/inject"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/ghttp"
"k8s.io/client-go/kubernetes"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache/testing"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/virt-controller/services"

corev1 "k8s.io/client-go/pkg/api/v1"
batchv1 "k8s.io/client-go/pkg/apis/batch/v1"
kvirtv1 "kubevirt.io/kubevirt/pkg/api/v1"
)

var _ = Describe("Migration", func() {
var server *ghttp.Server
var stopChan chan struct{}
var jobController *kubecli.Controller
var lw *framework.FakeControllerSource
var jobCache cache.Store

var vmService services.VMService
var restClient *rest.RESTClient
var vm *kvirtv1.VM

logging.DefaultLogger().SetIOWriter(GinkgoWriter)

BeforeEach(func() {
var g inject.Graph
vmService = services.NewVMService()
server = ghttp.NewServer()
config := rest.Config{}
config.Host = server.URL()
clientSet, _ := kubernetes.NewForConfig(&config)
templateService, _ := services.NewTemplateService("kubevirt/virt-launcher")
restClient, _ = kubecli.GetRESTClientFromFlags(server.URL(), "")

g.Provide(
&inject.Object{Value: restClient},
&inject.Object{Value: clientSet},
&inject.Object{Value: vmService},
&inject.Object{Value: templateService},
)
g.Populate()

stopChan = make(chan struct{})
lw = framework.NewFakeControllerSource()
jobCache = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil)

_, jobController = NewJobControllerWithListWatch(vmService, nil, lw, restClient)

vm = kvirtv1.NewMinimalVM("test-vm")

// Start the controller
jobController.StartInformer(stopChan)
go jobController.Run(1, stopChan)
})

Context("Running job with out migration labels", func() {
It("should not attempt to update the VM", func(done Done) {

job := &batchv1.Job{}

// Register the expected REST call
//server.AppendHandlers()

// Tell the controller that there is a new Migration
lw.Add(job)

// Wait until we have processed the added item
finishController(jobController, stopChan)

Expect(len(server.ReceivedRequests())).To(Equal(0))
close(done)
}, 10)
})

Context("Running job with migration labels but no success", func() {
It("should ignore the the VM ", func(done Done) {

job := &batchv1.Job{
ObjectMeta: corev1.ObjectMeta{
Labels: map[string]string{
kvirtv1.DomainLabel: "something",
"vmname": vm.ObjectMeta.Name,
},
},
}

// No registered REST calls
//server.AppendHandlers()

// Tell the controller that there is a new Job
lw.Add(job)

// Wait until we have processed the added item
finishController(jobController, stopChan)

Expect(len(server.ReceivedRequests())).To(Equal(0))
close(done)
}, 10)
})

Context("Running job with migration labels and one success", func() {
It("should update the VM to Running", func(done Done) {

job := &batchv1.Job{
ObjectMeta: corev1.ObjectMeta{
Labels: map[string]string{
kvirtv1.DomainLabel: "something",
"vmname": vm.ObjectMeta.Name,
},
},
Status: batchv1.JobStatus{
Succeeded: 1,
Failed: 0,
Active: 0,
},
}

// Register the expected REST call
server.AppendHandlers(
handlerToFetchTestVM(vm),
handlerToUpdateTestVM(vm),
)

// Tell the controller that there is a new Job
lw.Add(job)
finishController(jobController, stopChan)
Expect(len(server.ReceivedRequests())).To(Equal(2))
close(done)
}, 10)
})

AfterEach(func() {
close(stopChan)
server.Close()
})
})

func handlerToFetchTestVM(vm *kvirtv1.VM) http.HandlerFunc {
return ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/apis/kubevirt.io/v1alpha1/namespaces/default/vms/"+vm.ObjectMeta.Name),
ghttp.RespondWithJSONEncoded(http.StatusOK, vm),
)
}

func handlerToUpdateTestVM(vm *kvirtv1.VM) http.HandlerFunc {
return ghttp.CombineHandlers(
ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/vms/"+vm.ObjectMeta.Name),
ghttp.RespondWithJSONEncoded(http.StatusOK, vm),
)
}

func finishController(jobController *kubecli.Controller, stopChan chan struct{}) {
// Wait until we have processed the added item

jobController.WaitForSync(stopChan)
jobController.ShutDownQueue()
jobController.WaitUntilDone()
}

0 comments on commit f8195fa

Please sign in to comment.