|
|
@@ -8,6 +8,7 @@ import ( |
|
|
|
|
|
"github.com/libvirt/libvirt-go"
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
|
"k8s.io/apimachinery/pkg/types"
|
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
|
|
|
|
"kubevirt.io/kubevirt/pkg/log"
|
|
|
@@ -75,74 +76,104 @@ func newWatchEventError(err error) watch.Event { |
|
|
return watch.Event{Type: watch.Error, Object: &metav1.Status{Status: metav1.StatusFailure, Message: err.Error()}}
|
|
|
}
|
|
|
|
|
|
func libvirtEventCallback(d cli.VirDomain, event *libvirt.DomainEventLifecycle, client *DomainEventClient, deleteNotificationSent chan watch.Event) {
|
|
|
func libvirtEventCallback(c cli.Connection, domain *api.Domain, event *libvirt.DomainEventLifecycle, client *DomainEventClient, events chan watch.Event) {
|
|
|
|
|
|
// check for reconnects, and emit an error to force a resync
|
|
|
if event == nil {
|
|
|
client.SendDomainEvent(newWatchEventError(fmt.Errorf("Libvirt reconnect")))
|
|
|
return
|
|
|
}
|
|
|
|
|
|
domain, err := util.NewDomain(d)
|
|
|
if err != nil {
|
|
|
log.Log.Reason(err).Error("Could not create the Domain.")
|
|
|
client.SendDomainEvent(newWatchEventError(err))
|
|
|
return
|
|
|
}
|
|
|
|
|
|
// No matter which event, try to fetch the domain xml
|
|
|
// and the state. If we get a IsNotFound error, that
|
|
|
// means that the VirtualMachineInstance was removed.
|
|
|
spec, err := util.GetDomainSpec(d)
|
|
|
d, err := c.LookupDomainByName(util.DomainFromNamespaceName(domain.ObjectMeta.Namespace, domain.ObjectMeta.Name))
|
|
|
if err != nil {
|
|
|
if !domainerrors.IsNotFound(err) {
|
|
|
log.Log.Reason(err).Error("Could not fetch the Domain specification.")
|
|
|
client.SendDomainEvent(newWatchEventError(err))
|
|
|
return
|
|
|
}
|
|
|
} else {
|
|
|
domain.Spec = *spec
|
|
|
domain.ObjectMeta.UID = spec.Metadata.KubeVirt.UID
|
|
|
}
|
|
|
status, reason, err := d.GetState()
|
|
|
if err != nil {
|
|
|
if !domainerrors.IsNotFound(err) {
|
|
|
log.Log.Reason(err).Error("Could not fetch the Domain state.")
|
|
|
log.Log.Reason(err).Error("Could not fetch the Domain.")
|
|
|
client.SendDomainEvent(newWatchEventError(err))
|
|
|
return
|
|
|
}
|
|
|
domain.SetState(api.NoState, api.ReasonNonExistent)
|
|
|
} else {
|
|
|
domain.SetState(util.ConvState(status), util.ConvReason(status, reason))
|
|
|
defer d.Free()
|
|
|
|
|
|
// No matter which event, try to fetch the domain xml
|
|
|
// and the state. If we get a IsNotFound error, that
|
|
|
// means that the VirtualMachineInstance was removed.
|
|
|
status, reason, err := d.GetState()
|
|
|
if err != nil {
|
|
|
if !domainerrors.IsNotFound(err) {
|
|
|
log.Log.Reason(err).Error("Could not fetch the Domain state.")
|
|
|
client.SendDomainEvent(newWatchEventError(err))
|
|
|
return
|
|
|
}
|
|
|
domain.SetState(api.NoState, api.ReasonNonExistent)
|
|
|
} else {
|
|
|
domain.SetState(util.ConvState(status), util.ConvReason(status, reason))
|
|
|
}
|
|
|
spec, err := util.GetDomainSpec(status, d)
|
|
|
if err != nil {
|
|
|
if !domainerrors.IsNotFound(err) {
|
|
|
log.Log.Reason(err).Error("Could not fetch the Domain specification.")
|
|
|
client.SendDomainEvent(newWatchEventError(err))
|
|
|
return
|
|
|
}
|
|
|
} else {
|
|
|
domain.Spec = *spec
|
|
|
domain.ObjectMeta.UID = spec.Metadata.KubeVirt.UID
|
|
|
}
|
|
|
|
|
|
log.Log.Infof("libvirt domain status: %v:%v", status, reason)
|
|
|
log.Log.Infof("kubevirt domain status: %v:%v", domain.Status.Status, domain.Status.Reason)
|
|
|
}
|
|
|
|
|
|
log.Log.Infof("domain status: %v:%v", status, reason)
|
|
|
switch domain.Status.Reason {
|
|
|
case api.ReasonNonExistent:
|
|
|
event := watch.Event{Type: watch.Deleted, Object: domain}
|
|
|
client.SendDomainEvent(event)
|
|
|
deleteNotificationSent <- event
|
|
|
events <- event
|
|
|
default:
|
|
|
if event.Event == libvirt.DOMAIN_EVENT_DEFINED && libvirt.DomainEventDefinedDetailType(event.Detail) == libvirt.DOMAIN_EVENT_DEFINED_ADDED {
|
|
|
client.SendDomainEvent(watch.Event{Type: watch.Added, Object: domain})
|
|
|
event := watch.Event{Type: watch.Added, Object: domain}
|
|
|
client.SendDomainEvent(event)
|
|
|
events <- event
|
|
|
} else {
|
|
|
client.SendDomainEvent(watch.Event{Type: watch.Modified, Object: domain})
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func StartNotifier(virtShareDir string, domainConn cli.Connection, deleteNotificationSent chan watch.Event) error {
|
|
|
func StartNotifier(virtShareDir string, domainConn cli.Connection, deleteNotificationSent chan watch.Event, vmiUID types.UID) error {
|
|
|
type LibvirtEvent struct {
|
|
|
Domain string
|
|
|
Event *libvirt.DomainEventLifecycle
|
|
|
}
|
|
|
|
|
|
eventChan := make(chan LibvirtEvent, 10)
|
|
|
|
|
|
// Run the event process logic in a separate go-routine to not block libvirt
|
|
|
go func() {
|
|
|
for event := range eventChan {
|
|
|
// TODO don't make a client every single time
|
|
|
client, err := NewDomainEventClient(virtShareDir)
|
|
|
if err != nil {
|
|
|
log.Log.Reason(err).Error("Unable to create domain event notify client")
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
libvirtEventCallback(domainConn, util.NewDomainFromName(event.Domain, vmiUID), event.Event, client, deleteNotificationSent)
|
|
|
log.Log.Info("processed event")
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
entrypointCallback := func(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventLifecycle) {
|
|
|
log.Log.Infof("Libvirt event %d with reason %d received", event.Event, event.Detail)
|
|
|
// TODO don't make a client every single time
|
|
|
client, err := NewDomainEventClient(virtShareDir)
|
|
|
name, err := d.GetName()
|
|
|
if err != nil {
|
|
|
log.Log.Reason(err).Error("Unable to create domain event notify client")
|
|
|
return
|
|
|
log.Log.Reason(err).Info("Could not determine name of libvirt domain in event callback.")
|
|
|
}
|
|
|
select {
|
|
|
case eventChan <- LibvirtEvent{Event: event, Domain: name}:
|
|
|
default:
|
|
|
log.Log.Infof("Libvirt event channel is full, dropping event.")
|
|
|
}
|
|
|
|
|
|
libvirtEventCallback(d, event, client, deleteNotificationSent)
|
|
|
log.Log.Info("processed event")
|
|
|
}
|
|
|
err := domainConn.DomainEventLifecycleRegister(entrypointCallback)
|
|
|
if err != nil {
|
|
|
|