From 7cbd05cfcfb54415e453f9ef13f1532e2193f750 Mon Sep 17 00:00:00 2001 From: Alice Frosi Date: Tue, 9 Apr 2024 15:01:18 +0200 Subject: [PATCH 1/6] api: add WorkloadUpdateMigrationAbortionAnnotation This annotation enables the deletion of a migration due to a workload update of the VMI. The main use of this annotation is to test the abortion of an update. The users should also remove manually this annotation once the update has been aborted. Signed-off-by: Alice Frosi --- staging/src/kubevirt.io/api/core/v1/types.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/staging/src/kubevirt.io/api/core/v1/types.go b/staging/src/kubevirt.io/api/core/v1/types.go index db6a4341a39e..75caca7c83f3 100644 --- a/staging/src/kubevirt.io/api/core/v1/types.go +++ b/staging/src/kubevirt.io/api/core/v1/types.go @@ -853,6 +853,9 @@ const ( // This annotation indicates that a migration is the result of an // automated workload update WorkloadUpdateMigrationAnnotation string = "kubevirt.io/workloadUpdateMigration" + // This annotation indicates to abort any migration due to an automated + // workload update. It should only be used for testing purposes. + WorkloadUpdateMigrationAbortionAnnotation string = "kubevirt.io/testWorkloadUpdateMigrationAbortion" // This label declares whether a particular node is available for // scheduling virtual machine instances on it. Used on Node. NodeSchedulable string = "kubevirt.io/schedulable" From 4c1ee821c5ec85694c08c9e5910018479213a02e Mon Sep 17 00:00:00 2001 From: Alice Frosi Date: Wed, 3 Apr 2024 13:58:33 +0200 Subject: [PATCH 2/6] migrations: add ListWorkloadUpdateMigrations The function ListWorkloadUpdateMigrations is an helper function that returns the not finalized migrations due to a workload update. Signed-off-by: Alice Frosi --- pkg/util/migrations/migrations.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pkg/util/migrations/migrations.go b/pkg/util/migrations/migrations.go index b290cab2ef84..31c002ead382 100644 --- a/pkg/util/migrations/migrations.go +++ b/pkg/util/migrations/migrations.go @@ -4,6 +4,7 @@ import ( v12 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "kubevirt.io/api/core/v1" "kubevirt.io/client-go/log" @@ -24,6 +25,29 @@ func ListUnfinishedMigrations(informer cache.SharedIndexInformer) []*v1.VirtualM return migrations } +func ListWorkloadUpdateMigrations(store cache.Store, vmiName, ns string) []v1.VirtualMachineInstanceMigration { + objs := store.List() + migrations := []v1.VirtualMachineInstanceMigration{} + for _, obj := range objs { + migration := obj.(*v1.VirtualMachineInstanceMigration) + if migration.IsFinal() { + continue + } + if migration.Namespace != ns { + continue + } + if migration.Spec.VMIName != vmiName { + continue + } + if !metav1.HasAnnotation(migration.ObjectMeta, v1.WorkloadUpdateMigrationAnnotation) { + continue + } + migrations = append(migrations, *migration) + } + + return migrations +} + func FilterRunningMigrations(migrations []*v1.VirtualMachineInstanceMigration) []*v1.VirtualMachineInstanceMigration { runningMigrations := []*v1.VirtualMachineInstanceMigration{} for _, migration := range migrations { From e77f709076567adfaead4816ca4b7c837784a287 Mon Sep 17 00:00:00 2001 From: Alice Frosi Date: Mon, 22 Apr 2024 14:50:01 +0200 Subject: [PATCH 3/6] workload-updater: ignore changes with the abortion annotation The testWorkloadUpdateMigrationAbortionAnnotation cancels the migrations due to a workload update. If a VMI has this annotation, new changes shouldn't be considered until this annotation is removed. Signed-off-by: Alice Frosi --- .../watch/workload-updater/workload-updater.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/virt-controller/watch/workload-updater/workload-updater.go b/pkg/virt-controller/watch/workload-updater/workload-updater.go index feb06a6b7eb9..ac0f0592dbd1 100644 --- a/pkg/virt-controller/watch/workload-updater/workload-updater.go +++ b/pkg/virt-controller/watch/workload-updater/workload-updater.go @@ -25,6 +25,7 @@ import ( migrationutils "kubevirt.io/kubevirt/pkg/util/migrations" + v1 "kubevirt.io/api/core/v1" virtv1 "kubevirt.io/api/core/v1" "kubevirt.io/client-go/kubecli" "kubevirt.io/client-go/log" @@ -313,7 +314,9 @@ func (c *WorkloadUpdateController) doesRequireMigration(vmi *virtv1.VirtualMachi if vmi.IsFinal() || migrationutils.IsMigrating(vmi) { return false } - + if metav1.HasAnnotation(vmi.ObjectMeta, v1.WorkloadUpdateMigrationAbortionAnnotation) { + return false + } if isHotplugInProgress(vmi) { return true } From be31f56df104aa999f3dbff95ec4dd13328fc472 Mon Sep 17 00:00:00 2001 From: Alice Frosi Date: Wed, 3 Apr 2024 14:01:20 +0200 Subject: [PATCH 4/6] workload-updater: cancel a workload update migration For certain workload updates, we might desire to cancel a migration triggred by the update. The workload updater checks if the VMI has a VirtualMachineInstance*Change, if the conditions aren't present but there is a migration associated with an automated update, then the migration will be aborted. Signed-off-by: Alice Frosi --- .../workload-updater/workload-updater.go | 49 +++++++++++++++++-- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/pkg/virt-controller/watch/workload-updater/workload-updater.go b/pkg/virt-controller/watch/workload-updater/workload-updater.go index ac0f0592dbd1..97a231d5074e 100644 --- a/pkg/virt-controller/watch/workload-updater/workload-updater.go +++ b/pkg/virt-controller/watch/workload-updater/workload-updater.go @@ -44,6 +44,12 @@ const ( FailedEvictVirtualMachineInstanceReason = "FailedEvict" // SuccessfulEvictVirtualMachineInstanceReason is added in an event if a deletion of a VMI Succeeds SuccessfulEvictVirtualMachineInstanceReason = "SuccessfulEvict" + // SuccessfulChangeAbortionReason is added in an event if a deletion of a + // migration succeeds + SuccessfulChangeAbortionReason = "SuccessfulChangeAbortion" + // FailedChangeAbortionReason is added in an event if a deletion of a + // migration succeeds + FailedChangeAbortionReason = "FailedChangeAbortion" ) // time to wait before re-enqueing when outdated VMIs are still detected @@ -75,6 +81,7 @@ type updateData struct { allOutdatedVMIs []*virtv1.VirtualMachineInstance migratableOutdatedVMIs []*virtv1.VirtualMachineInstance evictOutdatedVMIs []*virtv1.VirtualMachineInstance + abortChangeVMIs []*virtv1.VirtualMachineInstance numActiveMigrations int } @@ -324,6 +331,17 @@ func (c *WorkloadUpdateController) doesRequireMigration(vmi *virtv1.VirtualMachi return false } +func (c *WorkloadUpdateController) shouldAbortMigration(vmi *virtv1.VirtualMachineInstance) bool { + numMig := len(migrationutils.ListWorkloadUpdateMigrations(c.migrationInformer.GetStore(), vmi.Name, vmi.Namespace)) + if metav1.HasAnnotation(vmi.ObjectMeta, virtv1.WorkloadUpdateMigrationAbortionAnnotation) { + return numMig > 0 + } + if isHotplugInProgress(vmi) { + return false + } + return numMig > 0 +} + func (c *WorkloadUpdateController) getUpdateData(kv *virtv1.KubeVirt) *updateData { data := &updateData{} @@ -352,10 +370,14 @@ func (c *WorkloadUpdateController) getUpdateData(kv *virtv1.KubeVirt) *updateDat objs := c.vmiInformer.GetStore().List() for _, obj := range objs { vmi := obj.(*virtv1.VirtualMachineInstance) - if !vmi.IsRunning() || vmi.IsFinal() || vmi.DeletionTimestamp != nil { + switch { + case !vmi.IsRunning() || vmi.IsFinal() || vmi.DeletionTimestamp != nil: // only consider running VMIs that aren't being shutdown continue - } else if !c.isOutdated(vmi) && !c.doesRequireMigration(vmi) { + case c.shouldAbortMigration(vmi) && !c.isOutdated(vmi): + data.abortChangeVMIs = append(data.abortChangeVMIs, vmi) + continue + case !c.isOutdated(vmi) && !c.doesRequireMigration(vmi): continue } @@ -456,7 +478,7 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error { // Rather than enqueing based on VMI activity, we keep periodically poping the loop // until all VMIs are updated. Watching all VMI activity is chatty for this controller // when we don't need to be that efficent in how quickly the updates are being processed. - if len(data.evictOutdatedVMIs) != 0 || len(data.migratableOutdatedVMIs) != 0 { + if len(data.evictOutdatedVMIs) != 0 || len(data.migratableOutdatedVMIs) != 0 || len(data.abortChangeVMIs) != 0 { c.queue.AddAfter(key, periodicReEnqueueIntervalSeconds) } @@ -510,7 +532,7 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error { evictionCandidates = data.evictOutdatedVMIs[0:batchDeletionCount] } - wgLen := len(migrationCandidates) + len(evictionCandidates) + wgLen := len(migrationCandidates) + len(evictionCandidates) + len(data.abortChangeVMIs) wg := &sync.WaitGroup{} wg.Add(wgLen) errChan := make(chan error, wgLen) @@ -575,6 +597,25 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error { }(vmi) } + for _, vmi := range data.abortChangeVMIs { + go func(vmi *virtv1.VirtualMachineInstance) { + defer wg.Done() + migList := migrationutils.ListWorkloadUpdateMigrations(c.migrationInformer.GetStore(), vmi.Name, vmi.Namespace) + for _, mig := range migList { + err = c.clientset.VirtualMachineInstanceMigration(vmi.Namespace).Delete(context.Background(), mig.Name, metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + log.Log.Object(vmi).Reason(err).Errorf("Failed to delete the migration due to a migration abortion") + c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, FailedChangeAbortionReason, "Failed to abort change for vmi: %s: %v", vmi.Name, err) + errChan <- err + } else if err == nil { + log.Log.Infof("Delete migration %s due to an update change abortion", mig.Name) + c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, SuccessfulChangeAbortionReason, "Aborted change for vmi: %s", vmi.Name) + + } + } + + }(vmi) + } wg.Wait() select { From a60b7267a13af60facb2ba7b99f2d9f5b266856f Mon Sep 17 00:00:00 2001 From: Alice Frosi Date: Wed, 3 Apr 2024 14:03:55 +0200 Subject: [PATCH 5/6] workload-updater: unit test for update abortion Signed-off-by: Alice Frosi --- .../workload-updater/workload-updater_test.go | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/pkg/virt-controller/watch/workload-updater/workload-updater_test.go b/pkg/virt-controller/watch/workload-updater/workload-updater_test.go index 696cb8ce3eaa..754ea7603d74 100644 --- a/pkg/virt-controller/watch/workload-updater/workload-updater_test.go +++ b/pkg/virt-controller/watch/workload-updater/workload-updater_test.go @@ -453,6 +453,105 @@ var _ = Describe("Workload Updater", func() { }) }) + Context("Abort changes due to an automated live update", func() { + createVM := func(annotation, hasChangeCondition bool) *v1.VirtualMachineInstance { + vmi := api.NewMinimalVMI("testvm") + vmi.Namespace = k8sv1.NamespaceDefault + vmi.Status.Phase = v1.Running + vmi.Status.Conditions = append(vmi.Status.Conditions, v1.VirtualMachineInstanceCondition{ + Type: v1.VirtualMachineInstanceIsMigratable, Status: k8sv1.ConditionTrue}) + if hasChangeCondition { + vmi.Status.Conditions = append(vmi.Status.Conditions, v1.VirtualMachineInstanceCondition{ + Type: v1.VirtualMachineInstanceMemoryChange, + Status: k8sv1.ConditionTrue, + }) + } + if annotation { + vmi.ObjectMeta.Annotations = make(map[string]string) + vmi.ObjectMeta.Annotations[v1.WorkloadUpdateMigrationAbortionAnnotation] = "" + } + vmiSource.Add(vmi) + return vmi + } + createMig := func(vmiName string, phase v1.VirtualMachineInstanceMigrationPhase) *v1.VirtualMachineInstanceMigration { + mig := newMigration("test", vmiName, phase) + mig.Annotations = map[string]string{v1.WorkloadUpdateMigrationAnnotation: ""} + migrationFeeder.Add(mig) + return mig + } + + BeforeEach(func() { + kv := newKubeVirt(0) + kv.Spec.WorkloadUpdateStrategy.WorkloadUpdateMethods = []v1.WorkloadUpdateMethod{v1.WorkloadUpdateMethodLiveMigrate} + addKubeVirt(kv) + }) + + DescribeTable("should delete the migration", func(phase v1.VirtualMachineInstanceMigrationPhase) { + vmi := createVM(false, false) + mig := createMig(vmi.Name, phase) + + if !mig.IsFinal() { + migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil) + } + + controller.Execute() + if mig.IsFinal() { + Expect(recorder.Events).To(BeEmpty()) + } else { + testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason) + } + }, + Entry("in running phase", v1.MigrationRunning), + Entry("in failed phase", v1.MigrationFailed), + Entry("in succeeded phase", v1.MigrationSucceeded), + ) + + DescribeTable("should handle", func(hasCond, hasMig bool) { + vmi := createVM(false, hasCond) + if hasCond { + kubeVirtInterface.EXPECT().PatchStatus(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + } + var mig *v1.VirtualMachineInstanceMigration + if hasMig { + mig = createMig(vmi.Name, v1.MigrationRunning) + } + changeAborted := hasMig && !hasCond + if changeAborted { + migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil) + } + controller.Execute() + if changeAborted { + testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason) + } else { + Expect(recorder.Events).To(BeEmpty()) + } + }, + Entry("a in progress change update", true, true), + Entry("a change abortion", false, true), + Entry("no change in progress", false, false), + ) + + DescribeTable("should always cancel the migration when the testWorkloadUpdateMigrationAbortion annotation is present", func(hasCond bool) { + vmi := createVM(true, hasCond) + mig := createMig(vmi.Name, v1.MigrationRunning) + migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil) + controller.Execute() + testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason) + }, + Entry("with the change condition", true), + Entry("without the change condition", false), + ) + + It("should return an error if the migration hasn't been deleted", func() { + vmi := createVM(true, false) + mig := createMig(vmi.Name, v1.MigrationRunning) + migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(fmt.Errorf("some error")) + + controller.Execute() + testutils.ExpectEvent(recorder, FailedChangeAbortionReason) + }) + }) + AfterEach(func() { close(stop) From 4dfde6f41a9a4603c8f8764d5ee0f8777e54ad87 Mon Sep 17 00:00:00 2001 From: Alice Frosi Date: Tue, 9 Apr 2024 15:59:12 +0200 Subject: [PATCH 6/6] tests: test migration abortion with CPU hotplug Add functional test to verify the abortion of the migration during the CPU update. The migration cancellation is triggered by the annotation kubevirt.io/testWorkloadUpdateMigrationAbortion on the VMI. Signed-off-by: Alice Frosi --- tests/hotplug/BUILD.bazel | 1 + tests/hotplug/cpu.go | 82 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/tests/hotplug/BUILD.bazel b/tests/hotplug/BUILD.bazel index 5ba3ffbbc64d..2e2cdc3012a6 100644 --- a/tests/hotplug/BUILD.bazel +++ b/tests/hotplug/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/libvmi:go_default_library", "//pkg/pointer:go_default_library", "//staging/src/kubevirt.io/api/core/v1:go_default_library", + "//staging/src/kubevirt.io/api/migrations/v1alpha1:go_default_library", "//staging/src/kubevirt.io/client-go/kubecli:go_default_library", "//tests:go_default_library", "//tests/console:go_default_library", diff --git a/tests/hotplug/cpu.go b/tests/hotplug/cpu.go index 37f971a470e7..46375e755e5f 100644 --- a/tests/hotplug/cpu.go +++ b/tests/hotplug/cpu.go @@ -8,9 +8,10 @@ import ( "kubevirt.io/kubevirt/tests/libnet" + migrationsv1 "kubevirt.io/api/migrations/v1alpha1" + "kubevirt.io/kubevirt/pkg/libvmi" "kubevirt.io/kubevirt/pkg/pointer" - "kubevirt.io/kubevirt/tests/framework/checks" "kubevirt.io/kubevirt/tests/libmigration" @@ -38,6 +39,7 @@ import ( . "kubevirt.io/kubevirt/tests/framework/matcher" "kubevirt.io/kubevirt/tests/libpod" "kubevirt.io/kubevirt/tests/libwait" + testsmig "kubevirt.io/kubevirt/tests/migration" ) var _ = Describe("[sig-compute][Serial]CPU Hotplug", decorators.SigCompute, decorators.SigComputeMigrations, decorators.RequiresTwoSchedulableNodes, decorators.VMLiveUpdateFeaturesGate, Serial, func() { @@ -260,6 +262,84 @@ var _ = Describe("[sig-compute][Serial]CPU Hotplug", decorators.SigCompute, deco Expect(reqCpu).To(Equal(expCpu.Value())) }) }) + + Context("Abort CPU change", func() { + It("should cancel the automated workload update", func() { + vmi := libvmifact.NewAlpineWithTestTooling( + libnet.WithMasqueradeNetworking()..., + ) + vmi.Namespace = testsuite.GetTestNamespace(vmi) + vmi.Spec.Domain.CPU = &v1.CPU{ + Sockets: 1, + Cores: 2, + Threads: 1, + MaxSockets: 2, + } + By("Limiting the bandwidth of migrations in the test namespace") + policy := testsmig.PreparePolicyAndVMIWithBandwidthLimitation(vmi, resource.MustParse("1Ki")) + testsmig.CreateMigrationPolicy(virtClient, policy) + Eventually(func() *migrationsv1.MigrationPolicy { + policy, err := virtClient.MigrationPolicy().Get(context.Background(), policy.Name, metav1.GetOptions{}) + if err != nil { + return nil + } + return policy + }, 30*time.Second, time.Second).ShouldNot(BeNil()) + + vm := libvmi.NewVirtualMachine(vmi, libvmi.WithRunning()) + + vm, err := virtClient.VirtualMachine(vm.Namespace).Create(context.Background(), vm, metav1.CreateOptions{}) + Expect(err).ToNot(HaveOccurred()) + Eventually(ThisVM(vm), 360*time.Second, 1*time.Second).Should(BeReady()) + libwait.WaitForSuccessfulVMIStart(vmi) + + // Update the CPU number and trigger the workload update + // and migration + By("Enabling the second socket to trigger the migration update") + p, err := patch.New(patch.WithReplace("/spec/template/spec/domain/cpu/sockets", 2)).GeneratePayload() + Expect(err).NotTo(HaveOccurred()) + _, err = virtClient.VirtualMachine(vm.Namespace).Patch(context.Background(), vm.Name, types.JSONPatchType, p, k8smetav1.PatchOptions{}) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() bool { + migrations, err := virtClient.VirtualMachineInstanceMigration(vm.Namespace).List(context.Background(), k8smetav1.ListOptions{}) + Expect(err).ToNot(HaveOccurred()) + for _, mig := range migrations.Items { + if mig.Spec.VMIName == vmi.Name { + return true + } + } + return false + }, 30*time.Second, time.Second).Should(BeTrue()) + + // Add annotation to cancel the workload update + By("Patching the workload migration abortion annotation") + vmi.ObjectMeta.Annotations[v1.WorkloadUpdateMigrationAbortionAnnotation] = "" + p, err = patch.New(patch.WithAdd("/metadata/annotations", vmi.ObjectMeta.Annotations)).GeneratePayload() + Expect(err).ToNot(HaveOccurred()) + _, err = virtClient.VirtualMachineInstance(vmi.Namespace).Patch(context.Background(), vmi.Name, types.JSONPatchType, p, metav1.PatchOptions{}) + Expect(err).ToNot(HaveOccurred()) + Eventually(func() bool { + vmi, err = virtClient.VirtualMachineInstance(vm.Namespace).Get(context.Background(), vm.Name, metav1.GetOptions{}) + Expect(err).ToNot(HaveOccurred()) + return metav1.HasAnnotation(vmi.ObjectMeta, v1.WorkloadUpdateMigrationAbortionAnnotation) + }, 30*time.Second, time.Second).Should(BeTrue()) + + // Wait until the migration is cancelled by the workload + // updater + Eventually(func() bool { + migrations, err := virtClient.VirtualMachineInstanceMigration(vm.Namespace).List(context.Background(), k8smetav1.ListOptions{}) + Expect(err).ToNot(HaveOccurred()) + for _, mig := range migrations.Items { + if mig.Spec.VMIName == vmi.Name { + return true + } + } + return false + }, 30*time.Second, time.Second).Should(BeFalse()) + + }) + }) }) func patchWorkloadUpdateMethodAndRolloutStrategy(kvName string, virtClient kubecli.KubevirtClient, updateStrategy *v1.KubeVirtWorkloadUpdateStrategy, rolloutStrategy *v1.VMRolloutStrategy) {