Skip to content

Commit

Permalink
intermediate cleanup
Browse files Browse the repository at this point in the history
moved most logic to watch/migration
  • Loading branch information
admiyo committed Mar 6, 2017
1 parent 8f95546 commit b2e42ed
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 74 deletions.
55 changes: 8 additions & 47 deletions pkg/virt-controller/services/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ type VMService interface {
StartVMPod(*corev1.VM) error
DeleteVMPod(*corev1.VM) error
GetRunningVMPods(*corev1.VM) (*v1.PodList, error)
StartMigrationTargetPod(*corev1.Migration) error
DeleteMigration(*corev1.Migration) error
GetRunningMigrationPods(*corev1.Migration) (*v1.PodList, error)
SetupMigration(migration *corev1.Migration, vm *corev1.VM) error
}

type vmService struct {
Expand Down Expand Up @@ -73,7 +73,7 @@ func (v *vmService) GetRunningVMPods(vm *corev1.VM) (*v1.PodList, error) {
return podList, nil
}

func (v *vmService) UpdateMigration(migration *corev1.Migration) error {
func UpdateMigration(migration *corev1.Migration) error {
restClient, err := kubecli.GetRESTClient()
if err != nil {
return err
Expand All @@ -83,7 +83,7 @@ func (v *vmService) UpdateMigration(migration *corev1.Migration) error {
return err
}

func (v *vmService) GetDefinedVMs(vmName string) (*corev1.VM, error) {
func GetDefinedVMs(vmName string) (*corev1.VM, error) {
restClient, err := kubecli.GetRESTClient()
list, err := restClient.Get().Namespace(v1.NamespaceDefault).Resource("vms").Name(vmName).Do().Get()
if err != nil {
Expand All @@ -110,47 +110,10 @@ func UnfinishedVMPodSelector(vm *corev1.VM) v1.ListOptions {
return v1.ListOptions{FieldSelector: fieldSelector.String(), LabelSelector: labelSelector.String()}
}

func (v *vmService) StartMigrationTargetPod(migration *corev1.Migration) error {
precond.MustNotBeNil(migration)
precond.MustNotBeEmpty(migration.GetObjectMeta().GetName())
precond.MustNotBeEmpty(string(migration.GetObjectMeta().GetUID()))

vm, err := v.GetDefinedVMs(migration.Spec.MigratingVMName)
if err != nil {
migration.Status.Phase = corev1.MigrationFailed
err2 := v.UpdateMigration(migration)
if err2 != nil {
return err2
}
// Report the error with the migration in the controller log
return err
}

podList, err := v.GetRunningVMPods(vm)
if err != nil {
return err
}

if len(podList.Items) < 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s Pod does not exist", vm.GetObjectMeta().GetName()))
}

// If there are more than one pod in other states than Succeeded or Failed we can't go on
if len(podList.Items) > 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s Pod is already migrating", vm.GetObjectMeta().GetName()))
}

//TODO: detect collisions
for k, v := range migration.Spec.DestinationNodeSelector {
vm.Spec.NodeSelector[k] = v
}

func (v *vmService) SetupMigration(migration *corev1.Migration, vm *corev1.VM) error {
pod, err := v.TemplateService.RenderLaunchManifest(vm)
if err != nil {
return err
}
if _, err := v.KubeCli.Core().Pods(v1.NamespaceDefault).Create(pod); err != nil {
return err
if err == nil {
_, err = v.KubeCli.Core().Pods(v1.NamespaceDefault).Create(pod)
}

if err == nil {
Expand All @@ -159,12 +122,10 @@ func (v *vmService) StartMigrationTargetPod(migration *corev1.Migration) error {
migration.Status.Phase = corev1.MigrationFailed
}

err2 := v.UpdateMigration(migration)
err2 := UpdateMigration(migration)
if err2 != nil {
return err2
err = err2
}

// Report the result of the `Create` call
return err
}

Expand Down
69 changes: 46 additions & 23 deletions pkg/virt-controller/watch/migration.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package watch

import (
"fmt"
"github.com/jeevatkm/go-model"
kubeapi "k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/errors"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/util/workqueue"
"k8s.io/client-go/rest"
Expand All @@ -13,6 +12,8 @@ import (
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/pkg/logging"
"kubevirt.io/kubevirt/pkg/middleware"
"kubevirt.io/kubevirt/pkg/precond"
"kubevirt.io/kubevirt/pkg/virt-controller/services"
)

Expand Down Expand Up @@ -68,33 +69,14 @@ func cleanupOldMigration(key interface{}, queue workqueue.RateLimitingInterface,
func scheduleMigration(migration *v1.Migration, migrationService services.VMService, queue workqueue.RateLimitingInterface, key interface{}, restClient *rest.RESTClient) {
migrationCopy := copyMigration(migration)
logger := logging.DefaultLogger().Object(&migrationCopy)
if err := migrationService.StartMigrationTargetPod(&migrationCopy); err != nil {
if err := StartMigrationTargetPod(migrationService, &migrationCopy); err != nil {
handleStartMigrationError(logger, err, migrationService, migrationCopy)
//queue.AddRateLimited(key)
return
}

//TODO make status into enumeration
migrationCopy.Status.Phase = "Scheduling"
if err := restClient.Put().Resource("migrations").Body(&migrationCopy).Name(migrationCopy.ObjectMeta.Name).Namespace(kubeapi.NamespaceDefault).Do().Error(); err != nil {
handleMigrationSchedulingError(logger, err, queue, key)

} else {
logger.Info().Msg("Handing over the VM to the scheduler succeeded.")
}

}
func handleMigrationSchedulingError(logger *logging.FilteredLogger, err error, queue workqueue.RateLimitingInterface, key interface{}) {
logger.Error().Reason(err).Msg("Updating the VM state to 'Scheduling' failed.")
if e, ok := err.(*errors.StatusError); ok {
if e.Status().Reason == metav1.StatusReasonNotFound ||
e.Status().Reason == metav1.StatusReasonConflict {
// Nothing to do for us, VM got either deleted in the meantime or a newer version is enqueued already
return
}
}
queue.AddRateLimited(key)
}

func handleStartMigrationError(logger *logging.FilteredLogger, err error, migrationService services.VMService, migrationCopy v1.Migration) {
logger.Error().Reason(err).Msg("Defining a target pod for the Migration.")
pl, err := migrationService.GetRunningMigrationPods(&migrationCopy)
Expand Down Expand Up @@ -130,3 +112,44 @@ func copyMigration(migration *v1.Migration) v1.Migration {
model.Copy(&migrationCopy, migration)
return migrationCopy
}

func StartMigrationTargetPod(v services.VMService, migration *v1.Migration) error {
precond.MustNotBeNil(migration)
precond.MustNotBeEmpty(migration.GetObjectMeta().GetName())
precond.MustNotBeEmpty(string(migration.GetObjectMeta().GetUID()))

vm, err := services.GetDefinedVMs(migration.Spec.MigratingVMName)
if err != nil {
migration.Status.Phase = v1.MigrationFailed
err2 := services.UpdateMigration(migration)
if err2 != nil {
return err2
}
// Report the error with the migration in the controller log
return err
}

podList, err := v.GetRunningVMPods(vm)
if err != nil {
return err
}

if len(podList.Items) < 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s Pod does not exist", vm.GetObjectMeta().GetName()))
}

// If there are more than one pod in other states than Succeeded or Failed we can't go on
if len(podList.Items) > 1 {
return middleware.NewResourceConflictError(fmt.Sprintf("VM %s Pod is already migrating", vm.GetObjectMeta().GetName()))
}

//TODO: detect collisions
for k, v := range migration.Spec.DestinationNodeSelector {
vm.Spec.NodeSelector[k] = v
}

err = v.SetupMigration(migration, vm)

// Report the result of the `Create` call
return err
}
2 changes: 1 addition & 1 deletion tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewRandomVMWithLun(lun int) *v1.VM {
}

func NewMigrationForVm(vm *v1.VM) *v1.Migration {
return v1.NewMinimalMigration(vm.ObjectMeta.Name + "migrate", vm.ObjectMeta.Name)
return v1.NewMinimalMigration(vm.ObjectMeta.Name+"migrate", vm.ObjectMeta.Name)
}

func NewRandomVMWithSpice() *v1.VM {
Expand Down
4 changes: 1 addition & 3 deletions tests/vm_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
. "github.com/onsi/gomega"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/errors"
kubev1 "k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/util/json"
kubev1 "k8s.io/client-go/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/api/v1"
"kubevirt.io/kubevirt/pkg/kubecli"
"kubevirt.io/kubevirt/tests"
Expand Down Expand Up @@ -98,8 +98,6 @@ var _ = Describe("VmMigration", func() {
var vmR *v1.VM = r.(*v1.VM)
Expect(vmR.ObjectMeta.Name).To(Equal(vm.ObjectMeta.Name))

println("Got a VM with name " + vmR.ObjectMeta.Name)

//wait for the controller to update status
time.Sleep(time.Second * 3)

Expand Down

0 comments on commit b2e42ed

Please sign in to comment.