Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for new hotplug attachment pod to be ready #10275

Merged
merged 4 commits into from Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 35 additions & 27 deletions pkg/virt-controller/watch/vmi.go
Expand Up @@ -516,11 +516,7 @@ func (c *VMIController) hasOwnerVM(vmi *virtv1.VirtualMachineInstance) bool {
}

ownerVM := obj.(*virtv1.VirtualMachine)
if controllerRef.UID == ownerVM.UID {
return true
}

return false
return controllerRef.UID == ownerVM.UID
}

func (c *VMIController) updateStatus(vmi *virtv1.VirtualMachineInstance, pod *k8sv1.Pod, dataVolumes []*cdiv1.DataVolume, syncErr syncError) error {
Expand Down Expand Up @@ -1816,15 +1812,27 @@ func (c *VMIController) waitForFirstConsumerTemporaryPods(vmi *virtv1.VirtualMac
}

func (c *VMIController) needsHandleHotplug(hotplugVolumes []*virtv1.Volume, hotplugAttachmentPods []*k8sv1.Pod) bool {
if len(hotplugAttachmentPods) > 1 {
return true
}
// Determine if the ready volumes have changed compared to the current pod
if len(hotplugAttachmentPods) == 1 && c.podVolumesMatchesReadyVolumes(hotplugAttachmentPods[0], hotplugVolumes) {
return false
}
return len(hotplugVolumes) > 0 || len(hotplugAttachmentPods) > 0
}

func (c *VMIController) getActiveAndOldAttachmentPods(readyHotplugVolumes []*virtv1.Volume, hotplugAttachmentPods []*k8sv1.Pod) (*k8sv1.Pod, []*k8sv1.Pod) {
var currentPod *k8sv1.Pod
oldPods := make([]*k8sv1.Pod, 0)
for _, attachmentPod := range hotplugAttachmentPods {
if c.podVolumesMatchesReadyVolumes(attachmentPod, hotplugVolumes) {
log.DefaultLogger().Infof("Don't need to handle as we have a matching attachment pod")
return false
if !c.podVolumesMatchesReadyVolumes(attachmentPod, readyHotplugVolumes) {
oldPods = append(oldPods, attachmentPod)
} else {
currentPod = attachmentPod
}
return true
}
return len(hotplugVolumes) > 0
return currentPod, oldPods
}

func (c *VMIController) handleHotplugVolumes(hotplugVolumes []*virtv1.Volume, hotplugAttachmentPods []*k8sv1.Pod, vmi *virtv1.VirtualMachineInstance, virtLauncherPod *k8sv1.Pod, dataVolumes []*cdiv1.DataVolume) syncError {
Expand Down Expand Up @@ -1855,29 +1863,25 @@ func (c *VMIController) handleHotplugVolumes(hotplugVolumes []*virtv1.Volume, ho
readyHotplugVolumes = append(readyHotplugVolumes, volume)
}
// Determine if the ready volumes have changed compared to the current pod
currentPod := make([]*k8sv1.Pod, 0)
oldPods := make([]*k8sv1.Pod, 0)
for _, attachmentPod := range hotplugAttachmentPods {
if !c.podVolumesMatchesReadyVolumes(attachmentPod, readyHotplugVolumes) {
oldPods = append(oldPods, attachmentPod)
} else {
currentPod = append(currentPod, attachmentPod)
}
}
currentPod, oldPods := c.getActiveAndOldAttachmentPods(readyHotplugVolumes, hotplugAttachmentPods)

if len(currentPod) == 0 && len(readyHotplugVolumes) > 0 {
if currentPod == nil && len(readyHotplugVolumes) > 0 {
// ready volumes have changed
// Create new attachment pod that holds all the ready volumes
if err := c.createAttachmentPod(vmi, virtLauncherPod, readyHotplugVolumes); err != nil {
return err
}
}
// Delete old attachment pod
for _, attachmentPod := range oldPods {
if err := c.deleteAttachmentPodForVolume(vmi, attachmentPod); err != nil {
return &syncErrorImpl{fmt.Errorf("Error deleting attachment pod %v", err), FailedDeletePodReason}

if len(readyHotplugVolumes) == 0 || (currentPod != nil && currentPod.Status.Phase == k8sv1.PodRunning) {
// Delete old attachment pod
for _, attachmentPod := range oldPods {
if err := c.deleteAttachmentPodForVolume(vmi, attachmentPod); err != nil {
return &syncErrorImpl{fmt.Errorf("Error deleting attachment pod %v", err), FailedDeletePodReason}
}
}
}

return nil
}

Expand Down Expand Up @@ -2121,6 +2125,9 @@ func (c *VMIController) updateVolumeStatus(vmi *virtv1.VirtualMachineInstance, v
if err != nil {
return err
}

attachmentPod, _ := c.getActiveAndOldAttachmentPods(hotplugVolumes, attachmentPods)

newStatus := make([]virtv1.VolumeStatus, 0)
for i, volume := range vmi.Spec.Volumes {
status := virtv1.VolumeStatus{}
Expand All @@ -2142,7 +2149,6 @@ func (c *VMIController) updateVolumeStatus(vmi *virtv1.VirtualMachineInstance, v
ClaimName: volume.Name,
}
}
attachmentPod := c.findAttachmentPodByVolumeName(volume.Name, attachmentPods)
if attachmentPod == nil {
if !c.volumeReady(status.Phase) {
status.HotplugVolume.AttachPodUID = ""
Expand All @@ -2156,6 +2162,9 @@ func (c *VMIController) updateVolumeStatus(vmi *virtv1.VirtualMachineInstance, v
status.HotplugVolume.AttachPodName = attachmentPod.Name
if len(attachmentPod.Status.ContainerStatuses) == 1 && attachmentPod.Status.ContainerStatuses[0].Ready {
status.HotplugVolume.AttachPodUID = attachmentPod.UID
} else {
// Remove UID of old pod if a new one is available, but not yet ready
status.HotplugVolume.AttachPodUID = ""
}
if c.canMoveToAttachedPhase(status.Phase) {
status.Phase = virtv1.HotplugVolumeAttachedToNode
Expand Down Expand Up @@ -2244,8 +2253,7 @@ func (c *VMIController) getFilesystemOverhead(pvc *k8sv1.PersistentVolumeClaim)
}

func (c *VMIController) canMoveToAttachedPhase(currentPhase virtv1.VolumePhase) bool {
return (currentPhase == "" || currentPhase == virtv1.VolumeBound || currentPhase == virtv1.VolumePending ||
currentPhase == virtv1.HotplugVolumeAttachedToNode)
return (currentPhase == "" || currentPhase == virtv1.VolumeBound || currentPhase == virtv1.VolumePending)
}

func (c *VMIController) findAttachmentPodByVolumeName(volumeName string, attachmentPods []*k8sv1.Pod) *k8sv1.Pod {
Expand Down
52 changes: 52 additions & 0 deletions pkg/virt-controller/watch/vmi_test.go
Expand Up @@ -2700,6 +2700,58 @@ var _ = Describe("VirtualMachineInstance watcher", func() {
[]string{SuccessfulCreatePodReason}),
)

DescribeTable("Should properly calculate if it needs to handle hotplug volumes", func(hotplugVolumes []*virtv1.Volume, attachmentPods []*k8sv1.Pod, match gomegaTypes.GomegaMatcher) {
Expect(controller.needsHandleHotplug(hotplugVolumes, attachmentPods)).To(match)
},
Entry("nil volumes, nil attachmentPods", nil, nil, BeFalse()),
Entry("empty volumes, empty attachmentPods", []*virtv1.Volume{}, []*k8sv1.Pod{}, BeFalse()),
Entry("single volume, empty attachmentPods", []*virtv1.Volume{
{
Name: "test",
},
}, []*k8sv1.Pod{}, BeTrue()),
Entry("no volume, single attachmentPod", []*virtv1.Volume{}, makePods(0), BeTrue()),
Entry("matching volume, single attachmentPod", []*virtv1.Volume{
{
Name: "volume0",
},
}, makePods(0), BeFalse()),
Entry("mismatched volume, single attachmentPod", []*virtv1.Volume{
{
Name: "invalid",
},
}, makePods(0), BeTrue()),
Entry("matching volume, multiple attachmentPods", []*virtv1.Volume{
{
Name: "volume0",
},
}, []*k8sv1.Pod{makePods(0)[0], makePods(1)[0]}, BeTrue()),
)

DescribeTable("Should find active and old pods", func(hotplugVolumes []*virtv1.Volume, attachmentPods []*k8sv1.Pod, expectedActive *k8sv1.Pod, expectedOld []*k8sv1.Pod) {
active, old := controller.getActiveAndOldAttachmentPods(hotplugVolumes, attachmentPods)
Expect(active).To(Equal(expectedActive))
Expect(old).To(ContainElements(expectedOld))
},
Entry("nil volumes, nil attachmentPods", nil, nil, nil, nil),
Entry("empty volumes, empty attachmentPods", []*virtv1.Volume{}, []*k8sv1.Pod{}, nil, []*k8sv1.Pod{}),
Entry("matching volume, single attachmentPod", []*virtv1.Volume{
{
Name: "volume0",
},
}, makePods(0), makePods(0)[0], []*k8sv1.Pod{}),
Entry("matching volume, multiple attachmentPods, first pod matches", []*virtv1.Volume{
{
Name: "volume0",
},
}, []*k8sv1.Pod{makePods(0)[0], makePods(1)[0]}, makePods(0)[0], makePods(1)),
Entry("matching volume, multiple attachmentPods, second pod matches", []*virtv1.Volume{
{
Name: "volume1",
},
}, []*k8sv1.Pod{makePods(0)[0], makePods(1)[0]}, makePods(1)[0], makePods(0)),
)

It("Should get default filesystem overhead if there are multiple CDI instances", func() {
cdi := cdiv1.CDI{
ObjectMeta: metav1.ObjectMeta{
Expand Down
14 changes: 11 additions & 3 deletions pkg/virt-handler/hotplug-disk/mount.go
Expand Up @@ -309,12 +309,16 @@ func (m *volumeMounter) mountHotplugVolume(vmi *v1.VirtualMachineInstance, volum
if m.isBlockVolume(&vmi.Status, volumeName) {
logger.V(4).Infof("Mounting block volume: %s", volumeName)
if err := m.mountBlockHotplugVolume(vmi, volumeName, sourceUID, record); err != nil {
return fmt.Errorf("failed to mount block hotplug volume %s: %v", volumeName, err)
if !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("failed to mount block hotplug volume %s: %v", volumeName, err)
}
}
} else {
logger.V(4).Infof("Mounting file system volume: %s", volumeName)
if err := m.mountFileSystemHotplugVolume(vmi, volumeName, sourceUID, record, mountDirectory); err != nil {
return fmt.Errorf("failed to mount filesystem hotplug volume %s: %v", volumeName, err)
if !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("failed to mount filesystem hotplug volume %s: %v", volumeName, err)
}
}
}
}
Expand Down Expand Up @@ -663,9 +667,13 @@ func (m *volumeMounter) Unmount(vmi *v1.VirtualMachineInstance) error {
var err error
if m.isBlockVolume(&vmi.Status, volumeStatus.Name) {
path, err = safepath.JoinNoFollow(basePath, volumeStatus.Name)
if errors.Is(err, os.ErrNotExist) {
// already unmounted or never mounted
continue
}
} else if m.isDirectoryMounted(&vmi.Status, volumeStatus.Name) {
path, err = m.hotplugDiskManager.GetFileSystemDirectoryTargetPathFromHostView(virtlauncherUID, volumeStatus.Name, false)
if os.IsExist(err) {
if errors.Is(err, os.ErrNotExist) {
// already unmounted or never mounted
continue
}
Expand Down
10 changes: 10 additions & 0 deletions tests/storage/hotplug.go
Expand Up @@ -730,6 +730,16 @@ var _ = SIGDescribe("Hotplug", func() {
for i := range testVolumes {
verifyVolumeNolongerAccessible(vmi, targets[i])
}
By("Verifying there are no sync errors")
events, err := virtClient.CoreV1().Events(vmi.Namespace).List(context.Background(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
for _, event := range events.Items {
if event.InvolvedObject.Kind == "VirtualMachineInstance" && event.InvolvedObject.UID == vmi.UID {
if event.Reason == string(v1.SyncFailed) {
Fail(fmt.Sprintf("Found sync failed event %v", event))
}
}
}
},
Entry("with VMs", addDVVolumeVM, removeVolumeVM, corev1.PersistentVolumeFilesystem, false),
Entry("with VMIs", addDVVolumeVMI, removeVolumeVMI, corev1.PersistentVolumeFilesystem, true),
Expand Down