New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add CSI block volume directory cleanup #87978
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,14 +72,15 @@ import ( | |
"os" | ||
"path/filepath" | ||
|
||
"k8s.io/klog" | ||
|
||
"k8s.io/api/core/v1" | ||
v1 "k8s.io/api/core/v1" | ||
storage "k8s.io/api/storage/v1" | ||
meta "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/klog" | ||
"k8s.io/kubernetes/pkg/util/removeall" | ||
"k8s.io/kubernetes/pkg/volume" | ||
volumetypes "k8s.io/kubernetes/pkg/volume/util/types" | ||
utilstrings "k8s.io/utils/strings" | ||
) | ||
|
||
|
@@ -113,10 +114,16 @@ func (m *csiBlockMapper) getStagingPath() string { | |
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName) | ||
} | ||
|
||
// getPublishDir returns path to a directory, where the volume is published to each pod. | ||
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName} | ||
func (m *csiBlockMapper) getPublishDir() string { | ||
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName) | ||
} | ||
|
||
// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume | ||
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID} | ||
func (m *csiBlockMapper) getPublishPath() string { | ||
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName, string(m.podUID)) | ||
return filepath.Join(m.getPublishDir(), string(m.podUID)) | ||
} | ||
|
||
// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume | ||
|
@@ -299,6 +306,13 @@ func (m *csiBlockMapper) SetUpDevice() error { | |
// Call NodeStageVolume | ||
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment) | ||
if err != nil { | ||
if volumetypes.IsOperationFinishedError(err) { | ||
cleanupErr := m.cleanupOrphanDeviceFiles() | ||
if cleanupErr != nil { | ||
// V(4) for not so serious error | ||
klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggest using |
||
} | ||
} | ||
return err | ||
} | ||
|
||
|
@@ -435,6 +449,41 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error | |
return err | ||
} | ||
} | ||
if err = m.cleanupOrphanDeviceFiles(); err != nil { | ||
// V(4) for not so serious error | ||
klog.V(4).Infof("Failed to clean up block volume directory %s", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Clean up any orphan files / directories when a block volume is being unstaged. | ||
// At this point we can be sure that there is no pod using the volume and all | ||
// files are indeed orphaned. | ||
func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error { | ||
// Remove artifacts of NodePublish. | ||
// publishDir: xxx/plugins/kubernetes.io/csi/volumeDevices/publish/<volume name> | ||
// Each PublishVolume() created a subdirectory there. Since everything should be | ||
// already unpublished at this point, the directory should be empty by now. | ||
publishDir := m.getPublishDir() | ||
if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason we used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return errors.New(log("failed to remove publish directory [%s]: %v", publishDir, err)) | ||
} | ||
|
||
// Remove artifacts of NodeStage. | ||
// stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/<volume name> | ||
stagingPath := m.getStagingPath() | ||
if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) { | ||
return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err)) | ||
} | ||
|
||
// Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/<volume name>. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like a good use case for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. RemoveAll would remove also stuff that should be removed by GenerateUnmapVolumeFunc and we would not see this bug. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, I didn't realize the subtle implications with reference to bind mount and GenerateUnmapVolumeFunc. |
||
// At this point it contains only "data/vol_data.json" and empty "dev/". | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is right. I seemed to forget to write down the existence of "data/vol_data.json" under xxx/plugins/kubernetes.io/csi/volumeDevices/ in L56 comments. Should we add a comment on the file, there? |
||
volumeDir := getVolumePluginDir(m.specName, m.plugin.host) | ||
mounter := m.plugin.host.GetMounter(m.plugin.GetPluginName()) | ||
if err := removeall.RemoveAllOneFilesystem(mounter, volumeDir); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ package csi | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"os" | ||
"path/filepath" | ||
|
@@ -282,7 +283,7 @@ func TestBlockMapperSetupDevice(t *testing.T) { | |
} | ||
} | ||
|
||
func TestBlockMapperMapPodDevice(t *testing.T) { | ||
func TestBlockMapperSetupDeviceError(t *testing.T) { | ||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() | ||
|
||
plug, tmpDir := newTestPlugin(t, nil) | ||
|
@@ -297,11 +298,59 @@ func TestBlockMapperMapPodDevice(t *testing.T) { | |
nodeName := string(plug.host.GetNodeName()) | ||
|
||
csiMapper.csiClient = setupClient(t, true) | ||
fClient := csiMapper.csiClient.(*fakeCsiDriverClient) | ||
fClient.nodeClient.SetNextError(errors.New("mock final error")) | ||
|
||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: redundant cast to string for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed |
||
attachment := makeTestAttachment(attachID, nodeName, pvName) | ||
attachment.Status.Attached = true | ||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) | ||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{}) | ||
if err != nil { | ||
t.Fatalf("failed to setup VolumeAttachment: %v", err) | ||
} | ||
t.Log("created attachement ", attachID) | ||
|
||
err = csiMapper.SetUpDevice() | ||
if err == nil { | ||
t.Fatal("mapper unexpectedly succeeded") | ||
} | ||
|
||
// Check that all directories have been cleaned | ||
// Check that all metadata / staging / publish directories were deleted | ||
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) | ||
if _, err := os.Stat(dataDir); err == nil { | ||
t.Errorf("volume publish data directory %s was not deleted", dataDir) | ||
} | ||
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) | ||
if _, err := os.Stat(devDir); err == nil { | ||
t.Errorf("volume publish device directory %s was not deleted", devDir) | ||
} | ||
stagingPath := csiMapper.getStagingPath() | ||
if _, err := os.Stat(stagingPath); err == nil { | ||
t.Errorf("volume staging path %s was not deleted", stagingPath) | ||
} | ||
} | ||
|
||
func TestBlockMapperMapPodDevice(t *testing.T) { | ||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() | ||
|
||
plug, tmpDir := newTestPlugin(t, nil) | ||
defer os.RemoveAll(tmpDir) | ||
|
||
csiMapper, _, pv, err := prepareBlockMapperTest(plug, "test-pv", t) | ||
if err != nil { | ||
t.Fatalf("Failed to make a new Mapper: %v", err) | ||
} | ||
|
||
pvName := pv.GetName() | ||
nodeName := string(plug.host.GetNodeName()) | ||
|
||
csiMapper.csiClient = setupClient(t, true) | ||
|
||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), nodeName) | ||
attachment := makeTestAttachment(attachID, nodeName, pvName) | ||
attachment.Status.Attached = true | ||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.Background(), attachment, metav1.CreateOptions{}) | ||
if err != nil { | ||
t.Fatalf("failed to setup VolumeAttachment: %v", err) | ||
} | ||
|
@@ -430,3 +479,123 @@ func TestBlockMapperTearDownDevice(t *testing.T) { | |
t.Error("csi server may not have received NodeUnstageVolume call") | ||
} | ||
} | ||
|
||
func TestVolumeSetupTeardown(t *testing.T) { | ||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)() | ||
|
||
// Follow volume setup + teardown sequences at top of cs_block.go and set up / clean up one CSI block device. | ||
// Focus on testing that there were no leftover files present after the cleanup. | ||
|
||
plug, tmpDir := newTestPlugin(t, nil) | ||
defer os.RemoveAll(tmpDir) | ||
|
||
csiMapper, spec, pv, err := prepareBlockMapperTest(plug, "test-pv", t) | ||
if err != nil { | ||
t.Fatalf("Failed to make a new Mapper: %v", err) | ||
} | ||
|
||
pvName := pv.GetName() | ||
nodeName := string(plug.host.GetNodeName()) | ||
|
||
csiMapper.csiClient = setupClient(t, true) | ||
|
||
attachID := getAttachmentName(csiMapper.volumeID, string(csiMapper.driverName), string(nodeName)) | ||
attachment := makeTestAttachment(attachID, nodeName, pvName) | ||
attachment.Status.Attached = true | ||
_, err = csiMapper.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{}) | ||
if err != nil { | ||
t.Fatalf("failed to setup VolumeAttachment: %v", err) | ||
} | ||
t.Log("created attachement ", attachID) | ||
|
||
err = csiMapper.SetUpDevice() | ||
if err != nil { | ||
t.Fatalf("mapper failed to SetupDevice: %v", err) | ||
} | ||
// Check if NodeStageVolume staged to the right path | ||
stagingPath := csiMapper.getStagingPath() | ||
svols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() | ||
svol, ok := svols[csiMapper.volumeID] | ||
if !ok { | ||
t.Error("csi server may not have received NodeStageVolume call") | ||
} | ||
if svol.Path != stagingPath { | ||
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path) | ||
} | ||
|
||
path, err := csiMapper.MapPodDevice() | ||
if err != nil { | ||
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err) | ||
} | ||
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() | ||
pvol, ok := pvols[csiMapper.volumeID] | ||
if !ok { | ||
t.Error("csi server may not have received NodePublishVolume call") | ||
} | ||
publishPath := csiMapper.getPublishPath() | ||
if pvol.Path != publishPath { | ||
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path) | ||
} | ||
if path != publishPath { | ||
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path) | ||
} | ||
|
||
unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID) | ||
if err != nil { | ||
t.Fatalf("failed to make a new Unmapper: %v", err) | ||
} | ||
|
||
csiUnmapper := unmapper.(*csiBlockMapper) | ||
csiUnmapper.csiClient = csiMapper.csiClient | ||
|
||
globalMapPath, err := csiUnmapper.GetGlobalMapPath(spec) | ||
if err != nil { | ||
t.Fatalf("unmapper failed to GetGlobalMapPath: %v", err) | ||
} | ||
|
||
err = csiUnmapper.UnmapPodDevice() | ||
if err != nil { | ||
t.Errorf("unmapper failed to call UnmapPodDevice: %v", err) | ||
} | ||
|
||
// GenerateUnmapDeviceFunc uses "" as pod UUID, it is global operation over all pods that used the volume | ||
unmapper, err = plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, "") | ||
if err != nil { | ||
t.Fatalf("failed to make a new Unmapper: %v", err) | ||
} | ||
csiUnmapper = unmapper.(*csiBlockMapper) | ||
csiUnmapper.csiClient = csiMapper.csiClient | ||
|
||
err = csiUnmapper.TearDownDevice(globalMapPath, "/dev/test") | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
pubs := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes() | ||
if _, ok := pubs[csiUnmapper.volumeID]; ok { | ||
t.Error("csi server may not have received NodeUnpublishVolume call") | ||
} | ||
vols := csiUnmapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodeStagedVolumes() | ||
if _, ok := vols[csiUnmapper.volumeID]; ok { | ||
t.Error("csi server may not have received NodeUnstageVolume call") | ||
} | ||
|
||
// Check that all metadata / staging / publish directories were deleted | ||
dataDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) | ||
if _, err := os.Stat(dataDir); err == nil { | ||
t.Errorf("volume publish data directory %s was not deleted", dataDir) | ||
} | ||
devDir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host) | ||
if _, err := os.Stat(devDir); err == nil { | ||
t.Errorf("volume publish device directory %s was not deleted", devDir) | ||
} | ||
if _, err := os.Stat(publishPath); err == nil { | ||
t.Errorf("volume publish path %s was not deleted", publishPath) | ||
} | ||
publishDir := filepath.Dir(publishPath) | ||
if _, err := os.Stat(publishDir); err == nil { | ||
t.Errorf("volume publish parent directory %s was not deleted", publishDir) | ||
} | ||
if _, err := os.Stat(stagingPath); err == nil { | ||
t.Errorf("volume staging path %s was not deleted", stagingPath) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to introduce uncertain device tracking for block volume calls? We can do this in a follow up but we should track that work somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should. Filled #88086
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait is that correct? That is another PR..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I fixed my previous comment with the right link