Skip to content

Commit

Permalink
Merge pull request #108 from rmohr/virt-handler-migration-handling
Browse files Browse the repository at this point in the history
Host side migration handling code
  • Loading branch information
rmohr committed Mar 7, 2017
2 parents 1eeea88 + 040b286 commit 54a5629
Show file tree
Hide file tree
Showing 18 changed files with 372 additions and 121 deletions.
11 changes: 11 additions & 0 deletions cluster/vagrant/setup_kubernetes_common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ systemctl enable kubelet && systemctl start kubelet
# Disable libvirt cgroup management
echo "cgroup_controllers = [ ]" >> /etc/libvirt/qemu.conf

# Let libvirt listen on TCP for migrations
echo 'LIBVIRTD_ARGS="--listen"' >> /etc/sysconfig/libvirtd

cat << EOT >>/etc/libvirt/libvirtd.conf
listen_tcp = 1
tcp_port = "16509"
auth_tcp = "none"
listen_addr = "0.0.0.0"
listen_tls = 0
EOT

# Disble sasl for libvirt. VDSM configured that
sed -i '/^auth_unix_rw/c\auth_unix_rw="none"' /etc/libvirt/libvirtd.conf
systemctl restart libvirtd
Expand Down
2 changes: 2 additions & 0 deletions cmd/virt-controller/virt-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ func main() {
}
vmCache, vmController := watch.NewVMController(vmService, nil, restClient)

vmController.StartInformer(stop)
go vmController.Run(1, stop)

// Wait until VM cache has warmed up before we start watching pods
vmController.WaitForSync(stop)

// Start watching pods
_, podController := watch.NewPodController(vmCache, nil, clientSet, restClient)
podController.StartInformer(stop)
go podController.Run(1, stop)

httpLogger := logger.With("service", "http")
Expand Down
31 changes: 17 additions & 14 deletions cmd/virt-handler/virt-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
kubev1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
Expand Down Expand Up @@ -71,46 +70,50 @@ func main() {
panic(err)
}

domainSharedInformer, err := virtcache.NewSharedInformer(domainConn)

restClient, err := kubecli.GetRESTClient()
if err != nil {
panic(err)
}

err = virthandler.RegisterDomainListener(domainSharedInformer)

l, err := labels.Parse(fmt.Sprintf(v1.NodeNameLabel+" in (%s)", *host))
if err != nil {
panic(err)
}

restClient, err := kubecli.GetRESTClient()
// Wire VM controller
vmListWatcher := kubecli.NewListWatchFromClient(restClient, "vms", api.NamespaceDefault, fields.Everything(), l)
vmStore, vmQueue, vmController := virthandler.NewVMController(vmListWatcher, domainManager, recorder, *restClient, *host)

// Wire Domain controller
domainSharedInformer, err := virtcache.NewSharedInformer(domainConn)
if err != nil {
panic(err)
}
domainStore, domainController := virthandler.NewDomainController(vmQueue, vmStore, domainSharedInformer)

l, err := labels.Parse(fmt.Sprintf(v1.NodeNameLabel+" in (%s)", *host))
if err != nil {
panic(err)
}

vmListWatcher := kubecli.NewListWatchFromClient(restClient, "vms", api.NamespaceDefault, fields.Everything(), l)

vmStore, vmController := virthandler.NewVMController(vmListWatcher, domainManager, recorder, *restClient)

// Bootstrapping. From here on the startup order matters
stop := make(chan struct{})
defer close(stop)

go domainSharedInformer.Run(stop)
cache.WaitForCacheSync(stop, domainSharedInformer.HasSynced)
// Start domain controller and wait for Domain cache sync
domainController.StartInformer(stop)
domainController.WaitForSync(stop)

// Poplulate the VM store with known Domains on the host, to get deletes since the last run
for _, domain := range domainSharedInformer.GetStore().List() {
for _, domain := range domainStore.List() {
d := domain.(*virtwrap.Domain)
vmStore.Add(v1.NewVMReferenceFromName(d.ObjectMeta.Name))
}

// Watch for VM changes
vmController.StartInformer(stop)
vmController.WaitForSync(stop)

go domainController.Run(1, stop)
go vmController.Run(1, stop)

// Sleep forever
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func NewVMReferenceFromName(name string) *VM {
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: GroupVersion.Group, Kind: "VM", Version: GroupVersion.Version})
Expand All @@ -299,7 +299,7 @@ func NewMinimalMigration(name string, vmName string) *Migration {
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/migrations/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Expand Down
47 changes: 33 additions & 14 deletions pkg/kubecli/kubecli.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func HandlePanic() {
}
}

func NewIndexerInformerForWorkQueue(lw cache.ListerWatcher, queue workqueue.RateLimitingInterface, objType runtime.Object) (cache.Indexer, *cache.Controller) {
return cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
func NewResourceEventHandlerFuncsForQorkqueue(queue workqueue.RateLimitingInterface) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
Expand All @@ -116,42 +116,57 @@ func NewIndexerInformerForWorkQueue(lw cache.ListerWatcher, queue workqueue.Rate
queue.Add(key)
}
},
}, cache.Indexers{})
}
}

