Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workload-update: cancel a workload update migration #11641

Merged
merged 6 commits into from May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/util/migrations/migrations.go
Expand Up @@ -4,6 +4,7 @@ import (
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "kubevirt.io/api/core/v1"
"kubevirt.io/client-go/log"

Expand All @@ -24,6 +25,29 @@ func ListUnfinishedMigrations(informer cache.SharedIndexInformer) []*v1.VirtualM
return migrations
}

func ListWorkloadUpdateMigrations(store cache.Store, vmiName, ns string) []v1.VirtualMachineInstanceMigration {
objs := store.List()
migrations := []v1.VirtualMachineInstanceMigration{}
for _, obj := range objs {
migration := obj.(*v1.VirtualMachineInstanceMigration)
if migration.IsFinal() {
continue
}
if migration.Namespace != ns {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't/shouldn't this ionformer have a "namespace" index?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also add your own "vmi" index that indexes on namespace/name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mhenriks I'm not sure I understand the feedback, could you please expand?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The disruption budget's controller's migration informer is initially created here:

func (f *kubeInformerFactory) VirtualMachineInstanceMigration() cache.SharedIndexInformer {
return f.getInformer("vmimInformer", func() cache.SharedIndexInformer {
lw := cache.NewListWatchFromClient(f.restClient, "virtualmachineinstancemigrations", k8sv1.NamespaceAll, fields.Everything())
return cache.NewSharedIndexInformer(lw, &kubev1.VirtualMachineInstanceMigration{}, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
})
}

the cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} part is providing a namespace indexer. This is how it's being used:

// Get all pods from the namespace
objs, err := podIndexer.ByIndex(cache.NamespaceIndex, vmi.Namespace)

The idea is that you can provide an indexer name and a key (e.g. the namespace indexer with the my-namespace key).

what you can do is pass the indexer as an argument instead of a store, but that's not a blocker IMO

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to go further with this approach you can even add a new indexer that would get the aborted migrations and use it here 😃

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mind if I address this in a followup PR? I'd like to get this merge in order to continue with volume migration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all, as I said it's not a blocker from my side :)

continue
}
if migration.Spec.VMIName != vmiName {
continue
}
if !metav1.HasAnnotation(migration.ObjectMeta, v1.WorkloadUpdateMigrationAnnotation) {
continue
}
migrations = append(migrations, *migration)
}

return migrations
}

