diff --git a/pkg/kubelet/volumemanager/reconciler/reconciler.go b/pkg/kubelet/volumemanager/reconciler/reconciler.go index 9e67f839b3c3..8b89448dcde8 100644 --- a/pkg/kubelet/volumemanager/reconciler/reconciler.go +++ b/pkg/kubelet/volumemanager/reconciler/reconciler.go @@ -353,10 +353,10 @@ type reconstructedVolume struct { volumeSpec *volumepkg.Spec outerVolumeSpecName string pod *v1.Pod - attachablePlugin volumepkg.AttachableVolumePlugin volumeGidValue string devicePath string mounter volumepkg.Mounter + deviceMounter volumepkg.DeviceMounter blockVolumeMapper volumepkg.BlockVolumeMapper } @@ -500,6 +500,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, var volumeMapper volumepkg.BlockVolumeMapper var volumeMounter volumepkg.Mounter + var deviceMounter volumepkg.DeviceMounter // Path to the mount or block device to check var checkPath string @@ -537,6 +538,17 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, err) } checkPath = volumeMounter.GetPath() + if deviceMountablePlugin != nil { + deviceMounter, err = deviceMountablePlugin.NewDeviceMounter() + if err != nil { + return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v", + uniqueVolumeName, + volumeSpec.Name(), + volume.podName, + pod.UID, + err) + } + } } // Check existence of mount point for filesystem volume or symbolic link for block volume @@ -558,7 +570,7 @@ func (rc *reconciler) reconstructVolume(volume podVolume) (*reconstructedVolume, // TODO: in case pod is added back before reconciler starts to unmount, we can update this field from desired state information outerVolumeSpecName: volume.volumeSpecName, pod: pod, - attachablePlugin: attachablePlugin, + deviceMounter: deviceMounter, volumeGidValue: "", // devicePath is updated during updateStates() by checking node status's VolumesAttached data. // TODO: get device path directly from the volume mount path. @@ -585,19 +597,18 @@ func (rc *reconciler) updateDevicePath(volumesNeedUpdate map[v1.UniqueVolumeName } } +// getDeviceMountPath returns device mount path for block volume which +// implements BlockVolumeMapper or filesystem volume which implements +// DeviceMounter func getDeviceMountPath(volume *reconstructedVolume) (string, error) { if volume.blockVolumeMapper != nil { // for block volume, we return its global map path return volume.blockVolumeMapper.GetGlobalMapPath(volume.volumeSpec) - } else if volume.attachablePlugin != nil { - // for filesystem volume, we return its device mount path if the plugin implements AttachableVolumePlugin - volumeAttacher, err := volume.attachablePlugin.NewAttacher() - if volumeAttacher == nil || err != nil { - return "", err - } - return volumeAttacher.GetDeviceMountPath(volume.volumeSpec) + } else if volume.deviceMounter != nil { + // for filesystem volume, we return its device mount path if the plugin implements DeviceMounter + return volume.deviceMounter.GetDeviceMountPath(volume.volumeSpec) } else { - return "", fmt.Errorf("blockVolumeMapper or attachablePlugin required") + return "", fmt.Errorf("blockVolumeMapper or deviceMounter required") } } @@ -628,7 +639,7 @@ func (rc *reconciler) updateStates(volumesNeedUpdate map[v1.UniqueVolumeName]*re } klog.V(4).Infof("Volume: %s (pod UID %s) is marked as mounted and added into the actual state", volume.volumeName, volume.podName) // If the volume has device to mount, we mark its device as mounted. - if volume.attachablePlugin != nil || volume.blockVolumeMapper != nil { + if volume.deviceMounter != nil || volume.blockVolumeMapper != nil { deviceMountPath, err := getDeviceMountPath(volume) if err != nil { klog.Errorf("Could not find device mount path for volume %s", volume.volumeName) diff --git a/pkg/volume/local/local.go b/pkg/volume/local/local.go index 987e1e65216b..ae9083317b27 100644 --- a/pkg/volume/local/local.go +++ b/pkg/volume/local/local.go @@ -191,6 +191,34 @@ func (plugin *localVolumePlugin) NewBlockVolumeUnmapper(volName string, // TODO: check if no path and no topology constraints are ok func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { fs := v1.PersistentVolumeFilesystem + // The main purpose of reconstructed volume is to clean unused mount points + // and directories. + // For filesystem volume with directory source, no global mount path is + // needed to clean. Empty path is ok. + // For filesystem volume with block source, we should resolve to its device + // path if global mount path exists. + var path string + mounter := plugin.host.GetMounter(plugin.GetPluginName()) + refs, err := mounter.GetMountRefs(mountPath) + if err != nil { + return nil, err + } + baseMountPath := plugin.generateBlockDeviceBaseGlobalPath() + for _, ref := range refs { + if mount.PathWithinBase(ref, baseMountPath) { + // If the global mount for block device exists, the source is block + // device. + // The resolved device path may not be the exact same as path in + // local PV object if symbolic link is used. However, it's the true + // source and can be used in reconstructed volume. + path, _, err = mount.GetDeviceNameFromMount(mounter, ref) + if err != nil { + return nil, err + } + klog.V(4).Infof("local: reconstructing volume %q (pod volume mount: %q) with device %q", volumeName, mountPath, path) + break + } + } localVolume := &v1.PersistentVolume{ ObjectMeta: metav1.ObjectMeta{ Name: volumeName, @@ -198,7 +226,7 @@ func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath strin Spec: v1.PersistentVolumeSpec{ PersistentVolumeSource: v1.PersistentVolumeSource{ Local: &v1.LocalVolumeSource{ - Path: "", + Path: path, }, }, VolumeMode: &fs, diff --git a/pkg/volume/local/local_test.go b/pkg/volume/local/local_test.go index 08cfea559d7f..e81c13ac7b66 100644 --- a/pkg/volume/local/local_test.go +++ b/pkg/volume/local/local_test.go @@ -422,44 +422,97 @@ func testFSGroupMount(plug volume.VolumePlugin, pod *v1.Pod, tmpDir string, fsGr } func TestConstructVolumeSpec(t *testing.T) { - tmpDir, plug := getPlugin(t) - defer os.RemoveAll(tmpDir) - - volPath := filepath.Join(tmpDir, testMountPath) - spec, err := plug.ConstructVolumeSpec(testPVName, volPath) - if err != nil { - t.Errorf("ConstructVolumeSpec() failed: %v", err) - } - if spec == nil { - t.Fatalf("ConstructVolumeSpec() returned nil") + tests := []struct { + name string + mountPoints []mount.MountPoint + expectedPath string + }{ + { + name: "filesystem volume with directory source", + mountPoints: []mount.MountPoint{ + { + Device: "/mnt/disk/ssd0", + Path: "pods/poduid/volumes/kubernetes.io~local-volume/pvA", + }, + }, + expectedPath: "", + }, + { + name: "filesystem volume with block source", + mountPoints: []mount.MountPoint{ + { + Device: "/dev/loop0", + Path: testMountPath, + }, + { + Device: "/dev/loop0", + Path: testBlockFormattingToFSGlobalPath, + }, + }, + expectedPath: "/dev/loop0", + }, } - volName := spec.Name() - if volName != testPVName { - t.Errorf("Expected volume name %q, got %q", testPVName, volName) - } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tmpDir, err := utiltesting.MkTmpdir("localVolumeTest") + if err != nil { + t.Fatalf("can't make a temp dir: %v", err) + } + defer os.RemoveAll(tmpDir) + plug := &localVolumePlugin{ + host: volumetest.NewFakeVolumeHost(tmpDir, nil, nil), + } + mounter := plug.host.GetMounter(plug.GetPluginName()) + fakeMountPoints := []mount.MountPoint{} + for _, mp := range tt.mountPoints { + fakeMountPoint := mp + fakeMountPoint.Path = filepath.Join(tmpDir, mp.Path) + fakeMountPoints = append(fakeMountPoints, fakeMountPoint) + } + mounter.(*mount.FakeMounter).MountPoints = fakeMountPoints + volPath := filepath.Join(tmpDir, testMountPath) + spec, err := plug.ConstructVolumeSpec(testPVName, volPath) + if err != nil { + t.Errorf("ConstructVolumeSpec() failed: %v", err) + } + if spec == nil { + t.Fatalf("ConstructVolumeSpec() returned nil") + } - if spec.Volume != nil { - t.Errorf("Volume object returned, expected nil") - } + volName := spec.Name() + if volName != testPVName { + t.Errorf("Expected volume name %q, got %q", testPVName, volName) + } - pv := spec.PersistentVolume - if pv == nil { - t.Fatalf("PersistentVolume object nil") - } + if spec.Volume != nil { + t.Errorf("Volume object returned, expected nil") + } - if spec.PersistentVolume.Spec.VolumeMode == nil { - t.Fatalf("Volume mode has not been set.") - } + pv := spec.PersistentVolume + if pv == nil { + t.Fatalf("PersistentVolume object nil") + } - if *spec.PersistentVolume.Spec.VolumeMode != v1.PersistentVolumeFilesystem { - t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode) - } + if spec.PersistentVolume.Spec.VolumeMode == nil { + t.Fatalf("Volume mode has not been set.") + } - ls := pv.Spec.PersistentVolumeSource.Local - if ls == nil { - t.Fatalf("LocalVolumeSource object nil") + if *spec.PersistentVolume.Spec.VolumeMode != v1.PersistentVolumeFilesystem { + t.Errorf("Unexpected volume mode %q", *spec.PersistentVolume.Spec.VolumeMode) + } + + ls := pv.Spec.PersistentVolumeSource.Local + if ls == nil { + t.Fatalf("LocalVolumeSource object nil") + } + + if pv.Spec.PersistentVolumeSource.Local.Path != tt.expectedPath { + t.Fatalf("Unexpected path got %q, expected %q", pv.Spec.PersistentVolumeSource.Local.Path, tt.expectedPath) + } + }) } + } func TestConstructBlockVolumeSpec(t *testing.T) { diff --git a/test/e2e/storage/testsuites/subpath.go b/test/e2e/storage/testsuites/subpath.go index d7d1eca86fe9..342374d11593 100644 --- a/test/e2e/storage/testsuites/subpath.go +++ b/test/e2e/storage/testsuites/subpath.go @@ -25,8 +25,10 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" "k8s.io/kubernetes/test/e2e/framework/volume" "k8s.io/kubernetes/test/e2e/storage/testpatterns" @@ -36,6 +38,7 @@ import ( "time" "github.com/onsi/ginkgo" + "github.com/onsi/gomega" ) var ( @@ -87,6 +90,7 @@ func (s *subPathTestSuite) defineTests(driver TestDriver, pattern testpatterns.T config *PerTestConfig driverCleanup func() + hostExec utils.HostExec resource *genericVolumeTestResource roVolSource *v1.VolumeSource pod *v1.Pod @@ -116,6 +120,7 @@ func (s *subPathTestSuite) defineTests(driver TestDriver, pattern testpatterns.T l.intreeOps, l.migratedOps = getMigrationVolumeOpCounts(f.ClientSet, driver.GetDriverInfo().InTreePluginName) testVolumeSizeRange := s.getTestSuiteInfo().supportedSizeRange l.resource = createGenericVolumeTestResource(driver, l.config, pattern, testVolumeSizeRange) + l.hostExec = utils.NewHostExec(f) // Setup subPath test dependent resource volType := pattern.VolType @@ -174,6 +179,10 @@ func (s *subPathTestSuite) defineTests(driver TestDriver, pattern testpatterns.T l.driverCleanup = nil } + if l.hostExec != nil { + l.hostExec.Cleanup() + } + validateMigrationVolumeOpCounts(f.ClientSet, driver.GetDriverInfo().InTreePluginName, l.intreeOps, l.migratedOps) } @@ -332,7 +341,7 @@ func (s *subPathTestSuite) defineTests(driver TestDriver, pattern testpatterns.T init() defer cleanup() - testSubpathReconstruction(f, l.pod, false) + testSubpathReconstruction(f, l.hostExec, l.pod, false) }) ginkgo.It("should unmount if pod is force deleted while kubelet is down [Disruptive][Slow][LinuxOnly]", func() { @@ -344,7 +353,7 @@ func (s *subPathTestSuite) defineTests(driver TestDriver, pattern testpatterns.T framework.Skipf("%s volume type does not support reconstruction, skipping", l.resource.volType) } - testSubpathReconstruction(f, l.pod, true) + testSubpathReconstruction(f, l.hostExec, l.pod, true) }) ginkgo.It("should support readOnly directory specified in the volumeMount", func() { @@ -877,9 +886,18 @@ func testPodContainerRestart(f *framework.Framework, pod *v1.Pod) { framework.ExpectNoError(err, "while waiting for container to stabilize") } -func testSubpathReconstruction(f *framework.Framework, pod *v1.Pod, forceDelete bool) { +func testSubpathReconstruction(f *framework.Framework, hostExec utils.HostExec, pod *v1.Pod, forceDelete bool) { // This is mostly copied from TestVolumeUnmountsFromDeletedPodWithForceOption() + // Disruptive test run serially, we can cache all voluem global mount + // points and verify after the test that we do not leak any global mount point. + nodeList, err := e2enode.GetReadySchedulableNodes(f.ClientSet) + framework.ExpectNoError(err, "while listing scheduable nodes") + globalMountPointsByNode := make(map[string]sets.String, len(nodeList.Items)) + for _, node := range nodeList.Items { + globalMountPointsByNode[node.Name] = utils.FindVolumeGlobalMountPoints(hostExec, &node) + } + // Change to busybox pod.Spec.Containers[0].Image = volume.GetTestImage(imageutils.GetE2EImage(imageutils.BusyBox)) pod.Spec.Containers[0].Command = volume.GenerateScriptCmd("sleep 100000") @@ -893,7 +911,7 @@ func testSubpathReconstruction(f *framework.Framework, pod *v1.Pod, forceDelete ginkgo.By(fmt.Sprintf("Creating pod %s", pod.Name)) removeUnusedContainers(pod) - pod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(pod) + pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(pod) framework.ExpectNoError(err, "while creating pod") err = e2epod.WaitForPodRunningInNamespace(f.ClientSet, pod) @@ -902,7 +920,24 @@ func testSubpathReconstruction(f *framework.Framework, pod *v1.Pod, forceDelete pod, err = f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(pod.Name, metav1.GetOptions{}) framework.ExpectNoError(err, "while getting pod") + var podNode *v1.Node + for i := range nodeList.Items { + if nodeList.Items[i].Name == pod.Spec.NodeName { + podNode = &nodeList.Items[i] + } + } + framework.ExpectNotEqual(podNode, nil, "pod node should exist in scheduable nodes") + utils.TestVolumeUnmountsFromDeletedPodWithForceOption(f.ClientSet, f, pod, forceDelete, true) + + if podNode != nil { + mountPoints := globalMountPointsByNode[podNode.Name] + mountPointsAfter := utils.FindVolumeGlobalMountPoints(hostExec, podNode) + s1 := mountPointsAfter.Difference(mountPoints) + s2 := mountPoints.Difference(mountPointsAfter) + gomega.Expect(s1).To(gomega.BeEmpty(), "global mount points leaked: %v", s1) + gomega.Expect(s2).To(gomega.BeEmpty(), "global mount points not found: %v", s2) + } } func formatVolume(f *framework.Framework, pod *v1.Pod) { diff --git a/test/e2e/storage/utils/BUILD b/test/e2e/storage/utils/BUILD index aaeab2731f06..6c761e9a3df3 100644 --- a/test/e2e/storage/utils/BUILD +++ b/test/e2e/storage/utils/BUILD @@ -1,9 +1,6 @@ package(default_visibility = ["//visibility:public"]) -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) +load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", @@ -23,6 +20,7 @@ go_library( "//staging/src/k8s.io/api/storage/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", diff --git a/test/e2e/storage/utils/utils.go b/test/e2e/storage/utils/utils.go index a02aafb45d5c..bb3b1918d197 100644 --- a/test/e2e/storage/utils/utils.go +++ b/test/e2e/storage/utils/utils.go @@ -32,6 +32,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" @@ -236,6 +237,7 @@ func TestKubeletRestartsAndRestoresMap(c clientset.Interface, f *framework.Frame // TestVolumeUnmountsFromDeletedPodWithForceOption tests that a volume unmounts if the client pod was deleted while the kubelet was down. // forceDelete is true indicating whether the pod is forcefully deleted. +// checkSubpath is true indicating whether the subpath should be checked. func TestVolumeUnmountsFromDeletedPodWithForceOption(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, forceDelete bool, checkSubpath bool) { nodeIP, err := framework.GetHostAddress(c, clientPod) framework.ExpectNoError(err) @@ -647,3 +649,24 @@ func CheckWriteToPath(pod *v1.Pod, volMode v1.PersistentVolumeMode, path string, VerifyExecInPodSucceed(pod, fmt.Sprintf("echo %s | base64 -d | sha256sum", encoded)) VerifyExecInPodSucceed(pod, fmt.Sprintf("echo %s | base64 -d | dd of=%s bs=%d count=1", encoded, pathForVolMode, len)) } + +// findMountPoints returns all mount points on given node under specified directory. +func findMountPoints(hostExec HostExec, node *v1.Node, dir string) []string { + result, err := hostExec.IssueCommandWithResult(fmt.Sprintf(`find %s -type d -exec mountpoint {} \; | grep 'is a mountpoint$' || true`, dir), node) + framework.ExpectNoError(err, "Encountered HostExec error.") + var mountPoints []string + if err != nil { + for _, line := range strings.Split(result, "\n") { + if line == "" { + continue + } + mountPoints = append(mountPoints, strings.TrimSuffix(line, " is a mountpoint")) + } + } + return mountPoints +} + +// FindVolumeGlobalMountPoints returns all volume global mount points on the node of given pod. +func FindVolumeGlobalMountPoints(hostExec HostExec, node *v1.Node) sets.String { + return sets.NewString(findMountPoints(hostExec, node, "/var/lib/kubelet/plugins")...) +}