Skip to content

Commit

Permalink
Merge pull request #10275 from awels/wait_for_new_attachment_pod
Browse files Browse the repository at this point in the history
Wait for new hotplug attachment pod to be ready
  • Loading branch information
kubevirt-bot committed Aug 14, 2023
2 parents 3d3c008 + c3c0251 commit d4fdeef
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 30 deletions.
62 changes: 35 additions & 27 deletions pkg/virt-controller/watch/vmi.go
Expand Up @@ -516,11 +516,7 @@ func (c *VMIController) hasOwnerVM(vmi *virtv1.VirtualMachineInstance) bool {
}

ownerVM := obj.(*virtv1.VirtualMachine)
if controllerRef.UID == ownerVM.UID {
return true
}

return false
return controllerRef.UID == ownerVM.UID
}

func (c *VMIController) updateStatus(vmi *virtv1.VirtualMachineInstance, pod *k8sv1.Pod, dataVolumes []*cdiv1.DataVolume, syncErr syncError) error {
Expand Down Expand Up @@ -1816,15 +1812,27 @@ func (c *VMIController) waitForFirstConsumerTemporaryPods(vmi *virtv1.VirtualMac
}

func (c *VMIController) needsHandleHotplug(hotplugVolumes []*virtv1.Volume, hotplugAttachmentPods []*k8sv1.Pod) bool {
if len(hotplugAttachmentPods) > 1 {
return true
}
// Determine if the ready volumes have changed compared to the current pod
if len(hotplugAttachmentPods) == 1 && c.podVolumesMatchesReadyVolumes(hotplugAttachmentPods[0], hotplugVolumes) {
return false
}
return len(hotplugVolumes) > 0 || len(hotplugAttachmentPods) > 0
}

func (c *VMIController) getActiveAndOldAttachmentPods(readyHotplugVolumes []*virtv1.Volume, hotplugAttachmentPods []*k8sv1.Pod) (*k8sv1.Pod, []*k8sv1.Pod) {
var currentPod *k8sv1.Pod
oldPods := make([]*k8sv1.Pod, 0)
for _, attachmentPod := range hotplugAttachmentPods {
if c.podVolumesMatchesReadyVolumes(attachmentPod, hotplugVolumes) {
log.DefaultLogger().Infof("Don't need to handle as we have a matching attachment pod")
return false
if !c.podVolumesMatchesReadyVolumes(attachmentPod, readyHotplugVolumes) {
oldPods = append(oldPods, attachmentPod)
} else {
currentPod = attachmentPod
}
return true
}
return len(hotplugVolumes) > 0
return currentPod, oldPods
}

func (c *VMIController) handleHotplugVolumes(hotplugVolumes []*virtv1.Volume, hotplugAttachmentPods []*k8sv1.Pod, vmi *virtv1.VirtualMachineInstance, virtLauncherPod *k8sv1.Pod, dataVolumes []*cdiv1.DataVolume) syncError {
Expand Down Expand Up @@ -1855,29 +1863,25 @@ func (c *VMIController) handleHotplugVolumes(hotplugVolumes []*virtv1.Volume, ho
readyHotplugVolumes = append(readyHotplugVolumes, volume)
}
// Determine if the ready volumes have changed compared to the current pod
currentPod := make([]*k8sv1.Pod, 0)
oldPods := make([]*k8sv1.Pod, 0)
for _, attachmentPod := range hotplugAttachmentPods {
if !c.podVolumesMatchesReadyVolumes(attachmentPod, readyHotplugVolumes) {
oldPods = append(oldPods, attachmentPod)
} else {
currentPod = append(currentPod, attachmentPod)
}
}
currentPod, oldPods := c.getActiveAndOldAttachmentPods(readyHotplugVolumes, hotplugAttachmentPods)

if len(currentPod) == 0 && len(readyHotplugVolumes) > 0 {
if currentPod == nil && len(readyHotplugVolumes) > 0 {
// ready volumes have changed
// Create new attachment pod that holds all the ready volumes
if err := c.createAttachmentPod(vmi, virtLauncherPod, readyHotplugVolumes); err != nil {
return err
}
}
// Delete old attachment pod
for _, attachmentPod := range oldPods {
if err := c.deleteAttachmentPodForVolume(vmi, attachmentPod); err != nil {
return &syncErrorImpl{fmt.Errorf("Error deleting attachment pod %v", err), FailedDeletePodReason}

if len(readyHotplugVolumes) == 0 || (currentPod != nil && currentPod.Status.Phase == k8sv1.PodRunning) {
// Delete old attachment pod
for _, attachmentPod := range oldPods {
if err := c.deleteAttachmentPodForVolume(vmi, attachmentPod); err != nil {
return &syncErrorImpl{fmt.Errorf("Error deleting attachment pod %v", err), FailedDeletePodReason}
}
}
}

return nil
}

Expand Down Expand Up @@ -2121,6 +2125,9 @@ func (c *VMIController) updateVolumeStatus(vmi *virtv1.VirtualMachineInstance, v
if err != nil {
return err
}

attachmentPod, _ := c.getActiveAndOldAttachmentPods(hotplugVolumes, attachmentPods)

newStatus := make([]virtv1.VolumeStatus, 0)
for i, volume := range vmi.Spec.Volumes {
status := virtv1.VolumeStatus{}
Expand All @@ -2142,7 +2149,6 @@ func (c *VMIController) updateVolumeStatus(vmi *virtv1.VirtualMachineInstance, v
ClaimName: volume.Name,
}
}
attachmentPod := c.findAttachmentPodByVolumeName(volume.Name, attachmentPods)
if attachmentPod == nil {
if !c.volumeReady(status.Phase) {
status.HotplugVolume.AttachPodUID = ""
Expand All @@ -2156,6 +2162,9 @@ func (c *VMIController) updateVolumeStatus(vmi *virtv1.VirtualMachineInstance, v
status.HotplugVolume.AttachPodName = attachmentPod.Name
if len(attachmentPod.Status.ContainerStatuses) == 1 && attachmentPod.Status.ContainerStatuses[0].Ready {
status.HotplugVolume.AttachPodUID = attachmentPod.UID
} else {
// Remove UID of old pod if a new one is available, but not yet ready
status.HotplugVolume.AttachPodUID = ""
}
if c.canMoveToAttachedPhase(status.Phase) {
status.Phase = virtv1.HotplugVolumeAttachedToNode
Expand Down Expand Up @@ -2244,8 +2253,7 @@ func (c *VMIController) getFilesystemOverhead(pvc *k8sv1.PersistentVolumeClaim)
}

func (c *VMIController) canMoveToAttachedPhase(currentPhase virtv1.VolumePhase) bool {
return (currentPhase == "" || currentPhase == virtv1.VolumeBound || currentPhase == virtv1.VolumePending ||
currentPhase == virtv1.HotplugVolumeAttachedToNode)
return (currentPhase == "" || currentPhase == virtv1.VolumeBound || currentPhase == virtv1.VolumePending)
}

func (c *VMIController) findAttachmentPodByVolumeName(volumeName string, attachmentPods []*k8sv1.Pod) *k8sv1.Pod {
Expand Down
52 changes: 52 additions & 0 deletions pkg/virt-controller/watch/vmi_test.go
Expand Up @@ -2700,6 +2700,58 @@ var _ = Describe("VirtualMachineInstance watcher", func() {
[]string{SuccessfulCreatePodReason}),
)

DescribeTable("Should properly calculate if it needs to handle hotplug volumes", func(hotplugVolumes []*virtv1.Volume, attachmentPods []*k8sv1.Pod, match gomegaTypes.GomegaMatcher) {
Expect(controller.needsHandleHotplug(hotplugVolumes, attachmentPods)).To(match)
},
Entry("nil volumes, nil attachmentPods", nil, nil, BeFalse()),
Entry("empty volumes, empty attachmentPods", []*virtv1.Volume{}, []*k8sv1.Pod{}, BeFalse()),
Entry("single volume, empty attachmentPods", []*virtv1.Volume{
{
Name: "test",
},
}, []*k8sv1.Pod{}, BeTrue()),
Entry("no volume, single attachmentPod", []*virtv1.Volume{}, makePods(0), BeTrue()),
Entry("matching volume, single attachmentPod", []*virtv1.Volume{
{
Name: "volume0",
},
}, makePods(0), BeFalse()),
Entry("mismatched volume, single attachmentPod", []*virtv1.Volume{
{
Name: "invalid",
},
}, makePods(0), BeTrue()),
Entry("matching volume, multiple attachmentPods", []*virtv1.Volume{
{
Name: "volume0",
},
}, []*k8sv1.Pod{makePods(0)[0], makePods(1)[0]}, BeTrue()),
)

DescribeTable("Should find active and old pods", func(hotplugVolumes []*virtv1.Volume, attachmentPods []*k8sv1.Pod, expectedActive *k8sv1.Pod, expectedOld []*k8sv1.Pod) {
active, old := controller.getActiveAndOldAttachmentPods(hotplugVolumes, attachmentPods)
Expect(active).To(Equal(expectedActive))
Expect(old).To(ContainElements(expectedOld))
},
Entry("nil volumes, nil attachmentPods", nil, nil, nil, nil),
Entry("empty volumes, empty attachmentPods", []*virtv1.Volume{}, []*k8sv1.Pod{}, nil, []*k8sv1.Pod{}),
Entry("matching volume, single attachmentPod", []*virtv1.Volume{
{
Name: "volume0",
},
}, makePods(0), makePods(0)[0], []*k8sv1.Pod{}),
Entry("matching volume, multiple attachmentPods, first pod matches", []*virtv1.Volume{
{
Name: "volume0",
},
}, []*k8sv1.Pod{makePods(0)[0], makePods(1)[0]}, makePods(0)[0], makePods(1)),
Entry("matching volume, multiple attachmentPods, second pod matches", []*virtv1.Volume{
{
Name: "volume1",
},
}, []*k8sv1.Pod{makePods(0)[0], makePods(1)[0]}, makePods(1)[0], makePods(0)),
)

It("Should get default filesystem overhead if there are multiple CDI instances", func() {
cdi := cdiv1.CDI{
ObjectMeta: metav1.ObjectMeta{
Expand Down
14 changes: 11 additions & 3 deletions pkg/virt-handler/hotplug-disk/mount.go
Expand Up @@ -309,12 +309,16 @@ func (m *volumeMounter) mountHotplugVolume(vmi *v1.VirtualMachineInstance, volum
if m.isBlockVolume(&vmi.Status, volumeName) {
logger.V(4).Infof("Mounting block volume: %s", volumeName)
if err := m.mountBlockHotplugVolume(vmi, volumeName, sourceUID, record); err != nil {
return fmt.Errorf("failed to mount block hotplug volume %s: %v", volumeName, err)
if !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("failed to mount block hotplug volume %s: %v", volumeName, err)
}
}
} else {
logger.V(4).Infof("Mounting file system volume: %s", volumeName)
if err := m.mountFileSystemHotplugVolume(vmi, volumeName, sourceUID, record, mountDirectory); err != nil {
return fmt.Errorf("failed to mount filesystem hotplug volume %s: %v", volumeName, err)
if !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("failed to mount filesystem hotplug volume %s: %v", volumeName, err)
}
}
}
}
Expand Down Expand Up @@ -663,9 +667,13 @@ func (m *volumeMounter) Unmount(vmi *v1.VirtualMachineInstance) error {
var err error
if m.isBlockVolume(&vmi.Status, volumeStatus.Name) {
path, err = safepath.JoinNoFollow(basePath, volumeStatus.Name)
if errors.Is(err, os.ErrNotExist) {
// already unmounted or never mounted
continue
}
} else if m.isDirectoryMounted(&vmi.Status, volumeStatus.Name) {
path, err = m.hotplugDiskManager.GetFileSystemDirectoryTargetPathFromHostView(virtlauncherUID, volumeStatus.Name, false)
if os.IsExist(err) {
if errors.Is(err, os.ErrNotExist) {
// already unmounted or never mounted
continue
}
Expand Down
10 changes: 10 additions & 0 deletions tests/storage/hotplug.go
Expand Up @@ -730,6 +730,16 @@ var _ = SIGDescribe("Hotplug", func() {
for i := range testVolumes {
verifyVolumeNolongerAccessible(vmi, targets[i])
}
By("Verifying there are no sync errors")
events, err := virtClient.CoreV1().Events(vmi.Namespace).List(context.Background(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
for _, event := range events.Items {
if event.InvolvedObject.Kind == "VirtualMachineInstance" && event.InvolvedObject.UID == vmi.UID {
if event.Reason == string(v1.SyncFailed) {
Fail(fmt.Sprintf("Found sync failed event %v", event))
}
}
}
},
Entry("with VMs", addDVVolumeVM, removeVolumeVM, corev1.PersistentVolumeFilesystem, false),
Entry("with VMIs", addDVVolumeVMI, removeVolumeVMI, corev1.PersistentVolumeFilesystem, true),
Expand Down

0 comments on commit d4fdeef

Please sign in to comment.