diff --git a/pkg/util/migrations/migrations.go b/pkg/util/migrations/migrations.go index b290cab2ef84..31c002ead382 100644 --- a/pkg/util/migrations/migrations.go +++ b/pkg/util/migrations/migrations.go @@ -4,6 +4,7 @@ import ( v12 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "kubevirt.io/api/core/v1" "kubevirt.io/client-go/log" @@ -24,6 +25,29 @@ func ListUnfinishedMigrations(informer cache.SharedIndexInformer) []*v1.VirtualM return migrations } +func ListWorkloadUpdateMigrations(store cache.Store, vmiName, ns string) []v1.VirtualMachineInstanceMigration { + objs := store.List() + migrations := []v1.VirtualMachineInstanceMigration{} + for _, obj := range objs { + migration := obj.(*v1.VirtualMachineInstanceMigration) + if migration.IsFinal() { + continue + } + if migration.Namespace != ns { + continue + } + if migration.Spec.VMIName != vmiName { + continue + } + if !metav1.HasAnnotation(migration.ObjectMeta, v1.WorkloadUpdateMigrationAnnotation) { + continue + } + migrations = append(migrations, *migration) + } + + return migrations +} + func FilterRunningMigrations(migrations []*v1.VirtualMachineInstanceMigration) []*v1.VirtualMachineInstanceMigration { runningMigrations := []*v1.VirtualMachineInstanceMigration{} for _, migration := range migrations { diff --git a/pkg/virt-controller/watch/workload-updater/workload-updater.go b/pkg/virt-controller/watch/workload-updater/workload-updater.go index feb06a6b7eb9..97a231d5074e 100644 --- a/pkg/virt-controller/watch/workload-updater/workload-updater.go +++ b/pkg/virt-controller/watch/workload-updater/workload-updater.go @@ -25,6 +25,7 @@ import ( migrationutils "kubevirt.io/kubevirt/pkg/util/migrations" + v1 "kubevirt.io/api/core/v1" virtv1 "kubevirt.io/api/core/v1" "kubevirt.io/client-go/kubecli" "kubevirt.io/client-go/log" @@ -43,6 +44,12 @@ const ( FailedEvictVirtualMachineInstanceReason = "FailedEvict" // SuccessfulEvictVirtualMachineInstanceReason is added in an event if a deletion of a VMI Succeeds SuccessfulEvictVirtualMachineInstanceReason = "SuccessfulEvict" + // SuccessfulChangeAbortionReason is added in an event if a deletion of a + // migration succeeds + SuccessfulChangeAbortionReason = "SuccessfulChangeAbortion" + // FailedChangeAbortionReason is added in an event if a deletion of a + // migration succeeds + FailedChangeAbortionReason = "FailedChangeAbortion" ) // time to wait before re-enqueing when outdated VMIs are still detected @@ -74,6 +81,7 @@ type updateData struct { allOutdatedVMIs []*virtv1.VirtualMachineInstance migratableOutdatedVMIs []*virtv1.VirtualMachineInstance evictOutdatedVMIs []*virtv1.VirtualMachineInstance + abortChangeVMIs []*virtv1.VirtualMachineInstance numActiveMigrations int } @@ -313,7 +321,9 @@ func (c *WorkloadUpdateController) doesRequireMigration(vmi *virtv1.VirtualMachi if vmi.IsFinal() || migrationutils.IsMigrating(vmi) { return false } - + if metav1.HasAnnotation(vmi.ObjectMeta, v1.WorkloadUpdateMigrationAbortionAnnotation) { + return false + } if isHotplugInProgress(vmi) { return true } @@ -321,6 +331,17 @@ func (c *WorkloadUpdateController) doesRequireMigration(vmi *virtv1.VirtualMachi return false } +func (c *WorkloadUpdateController) shouldAbortMigration(vmi *virtv1.VirtualMachineInstance) bool { + numMig := len(migrationutils.ListWorkloadUpdateMigrations(c.migrationInformer.GetStore(), vmi.Name, vmi.Namespace)) + if metav1.HasAnnotation(vmi.ObjectMeta, virtv1.WorkloadUpdateMigrationAbortionAnnotation) { + return numMig > 0 + } + if isHotplugInProgress(vmi) { + return false + } + return numMig > 0 +} + func (c *WorkloadUpdateController) getUpdateData(kv *virtv1.KubeVirt) *updateData { data := &updateData{} @@ -349,10 +370,14 @@ func (c *WorkloadUpdateController) getUpdateData(kv *virtv1.KubeVirt) *updateDat objs := c.vmiInformer.GetStore().List() for _, obj := range objs { vmi := obj.(*virtv1.VirtualMachineInstance) - if !vmi.IsRunning() || vmi.IsFinal() || vmi.DeletionTimestamp != nil { + switch { + case !vmi.IsRunning() || vmi.IsFinal() || vmi.DeletionTimestamp != nil: // only consider running VMIs that aren't being shutdown continue - } else if !c.isOutdated(vmi) && !c.doesRequireMigration(vmi) { + case c.shouldAbortMigration(vmi) && !c.isOutdated(vmi): + data.abortChangeVMIs = append(data.abortChangeVMIs, vmi) + continue + case !c.isOutdated(vmi) && !c.doesRequireMigration(vmi): continue } @@ -453,7 +478,7 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error { // Rather than enqueing based on VMI activity, we keep periodically poping the loop // until all VMIs are updated. Watching all VMI activity is chatty for this controller // when we don't need to be that efficent in how quickly the updates are being processed. - if len(data.evictOutdatedVMIs) != 0 || len(data.migratableOutdatedVMIs) != 0 { + if len(data.evictOutdatedVMIs) != 0 || len(data.migratableOutdatedVMIs) != 0 || len(data.abortChangeVMIs) != 0 { c.queue.AddAfter(key, periodicReEnqueueIntervalSeconds) } @@ -507,7 +532,7 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error { evictionCandidates = data.evictOutdatedVMIs[0:batchDeletionCount] } - wgLen := len(migrationCandidates) + len(evictionCandidates) + wgLen := len(migrationCandidates) + len(evictionCandidates) + len(data.abortChangeVMIs) wg := &sync.WaitGroup{} wg.Add(wgLen) errChan := make(chan error, wgLen) @@ -572,6 +597,25 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error { }(vmi) } + for _, vmi := range data.abortChangeVMIs { + go func(vmi *virtv1.VirtualMachineInstance) { + defer wg.Done() + migList := migrationutils.ListWorkloadUpdateMigrations(c.migrationInformer.GetStore(), vmi.Name, vmi.Namespace) + for _, mig := range migList { + err = c.clientset.VirtualMachineInstanceMigration(vmi.Namespace).Delete(context.Background(), mig.Name, metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + log.Log.Object(vmi).Reason(err).Errorf("Failed to delete the migration due to a migration abortion") + c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, FailedChangeAbortionReason, "Failed to abort change for vmi: %s: %v", vmi.Name, err) + errChan <- err + } else if err == nil { + log.Log.Infof("Delete migration %s due to an update change abortion", mig.Name) + c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, SuccessfulChangeAbortionReason, "Aborted change for vmi: %s", vmi.Name) + + } + } + + }(vmi) + } wg.Wait() select { diff --git a/pkg/virt-controller/watch/workload-updater/workload-updater_test.go b/pkg/virt-controller/watch/workload-updater/workload-updater_test.go index 696cb8ce3eaa..754ea7603d74 100644 --- a/pkg/virt-controller/watch/workload-updater/workload-updater_test.go +++ b/pkg/virt-controller/watch/workload-updater/workload-updater_test.go @@ -453,6 +453,105 @@ var _ = Describe("Workload Updater", func() { }) }) + Context("Abort changes due to an automated live update", func() { + createVM := func(annotation, hasChangeCondition bool) *v1.VirtualMachineInstance { + vmi := api.NewMinimalVMI("testvm") + vmi.Namespace = k8sv1.NamespaceDefault + vmi.Status.Phase = v1.Running + vmi.Status.Conditions = append(vmi.Status.Conditions, v1.VirtualMachineInstanceCondition{ + Type: v1.VirtualMachineInstanceIsMigratable, Status: k8sv1.ConditionTrue}) + if hasChangeCondition { + vmi.Status.Conditions = append(vmi.Status.Conditions, v1.VirtualMachineInstanceCondition{ + Type: v1.VirtualMachineInstanceMemoryChange, + Status: k8sv1.ConditionTrue, + }) + } + if annotation { + vmi.ObjectMeta.Annotations = make(map[string]string) + vmi.ObjectMeta.Annotations[v1.WorkloadUpdateMigrationAbortionAnnotation] = "" + } + vmiSource.Add(vmi) + return vmi + } + createMig := func(vmiName string, phase v1.VirtualMachineInstanceMigrationPhase) *v1.VirtualMachineInstanceMigration { + mig := newMigration("test", vmiName, phase) + mig.Annotations = map[string]string{v1.WorkloadUpdateMigrationAnnotation: ""} + migrationFeeder.Add(mig) + return mig + } + + BeforeEach(func() { + kv := newKubeVirt(0) + kv.Spec.WorkloadUpdateStrategy.WorkloadUpdateMethods = []v1.WorkloadUpdateMethod{v1.WorkloadUpdateMethodLiveMigrate} + addKubeVirt(kv) + }) + + DescribeTable("should delete the migration", func(phase v1.VirtualMachineInstanceMigrationPhase) { + vmi := createVM(false, false) + mig := createMig(vmi.Name, phase) + + if !mig.IsFinal() { + migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil) + } + + controller.Execute() + if mig.IsFinal() { + Expect(recorder.Events).To(BeEmpty()) + } else { + testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason) + } + }, + Entry("in running phase", v1.MigrationRunning), + Entry("in failed phase", v1.MigrationFailed), + Entry("in succeeded phase", v1.MigrationSucceeded), + ) + + DescribeTable("should handle", func(hasCond, hasMig bool) { + vmi := createVM(false, hasCond) + if hasCond { + kubeVirtInterface.EXPECT().PatchStatus(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + } + var mig *v1.VirtualMachineInstanceMigration + if hasMig { + mig = createMig(vmi.Name, v1.MigrationRunning) + } + changeAborted := hasMig && !hasCond + if changeAborted { + migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil) + } + controller.Execute() + if changeAborted { + testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason) + } else { + Expect(recorder.Events).To(BeEmpty()) + } + }, + Entry("a in progress change update", true, true), + Entry("a change abortion", false, true), + Entry("no change in progress", false, false), + ) + + DescribeTable("should always cancel the migration when the testWorkloadUpdateMigrationAbortion annotation is present", func(hasCond bool) { + vmi := createVM(true, hasCond) + mig := createMig(vmi.Name, v1.MigrationRunning) + migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil) + controller.Execute() + testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason) + }, + Entry("with the change condition", true), + Entry("without the change condition", false), + ) + + It("should return an error if the migration hasn't been deleted", func() { + vmi := createVM(true, false) + mig := createMig(vmi.Name, v1.MigrationRunning) + migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(fmt.Errorf("some error")) + + controller.Execute() + testutils.ExpectEvent(recorder, FailedChangeAbortionReason) + }) + }) + AfterEach(func() { close(stop) diff --git a/staging/src/kubevirt.io/api/core/v1/types.go b/staging/src/kubevirt.io/api/core/v1/types.go index db6a4341a39e..75caca7c83f3 100644 --- a/staging/src/kubevirt.io/api/core/v1/types.go +++ b/staging/src/kubevirt.io/api/core/v1/types.go @@ -853,6 +853,9 @@ const ( // This annotation indicates that a migration is the result of an // automated workload update WorkloadUpdateMigrationAnnotation string = "kubevirt.io/workloadUpdateMigration" + // This annotation indicates to abort any migration due to an automated + // workload update. It should only be used for testing purposes. + WorkloadUpdateMigrationAbortionAnnotation string = "kubevirt.io/testWorkloadUpdateMigrationAbortion" // This label declares whether a particular node is available for // scheduling virtual machine instances on it. Used on Node. NodeSchedulable string = "kubevirt.io/schedulable" diff --git a/tests/hotplug/BUILD.bazel b/tests/hotplug/BUILD.bazel index 5ba3ffbbc64d..2e2cdc3012a6 100644 --- a/tests/hotplug/BUILD.bazel +++ b/tests/hotplug/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/libvmi:go_default_library", "//pkg/pointer:go_default_library", "//staging/src/kubevirt.io/api/core/v1:go_default_library", + "//staging/src/kubevirt.io/api/migrations/v1alpha1:go_default_library", "//staging/src/kubevirt.io/client-go/kubecli:go_default_library", "//tests:go_default_library", "//tests/console:go_default_library", diff --git a/tests/hotplug/cpu.go b/tests/hotplug/cpu.go index 37f971a470e7..46375e755e5f 100644 --- a/tests/hotplug/cpu.go +++ b/tests/hotplug/cpu.go @@ -8,9 +8,10 @@ import ( "kubevirt.io/kubevirt/tests/libnet" + migrationsv1 "kubevirt.io/api/migrations/v1alpha1" + "kubevirt.io/kubevirt/pkg/libvmi" "kubevirt.io/kubevirt/pkg/pointer" - "kubevirt.io/kubevirt/tests/framework/checks" "kubevirt.io/kubevirt/tests/libmigration" @@ -38,6 +39,7 @@ import ( . "kubevirt.io/kubevirt/tests/framework/matcher" "kubevirt.io/kubevirt/tests/libpod" "kubevirt.io/kubevirt/tests/libwait" + testsmig "kubevirt.io/kubevirt/tests/migration" ) var _ = Describe("[sig-compute][Serial]CPU Hotplug", decorators.SigCompute, decorators.SigComputeMigrations, decorators.RequiresTwoSchedulableNodes, decorators.VMLiveUpdateFeaturesGate, Serial, func() { @@ -260,6 +262,84 @@ var _ = Describe("[sig-compute][Serial]CPU Hotplug", decorators.SigCompute, deco Expect(reqCpu).To(Equal(expCpu.Value())) }) }) + + Context("Abort CPU change", func() { + It("should cancel the automated workload update", func() { + vmi := libvmifact.NewAlpineWithTestTooling( + libnet.WithMasqueradeNetworking()..., + ) + vmi.Namespace = testsuite.GetTestNamespace(vmi) + vmi.Spec.Domain.CPU = &v1.CPU{ + Sockets: 1, + Cores: 2, + Threads: 1, + MaxSockets: 2, + } + By("Limiting the bandwidth of migrations in the test namespace") + policy := testsmig.PreparePolicyAndVMIWithBandwidthLimitation(vmi, resource.MustParse("1Ki")) + testsmig.CreateMigrationPolicy(virtClient, policy) + Eventually(func() *migrationsv1.MigrationPolicy { + policy, err := virtClient.MigrationPolicy().Get(context.Background(), policy.Name, metav1.GetOptions{}) + if err != nil { + return nil + } + return policy + }, 30*time.Second, time.Second).ShouldNot(BeNil()) + + vm := libvmi.NewVirtualMachine(vmi, libvmi.WithRunning()) + + vm, err := virtClient.VirtualMachine(vm.Namespace).Create(context.Background(), vm, metav1.CreateOptions{}) + Expect(err).ToNot(HaveOccurred()) + Eventually(ThisVM(vm), 360*time.Second, 1*time.Second).Should(BeReady()) + libwait.WaitForSuccessfulVMIStart(vmi) + + // Update the CPU number and trigger the workload update + // and migration + By("Enabling the second socket to trigger the migration update") + p, err := patch.New(patch.WithReplace("/spec/template/spec/domain/cpu/sockets", 2)).GeneratePayload() + Expect(err).NotTo(HaveOccurred()) + _, err = virtClient.VirtualMachine(vm.Namespace).Patch(context.Background(), vm.Name, types.JSONPatchType, p, k8smetav1.PatchOptions{}) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() bool { + migrations, err := virtClient.VirtualMachineInstanceMigration(vm.Namespace).List(context.Background(), k8smetav1.ListOptions{}) + Expect(err).ToNot(HaveOccurred()) + for _, mig := range migrations.Items { + if mig.Spec.VMIName == vmi.Name { + return true + } + } + return false + }, 30*time.Second, time.Second).Should(BeTrue()) + + // Add annotation to cancel the workload update + By("Patching the workload migration abortion annotation") + vmi.ObjectMeta.Annotations[v1.WorkloadUpdateMigrationAbortionAnnotation] = "" + p, err = patch.New(patch.WithAdd("/metadata/annotations", vmi.ObjectMeta.Annotations)).GeneratePayload() + Expect(err).ToNot(HaveOccurred()) + _, err = virtClient.VirtualMachineInstance(vmi.Namespace).Patch(context.Background(), vmi.Name, types.JSONPatchType, p, metav1.PatchOptions{}) + Expect(err).ToNot(HaveOccurred()) + Eventually(func() bool { + vmi, err = virtClient.VirtualMachineInstance(vm.Namespace).Get(context.Background(), vm.Name, metav1.GetOptions{}) + Expect(err).ToNot(HaveOccurred()) + return metav1.HasAnnotation(vmi.ObjectMeta, v1.WorkloadUpdateMigrationAbortionAnnotation) + }, 30*time.Second, time.Second).Should(BeTrue()) + + // Wait until the migration is cancelled by the workload + // updater + Eventually(func() bool { + migrations, err := virtClient.VirtualMachineInstanceMigration(vm.Namespace).List(context.Background(), k8smetav1.ListOptions{}) + Expect(err).ToNot(HaveOccurred()) + for _, mig := range migrations.Items { + if mig.Spec.VMIName == vmi.Name { + return true + } + } + return false + }, 30*time.Second, time.Second).Should(BeFalse()) + + }) + }) }) func patchWorkloadUpdateMethodAndRolloutStrategy(kvName string, virtClient kubecli.KubevirtClient, updateStrategy *v1.KubeVirtWorkloadUpdateStrategy, rolloutStrategy *v1.VMRolloutStrategy) {