Skip to content

Commit

Permalink
Merge pull request #118 from admiyo/migration-controller
Browse files Browse the repository at this point in the history
Migration controller
  • Loading branch information
admiyo committed Mar 9, 2017
2 parents f994574 + 6216ff9 commit 5f8af89
Show file tree
Hide file tree
Showing 16 changed files with 651 additions and 113 deletions.
16 changes: 14 additions & 2 deletions cmd/virt-controller/virt-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/emicklei/go-restful"
"github.com/facebookgo/inject"
clientrest "k8s.io/client-go/rest"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/virt-controller/rest"
Expand Down Expand Up @@ -40,7 +41,14 @@ func main() {
golog.Fatal(err)
}

var restClient *clientrest.RESTClient
restClient, err = kubecli.GetRESTClient()
if err != nil {
golog.Fatal(err)
}

g.Provide(
&inject.Object{Value: restClient},
&inject.Object{Value: clientSet},
&inject.Object{Value: templateService},
&inject.Object{Value: vmService},
Expand All @@ -57,15 +65,14 @@ func main() {
defer close(stop)

// Start wachting vms
restClient, err := kubecli.GetRESTClient()
restClient, err = kubecli.GetRESTClient()
if err != nil {
golog.Fatal(err)
}
vmCache, vmController := watch.NewVMController(vmService, nil, restClient)

vmController.StartInformer(stop)
go vmController.Run(1, stop)

// Wait until VM cache has warmed up before we start watching pods
vmController.WaitForSync(stop)

Expand All @@ -74,6 +81,11 @@ func main() {
podController.StartInformer(stop)
go podController.Run(1, stop)

_, migrationController := watch.NewMigrationController(vmService, nil, restClient)
migrationController.StartInformer(stop)
go migrationController.Run(1, stop)
migrationController.WaitForSync(stop)

httpLogger := logger.With("service", "http")

httpLogger.Info().Log("action", "listening", "interface", *host, "port", *port)
Expand Down
69 changes: 44 additions & 25 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,26 @@ func (s SyncEvent) String() string {

func NewMinimalVM(vmName string) *VM {
precond.CheckNotEmpty(vmName)
return &VM{
Spec: VMSpec{Domain: NewMinimalDomainSpec(vmName)},
vm := NewVMReferenceFromName(vmName)
vm.Spec = VMSpec{Domain: NewMinimalDomainSpec(vmName)}
vm.TypeMeta = metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "VM",
}
return vm
}

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *VM {
vm := &VM{
ObjectMeta: v1.ObjectMeta{
Name: vmName,
Name: name,
Namespace: kubeapi.NamespaceDefault,
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "VM",
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: GroupVersion.Group, Kind: "VM", Version: GroupVersion.Version})
return vm
}

type Spice struct {
Expand Down Expand Up @@ -280,33 +289,28 @@ func NewSpice(vmName string) *Spice {
}
}

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *VM {
vm := &VM{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
//TODO validate that this is correct
func NewMinimalMigration(name string, vmName string) *Migration {
migration := NewMigrationReferenceFromName(name)
migration.Spec = MigrationSpec{
MigratingVMName: vmName,
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: GroupVersion.Group, Kind: "VM", Version: GroupVersion.Version})
return vm
return migration
}

//TODO validate that this is correct
func NewMinimalMigration(name string, vmName string) *Migration {
func NewMigrationReferenceFromName(name string) *Migration {
migration := &Migration{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/migrations/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "Migration",
},
Spec: MigrationSpec{
MigratingVMName: vmName,
Status: MigrationStatus{
Phase: MigrationUnknown,
},
}
return migration
Expand Down Expand Up @@ -341,6 +345,9 @@ type MigrationSpec struct {
type MigrationPhase string

const (
// Create Migration has been called but nothing has been done with it
MigrationUnknown MigrationPhase = ""

// Migration has been scheduled but no update on the status has been recorded
MigrationPending MigrationPhase = "Pending"

Expand All @@ -358,12 +365,24 @@ const (
// MigrationStatus is the last reported status of a VM Migratrion. Status may trail the actual
// state of a migration.
type MigrationStatus struct {
Phase MigrationPhase `json:"message,omitempty"`
Phase MigrationPhase `json:"phase,omitempty"`
}

// Required to satisfy ObjectMetaAccessor interface
func (v *Migration) GetObjectMeta() meta.Object {
return &v.ObjectMeta
func (m *Migration) GetObjectMeta() meta.Object {
return &m.ObjectMeta
}

func (m *Migration) UnmarshalJSON(data []byte) error {
type MigrationCopy Migration
tmp := MigrationCopy{}
err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}
tmp2 := Migration(tmp)
*m = tmp2
return nil
}

//A list of Migrations
Expand Down
1 change: 1 addition & 0 deletions pkg/virt-api/rest/kubeproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"kubevirt.io/kubevirt/pkg/logging"
rest2 "kubevirt.io/kubevirt/pkg/rest"
. "kubevirt.io/kubevirt/pkg/virt-api/rest"
. "kubevirt.io/kubevirt/test"
)

const vmResource = "vms"
Expand Down
Binary file added pkg/virt-controller/services/debug.test
Binary file not shown.
Binary file added pkg/virt-controller/services/services.test
Binary file not shown.
97 changes: 71 additions & 26 deletions pkg/virt-controller/services/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,36 @@ import (
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/labels"
"k8s.io/client-go/rest"
corev1 "kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/middleware"
"kubevirt.io/kubevirt/pkg/precond"
)

type VMService interface {
StartVM(*corev1.VM) error
DeleteVM(*corev1.VM) error
PrepareMigration(*corev1.VM) error
GetRunningPods(*corev1.VM) (*v1.PodList, error)
StartVMPod(*corev1.VM) error
DeleteVMPod(*corev1.VM) error
GetRunningVMPods(*corev1.VM) (*v1.PodList, error)
DeleteMigration(*corev1.Migration) error
GetRunningMigrationPods(*corev1.Migration) (*v1.PodList, error)
SetupMigration(migration *corev1.Migration, vm *corev1.VM) error
UpdateMigration(migration *corev1.Migration) error
FetchVM(vmName string) (*corev1.VM, error)
}

type vmService struct {
KubeCli *kubernetes.Clientset `inject:""`
RestClient *rest.RESTClient `inject:""`
TemplateService TemplateService `inject:""`
}

func (v *vmService) StartVM(vm *corev1.VM) error {
func (v *vmService) StartVMPod(vm *corev1.VM) error {

precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())
precond.MustNotBeEmpty(string(vm.GetObjectMeta().GetUID()))

podList, err := v.GetRunningPods(vm)
podList, err := v.GetRunningVMPods(vm)
if err != nil {
return err
}
Expand All @@ -49,59 +56,97 @@ func (v *vmService) StartVM(vm *corev1.VM) error {
return nil
}

func (v *vmService) DeleteVM(vm *corev1.VM) error {
func (v *vmService) DeleteVMPod(vm *corev1.VM) error {
precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())

if err := v.KubeCli.Core().Pods(v1.NamespaceDefault).DeleteCollection(nil, UnfinishedVMPodSelector(vm)); err != nil {
if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, UnfinishedVMPodSelector(vm)); err != nil {
return err
}
return nil
}

func (v *vmService) GetRunningPods(vm *corev1.VM) (*v1.PodList, error) {
podList, err := v.KubeCli.Core().Pods(v1.NamespaceDefault).List(UnfinishedVMPodSelector(vm))
func (v *vmService) GetRunningVMPods(vm *corev1.VM) (*v1.PodList, error) {
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(UnfinishedVMPodSelector(vm))

if err != nil {
return nil, err
}
return podList, nil
}

func (v *vmService) UpdateMigration(migration *corev1.Migration) error {
migrationName := migration.ObjectMeta.Name
_, err := v.RestClient.Put().Namespace(v1.NamespaceDefault).Resource("migrations").Body(migration).Name(migrationName).Do().Get()
return err
}

func (v *vmService) FetchVM(vmName string) (*corev1.VM, error) {
resp, err := v.RestClient.Get().Namespace(v1.NamespaceDefault).Resource("vms").Name(vmName).Do().Get()
if err != nil {
return nil, err
}
vm := resp.(*corev1.VM)
return vm, nil
}

func NewVMService() VMService {
svc := vmService{}
return &svc
}

func (v *vmService) PrepareMigration(vm *corev1.VM) error {
precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())
precond.MustBeTrue(len(vm.Spec.NodeSelector) > 0)
podList, err := v.GetRunningPods(vm)
func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions {
fieldSelector := fields.ParseSelectorOrDie(
"status.phase!=" + string(v1.PodFailed) +
",status.phase!=" + string(v1.PodSucceeded))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", vm.GetObjectMeta().GetName()))
if err != nil {
return err
panic(err)
}
return v1.ListOptions{FieldSelector: fieldSelector.String(), LabelSelector: labelSelector.String()}
}

// If there are more than one pod in other states than Succeeded or Failed we can't go on
if len(podList.Items) > 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s is already migrating", vm.GetObjectMeta().GetName()))
func (v *vmService) SetupMigration(migration *corev1.Migration, vm *corev1.VM) error {
pod, err := v.TemplateService.RenderLaunchManifest(vm)
if err == nil {
_, err = v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).Create(pod)
}
if err == nil {
migration.Status.Phase = corev1.MigrationInProgress
} else {
migration.Status.Phase = corev1.MigrationFailed
}

pod, err := v.TemplateService.RenderLaunchManifest(vm)
if err != nil {
return err
err2 := v.UpdateMigration(migration)
if err2 != nil {
err = err2
}
if _, err := v.KubeCli.Core().Pods(v1.NamespaceDefault).Create(pod); err != nil {
return err
}

func (v *vmService) DeleteMigration(migration *corev1.Migration) error {
precond.MustNotBeNil(migration)
precond.MustNotBeEmpty(migration.GetObjectMeta().GetName())

if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, unfinishedMigrationPodSelector(migration)); err != nil {
return err
}
return nil
}

// Visible for tests
func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions {
func (v *vmService) GetRunningMigrationPods(migration *corev1.Migration) (*v1.PodList, error) {
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(unfinishedMigrationPodSelector(migration))
if err != nil {
return nil, err
}
return podList, nil
}

func unfinishedMigrationPodSelector(migration *corev1.Migration) v1.ListOptions {
fieldSelector := fields.ParseSelectorOrDie(
"status.phase!=" + string(v1.PodFailed) +
",status.phase!=" + string(v1.PodSucceeded))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", vm.GetObjectMeta().GetName()))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", migration.GetObjectMeta().GetName()))
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 5f8af89

Please sign in to comment.