Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In-Tree to CSI Migration logic for PersistentVolume Attach/Detach #69830

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/controller/.import-restrictions
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@
"github.com/google/gofuzz",
"github.com/prometheus/client_golang/prometheus",
"github.com/robfig/cron",
"github.com/spf13/pflag"
"github.com/spf13/pflag",
"github.com/stretchr/testify/assert",
"github.com/stretchr/testify/require",
"github.com/google/gofuzz",
"github.com/golang/protobuf/ptypes/wrappers",
"github.com/golang/protobuf/proto",
"github.com/container-storage-interface/spec/lib/go/csi"
]
},
{
Expand Down Expand Up @@ -257,6 +263,8 @@
"AllowedPrefixes": [
"k8s.io/csi-api/pkg/apis/csi/v1alpha1",
"k8s.io/csi-api/pkg/client/clientset/versioned",
"k8s.io/csi-api/pkg/client/listers/csi/v1alpha1",
"k8s.io/csi-api/pkg/client/informers/externalversions",
"k8s.io/heapster/metrics/api/v1/types",
"k8s.io/kube-controller-manager/config/v1alpha1",
"k8s.io/metrics/pkg/apis/custom_metrics/v1beta2",
Expand Down Expand Up @@ -288,7 +296,9 @@
"google.golang.org/api/compute/v0.alpha",
"google.golang.org/api/container/v1",
"google.golang.org/api/compute/v0.beta",
"google.golang.org/api/tpu/v1"
"google.golang.org/api/tpu/v1",
"golang.org/x/net/context",
"google.golang.org/grpc"
]
}
]
Expand Down
46 changes: 37 additions & 9 deletions pkg/volume/csi/csi_attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,38 @@ var _ volume.Detacher = &csiAttacher{}
var _ volume.DeviceUnmounter = &csiAttacher{}

func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
var attachID string
var volID string

if volumeName == "" {
klog.Error(log("detacher.Detach missing value for parameter volumeName"))
return errors.New("missing expected parameter volumeName")
}
parts := strings.Split(volumeName, volNameSep)
if len(parts) != 2 {
klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
return errors.New("volumeName missing expected data")

if isAttachmentName(volumeName) {
// Detach can also be called with the attach ID as the `volumeName`. This codepath is
// hit only when we have migrated an in-tree volume to CSI and the A/D Controller is shut
// down, the pod with the volume is deleted, and the A/D Controller starts back up in that
// order.
attachID = volumeName

// Vol ID should be the volume handle, except that is not available here.
// It is only used in log messages so in the event that this happens log messages will be
// printing out the attachID instead of the volume handle.
volID = volumeName
} else {
// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
parts := strings.Split(volumeName, volNameSep)
if len(parts) != 2 {
klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
return errors.New("volumeName missing expected data")
}

driverName := parts[0]
volID = parts[1]
attachID = getAttachmentName(volID, driverName, string(nodeName))
}

driverName := parts[0]
volID := parts[1]
attachID := getAttachmentName(volID, driverName, string(nodeName))
if err := c.k8s.StorageV1beta1().VolumeAttachments().Delete(attachID, nil); err != nil {
if apierrs.IsNotFound(err) {
// object deleted or never existed, done
Expand Down Expand Up @@ -550,12 +568,22 @@ func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
return nil
}

// getAttachmentName returns csi-<sha252(volName,csiDriverName,NodeName>
// getAttachmentName returns csi-<sha256(volName,csiDriverName,NodeName)>
func getAttachmentName(volName, csiDriverName, nodeName string) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
return fmt.Sprintf("csi-%x", result)
}

// isAttachmentName returns true if the string given is of the form of an Attach ID
// and false otherwise
func isAttachmentName(unknownString string) bool {
jsafrane marked this conversation as resolved.
Show resolved Hide resolved
// 68 == "csi-" + len(sha256hash)
if strings.HasPrefix(unknownString, "csi-") && len(unknownString) == 68 {
return true
}
return false
}

func makeDeviceMountPath(plugin *csiPlugin, spec *volume.Spec) (string, error) {
if spec == nil {
return "", fmt.Errorf("makeDeviceMountPath failed, spec is nil")
Expand Down
4 changes: 2 additions & 2 deletions pkg/volume/csi/csi_attacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,9 +1004,9 @@ func newTestWatchPlugin(t *testing.T, csiClient *fakecsi.Clientset) (*csiPlugin,
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)

plug, err := plugMgr.FindPluginByName(csiPluginName)
plug, err := plugMgr.FindPluginByName(CSIPluginName)
if err != nil {
t.Fatalf("can't find plugin %v", csiPluginName)
t.Fatalf("can't find plugin %v", CSIPluginName)
}

csiPlug, ok := plug.(*csiPlugin)
Expand Down
6 changes: 3 additions & 3 deletions pkg/volume/csi/csi_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
// Example: plugins/kubernetes.io/csi/volumeDevices/staging/{pvname}
func (m *csiBlockMapper) getStagingPath() string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "staging", sanitizedSpecVolID)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", sanitizedSpecVolID)
}

// getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
// Example: plugins/kubernetes.io/csi/volumeDevices/publish/{pvname}
func (m *csiBlockMapper) getPublishPath() string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(m.specName)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(csiPluginName), "publish", sanitizedSpecVolID)
return path.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", sanitizedSpecVolID)
}

// GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
// returns: pods/{podUid}/volumeDevices/kubernetes.io~csi, {pvname}
func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(csiPluginName))
path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName))
specName := m.specName
klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, specName))
return path, specName
Expand Down
2 changes: 1 addition & 1 deletion pkg/volume/csi/csi_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *csiMountMgr) GetPath() string {

func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string {
specVolID := utilstrings.EscapeQualifiedName(specVolumeID)
return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(csiPluginName), specVolID)
return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(CSIPluginName), specVolID)
}

// volume.Mounter methods
Expand Down
2 changes: 1 addition & 1 deletion pkg/volume/csi/csi_mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func MounterSetUpTests(t *testing.T, podInfoEnabled bool) {
},
Spec: storage.VolumeAttachmentSpec{
NodeName: "test-node",
Attacher: csiPluginName,
Attacher: CSIPluginName,
Source: storage.VolumeAttachmentSource{
PersistentVolumeName: &pvName,
},
Expand Down
5 changes: 3 additions & 2 deletions pkg/volume/csi/csi_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import (
)

const (
csiPluginName = "kubernetes.io/csi"
// CSIPluginName is the name of the in-tree CSI Plugin
CSIPluginName = "kubernetes.io/csi"

// TODO (vladimirvivien) implement a more dynamic way to discover
// the unix domain socket path for each installed csi driver.
Expand Down Expand Up @@ -230,7 +231,7 @@ func (p *csiPlugin) Init(host volume.VolumeHost) error {
}

func (p *csiPlugin) GetPluginName() string {
return csiPluginName
return CSIPluginName
}

// GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
Expand Down
4 changes: 2 additions & 2 deletions pkg/volume/csi/csi_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func newTestPlugin(t *testing.T, client *fakeclient.Clientset, csiClient *fakecs
plugMgr := &volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), nil /* prober */, host)

plug, err := plugMgr.FindPluginByName(csiPluginName)
plug, err := plugMgr.FindPluginByName(CSIPluginName)
if err != nil {
t.Fatalf("can't find plugin %v", csiPluginName)
t.Fatalf("can't find plugin %v", CSIPluginName)
}

csiPlug, ok := plug.(*csiPlugin)
Expand Down
6 changes: 3 additions & 3 deletions pkg/volume/csi/csi_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,23 @@ func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) {

// log prepends log string with `kubernetes.io/csi`
func log(msg string, parts ...interface{}) string {
return fmt.Sprintf(fmt.Sprintf("%s: %s", csiPluginName, msg), parts...)
return fmt.Sprintf(fmt.Sprintf("%s: %s", CSIPluginName, msg), parts...)
}

// getVolumeDevicePluginDir returns the path where the CSI plugin keeps the
// symlink for a block device associated with a given specVolumeID.
// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev
func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID)
return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "dev")
return path.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "dev")
}

// getVolumeDeviceDataDir returns the path where the CSI plugin keeps the
// volume data for a block device associated with a given specVolumeID.
// path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data
func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string {
sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID)
return path.Join(host.GetVolumeDevicePluginDir(csiPluginName), sanitizedSpecVolID, "data")
return path.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID, "data")
}

// hasReadWriteOnce returns true if modes contains v1.ReadWriteOnce
Expand Down
1 change: 1 addition & 0 deletions pkg/volume/util/operationexecutor/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/kubelet/events:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/csi:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/nestedpendingoperations:go_default_library",
"//pkg/volume/util/types:go_default_library",
Expand Down
86 changes: 71 additions & 15 deletions pkg/volume/util/operationexecutor/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/csi"
"k8s.io/kubernetes/pkg/volume/util"
volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
Expand Down Expand Up @@ -288,6 +289,9 @@ func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
func (og *operationGenerator) GenerateAttachVolumeFunc(
volumeToAttach VolumeToAttach,
actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
var err error
var attachableVolumePlugin volume.AttachableVolumePlugin

// Get attacher plugin
eventRecorderFunc := func(err *error) {
if *err != nil {
Expand All @@ -297,11 +301,29 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}
}

attachableVolumePlugin, err :=
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
eventRecorderFunc(&err)
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
originalSpec := volumeToAttach.VolumeSpec
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToAttach.VolumeSpec) {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
eventRecorderFunc(&err)
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginByName failed", err)
}

