Skip to content

Commit

Permalink
Merge pull request #74026 from mkimuram/issue/73773
Browse files Browse the repository at this point in the history
Separate staging/publish and unstaging/unpublish logics for block
  • Loading branch information
k8s-ci-robot committed Nov 15, 2019
2 parents 97d45fe + 4578c6c commit cb2684c
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 54 deletions.
179 changes: 140 additions & 39 deletions pkg/volume/csi/csi_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,55 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

/*
This file defines block volume related methods for CSI driver.
CSI driver is responsible for staging/publishing volumes to their staging/publish paths.
Mapping and unmapping of a device in a publish path to its global map path and its
pod device map path are done by operation_executor through MapBlockVolume/UnmapBlockVolume
(MapBlockVolume and UnmapBlockVolume take care for lock, symlink, and bind mount).
Summary of block volume related CSI driver's methods are as follows:
- GetGlobalMapPath returns a global map path,
- GetPodDeviceMapPath returns a pod device map path and filename,
- SetUpDevice calls CSI's NodeStageVolume and stage a volume to its staging path,
- MapPodDevice calls CSI's NodePublishVolume and publish a volume to its publish path,
- UnmapPodDevice calls CSI's NodeUnpublishVolume and unpublish a volume from its publish path,
- TearDownDevice calls CSI's NodeUnstageVolume and unstage a volume from its staging path.
These methods are called by below sequences:
- operation_executor.MountVolume
- csi.GetGlobalMapPath
- csi.SetupDevice
- NodeStageVolume
- ASW.MarkDeviceAsMounted
- csi.GetPodDeviceMapPath
- csi.MapPodDevice
- NodePublishVolume
- util.MapBlockVolume
- ASW.MarkVolumeAsMounted
- operation_executor.UnmountVolume
- csi.GetPodDeviceMapPath
- util.UnmapBlockVolume
- csi.UnmapPodDevice
- NodeUnpublishVolume
- ASW.MarkVolumeAsUnmounted
- operation_executor.UnmountDevice
- csi.TearDownDevice
- NodeUnstageVolume
- ASW.MarkDeviceAsUnmounted
After successful MountVolume for block volume, directory structure will be like below:
/dev/loopX ... Descriptor lock(Loopback device to mapFile under global map path)
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/ ... Global map path
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/{podUID} ... MapFile(Bind mount to publish Path)
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/staging/{specName} ... Staging path
/var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID} ... Publish path
/var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/ ... Pod device map path
/var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/{specName} ... MapFile(Symlink to publish path)
*/

package csi

import (
Expand Down Expand Up @@ -51,34 +100,31 @@ var _ volume.BlockVolumeMapper = &csiBlockMapper{}
var _ volume.CustomBlockVolumeMapper = &csiBlockMapper{}

// GetGlobalMapPath returns a global map path (on the node) to a device file which will be symlinked to
// Example: plugins/kubernetes.io/csi/volumeDevices/{pvname}/dev
// Example: plugins/kubernetes.io/csi/volumeDevices/{specName}/dev
func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
dir := getVolumeDevicePluginDir(spec.Name(), m.plugin.host)
dir := getVolumeDevicePluginDir(m.specName, m.plugin.host)
klog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
return dir, nil
}

// getStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{pvname}
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{specName}
func (m *csiBlockMapper) getStagingPath() string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", sanitizedSpecVolID)
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
}

// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{pvname}
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID}
func (m *csiBlockMapper) getPublishPath() string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", sanitizedSpecVolID)
return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName, string(m.podUID))
}

// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {pvname}
// returns: pods/{podUID}/volumeDevices/kubernetes.io~csi, {specName}
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName))
specName := m.specName
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
return path, specName
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, m.specName))
return path, m.specName
}