type Controller struct {
indexer cache.Indexer
indexer cache.Store
queue workqueue.RateLimitingInterface
informer *cache.Controller
informer cache.ControllerInterface
f ControllerFunc
done chan struct{}
}

func NewController(lw cache.ListerWatcher, queue workqueue.RateLimitingInterface, objType runtime.Object, f ControllerFunc) (cache.Indexer, *Controller) {
indexer, informer := NewIndexerInformerForWorkQueue(lw, queue, objType)
func NewController(lw cache.ListerWatcher, queue workqueue.RateLimitingInterface, objType runtime.Object, f ControllerFunc) (cache.Store, *Controller) {

indexer, informer := cache.NewIndexerInformer(lw, objType, 0, NewResourceEventHandlerFuncsForQorkqueue(queue), cache.Indexers{})
return NewControllerFromInformer(indexer, informer, queue, f)
}

return indexer, &Controller{
func NewControllerFromInformer(indexer cache.Store, informer cache.ControllerInterface, queue workqueue.RateLimitingInterface, f ControllerFunc) (cache.Store, *Controller) {
c := &Controller{
informer: informer,
indexer: indexer,
queue: queue,
f: f,
done: make(chan struct{}),
}
c.f = func(s cache.Store, w workqueue.RateLimitingInterface) bool {
running := f(s, w)
if !running {
close(c.done)
}
return running
}
return indexer, c
}

type ControllerFunc func(cache.Indexer, workqueue.RateLimitingInterface) bool
type ControllerFunc func(cache.Store, workqueue.RateLimitingInterface) bool

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer HandlePanic()
defer c.queue.ShutDown()
logging.DefaultLogger().Info().Msg("Starting VM controller.")

go c.informer.Run(stopCh)
logging.DefaultLogger().Info().Msg("Starting controller.")

for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

<-stopCh
logging.DefaultLogger().Info().Msg("Stopping VM controller.")
logging.DefaultLogger().Info().Msg("Stopping controller.")
}

func (c *Controller) StartInformer(stopCh chan struct{}) {
go c.informer.Run(stopCh)
}

func (c *Controller) WaitForSync(stopCh chan struct{}) {
Expand All @@ -162,3 +177,7 @@ func (c *Controller) runWorker() {
for c.f(c.indexer, c.queue) {
}
}

func (c *Controller) WaitUntilDone() {
<-c.done
}
8 changes: 4 additions & 4 deletions pkg/virt-controller/watch/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@ func scheduledVMPodSelector() kubeapi.ListOptions {
return kubeapi.ListOptions{FieldSelector: fieldSelector, LabelSelector: labelSelector}
}

func NewPodController(vmCache cache.Indexer, recorder record.EventRecorder, clientset *kubernetes.Clientset, restClient *rest.RESTClient) (cache.Indexer, *kubecli.Controller) {
func NewPodController(vmCache cache.Store, recorder record.EventRecorder, clientset *kubernetes.Clientset, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) {

selector := scheduledVMPodSelector()
lw := kubecli.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", kubeapi.NamespaceDefault, selector.FieldSelector, selector.LabelSelector)
return NewPodControllerWithListWatch(vmCache, recorder, lw, restClient)
}

func NewPodControllerWithListWatch(vmCache cache.Indexer, recorder record.EventRecorder, lw cache.ListerWatcher, restClient *rest.RESTClient) (cache.Indexer, *kubecli.Controller) {
func NewPodControllerWithListWatch(vmCache cache.Store, _ record.EventRecorder, lw cache.ListerWatcher, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) {

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return kubecli.NewController(lw, queue, &v1.Pod{}, func(indexer cache.Indexer, queue workqueue.RateLimitingInterface) bool {
return kubecli.NewController(lw, queue, &v1.Pod{}, func(store cache.Store, queue workqueue.RateLimitingInterface) bool {
key, quit := queue.Get()
if quit {
return false
}
defer queue.Done(key)

// Fetch the latest Vm state from cache
obj, exists, err := indexer.GetByKey(key.(string))
obj, exists, err := store.GetByKey(key.(string))

if err != nil {
queue.AddRateLimited(key)
Expand Down
25 changes: 6 additions & 19 deletions pkg/virt-controller/watch/pod_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package watch

import (
"net/http"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/ghttp"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/conversion"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/runtime/serializer"
"k8s.io/client-go/pkg/util/uuid"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/cache/testing"
"k8s.io/client-go/tools/clientcmd"
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/virt-controller/services"
"net/http"
)

var _ = Describe("Pod", func() {
Expand All @@ -34,7 +31,8 @@ var _ = Describe("Pod", func() {
It("should update the VM with the node of the running Pod", func(done Done) {

// Wire a Pod controller with a fake source
restClient := getRestClient(server.URL())
restClient, err := kubecli.GetRESTClientFromFlags(server.URL(), "")
Expect(err).To(Not(HaveOccurred()))
vmCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, nil)
lw := framework.NewFakeControllerSource()
_, podController := NewPodControllerWithListWatch(vmCache, nil, lw, restClient)
Expand Down Expand Up @@ -73,6 +71,7 @@ var _ = Describe("Pod", func() {
)

// Start the controller
podController.StartInformer(stopChan)
go podController.Run(1, stopChan)

// Tell the controller that there is a new running Pod
Expand All @@ -89,15 +88,3 @@ var _ = Describe("Pod", func() {
server.Close()
})
})

func getRestClient(url string) *rest.RESTClient {
restConfig, err := clientcmd.BuildConfigFromFlags(url, "")
Expect(err).NotTo(HaveOccurred())
restConfig.GroupVersion = &v1.GroupVersion
restConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: api.Codecs}
restConfig.APIPath = "/apis"
restConfig.ContentType = runtime.ContentTypeJSON
restClient, err := rest.RESTClientFor(restConfig)
Expect(err).ToNot(HaveOccurred())
return restClient
}
6 changes: 3 additions & 3 deletions pkg/virt-controller/watch/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ import (
"strings"
)

func NewVMController(vmService services.VMService, recorder record.EventRecorder, restClient *rest.RESTClient) (cache.Indexer, *kubecli.Controller) {
func NewVMController(vmService services.VMService, recorder record.EventRecorder, restClient *rest.RESTClient) (cache.Store, *kubecli.Controller) {
lw := cache.NewListWatchFromClient(restClient, "vms", kubeapi.NamespaceDefault, fields.Everything())

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
return kubecli.NewController(lw, queue, &v1.VM{}, func(indexer cache.Indexer, queue workqueue.RateLimitingInterface) bool {
return kubecli.NewController(lw, queue, &v1.VM{}, func(store cache.Store, queue workqueue.RateLimitingInterface) bool {
key, quit := queue.Get()
if quit {
return false
}
defer queue.Done(key)

// Fetch the latest Vm state from cache
obj, exists, err := indexer.GetByKey(key.(string))
obj, exists, err := store.GetByKey(key.(string))

if err != nil {
queue.AddRateLimited(key)
Expand Down
49 changes: 38 additions & 11 deletions pkg/virt-handler/domain.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package virthandler

import (
"k8s.io/client-go/pkg/util/workqueue"
"k8s.io/client-go/tools/cache"
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/virt-handler/virtwrap"
)
Expand All @@ -11,18 +14,42 @@ TODO: Define the exact scope of this controller.
For now it looks like we should use domain events to detect unexpected domain changes like crashes or vms going
into pause mode because of resource shortage or cut off connections to storage.
*/
func NewDomainController(vmQueue workqueue.RateLimitingInterface, vmStore cache.Store, informer cache.SharedInformer) (cache.Store, *kubecli.Controller) {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
informer.AddEventHandler(kubecli.NewResourceEventHandlerFuncsForQorkqueue(queue))

func RegisterDomainListener(informer cache.SharedInformer) error {
return informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
logging.DefaultLogger().Info().Object(obj.(*virtwrap.Domain)).Msgf("Added domain is in state %s", obj.(*virtwrap.Domain).Status.Status)
},
DeleteFunc: func(obj interface{}) {
logging.DefaultLogger().Info().Object(obj.(*virtwrap.Domain)).Msgf("Deleted domain is in state %s", obj.(*virtwrap.Domain).Status.Status)
},
UpdateFunc: func(old interface{}, new interface{}) {
logging.DefaultLogger().Info().Object(new.(*virtwrap.Domain)).Msgf("Updated domain is in state %s", new.(*virtwrap.Domain).Status.Status)
return kubecli.NewControllerFromInformer(informer.GetStore(), informer, queue, func(indexer cache.Store, queue workqueue.RateLimitingInterface) bool {
key, quit := queue.Get()
if quit {
return false
}
defer queue.Done(key)

},
obj, exists, err := indexer.GetByKey(key.(string))
if err != nil {
queue.AddRateLimited(key)
return true
}
var domain *virtwrap.Domain
if !exists {
_, name, err := cache.SplitMetaNamespaceKey(key.(string))
if err != nil {
queue.AddRateLimited(key)
return true
}
domain = virtwrap.NewDomainReferenceFromName(name)
logging.DefaultLogger().Info().Object(domain).Msgf("Domain deleted")
} else {
domain = obj.(*virtwrap.Domain)
logging.DefaultLogger().Info().Object(domain).Msgf("Domain is in state %s", domain.Status.Status)
}
obj, vmExists, err := vmStore.GetByKey(key.(string))
if err != nil {
queue.AddRateLimited(key)
} else if !vmExists || obj.(*v1.VM).GetObjectMeta().GetUID() != domain.GetObjectMeta().GetUID() {
// The VM is not in the vm cache, or is a VM with a differend uuid, tell the VM controller to investigate it
vmQueue.Add(key)
}
return true
})
}

0 comments on commit 54a5629

Please sign in to comment.