csiSpec, err := translateSpec(volumeToAttach.VolumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err)
}

volumeToAttach.VolumeSpec = csiSpec
} else {
attachableVolumePlugin, err =
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
eventRecorderFunc(&err)
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
}
}

volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
Expand All @@ -319,7 +341,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
if derr, ok := attachErr.(*util.DanglingAttachError); ok {
addErr := actualStateOfWorld.MarkVolumeAsAttached(
v1.UniqueVolumeName(""),
volumeToAttach.VolumeSpec,
originalSpec,
derr.CurrentNode,
derr.DevicePath)

Expand All @@ -329,7 +351,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(

} else {
addErr := actualStateOfWorld.MarkVolumeAsUncertain(
v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
v1.UniqueVolumeName(""), originalSpec, volumeToAttach.NodeName)
if addErr != nil {
klog.Errorf("AttachVolume.MarkVolumeAsUncertain fail to add the volume %q to actual state with %s", volumeToAttach.VolumeName, addErr)
}
Expand All @@ -347,7 +369,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(

davidz627 marked this conversation as resolved.
Show resolved Hide resolved
// Update actual state of world
addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
v1.UniqueVolumeName(""), originalSpec, volumeToAttach.NodeName, devicePath)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im wondering why do we need to MarkVolumeAsAttached with originalSpec but not migrated spec?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since in desired state of world the desired volume is the original volume, we want to mark the original volume as attached. Otherwise the reconciler will continuously try to attach the object since we "desire the in-tree volume to be attached" and we have "actually attached the CSI volume"

if addVolumeNodeErr != nil {
// On failure, return error. Caller will log and retry.
return volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
Expand Down Expand Up @@ -378,10 +400,26 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(

if volumeToDetach.VolumeSpec != nil {
// Get attacher plugin
attachableVolumePlugin, err =
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
// useCSIPlugin will check both CSIMigration and the plugin specific feature gate
if useCSIPlugin(og.volumePluginMgr, volumeToDetach.VolumeSpec) {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
}

csiSpec, err := translateSpec(volumeToDetach.VolumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err)
}

volumeToDetach.VolumeSpec = csiSpec
davidz627 marked this conversation as resolved.
Show resolved Hide resolved
} else {
attachableVolumePlugin, err =
og.volumePluginMgr.FindAttachablePluginBySpec(volumeToDetach.VolumeSpec)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
}
}

volumeName, err =
Expand All @@ -397,10 +435,28 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err)
}
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err)

// TODO(dyzz): This case can't distinguish between PV and In-line which is necessary because
// if it was PV it may have been migrated, but the same plugin with in-line may not have been.
// Suggestions welcome...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the easiest is to mandate that migration of PVs and inline volumes for a plugin must happen at the same time. Otherwise it gets really messy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is hard to do that since CSI In-line volume support is also going alpha this quarter so the dependency there is kind of tight. I guess during Q4 alpha of migration this specific functionality can be broken and fixed in 1.14. It is a non-standard codepath anyway (only hit on A/D controller restart when pod was deleted in meantime).
/cc @saad-ali

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coupling this feature with the CSI inline feature will slow everything down. This case is an edge case. Let's document it as a known issue with the alpha release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok for alpha to develop them separately, but IMO inline volumes should go beta and GA before or together with the migration. It's not full feature if you migrate just the easier part :-).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsafrane we plan to have in-line volumes migrated by Q1 for alpha as well. Ideally they will go Beta together Q2

if csilib.IsMigratableByName(pluginName) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
}
// volumeToDetach.VolumeName here is always the in-tree volume name
// therefore a workaround is required. volumeToDetach.DevicePath
// is the attachID which happens to be what volumeName is needed for in Detach.
// Therefore we set volumeName to the attachID. And CSI Detach can detect and use that.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feels its more explicit/simply to do a translation from in-tree volume name to csi volume name in this case. This could be a new method provided by translation library. With this, the magic isAttachment logic in csi plugin is also avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I think this might be possible:
For example for GCE the volume name will be the PDName, for CSI it will be driverName^volumeHandle^ where the volume handle is the volume ID the driver returns which for GCE contains the project, zone, and PDName.
However I dont know if all volumes will be able to have all the information from the volume name translated from in-tree to CSI and vice-versa...

volumeName = volumeToDetach.DevicePath
} else {
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err)
}
}

}

if pluginName == "" {
Expand Down