Permalink
Comparing changes
Open a pull request
- 5 commits
- 3 files changed
- 0 commit comments
- 2 contributors
Unified
Split
Showing
with
135 additions
and 16 deletions.
- +66 −10 pkg/virt-controller/watch/replicaset.go
- +33 −4 pkg/virt-controller/watch/replicaset_test.go
- +36 −2 tests/replicaset_test.go
| @@ -185,8 +185,6 @@ func (c *VMIReplicaSet) execute(key string) error { | ||
| return err | ||
| } | ||
| vmis = c.filterActiveVMIs(vmis) | ||
| // If any adoptions are attempted, we should first recheck for deletion with | ||
| // an uncached quorum read sometime after listing VirtualMachines (see kubernetes/kubernetes#42639). | ||
| canAdoptFunc := controller.RecheckDeletionTimestamp(func() (v1.Object, error) { | ||
| @@ -205,24 +203,29 @@ func (c *VMIReplicaSet) execute(key string) error { | ||
| return err | ||
| } | ||
| finishedVmis := c.filterFinishedVMIs(vmis) | ||
| activeVmis := c.filterActiveVMIs(vmis) | ||
| var scaleErr error | ||
| // Scale up or down, if all expected creates and deletes were report by the listener | ||
| if needsSync && !rs.Spec.Paused && rs.ObjectMeta.DeletionTimestamp == nil { | ||
| scaleErr = c.scale(rs, vmis) | ||
| scaleErr = c.scale(rs, activeVmis) | ||
| if len(finishedVmis) > 0 && scaleErr == nil { | ||
| scaleErr = c.cleanFinishedVmis(rs, finishedVmis) | ||
| } | ||
| } | ||
| // If the controller is going to be deleted and the orphan finalizer is the next one, release the VMIs. Don't update the status | ||
| // TODO: Workaround for https://github.com/kubernetes/kubernetes/issues/56348, remove it once it is fixed | ||
| if rs.ObjectMeta.DeletionTimestamp != nil && controller.HasFinalizer(rs, v1.FinalizerOrphanDependents) { | ||
| return c.orphan(cm, rs, vmis) | ||
| return c.orphan(cm, rs, activeVmis) | ||
| } | ||
| if scaleErr != nil { | ||
| logger.Reason(err).Error("Scaling the replicaset failed.") | ||
| } | ||
| err = c.updateStatus(rs.DeepCopy(), vmis, scaleErr) | ||
| err = c.updateStatus(rs.DeepCopy(), activeVmis, scaleErr) | ||
| if err != nil { | ||
| logger.Reason(err).Error("Updating the replicaset status failed.") | ||
| } | ||
| @@ -258,8 +261,9 @@ func (c *VMIReplicaSet) orphan(cm *controller.VirtualMachineControllerRefManager | ||
| } | ||
| func (c *VMIReplicaSet) scale(rs *virtv1.VirtualMachineInstanceReplicaSet, vmis []*virtv1.VirtualMachineInstance) error { | ||
| log.Log.V(4).Object(rs).Info("Scale") | ||
| diff := c.calcDiff(rs, vmis) | ||
| rsKey, err := controller.KeyFunc(rs) | ||
| if err != nil { | ||
| log.Log.Object(rs).Reason(err).Error("Failed to extract rsKey from replicaset.") | ||
| @@ -280,6 +284,7 @@ func (c *VMIReplicaSet) scale(rs *virtv1.VirtualMachineInstanceReplicaSet, vmis | ||
| wg.Add(abs(diff)) | ||
| if diff > 0 { | ||
| log.Log.V(4).Object(rs).Info("Delete excess VM's") | ||
| // We have to delete VMIs, use a very simple selection strategy for now | ||
| // TODO: Possible deletion order: not yet running VMIs < migrating VMIs < other | ||
| deleteCandidates := vmis[0:diff] | ||
| @@ -302,6 +307,7 @@ func (c *VMIReplicaSet) scale(rs *virtv1.VirtualMachineInstanceReplicaSet, vmis | ||
| } | ||
| } else if diff < 0 { | ||
| log.Log.V(4).Object(rs).Info("Add missing VM's") | ||
| // We have to create VMIs | ||
| c.expectations.ExpectCreations(rsKey, abs(diff)) | ||
| basename := c.getVirtualMachineBaseName(rs) | ||
| @@ -341,7 +347,7 @@ func (c *VMIReplicaSet) scale(rs *virtv1.VirtualMachineInstanceReplicaSet, vmis | ||
| // filterActiveVMIs takes a list of VMIs and returns all VMIs which are not in a final state | ||
| func (c *VMIReplicaSet) filterActiveVMIs(vmis []*virtv1.VirtualMachineInstance) []*virtv1.VirtualMachineInstance { | ||
| return filter(vmis, func(vmi *virtv1.VirtualMachineInstance) bool { | ||
| return !vmi.IsFinal() | ||
| return !vmi.IsFinal() && vmi.DeletionTimestamp == nil | ||
| }) | ||
| } | ||
| @@ -352,6 +358,13 @@ func (c *VMIReplicaSet) filterReadyVMIs(vmis []*virtv1.VirtualMachineInstance) [ | ||
| }) | ||
| } | ||
| // filterFinishedVMIs takes a list of VMIs and returns all VMIs which are in final state. | ||
| func (c *VMIReplicaSet) filterFinishedVMIs(vmis []*virtv1.VirtualMachineInstance) []*virtv1.VirtualMachineInstance { | ||
| return filter(vmis, func(vmi *virtv1.VirtualMachineInstance) bool { | ||
| return vmi.IsFinal() && vmi.DeletionTimestamp == nil | ||
| }) | ||
| } | ||
| func filter(vmis []*virtv1.VirtualMachineInstance, f func(vmi *virtv1.VirtualMachineInstance) bool) []*virtv1.VirtualMachineInstance { | ||
| filtered := []*virtv1.VirtualMachineInstance{} | ||
| for _, vmi := range vmis { | ||
| @@ -632,9 +645,7 @@ func (c *VMIReplicaSet) removeCondition(rs *virtv1.VirtualMachineInstanceReplica | ||
| } | ||
| func (c *VMIReplicaSet) updateStatus(rs *virtv1.VirtualMachineInstanceReplicaSet, vmis []*virtv1.VirtualMachineInstance, scaleErr error) error { | ||
| diff := c.calcDiff(rs, vmis) | ||
| readyReplicas := int32(len(c.filterReadyVMIs(vmis))) | ||
| // check if we have reached the equilibrium | ||
| @@ -774,3 +785,48 @@ func (c *VMIReplicaSet) resolveControllerRef(namespace string, controllerRef *v1 | ||
| } | ||
| return rs.(*virtv1.VirtualMachineInstanceReplicaSet) | ||
| } | ||
| func (c *VMIReplicaSet) cleanFinishedVmis(rs *virtv1.VirtualMachineInstanceReplicaSet, vmis []*virtv1.VirtualMachineInstance) error { | ||
| rsKey, err := controller.KeyFunc(rs) | ||
| if err != nil { | ||
| log.Log.Object(rs).Reason(err).Error("Failed to extract rsKey from replicaset.") | ||
| return nil | ||
| } | ||
| diff := limit(len(vmis), c.burstReplicas) | ||
| // Every delete request can fail, give the channel enough room, to not block the go routines | ||
| errChan := make(chan error, abs(diff)) | ||
| var wg sync.WaitGroup | ||
| wg.Add(abs(diff)) | ||
| log.Log.V(4).Object(rs).Info("Delete finished VM's") | ||
| deleteCandidates := vmis[0:diff] | ||
| c.expectations.ExpectDeletions(rsKey, controller.VirtualMachineKeys(deleteCandidates)) | ||
| for i := 0; i < diff; i++ { | ||
| go func(idx int) { | ||
| defer wg.Done() | ||
| deleteCandidate := vmis[idx] | ||
| err := c.clientset.VirtualMachineInstance(rs.ObjectMeta.Namespace).Delete(deleteCandidate.ObjectMeta.Name, &v1.DeleteOptions{}) | ||
| // Don't log an error if it is already deleted | ||
| if err != nil { | ||
| // We can't observe a delete if it was not accepted by the server | ||
| c.expectations.DeletionObserved(rsKey, controller.VirtualMachineKey(deleteCandidate)) | ||
| c.recorder.Eventf(rs, k8score.EventTypeWarning, FailedDeleteVirtualMachineReason, "Error deleting finished virtual machine %s: %v", deleteCandidate.ObjectMeta.Name, err) | ||
| errChan <- err | ||
| return | ||
| } | ||
| c.recorder.Eventf(rs, k8score.EventTypeNormal, SuccessfulDeleteVirtualMachineReason, "Deleted finished virtual machine: %v", deleteCandidate.ObjectMeta.UID) | ||
| }(i) | ||
| } | ||
| wg.Wait() | ||
| select { | ||
| case err := <-errChan: | ||
| // Only return the first error which occurred, the others will most likely be equal errors | ||
| return err | ||
| default: | ||
| } | ||
| return nil | ||
| } | ||
| @@ -287,15 +287,16 @@ var _ = Describe("Replicaset", func() { | ||
| It("should be woken by a stopped VirtualMachineInstance and create a new one", func() { | ||
| rs, vmi := DefaultReplicaSet(1) | ||
| rs.Status.Replicas = 1 | ||
| rs.Status.ReadyReplicas = 1 | ||
| vmi.Status.Phase = v1.Running | ||
| rsCopy := rs.DeepCopy() | ||
| rsCopy.Status.Replicas = 0 | ||
| rsCopy.Status.ReadyReplicas = 0 | ||
| addReplicaSet(rs) | ||
| vmiFeeder.Add(vmi) | ||
| rsInterface.EXPECT().Update(rsCopy).Times(1) | ||
| // First make sure that we don't have to do anything | ||
| controller.Execute() | ||
| @@ -306,12 +307,13 @@ var _ = Describe("Replicaset", func() { | ||
| vmiFeeder.Modify(modifiedVMI) | ||
| // Expect the re-crate of the VirtualMachineInstance | ||
| rsInterface.EXPECT().Update(rsCopy).Times(1) | ||
| vmiInterface.EXPECT().Delete(vmi.ObjectMeta.Name, gomock.Any()).Return(nil) | ||
| vmiInterface.EXPECT().Create(gomock.Any()).Return(vmi, nil) | ||
| // Run the controller again | ||
| controller.Execute() | ||
| testutils.ExpectEvent(recorder, SuccessfulCreateVirtualMachineReason) | ||
| testutils.ExpectEvents(recorder, SuccessfulDeleteVirtualMachineReason, SuccessfulCreateVirtualMachineReason) | ||
| }) | ||
| It("should be woken by a ready VirtualMachineInstance and update the readyReplicas counter", func() { | ||
| @@ -370,6 +372,33 @@ var _ = Describe("Replicaset", func() { | ||
| testutils.ExpectEvent(recorder, SuccessfulCreateVirtualMachineReason) | ||
| }) | ||
| It("should delete VirtualMachineIstance in the final state", func() { | ||
| rs, vmi := DefaultReplicaSet(1) | ||
| rs.Status.Replicas = 1 | ||
| rs.Status.ReadyReplicas = 1 | ||
| vmi.Status.Phase = v1.Running | ||
| addReplicaSet(rs) | ||
| vmiFeeder.Add(vmi) | ||
| // First make sure that we don't have to do anything | ||
| controller.Execute() | ||
| // Move one VirtualMachineInstance to a final state | ||
| modifiedVMI := vmi.DeepCopy() | ||
| modifiedVMI.Status.Phase = v1.Failed | ||
| modifiedVMI.ResourceVersion = "1" | ||
| vmiFeeder.Modify(modifiedVMI) | ||
| // Expect the re-crate of the VirtualMachineInstance | ||
| vmiInterface.EXPECT().Delete(vmi.ObjectMeta.Name, gomock.Any()).Return(nil) | ||
| // Run the cleanFinishedVmis method | ||
| controller.cleanFinishedVmis(rs, []*v1.VirtualMachineInstance{modifiedVMI}) | ||
| testutils.ExpectEvent(recorder, SuccessfulDeleteVirtualMachineReason) | ||
| }) | ||
| It("should add a fail condition if scaling up fails", func() { | ||
| rs, vmi := DefaultReplicaSet(3) | ||
| @@ -77,7 +77,7 @@ var _ = Describe("VirtualMachineInstanceReplicaSet", func() { | ||
| rs, err = virtClient.ReplicaSet(tests.NamespaceTestDefault).Get(name, v12.GetOptions{}) | ||
| Expect(err).ToNot(HaveOccurred()) | ||
| return rs.Status.Replicas | ||
| }, 60, 1).Should(Equal(int32(scale))) | ||
| }, 90*time.Second, time.Second).Should(Equal(int32(scale))) | ||
| vmis, err := virtClient.VirtualMachineInstance(tests.NamespaceTestDefault).List(&v12.ListOptions{}) | ||
| Expect(err).ToNot(HaveOccurred()) | ||
| @@ -179,7 +179,7 @@ var _ = Describe("VirtualMachineInstanceReplicaSet", func() { | ||
| vmis, err := virtClient.VirtualMachineInstance(newRS.ObjectMeta.Namespace).List(&v12.ListOptions{}) | ||
| Expect(err).ToNot(HaveOccurred()) | ||
| return len(vmis.Items) | ||
| }, 60*time.Second, 1*time.Second).Should(BeZero()) | ||
| }, 120*time.Second, 1*time.Second).Should(BeZero()) | ||
| }) | ||
| It("should remove owner references on the VirtualMachineInstance if it is orphan deleted", func() { | ||
| @@ -275,4 +275,38 @@ var _ = Describe("VirtualMachineInstanceReplicaSet", func() { | ||
| return rs.Status.Replicas | ||
| }, 10*time.Second, 1*time.Second).Should(Equal(int32(2))) | ||
| }) | ||
| It("should remove the finished VM", func() { | ||
| By("Creating new replica set") | ||
| rs := newReplicaSet() | ||
| doScale(rs.ObjectMeta.Name, int32(2)) | ||
| vmis, err := virtClient.VirtualMachineInstance(tests.NamespaceTestDefault).List(&v12.ListOptions{}) | ||
| Expect(err).ToNot(HaveOccurred()) | ||
| Expect(vmis.Items).ToNot(BeEmpty()) | ||
| vmi := vmis.Items[0] | ||
| pods, err := virtClient.CoreV1().Pods(tests.NamespaceTestDefault).List(tests.UnfinishedVMIPodSelector(&vmi)) | ||
| Expect(err).ToNot(HaveOccurred()) | ||
| Expect(len(pods.Items)).To(Equal(1)) | ||
| pod := pods.Items[0] | ||
| By("Deleting one of the RS VMS pods") | ||
| err = virtClient.CoreV1().Pods(tests.NamespaceTestDefault).Delete(pod.Name, &v12.DeleteOptions{}) | ||
| Expect(err).ToNot(HaveOccurred()) | ||
| By("Checking that the VM dissapeared") | ||
| Eventually(func() bool { | ||
| _, err := virtClient.VirtualMachineInstance(tests.NamespaceTestDefault).Get(vmi.Name, &v12.GetOptions{}) | ||
| if errors.IsNotFound(err) { | ||
| return true | ||
| } | ||
| return false | ||
| }, 120*time.Second, time.Second).Should(Equal(true)) | ||
| By("Checking number of RS VM's") | ||
| vmis, err = virtClient.VirtualMachineInstance(tests.NamespaceTestDefault).List(&v12.ListOptions{}) | ||
| Expect(err).ToNot(HaveOccurred()) | ||
| Expect(len(vmis.Items)).Should(Equal(2)) | ||
| }) | ||
| }) | ||