Skip to content

Commit

Permalink
Migration controller pod launch
Browse files Browse the repository at this point in the history
Starts the migration process, through the launching of the target pod.
Increased unit test coverage for both migration and VMs
  • Loading branch information
admiyo committed Mar 8, 2017
1 parent 19264e8 commit 70621c1
Show file tree
Hide file tree
Showing 15 changed files with 650 additions and 114 deletions.
16 changes: 14 additions & 2 deletions cmd/virt-controller/virt-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/emicklei/go-restful"
"github.com/facebookgo/inject"
clientrest "k8s.io/client-go/rest"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/virt-controller/rest"
Expand Down Expand Up @@ -40,7 +41,14 @@ func main() {
golog.Fatal(err)
}

var restClient *clientrest.RESTClient
restClient, err = kubecli.GetRESTClient()
if err != nil {
golog.Fatal(err)
}

g.Provide(
&inject.Object{Value: restClient},
&inject.Object{Value: clientSet},
&inject.Object{Value: templateService},
&inject.Object{Value: vmService},
Expand All @@ -57,15 +65,14 @@ func main() {
defer close(stop)

// Start wachting vms
restClient, err := kubecli.GetRESTClient()
restClient, err = kubecli.GetRESTClient()
if err != nil {
golog.Fatal(err)
}
vmCache, vmController := watch.NewVMController(vmService, nil, restClient)

vmController.StartInformer(stop)
go vmController.Run(1, stop)

// Wait until VM cache has warmed up before we start watching pods
vmController.WaitForSync(stop)

Expand All @@ -74,6 +81,11 @@ func main() {
podController.StartInformer(stop)
go podController.Run(1, stop)

_, migrationController := watch.NewMigrationController(vmService, nil, restClient)
migrationController.StartInformer(stop)
go migrationController.Run(1, stop)
migrationController.WaitForSync(stop)

httpLogger := logger.With("service", "http")

httpLogger.Info().Log("action", "listening", "interface", *host, "port", *port)
Expand Down
69 changes: 44 additions & 25 deletions pkg/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,17 +240,26 @@ func (s SyncEvent) String() string {

func NewMinimalVM(vmName string) *VM {
precond.CheckNotEmpty(vmName)
return &VM{
Spec: VMSpec{Domain: NewMinimalDomainSpec(vmName)},
vm := NewVMReferenceFromName(vmName)
vm.Spec = VMSpec{Domain: NewMinimalDomainSpec(vmName)}
vm.TypeMeta = metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "VM",
}
return vm
}

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *VM {
vm := &VM{
ObjectMeta: v1.ObjectMeta{
Name: vmName,
Name: name,
Namespace: kubeapi.NamespaceDefault,
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "VM",
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: GroupVersion.Group, Kind: "VM", Version: GroupVersion.Version})
return vm
}

type Spice struct {
Expand Down Expand Up @@ -280,33 +289,28 @@ func NewSpice(vmName string) *Spice {
}
}

// TODO Namespace could be different, also store it somewhere in the domain, so that we can report deletes on handler startup properly
func NewVMReferenceFromName(name string) *VM {
vm := &VM{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/vms/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
//TODO validate that this is correct
func NewMinimalMigration(name string, vmName string) *Migration {
migration := NewMigrationReferenceFromName(name)
migration.Spec = MigrationSpec{
MigratingVMName: vmName,
}
vm.SetGroupVersionKind(schema.GroupVersionKind{Group: GroupVersion.Group, Kind: "VM", Version: GroupVersion.Version})
return vm
return migration
}

//TODO validate that this is correct
func NewMinimalMigration(name string, vmName string) *Migration {
func NewMigrationReferenceFromName(name string) *Migration {
migration := &Migration{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: kubeapi.NamespaceDefault,
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/migrations/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
SelfLink: fmt.Sprintf("/apis/%s/namespaces/%s/%s", GroupVersion.String(), kubeapi.NamespaceDefault, name),
},
TypeMeta: metav1.TypeMeta{
APIVersion: GroupVersion.String(),
Kind: "Migration",
},
Spec: MigrationSpec{
MigratingVMName: vmName,
Status: MigrationStatus{
Phase: MigrationUnknown,
},
}
return migration
Expand Down Expand Up @@ -341,6 +345,9 @@ type MigrationSpec struct {
type MigrationPhase string

const (
// Create Migration has been called but nothing has been done with it
MigrationUnknown MigrationPhase = ""

// Migration has been scheduled but no update on the status has been recorded
MigrationPending MigrationPhase = "Pending"

Expand All @@ -358,12 +365,24 @@ const (
// MigrationStatus is the last reported status of a VM Migratrion. Status may trail the actual
// state of a migration.
type MigrationStatus struct {
Phase MigrationPhase `json:"message,omitempty"`
Phase MigrationPhase `json:"phase,omitempty"`
}

// Required to satisfy ObjectMetaAccessor interface
func (v *Migration) GetObjectMeta() meta.Object {
return &v.ObjectMeta
func (m *Migration) GetObjectMeta() meta.Object {
return &m.ObjectMeta
}

func (m *Migration) UnmarshalJSON(data []byte) error {
type MigrationCopy Migration
tmp := MigrationCopy{}
err := json.Unmarshal(data, &tmp)
if err != nil {
return err
}
tmp2 := Migration(tmp)
*m = tmp2
return nil
}

//A list of Migrations
Expand Down
1 change: 0 additions & 1 deletion pkg/virt-api/rest/kubeproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
rest2 "kubevirt.io/kubevirt/pkg/rest"
. "kubevirt.io/kubevirt/pkg/virt-api/rest"
. "kubevirt.io/kubevirt/test"

)

const vmResource = "vms"
Expand Down
Binary file added pkg/virt-controller/services/debug.test
Binary file not shown.
Binary file added pkg/virt-controller/services/services.test
Binary file not shown.
97 changes: 71 additions & 26 deletions pkg/virt-controller/services/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,36 @@ import (
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/labels"
"k8s.io/client-go/rest"
corev1 "kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/middleware"
"kubevirt.io/kubevirt/pkg/precond"
)

type VMService interface {
StartVM(*corev1.VM) error
DeleteVM(*corev1.VM) error
PrepareMigration(*corev1.VM) error
GetRunningPods(*corev1.VM) (*v1.PodList, error)
StartVMPod(*corev1.VM) error
DeleteVMPod(*corev1.VM) error
GetRunningVMPods(*corev1.VM) (*v1.PodList, error)
DeleteMigration(*corev1.Migration) error
GetRunningMigrationPods(*corev1.Migration) (*v1.PodList, error)
SetupMigration(migration *corev1.Migration, vm *corev1.VM) error
UpdateMigration(migration *corev1.Migration) error
FetchVM(vmName string) (*corev1.VM, error)
}

type vmService struct {
KubeCli *kubernetes.Clientset `inject:""`
RestClient *rest.RESTClient `inject:""`
TemplateService TemplateService `inject:""`
}

func (v *vmService) StartVM(vm *corev1.VM) error {
func (v *vmService) StartVMPod(vm *corev1.VM) error {

precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())
precond.MustNotBeEmpty(string(vm.GetObjectMeta().GetUID()))

podList, err := v.GetRunningPods(vm)
podList, err := v.GetRunningVMPods(vm)
if err != nil {
return err
}
Expand All @@ -49,59 +56,97 @@ func (v *vmService) StartVM(vm *corev1.VM) error {
return nil
}

func (v *vmService) DeleteVM(vm *corev1.VM) error {
func (v *vmService) DeleteVMPod(vm *corev1.VM) error {
precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())

if err := v.KubeCli.Core().Pods(v1.NamespaceDefault).DeleteCollection(nil, UnfinishedVMPodSelector(vm)); err != nil {
if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, UnfinishedVMPodSelector(vm)); err != nil {
return err
}
return nil
}

func (v *vmService) GetRunningPods(vm *corev1.VM) (*v1.PodList, error) {
podList, err := v.KubeCli.Core().Pods(v1.NamespaceDefault).List(UnfinishedVMPodSelector(vm))
func (v *vmService) GetRunningVMPods(vm *corev1.VM) (*v1.PodList, error) {
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(UnfinishedVMPodSelector(vm))

if err != nil {
return nil, err
}
return podList, nil
}

func (v *vmService) UpdateMigration(migration *corev1.Migration) error {
migrationName := migration.ObjectMeta.Name
_, err := v.RestClient.Put().Namespace(v1.NamespaceDefault).Resource("migrations").Body(migration).Name(migrationName).Do().Get()
return err
}

func (v *vmService) FetchVM(vmName string) (*corev1.VM, error) {
resp, err := v.RestClient.Get().Namespace(v1.NamespaceDefault).Resource("vms").Name(vmName).Do().Get()
if err != nil {
return nil, err
}
vm := resp.(*corev1.VM)
return vm, nil
}

func NewVMService() VMService {
svc := vmService{}
return &svc
}

func (v *vmService) PrepareMigration(vm *corev1.VM) error {
precond.MustNotBeNil(vm)
precond.MustNotBeEmpty(vm.GetObjectMeta().GetName())
precond.MustBeTrue(len(vm.Spec.NodeSelector) > 0)
podList, err := v.GetRunningPods(vm)
func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions {
fieldSelector := fields.ParseSelectorOrDie(
"status.phase!=" + string(v1.PodFailed) +
",status.phase!=" + string(v1.PodSucceeded))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", vm.GetObjectMeta().GetName()))
if err != nil {
return err
panic(err)
}
return v1.ListOptions{FieldSelector: fieldSelector.String(), LabelSelector: labelSelector.String()}
}

// If there are more than one pod in other states than Succeeded or Failed we can't go on
if len(podList.Items) > 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s is already migrating", vm.GetObjectMeta().GetName()))
func (v *vmService) SetupMigration(migration *corev1.Migration, vm *corev1.VM) error {
pod, err := v.TemplateService.RenderLaunchManifest(vm)
if err == nil {
_, err = v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).Create(pod)
}
if err == nil {
migration.Status.Phase = corev1.MigrationInProgress
} else {
migration.Status.Phase = corev1.MigrationFailed
}

pod, err := v.TemplateService.RenderLaunchManifest(vm)
if err != nil {
return err
err2 := v.UpdateMigration(migration)
if err2 != nil {
err = err2
}
if _, err := v.KubeCli.Core().Pods(v1.NamespaceDefault).Create(pod); err != nil {
return err
}

func (v *vmService) DeleteMigration(migration *corev1.Migration) error {
precond.MustNotBeNil(migration)
precond.MustNotBeEmpty(migration.GetObjectMeta().GetName())

if err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).DeleteCollection(nil, unfinishedMigrationPodSelector(migration)); err != nil {
return err
}
return nil
}

// Visible for tests
func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions {
func (v *vmService) GetRunningMigrationPods(migration *corev1.Migration) (*v1.PodList, error) {
podList, err := v.KubeCli.CoreV1().Pods(v1.NamespaceDefault).List(unfinishedMigrationPodSelector(migration))
if err != nil {
return nil, err
}
return podList, nil
}

func unfinishedMigrationPodSelector(migration *corev1.Migration) v1.ListOptions {
fieldSelector := fields.ParseSelectorOrDie(
"status.phase!=" + string(v1.PodFailed) +
",status.phase!=" + string(v1.PodSucceeded))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", vm.GetObjectMeta().GetName()))
labelSelector, err := labels.Parse(fmt.Sprintf(corev1.DomainLabel+" in (%s)", migration.GetObjectMeta().GetName()))
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 70621c1

Please sign in to comment.