Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration controller #118

Merged
merged 2 commits into from
Mar 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 14 additions & 2 deletions cmd/virt-controller/virt-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/emicklei/go-restful"
"github.com/facebookgo/inject"
clientrest "k8s.io/client-go/rest"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/virt-controller/rest"
Expand Down Expand Up @@ -40,7 +41,14 @@ func main() {
golog.Fatal(err)
}

var restClient *clientrest.RESTClient
restClient, err = kubecli.GetRESTClient()
if err != nil {
golog.Fatal(err)
}

g.Provide(
&inject.Object{Value: restClient},
&inject.Object{Value: clientSet},
&inject.Object{Value: templateService},
&inject.Object{Value: vmService},
Expand All @@ -57,15 +65,14 @@ func main() {
defer close(stop)

// Start wachting vms
restClient, err := kubecli.GetRESTClient()
restClient, err = kubecli.GetRESTClient()
if err != nil {
golog.Fatal(err)
}
vmCache, vmController := watch.NewVMController(vmService, nil, restClient)

vmController.StartInformer(stop)
go vmController.Run(1, stop)

// Wait until VM cache has warmed up before we start watching pods
vmController.WaitForSync(stop)

Expand All @@ -74,6 +81,11 @@ func main() {
podController.StartInformer(stop)
go podController.Run(1, stop)

_, migrationController := watch.NewMigrationController(vmService, nil, restClient)
migrationController.StartInformer(stop)
go migrationController.Run(1, stop)
migrationController.WaitForSync(stop)

httpLogger := logger.With("service", "http")

httpLogger.Info().Log("action", "listening", "interface", *host, "port", *port)
Expand Down
69 changes: 44 additions & 25 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,26 @@ func (s SyncEvent) String() string {

func NewMinimalVM(vmName string) *VM {
precond.CheckNotEmpty(vmName)
return &VM{
Spec: VMSpec{Domain: NewMinimalDomainSpec(vmName)},
vm := NewVMReferenceFromName(vmName)
vm.Spec = VMSpec{Domain: NewMinimalDomainSpec(vmName)}
vm.TypeMeta = metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "VM",
}
return vm
}

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *VM {
vm := &VM{
ObjectMeta: v1.ObjectMeta{
Name: vmName,
Name: name,
Namespace: kubeapi.NamespaceDefault,
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "VM",
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: GroupVersion.Group, Kind: "VM", Version: GroupVersion.Version})
return vm
}

type Spice struct {
Expand Down Expand Up @@ -280,33 +289,28 @@ func NewSpice(vmName string) *Spice {
}
}

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *VM {
vm := &VM{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
//TODO validate that this is correct
func NewMinimalMigration(name string, vmName string) *Migration {
migration := NewMigrationReferenceFromName(name)
migration.Spec = MigrationSpec{
MigratingVMName: vmName,
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: GroupVersion.Group, Kind: "VM", Version: GroupVersion.Version})
return vm
return migration
}

//TODO validate that this is correct
func NewMinimalMigration(name string, vmName string) *Migration {
func NewMigrationReferenceFromName(name string) *Migration {
migration := &Migration{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/migrations/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "Migration",
},
Spec: MigrationSpec{
MigratingVMName: vmName,
Status: MigrationStatus{
Phase: MigrationUnknown,
},
}
return migration
Expand Down Expand Up @@ -341,6 +345,9 @@ type MigrationSpec struct {
type MigrationPhase string

const (
// Create Migration has been called but nothing has been done with it
MigrationUnknown MigrationPhase = ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a compelling reason to make this an empty string? It would be more consistent to make this:

  • MigrationUnknown MigrationPhase = "Unknown"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means, that so far, no component touched it, if there is no state present ...


// Migration has been scheduled but no update on the status has been recorded
MigrationPending MigrationPhase = "Pending"

Expand All @@ -358,12 +365,24 @@ const (
// MigrationStatus is the last reported status of a VM Migratrion. Status may trail the actual
// state of a migration.
type MigrationStatus struct {
Phase MigrationPhase `json:"message,omitempty"`
Phase MigrationPhase `json:"phase,omitempty"`
}

// Required to satisfy ObjectMetaAccessor interface
func (v *Migration) GetObjectMeta() meta.Object {
return &v.ObjectMeta
func (m *Migration) GetObjectMeta() meta.Object {
return &m.ObjectMeta
}

func (m *Migration) UnmarshalJSON(data []byte) error {
type MigrationCopy Migration
tmp := MigrationCopy{}
err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}
tmp2 := Migration(tmp)
*m = tmp2
return nil
}

//A list of Migrations
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-api/rest/kubeproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"kubevirt.io/kubevirt/pkg/logging"
rest2 "kubevirt.io/kubevirt/pkg/rest"
. "kubevirt.io/kubevirt/pkg/virt-api/rest"
. "kubevirt.io/kubevirt/test"
)

const vmResource = "vms"
Expand Down
Binary file added pkg/virt-controller/services/debug.test
Binary file not shown.
Binary file added pkg/virt-controller/services/services.test
Binary file not shown.
97 changes: 71 additions & 26 deletions pkg/virt-controller/services/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,36 @@ import (
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/labels"
"k8s.io/client-go/rest"
corev1 "kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/middleware"
"kubevirt.io/kubevirt/pkg/precond"
)

type VMService interface {
StartVM(*corev1.VM) error
DeleteVM(*corev1.VM) error
PrepareMigration(*corev1.VM) error
GetRunningPods(*corev1.VM) (*v1.PodList, error)
StartVMPod(*corev1.VM) error
DeleteVMPod(*corev1.VM) error
GetRunningVMPods(*corev1.VM) (*v1.PodList, error)
DeleteMigration(*corev1.Migration) error
GetRunningMigrationPods(*corev1.Migration) (*v1.PodList, error)
SetupMigration(migration *corev1.Migration, vm *corev1.VM) error
UpdateMigration(migration *corev1.Migration) error
FetchVM(vmName string) (*corev1.VM, error)
}

type vmService struct {
KubeCli *kubernetes.Clientset `inject:""`
RestClient *rest.RESTClient `inject:""`
TemplateService TemplateService `inject:""`
}

func (v *vmService) StartVM(vm *corev1.VM) error {
func (v *vmService) StartVMPod(vm *corev1.VM) error {

precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())
precond.MustNotBeEmpty(string(vm.GetObjectMeta().GetUID()))

podList, err := v.GetRunningPods(vm)
podList, err := v.GetRunningVMPods(vm)
if err != nil {
return err
}
Expand All @@ -49,59 +56,97 @@ func (v *vmService) StartVM(vm *corev1.VM) error {
return nil
}

func (v *vmService) DeleteVM(vm *corev1.VM) error {
func (v *vmService) DeleteVMPod(vm *corev1.VM) error {
precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())

if err := v.KubeCli.Core().Pods(v1.NamespaceDefault).DeleteCollection(nil, UnfinishedVMPodSelector(vm)); err != nil {
if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, UnfinishedVMPodSelector(vm)); err != nil {
return err
}
return nil
}

func (v *vmService) GetRunningPods(vm *corev1.VM) (*v1.PodList, error) {
podList, err := v.KubeCli.Core().Pods(v1.NamespaceDefault).List(UnfinishedVMPodSelector(vm))
func (v *vmService) GetRunningVMPods(vm *corev1.VM) (*v1.PodList, error) {
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(UnfinishedVMPodSelector(vm))

if err != nil {
return nil, err
}
return podList, nil
}

func (v *vmService) UpdateMigration(migration *corev1.Migration) error {
migrationName := migration.ObjectMeta.Name
_, err := v.RestClient.Put().Namespace(v1.NamespaceDefault).Resource("migrations").Body(migration).Name(migrationName).Do().Get()
return err
}

func (v *vmService) FetchVM(vmName string) (*corev1.VM, error) {
resp, err := v.RestClient.Get().Namespace(v1.NamespaceDefault).Resource("vms").Name(vmName).Do().Get()
if err != nil {
return nil, err
}
vm := resp.(*corev1.VM)
return vm, nil
}

func NewVMService() VMService {
svc := vmService{}
return &svc
}

func (v *vmService) PrepareMigration(vm *corev1.VM) error {
precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())
precond.MustBeTrue(len(vm.Spec.NodeSelector) > 0)
podList, err := v.GetRunningPods(vm)
func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions {
fieldSelector := fields.ParseSelectorOrDie(
"status.phase!=" + string(v1.PodFailed) +
",status.phase!=" + string(v1.PodSucceeded))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", vm.GetObjectMeta().GetName()))
if err != nil {
return err
panic(err)
}
return v1.ListOptions{FieldSelector: fieldSelector.String(), LabelSelector: labelSelector.String()}
}

// If there are more than one pod in other states than Succeeded or Failed we can't go on
if len(podList.Items) > 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s is already migrating", vm.GetObjectMeta().GetName()))
func (v *vmService) SetupMigration(migration *corev1.Migration, vm *corev1.VM) error {
pod, err := v.TemplateService.RenderLaunchManifest(vm)
if err == nil {
_, err = v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).Create(pod)
}
if err == nil {
migration.Status.Phase = corev1.MigrationInProgress
} else {
migration.Status.Phase = corev1.MigrationFailed
}

pod, err := v.TemplateService.RenderLaunchManifest(vm)
if err != nil {
return err
err2 := v.UpdateMigration(migration)
if err2 != nil {
err = err2
}
if _, err := v.KubeCli.Core().Pods(v1.NamespaceDefault).Create(pod); err != nil {
return err
}

func (v *vmService) DeleteMigration(migration *corev1.Migration) error {
precond.MustNotBeNil(migration)
precond.MustNotBeEmpty(migration.GetObjectMeta().GetName())

if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, unfinishedMigrationPodSelector(migration)); err != nil {
return err
}
return nil
}

// Visible for tests
func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions {
func (v *vmService) GetRunningMigrationPods(migration *corev1.Migration) (*v1.PodList, error) {
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(unfinishedMigrationPodSelector(migration))
if err != nil {
return nil, err
}
return podList, nil
}

func unfinishedMigrationPodSelector(migration *corev1.Migration) v1.ListOptions {
fieldSelector := fields.ParseSelectorOrDie(
"status.phase!=" + string(v1.PodFailed) +
",status.phase!=" + string(v1.PodSucceeded))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", vm.GetObjectMeta().GetName()))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", migration.GetObjectMeta().GetName()))
if err != nil {
panic(err)
}
Expand Down