// stageVolumeForBlock stages a block volume to stagingPath
Expand Down Expand Up @@ -150,7 +196,6 @@ func (m *csiBlockMapper) publishVolumeForBlock(
accessMode v1.PersistentVolumeAccessMode,
csiSource *v1.CSIPersistentVolumeSource,
attachment *storage.VolumeAttachment,
stagingPath string,
) (string, error) {
klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called"))

Expand Down Expand Up @@ -186,7 +231,7 @@ func (m *csiBlockMapper) publishVolumeForBlock(
ctx,
m.volumeID,
m.readOnly,
stagingPath,
m.getStagingPath(),
publishPath,
accessMode,
publishVolumeInfo,
Expand Down Expand Up @@ -252,13 +297,7 @@ func (m *csiBlockMapper) SetUpDevice() error {
}

// Call NodeStageVolume
stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil {
return err
}

// Call NodePublishVolume
_, err = m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath)
_, err = m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil {
return err
}
Expand All @@ -267,7 +306,59 @@ func (m *csiBlockMapper) SetUpDevice() error {
}

func (m *csiBlockMapper) MapPodDevice() (string, error) {
return m.getPublishPath(), nil
if !m.plugin.blockEnabled {
return "", errors.New("CSIBlockVolume feature not enabled")
}
klog.V(4).Infof(log("blockMapper.MapPodDevice called"))

// Get csiSource from spec
if m.spec == nil {
return "", errors.New(log("blockMapper.MapPodDevice spec is nil"))
}

csiSource, err := getCSISourceFromSpec(m.spec)
if err != nil {
return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI persistent source: %v", err))
}

driverName := csiSource.Driver
skip, err := m.plugin.skipAttach(driverName)
if err != nil {
return "", errors.New(log("blockMapper.MapPodDevice failed to check CSIDriver for %s: %v", driverName, err))
}

var attachment *storage.VolumeAttachment
if !skip {
// Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
nodeName := string(m.plugin.host.GetNodeName())
attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(attachID, meta.GetOptions{})
if err != nil {
return "", errors.New(log("blockMapper.MapPodDevice failed to get volume attachment [id=%v]: %v", attachID, err))
}
}

//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
accessMode := v1.ReadWriteOnce
if m.spec.PersistentVolume.Spec.AccessModes != nil {
accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
}

ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()

csiClient, err := m.csiClientGetter.Get()
if err != nil {
return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI client: %v", err))
}

// Call NodePublishVolume
publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil {
return "", err
}

return publishPath, nil
}

var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
Expand Down Expand Up @@ -322,8 +413,6 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return errors.New("CSIBlockVolume feature not enabled")
}

klog.V(4).Infof(log("unmapper.TearDownDevice(globalMapPath=%s; devicePath=%s)", globalMapPath, devicePath))

ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()

Expand All @@ -332,21 +421,6 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return errors.New(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
}

