Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update VM state pre migration #127

Merged
merged 2 commits into from
Mar 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/kubecli/kubecli.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,9 @@ func (c *Controller) runWorker() {
func (c *Controller) WaitUntilDone() {
<-c.done
}

// Shut down the embedded queue. After the shutdown was issued, all items already in the queue will be processed but no
// new items will be accepted. It is possible to wait via #WaitUntilDone() until the last item was processed.
func (c *Controller) ShutDownQueue() {
c.queue.ShutDown()
}
49 changes: 38 additions & 11 deletions pkg/virt-controller/watch/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,49 @@ func NewPodControllerWithListWatch(vmCache cache.Store, _ record.EventRecorder,
vmCopy.ObjectMeta.Labels[corev1.NodeNameLabel] = pod.Spec.NodeName
vmCopy.Status.NodeName = pod.Spec.NodeName
// Update the VM
logger := logging.DefaultLogger().Object(vm)
if err := restClient.Put().Resource("vms").Body(&vmCopy).Name(vmCopy.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error(); err != nil {
logger.Error().Reason(err).Msg("Setting the VM to pending failed.")
if e, ok := err.(*errors.StatusError); ok {
if e.Status().Reason == metav1.StatusReasonNotFound {
// VM does not exist anymore, we don't have to retry
return true
}

}
logger.V(3).Info().Msg("Enqueuing VM initialization again.")
logger := logging.DefaultLogger()
if err := putVm(&vmCopy, restClient, queue); err != nil {
logger.V(3).Info().Msg("Enqueuing VM again.")
queue.AddRateLimited(key)
return true
}
logger.Info().Msgf("VM successfully scheduled to %s.", vmCopy.Status.NodeName)
} else if vm.Status.Phase == corev1.Running {
logger := logging.DefaultLogger()
obj, err := kubeapi.Scheme.Copy(vm)
if err != nil {
logger.Error().Reason(err).Msg("could not copy vm object")
queue.AddRateLimited(key)
return true
}
vmCopy := obj.(*corev1.VM)
vmCopy.Status.MigrationNodeName = pod.Spec.NodeName

if err := putVm(vmCopy, restClient, queue); err != nil {
logger.V(3).Info().Msg("Enqueuing VM again.")
queue.AddRateLimited(key)
return true
}
// TODO: the migration should be started here
logger.Info().Msgf("Scheduled VM migration to node %s.", vmCopy.Status.NodeName)
}
return true
})
}

// syncronously put updated VM object to API server.
func putVm(vm *corev1.VM, restClient *rest.RESTClient, queue workqueue.RateLimitingInterface) error {
logger := logging.DefaultLogger().Object(vm)
if err := restClient.Put().Resource("vms").Body(vm).Name(vm.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error(); err != nil {
logger.Error().Reason(err).Msg("Setting the VM state failed.")
if e, ok := err.(*errors.StatusError); ok {
if e.Status().Reason == metav1.StatusReasonNotFound {
// VM does not exist anymore, we don't have to retry
return nil
}

}
return err
}
return nil
}
78 changes: 66 additions & 12 deletions pkg/virt-controller/watch/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,30 @@ import (
var _ = Describe("Pod", func() {
var server *ghttp.Server
var stopChan chan struct{}
var vmCache cache.Store
var podController *kubecli.Controller
var lw *framework.FakeControllerSource

logging.DefaultLogger().SetIOWriter(GinkgoWriter)

BeforeEach(func() {
stopChan = make(chan struct{})
server = ghttp.NewServer()
// Wire a Pod controller with a fake source
restClient, err := kubecli.GetRESTClientFromFlags(server.URL(), "")
Expect(err).To(Not(HaveOccurred()))
vmCache = cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil)
lw = framework.NewFakeControllerSource()
_, podController = NewPodControllerWithListWatch(vmCache, nil, lw, restClient)

// Start the controller
podController.StartInformer(stopChan)
go podController.Run(1, stopChan)
})

Context("Running Pod for unscheduled VM given", func() {
It("should update the VM with the node of the running Pod", func(done Done) {

// Wire a Pod controller with a fake source
restClient, err := kubecli.GetRESTClientFromFlags(server.URL(), "")
Expect(err).To(Not(HaveOccurred()))
vmCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil)
lw := framework.NewFakeControllerSource()
_, podController := NewPodControllerWithListWatch(vmCache, nil, lw, restClient)

// Create a VM which is being scheduled
vm := v1.NewMinimalVM("testvm")
vm.Status.Phase = v1.Scheduling
Expand Down Expand Up @@ -70,15 +76,63 @@ var _ = Describe("Pod", func() {
),
)

// Start the controller
podController.StartInformer(stopChan)
go podController.Run(1, stopChan)
// Tell the controller that there is a new running Pod
lw.Add(pod)

// Wait until we have processed the added item
podController.WaitForSync(stopChan)
podController.ShutDownQueue()
podController.WaitUntilDone()

Expect(len(server.ReceivedRequests())).To(Equal(1))
close(done)
}, 10)
})

Context("Running Migration target Pod for a running VM given", func() {
It("should update the VM with the migration target node of the running Pod", func(done Done) {

// Create a VM which is being scheduled
vm := v1.NewMinimalVM("testvm")
vm.Status.Phase = v1.Running
vm.ObjectMeta.SetUID(uuid.NewUUID())

// Add the VM to the cache
vmCache.Add(vm)

// Create a target Pod for the VM
temlateService, err := services.NewTemplateService("whatever")
Expect(err).ToNot(HaveOccurred())
pod, err := temlateService.RenderLaunchManifest(vm)
Expect(err).ToNot(HaveOccurred())
pod.Spec.NodeName = "mynode"

// Create the expected VM after the update
obj, err := conversion.NewCloner().DeepCopy(vm)
Expect(err).ToNot(HaveOccurred())

expectedVM := obj.(*v1.VM)
expectedVM.Status.Phase = v1.Running
expectedVM.Status.MigrationNodeName = pod.Spec.NodeName

// Register the expected REST call
server.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("PUT", "/apis/kubevirt.io/v1alpha1/namespaces/default/vms/testvm"),
ghttp.VerifyJSONRepresenting(expectedVM),
ghttp.RespondWithJSONEncoded(http.StatusOK, expectedVM),
),
)

// Tell the controller that there is a new running Pod
lw.Add(pod)

// Wait until we have proof that a REST call happened
Eventually(func() int { return len(server.ReceivedRequests()) }).Should(Equal(1))
// Wait until we have processed the added item
podController.WaitForSync(stopChan)
podController.ShutDownQueue()
podController.WaitUntilDone()

Expect(len(server.ReceivedRequests())).To(Equal(1))
close(done)
}, 10)
})
Expand Down