func FilterRunningMigrations(migrations []*v1.VirtualMachineInstanceMigration) []*v1.VirtualMachineInstanceMigration {
runningMigrations := []*v1.VirtualMachineInstanceMigration{}
for _, migration := range migrations {
Expand Down
54 changes: 49 additions & 5 deletions pkg/virt-controller/watch/workload-updater/workload-updater.go
Expand Up @@ -25,6 +25,7 @@ import (

migrationutils "kubevirt.io/kubevirt/pkg/util/migrations"

v1 "kubevirt.io/api/core/v1"
virtv1 "kubevirt.io/api/core/v1"
"kubevirt.io/client-go/kubecli"
"kubevirt.io/client-go/log"
Expand All @@ -43,6 +44,12 @@ const (
FailedEvictVirtualMachineInstanceReason = "FailedEvict"
// SuccessfulEvictVirtualMachineInstanceReason is added in an event if a deletion of a VMI Succeeds
SuccessfulEvictVirtualMachineInstanceReason = "SuccessfulEvict"
// SuccessfulChangeAbortionReason is added in an event if a deletion of a
// migration succeeds
SuccessfulChangeAbortionReason = "SuccessfulChangeAbortion"
// FailedChangeAbortionReason is added in an event if a deletion of a
// migration succeeds
FailedChangeAbortionReason = "FailedChangeAbortion"
)

// time to wait before re-enqueing when outdated VMIs are still detected
Expand Down Expand Up @@ -74,6 +81,7 @@ type updateData struct {
allOutdatedVMIs []*virtv1.VirtualMachineInstance
migratableOutdatedVMIs []*virtv1.VirtualMachineInstance
evictOutdatedVMIs []*virtv1.VirtualMachineInstance
abortChangeVMIs []*virtv1.VirtualMachineInstance

numActiveMigrations int
}
Expand Down Expand Up @@ -313,14 +321,27 @@ func (c *WorkloadUpdateController) doesRequireMigration(vmi *virtv1.VirtualMachi
if vmi.IsFinal() || migrationutils.IsMigrating(vmi) {
return false
}

if metav1.HasAnnotation(vmi.ObjectMeta, v1.WorkloadUpdateMigrationAbortionAnnotation) {
return false
}
if isHotplugInProgress(vmi) {
return true
}

return false
}

func (c *WorkloadUpdateController) shouldAbortMigration(vmi *virtv1.VirtualMachineInstance) bool {
numMig := len(migrationutils.ListWorkloadUpdateMigrations(c.migrationInformer.GetStore(), vmi.Name, vmi.Namespace))
alicefr marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
numMig := len(migrationutils.ListWorkloadUpdateMigrations(c.migrationInformer.GetStore(), vmi.Name, vmi.Namespace))
shouldMigrate := len(migrationutils.ListWorkloadUpdateMigrations(c.migrationInformer.GetStore(), vmi.Name, vmi.Namespace)) > 0

Then return it below

if metav1.HasAnnotation(vmi.ObjectMeta, virtv1.WorkloadUpdateMigrationAbortionAnnotation) {
return numMig > 0
}
if isHotplugInProgress(vmi) {
return false
}
return numMig > 0
}

func (c *WorkloadUpdateController) getUpdateData(kv *virtv1.KubeVirt) *updateData {
data := &updateData{}

Expand Down Expand Up @@ -349,10 +370,14 @@ func (c *WorkloadUpdateController) getUpdateData(kv *virtv1.KubeVirt) *updateDat
objs := c.vmiInformer.GetStore().List()
for _, obj := range objs {
vmi := obj.(*virtv1.VirtualMachineInstance)
if !vmi.IsRunning() || vmi.IsFinal() || vmi.DeletionTimestamp != nil {
switch {
case !vmi.IsRunning() || vmi.IsFinal() || vmi.DeletionTimestamp != nil:
// only consider running VMIs that aren't being shutdown
continue
} else if !c.isOutdated(vmi) && !c.doesRequireMigration(vmi) {
case c.shouldAbortMigration(vmi) && !c.isOutdated(vmi):
data.abortChangeVMIs = append(data.abortChangeVMIs, vmi)
continue
case !c.isOutdated(vmi) && !c.doesRequireMigration(vmi):
continue
}

Expand Down Expand Up @@ -453,7 +478,7 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error {
// Rather than enqueing based on VMI activity, we keep periodically poping the loop
// until all VMIs are updated. Watching all VMI activity is chatty for this controller
// when we don't need to be that efficent in how quickly the updates are being processed.
if len(data.evictOutdatedVMIs) != 0 || len(data.migratableOutdatedVMIs) != 0 {
if len(data.evictOutdatedVMIs) != 0 || len(data.migratableOutdatedVMIs) != 0 || len(data.abortChangeVMIs) != 0 {
c.queue.AddAfter(key, periodicReEnqueueIntervalSeconds)
}

Expand Down Expand Up @@ -507,7 +532,7 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error {
evictionCandidates = data.evictOutdatedVMIs[0:batchDeletionCount]
}

wgLen := len(migrationCandidates) + len(evictionCandidates)
wgLen := len(migrationCandidates) + len(evictionCandidates) + len(data.abortChangeVMIs)
wg := &sync.WaitGroup{}
wg.Add(wgLen)
errChan := make(chan error, wgLen)
Expand Down Expand Up @@ -572,6 +597,25 @@ func (c *WorkloadUpdateController) sync(kv *virtv1.KubeVirt) error {
}(vmi)
}

for _, vmi := range data.abortChangeVMIs {
go func(vmi *virtv1.VirtualMachineInstance) {
defer wg.Done()
migList := migrationutils.ListWorkloadUpdateMigrations(c.migrationInformer.GetStore(), vmi.Name, vmi.Namespace)
for _, mig := range migList {
err = c.clientset.VirtualMachineInstanceMigration(vmi.Namespace).Delete(context.Background(), mig.Name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
log.Log.Object(vmi).Reason(err).Errorf("Failed to delete the migration due to a migration abortion")
c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, FailedChangeAbortionReason, "Failed to abort change for vmi: %s: %v", vmi.Name, err)
errChan <- err
} else if err == nil {
log.Log.Infof("Delete migration %s due to an update change abortion", mig.Name)
c.recorder.Eventf(vmi, k8sv1.EventTypeNormal, SuccessfulChangeAbortionReason, "Aborted change for vmi: %s", vmi.Name)

}
}

}(vmi)
}
wg.Wait()

select {
Expand Down
Expand Up @@ -453,6 +453,105 @@ var _ = Describe("Workload Updater", func() {
})
})

Context("Abort changes due to an automated live update", func() {
alicefr marked this conversation as resolved.
Show resolved Hide resolved
createVM := func(annotation, hasChangeCondition bool) *v1.VirtualMachineInstance {
vmi := api.NewMinimalVMI("testvm")
vmi.Namespace = k8sv1.NamespaceDefault
vmi.Status.Phase = v1.Running
vmi.Status.Conditions = append(vmi.Status.Conditions, v1.VirtualMachineInstanceCondition{
Type: v1.VirtualMachineInstanceIsMigratable, Status: k8sv1.ConditionTrue})
if hasChangeCondition {
vmi.Status.Conditions = append(vmi.Status.Conditions, v1.VirtualMachineInstanceCondition{
Type: v1.VirtualMachineInstanceMemoryChange,
Status: k8sv1.ConditionTrue,
})
}
if annotation {
vmi.ObjectMeta.Annotations = make(map[string]string)
vmi.ObjectMeta.Annotations[v1.WorkloadUpdateMigrationAbortionAnnotation] = ""
}
vmiSource.Add(vmi)
return vmi
}
createMig := func(vmiName string, phase v1.VirtualMachineInstanceMigrationPhase) *v1.VirtualMachineInstanceMigration {
mig := newMigration("test", vmiName, phase)
mig.Annotations = map[string]string{v1.WorkloadUpdateMigrationAnnotation: ""}
migrationFeeder.Add(mig)
return mig
}

BeforeEach(func() {
kv := newKubeVirt(0)
kv.Spec.WorkloadUpdateStrategy.WorkloadUpdateMethods = []v1.WorkloadUpdateMethod{v1.WorkloadUpdateMethodLiveMigrate}
addKubeVirt(kv)
})

DescribeTable("should delete the migration", func(phase v1.VirtualMachineInstanceMigrationPhase) {
alicefr marked this conversation as resolved.
Show resolved Hide resolved
vmi := createVM(false, false)
mig := createMig(vmi.Name, phase)

if !mig.IsFinal() {
migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil)
}

controller.Execute()
if mig.IsFinal() {
Expect(recorder.Events).To(BeEmpty())
} else {
testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason)
}
},
Entry("in running phase", v1.MigrationRunning),
Entry("in failed phase", v1.MigrationFailed),
Entry("in succeeded phase", v1.MigrationSucceeded),
)

DescribeTable("should handle", func(hasCond, hasMig bool) {
vmi := createVM(false, hasCond)
if hasCond {
kubeVirtInterface.EXPECT().PatchStatus(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
}
var mig *v1.VirtualMachineInstanceMigration
if hasMig {
mig = createMig(vmi.Name, v1.MigrationRunning)
}
changeAborted := hasMig && !hasCond
if changeAborted {
migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil)
}
controller.Execute()
if changeAborted {
testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason)
} else {
Expect(recorder.Events).To(BeEmpty())
}
},
Entry("a in progress change update", true, true),
Entry("a change abortion", false, true),
Entry("no change in progress", false, false),
)

DescribeTable("should always cancel the migration when the testWorkloadUpdateMigrationAbortion annotation is present", func(hasCond bool) {
vmi := createVM(true, hasCond)
mig := createMig(vmi.Name, v1.MigrationRunning)
migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(nil)
controller.Execute()
testutils.ExpectEvent(recorder, SuccessfulChangeAbortionReason)
},
Entry("with the change condition", true),
Entry("without the change condition", false),
)

It("should return an error if the migration hasn't been deleted", func() {
vmi := createVM(true, false)
mig := createMig(vmi.Name, v1.MigrationRunning)
migrationInterface.EXPECT().Delete(gomock.Any(), mig.Name, metav1.DeleteOptions{}).Return(fmt.Errorf("some error"))

controller.Execute()
testutils.ExpectEvent(recorder, FailedChangeAbortionReason)
})
})

AfterEach(func() {

close(stop)
Expand Down
3 changes: 3 additions & 0 deletions staging/src/kubevirt.io/api/core/v1/types.go
Expand Up @@ -853,6 +853,9 @@ const (
// This annotation indicates that a migration is the result of an
// automated workload update
WorkloadUpdateMigrationAnnotation string = "kubevirt.io/workloadUpdateMigration"
// This annotation indicates to abort any migration due to an automated
// workload update. It should only be used for testing purposes.
WorkloadUpdateMigrationAbortionAnnotation string = "kubevirt.io/testWorkloadUpdateMigrationAbortion"
// This label declares whether a particular node is available for
// scheduling virtual machine instances on it. Used on Node.
NodeSchedulable string = "kubevirt.io/schedulable"
Expand Down
1 change: 1 addition & 0 deletions tests/hotplug/BUILD.bazel
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/libvmi:go_default_library",
"//pkg/pointer:go_default_library",
"//staging/src/kubevirt.io/api/core/v1:go_default_library",
"//staging/src/kubevirt.io/api/migrations/v1alpha1:go_default_library",
"//staging/src/kubevirt.io/client-go/kubecli:go_default_library",
"//tests:go_default_library",
"//tests/console:go_default_library",
Expand Down
82 changes: 81 additions & 1 deletion tests/hotplug/cpu.go
Expand Up @@ -8,9 +8,10 @@ import (

"kubevirt.io/kubevirt/tests/libnet"

migrationsv1 "kubevirt.io/api/migrations/v1alpha1"

"kubevirt.io/kubevirt/pkg/libvmi"
"kubevirt.io/kubevirt/pkg/pointer"

"kubevirt.io/kubevirt/tests/framework/checks"

"kubevirt.io/kubevirt/tests/libmigration"
Expand Down Expand Up @@ -38,6 +39,7 @@ import (
. "kubevirt.io/kubevirt/tests/framework/matcher"
"kubevirt.io/kubevirt/tests/libpod"
"kubevirt.io/kubevirt/tests/libwait"
testsmig "kubevirt.io/kubevirt/tests/migration"
)

var _ = Describe("[sig-compute][Serial]CPU Hotplug", decorators.SigCompute, decorators.SigComputeMigrations, decorators.RequiresTwoSchedulableNodes, decorators.VMLiveUpdateFeaturesGate, Serial, func() {
Expand Down Expand Up @@ -260,6 +262,84 @@ var _ = Describe("[sig-compute][Serial]CPU Hotplug", decorators.SigCompute, deco
Expect(reqCpu).To(Equal(expCpu.Value()))
})
})

Context("Abort CPU change", func() {
It("should cancel the automated workload update", func() {
vmi := libvmifact.NewAlpineWithTestTooling(
libnet.WithMasqueradeNetworking()...,
)
vmi.Namespace = testsuite.GetTestNamespace(vmi)
vmi.Spec.Domain.CPU = &v1.CPU{
Sockets: 1,
Cores: 2,
Threads: 1,
MaxSockets: 2,
}
By("Limiting the bandwidth of migrations in the test namespace")
policy := testsmig.PreparePolicyAndVMIWithBandwidthLimitation(vmi, resource.MustParse("1Ki"))
testsmig.CreateMigrationPolicy(virtClient, policy)
Eventually(func() *migrationsv1.MigrationPolicy {
policy, err := virtClient.MigrationPolicy().Get(context.Background(), policy.Name, metav1.GetOptions{})
if err != nil {
return nil
}
return policy
}, 30*time.Second, time.Second).ShouldNot(BeNil())

vm := libvmi.NewVirtualMachine(vmi, libvmi.WithRunning())

vm, err := virtClient.VirtualMachine(vm.Namespace).Create(context.Background(), vm, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
Eventually(ThisVM(vm), 360*time.Second, 1*time.Second).Should(BeReady())
libwait.WaitForSuccessfulVMIStart(vmi)

// Update the CPU number and trigger the workload update
// and migration
By("Enabling the second socket to trigger the migration update")
p, err := patch.New(patch.WithReplace("/spec/template/spec/domain/cpu/sockets", 2)).GeneratePayload()
Expect(err).NotTo(HaveOccurred())
_, err = virtClient.VirtualMachine(vm.Namespace).Patch(context.Background(), vm.Name, types.JSONPatchType, p, k8smetav1.PatchOptions{})
Expect(err).ToNot(HaveOccurred())

Eventually(func() bool {
migrations, err := virtClient.VirtualMachineInstanceMigration(vm.Namespace).List(context.Background(), k8smetav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
for _, mig := range migrations.Items {
if mig.Spec.VMIName == vmi.Name {
return true
}
}
return false
}, 30*time.Second, time.Second).Should(BeTrue())

// Add annotation to cancel the workload update
By("Patching the workload migration abortion annotation")
vmi.ObjectMeta.Annotations[v1.WorkloadUpdateMigrationAbortionAnnotation] = ""
p, err = patch.New(patch.WithAdd("/metadata/annotations", vmi.ObjectMeta.Annotations)).GeneratePayload()
Expect(err).ToNot(HaveOccurred())
_, err = virtClient.VirtualMachineInstance(vmi.Namespace).Patch(context.Background(), vmi.Name, types.JSONPatchType, p, metav1.PatchOptions{})
Expect(err).ToNot(HaveOccurred())
Eventually(func() bool {
vmi, err = virtClient.VirtualMachineInstance(vm.Namespace).Get(context.Background(), vm.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return metav1.HasAnnotation(vmi.ObjectMeta, v1.WorkloadUpdateMigrationAbortionAnnotation)
}, 30*time.Second, time.Second).Should(BeTrue())

// Wait until the migration is cancelled by the workload
// updater
Eventually(func() bool {
migrations, err := virtClient.VirtualMachineInstanceMigration(vm.Namespace).List(context.Background(), k8smetav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
for _, mig := range migrations.Items {
if mig.Spec.VMIName == vmi.Name {
return true
}
}
return false
}, 30*time.Second, time.Second).Should(BeFalse())

})
})
})

func patchWorkloadUpdateMethodAndRolloutStrategy(kvName string, virtClient kubecli.KubevirtClient, updateStrategy *v1.KubeVirtWorkloadUpdateStrategy, rolloutStrategy *v1.VMRolloutStrategy) {
Expand Down