// Call NodeUnpublishVolume
publishPath := m.getPublishPath()
if _, err := os.Stat(publishPath); err != nil {
if os.IsNotExist(err) {
klog.V(4).Infof(log("blockMapper.TearDownDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
} else {
return err
}
} else {
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
if err != nil {
return err
}
}

// Call NodeUnstageVolume
stagingPath := m.getStagingPath()
if _, err := os.Stat(stagingPath); err != nil {
Expand All @@ -367,5 +441,32 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error

// UnmapPodDevice unmaps the block device path.
func (m *csiBlockMapper) UnmapPodDevice() error {
if !m.plugin.blockEnabled {
return errors.New("CSIBlockVolume feature not enabled")
}
publishPath := m.getPublishPath()

csiClient, err := m.csiClientGetter.Get()
if err != nil {
return errors.New(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
}

ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel()

// Call NodeUnpublishVolume
if _, err := os.Stat(publishPath); err != nil {
if os.IsNotExist(err) {
klog.V(4).Infof(log("blockMapper.UnmapPodDevice publishPath(%s) has already been deleted, skip calling NodeUnpublishVolume", publishPath))
} else {
return err
}
} else {
err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
if err != nil {
return err
}
}

return nil
}
63 changes: 48 additions & 15 deletions pkg/volume/csi/csi_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,40 @@ func prepareBlockMapperTest(plug *csiPlugin, specVolumeName string, t *testing.T
return csiMapper, spec, pv, nil
}

func prepareBlockUnmapperTest(plug *csiPlugin, specVolumeName string, t *testing.T) (*csiBlockMapper, *volume.Spec, *api.PersistentVolume, error) {
registerFakePlugin(testDriver, "endpoint", []string{"1.0.0"}, t)
pv := makeTestPV(specVolumeName, 10, testDriver, testVol)
spec := volume.NewSpecFromPersistentVolume(pv, pv.Spec.PersistentVolumeSource.CSI.ReadOnly)

// save volume data
dir := getVolumeDeviceDataDir(pv.ObjectMeta.Name, plug.host)
if err := os.MkdirAll(dir, 0755); err != nil && !os.IsNotExist(err) {
t.Errorf("failed to create dir [%s]: %v", dir, err)
}

if err := saveVolumeData(
dir,
volDataFileName,
map[string]string{
volDataKey.specVolID: pv.ObjectMeta.Name,
volDataKey.driverName: testDriver,
volDataKey.volHandle: testVol,
},
); err != nil {
t.Fatalf("failed to save volume data: %v", err)
}

unmapper, err := plug.NewBlockVolumeUnmapper(pv.ObjectMeta.Name, testPodUID)
if err != nil {
t.Fatalf("failed to make a new Unmapper: %v", err)
}

csiUnmapper := unmapper.(*csiBlockMapper)
csiUnmapper.csiClient = setupClient(t, true)

return csiUnmapper, spec, pv, nil
}

func TestBlockMapperGetGlobalMapPath(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIBlockVolume, true)()

Expand Down Expand Up @@ -141,12 +175,12 @@ func TestBlockMapperGetPublishPath(t *testing.T) {
{
name: "simple specName",
specVolumeName: "spec-0",
path: filepath.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s", "spec-0")),
path: filepath.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s/%s", "spec-0", testPodUID)),
},
{
name: "specName with dots",
specVolumeName: "test.spec.1",
path: filepath.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s", "test.spec.1")),
path: filepath.Join(tmpDir, fmt.Sprintf("plugins/kubernetes.io/csi/volumeDevices/publish/%s/%s", "test.spec.1", testPodUID)),
},
}
for _, tc := range testCases {
Expand Down Expand Up @@ -254,18 +288,6 @@ func TestBlockMapperSetupDevice(t *testing.T) {
if svol.Path != stagingPath {
t.Errorf("csi server expected device path %s, got %s", stagingPath, svol.Path)
}

// Check if NodePublishVolume published to the right path
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
pvol, ok := pvols[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}

publishPath := csiMapper.getPublishPath()
if pvol.Path != publishPath {
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
}
}

func TestBlockMapperMapPodDevice(t *testing.T) {
Expand Down Expand Up @@ -307,9 +329,20 @@ func TestBlockMapperMapPodDevice(t *testing.T) {
if err != nil {
t.Fatalf("mapper failed to GetGlobalMapPath: %v", err)
}

// Check if NodePublishVolume published to the right path
pvols := csiMapper.csiClient.(*fakeCsiDriverClient).nodeClient.GetNodePublishedVolumes()
pvol, ok := pvols[csiMapper.volumeID]
if !ok {
t.Error("csi server may not have received NodePublishVolume call")
}

publishPath := csiMapper.getPublishPath()
if pvol.Path != publishPath {
t.Errorf("csi server expected path %s, got %s", publishPath, pvol.Path)
}
if path != publishPath {
t.Errorf("path %s and %s doesn't match", path, publishPath)
t.Errorf("csi server expected path %s, but MapPodDevice returned %s", publishPath, path)
}
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/volume/csi/fake/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,20 @@ func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
return f.nodePublishedVolumes
}

// AddNodePublishedVolume adds specified volume to nodePublishedVolumes
func (f *NodeClient) AddNodePublishedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
f.nodePublishedVolumes[volID] = CSIVolume{
Path: deviceMountPath,
VolumeContext: volumeContext,
}
}

// GetNodeStagedVolumes returns node staged volumes
func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume {
return f.nodeStagedVolumes
}

// AddNodeStagedVolume adds specified volume to nodeStagedVolumes
func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
f.nodeStagedVolumes[volID] = CSIVolume{
Path: deviceMountPath,
Expand Down

0 comments on commit cb2684c

Please sign in to comment.