Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block volumes Support: CRI, volumemanager and operationexecutor changes #51494

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/volume/attachdetach/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
Expand Down
13 changes: 12 additions & 1 deletion pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
Expand Down Expand Up @@ -136,6 +137,7 @@ func NewAttachDetachController(
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "attachdetach-controller"})
blkutil := volumeutil.NewBlockVolumePathHandler()

adc.desiredStateOfWorld = cache.NewDesiredStateOfWorld(&adc.volumePluginMgr)
adc.actualStateOfWorld = cache.NewActualStateOfWorld(&adc.volumePluginMgr)
Expand All @@ -144,7 +146,8 @@ func NewAttachDetachController(
kubeClient,
&adc.volumePluginMgr,
recorder,
false)) // flag for experimental binary check for volume mount
false, // flag for experimental binary check for volume mount
blkutil))
adc.nodeStatusUpdater = statusupdater.NewNodeStatusUpdater(
kubeClient, nodeInformer.Lister(), adc.actualStateOfWorld)

Expand Down Expand Up @@ -515,6 +518,10 @@ func (adc *attachDetachController) GetPluginDir(podUID string) string {
return ""
}

func (adc *attachDetachController) GetVolumeDevicePluginDir(podUID string) string {
return ""
}

func (adc *attachDetachController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
return ""
}
Expand All @@ -523,6 +530,10 @@ func (adc *attachDetachController) GetPodPluginDir(podUID types.UID, pluginName
return ""
}

func (adc *attachDetachController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
return ""
}

func (adc *attachDetachController) GetKubeClient() clientset.Interface {
return adc.kubeClient
}
Expand Down
56 changes: 49 additions & 7 deletions pkg/controller/volume/attachdetach/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ func Test_Run_Positive_DoNothing(t *testing.T) {
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc())
nsu := statusupdater.NewNodeStatusUpdater(
fakeKubeClient, informerFactory.Core().V1().Nodes().Lister(), asw)
Expand Down Expand Up @@ -80,7 +86,13 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) {
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
Expand Down Expand Up @@ -126,7 +138,13 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
Expand Down Expand Up @@ -193,7 +211,13 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
Expand Down Expand Up @@ -260,7 +284,13 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
Expand Down Expand Up @@ -330,7 +360,13 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing.
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
Expand Down Expand Up @@ -416,7 +452,13 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := &record.FakeRecorder{}
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(fakeKubeClient, volumePluginMgr, fakeRecorder, false /* checkNodeCapabilitiesBeforeMount */))
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
reconciler := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/volume/expand/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/util/io:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
Expand Down
13 changes: 12 additions & 1 deletion pkg/controller/volume/expand/expand_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)

Expand Down Expand Up @@ -117,12 +118,14 @@ func NewExpandController(
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "volume_expand"})
blkutil := util.NewBlockVolumePathHandler()

expc.opExecutor = operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
kubeClient,
&expc.volumePluginMgr,
recorder,
false))
false,
blkutil))

expc.resizeMap = cache.NewVolumeResizeMap(expc.kubeClient)

Expand Down Expand Up @@ -203,10 +206,18 @@ func (expc *expandController) GetPluginDir(pluginName string) string {
return ""
}

func (expc *expandController) GetVolumeDevicePluginDir(pluginName string) string {
return ""
}

func (expc *expandController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
return ""
}

func (expc *expandController) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
return ""
}

func (expc *expandController) GetPodPluginDir(podUID types.UID, pluginName string) string {
return ""
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/volume/persistentvolume/volume_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (ctrl *PersistentVolumeController) GetPluginDir(pluginName string) string {
return ""
}

func (ctrl *PersistentVolumeController) GetVolumeDevicePluginDir(pluginName string) string {
return ""
}

func (ctrl *PersistentVolumeController) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
return ""
}
Expand All @@ -45,6 +49,10 @@ func (ctrl *PersistentVolumeController) GetPodPluginDir(podUID types.UID, plugin
return ""
}

func (ctrl *PersistentVolumeController) GetPodVolumeDeviceDir(ppodUID types.UID, pluginName string) string {
return ""
}

func (ctrl *PersistentVolumeController) GetKubeClient() clientset.Interface {
return ctrl.kubeClient
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package config
const (
DefaultKubeletPodsDirName = "pods"
DefaultKubeletVolumesDirName = "volumes"
DefaultKubeletVolumeDevicesDirName = "volumeDevices"
DefaultKubeletPluginsDirName = "plugins"
DefaultKubeletContainersDirName = "containers"
DefaultKubeletPluginContainersDirName = "plugin-containers"
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/container/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,14 @@ type RunContainerOptions struct {
type VolumeInfo struct {
// Mounter is the volume's mounter
Mounter volume.Mounter
// BlockVolumeMapper is the Block volume's mapper
BlockVolumeMapper volume.BlockVolumeMapper
// SELinuxLabeled indicates whether this volume has had the
// pod's SELinux label applied to it or not
SELinuxLabeled bool
// Whether the volume permission is set to read-only or not
// This value is passed from volume.spec
ReadOnly bool
}

type VolumeMap map[string]VolumeInfo
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const (
FailedMountVolume = "FailedMount"
VolumeResizeFailed = "VolumeResizeFailed"
FailedUnMountVolume = "FailedUnMount"
FailedMapVolume = "FailedMapVolume"
FailedUnmapDevice = "FailedUnmapDevice"
WarnAlreadyMountedVolume = "AlreadyMountedVolume"
SuccessfulDetachVolume = "SuccessfulDetachVolume"
SuccessfulMountVolume = "SuccessfulMountVolume"
Expand Down
28 changes: 28 additions & 0 deletions pkg/kubelet/kubelet_getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ func (kl *Kubelet) getPluginDir(pluginName string) string {
return filepath.Join(kl.getPluginsDir(), pluginName)
}

// getVolumeDevicePluginsDir returns the full path to the directory under which plugin
// directories are created. Plugins can use these directories for data that
// they need to persist. Plugins should create subdirectories under this named
// after their own names.
func (kl *Kubelet) getVolumeDevicePluginsDir() string {
return filepath.Join(kl.getRootDir(), config.DefaultKubeletPluginsDirName)
}

// getVolumeDevicePluginDir returns a data directory name for a given plugin name.
// Plugins can use these directories to store data that they need to persist.
// For per-pod plugin data, see getVolumeDevicePluginsDir.
func (kl *Kubelet) getVolumeDevicePluginDir(pluginName string) string {
return filepath.Join(kl.getVolumeDevicePluginsDir(), pluginName, config.DefaultKubeletVolumeDevicesDirName)
}

// GetPodDir returns the full path to the per-pod data directory for the
// specified pod. This directory may not exist if the pod does not exist.
func (kl *Kubelet) GetPodDir(podUID types.UID) string {
Expand All @@ -90,6 +105,19 @@ func (kl *Kubelet) getPodVolumeDir(podUID types.UID, pluginName string, volumeNa
return filepath.Join(kl.getPodVolumesDir(podUID), pluginName, volumeName)
}

// getPodVolumeDevicesDir returns the full path to the per-pod data directory under
// which volumes are created for the specified pod. This directory may not
// exist if the pod does not exist.
func (kl *Kubelet) getPodVolumeDevicesDir(podUID types.UID) string {
return filepath.Join(kl.getPodDir(podUID), config.DefaultKubeletVolumeDevicesDirName)
}

// getPodVolumeDeviceDir returns the full path to the directory which represents the
// named plugin for specified pod. This directory may not exist if the pod does not exist.
func (kl *Kubelet) getPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
return filepath.Join(kl.getPodVolumeDevicesDir(podUID), pluginName)
}

// getPodPluginsDir returns the full path to the per-pod data directory under
// which plugins may store data for the specified pod. This directory may not
// exist if the pod does not exist.
Expand Down
50 changes: 47 additions & 3 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util/format"
utilfile "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
volumevalidation "k8s.io/kubernetes/pkg/volume/validation"
"k8s.io/kubernetes/third_party/forked/golang/expansion"
Expand Down Expand Up @@ -91,9 +92,9 @@ func (kl *Kubelet) GetActivePods() []*v1.Pod {
return activePods
}

// makeDevices determines the devices for the given container.
// makeGPUDevices determines the devices for the given container.
// Experimental.
func (kl *Kubelet) makeDevices(pod *v1.Pod, container *v1.Container) ([]kubecontainer.DeviceInfo, error) {
func (kl *Kubelet) makeGPUDevices(pod *v1.Pod, container *v1.Container) ([]kubecontainer.DeviceInfo, error) {
if container.Resources.Limits.NvidiaGPU().IsZero() {
return nil, nil
}
Expand Down Expand Up @@ -128,6 +129,39 @@ func makeAbsolutePath(goos, path string) string {
return "c:\\" + path
}

// makeBlockVolumes maps the raw block devices specified in the path of the container
// Experimental
func (kl *Kubelet) makeBlockVolumes(pod *v1.Pod, container *v1.Container, podVolumes kubecontainer.VolumeMap, blkutil volumeutil.BlockVolumePathHandler) ([]kubecontainer.DeviceInfo, error) {
var devices []kubecontainer.DeviceInfo
for _, device := range container.VolumeDevices {
// check path is absolute
if !filepath.IsAbs(device.DevicePath) {
return nil, fmt.Errorf("error DevicePath `%s` must be an absolute path", device.DevicePath)
}
vol, ok := podVolumes[device.Name]
if !ok || vol.BlockVolumeMapper == nil {
glog.Errorf("Block volume cannot be satisfied for container %q, because the volume is missing or the volume mapper is nil: %+v", container.Name, device)
return nil, fmt.Errorf("cannot find volume %q to pass into container %q", device.Name, container.Name)
}
// Get a symbolic link associated to a block device under pod device path
dirPath, volName := vol.BlockVolumeMapper.GetPodDeviceMapPath()
symlinkPath := path.Join(dirPath, volName)
if islinkExist, checkErr := blkutil.IsSymlinkExist(symlinkPath); checkErr != nil {
return nil, checkErr
} else if islinkExist {
// Check readOnly in PVCVolumeSource and set read only permission if it's true.
permission := "mrw"
if vol.ReadOnly {
permission = "r"
}
glog.V(4).Infof("Device will be attached to container %q. Path on host: %v", container.Name, symlinkPath)
devices = append(devices, kubecontainer.DeviceInfo{PathOnHost: symlinkPath, PathInContainer: device.DevicePath, Permissions: permission})
}
}

return devices, nil
}

// makeMounts determines the mount points for the given container.
func makeMounts(pod *v1.Pod, podDir string, container *v1.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
// Kubernetes only mounts on /etc/hosts if:
Expand Down Expand Up @@ -416,12 +450,22 @@ func (kl *Kubelet) GenerateRunContainerOptions(pod *v1.Pod, container *v1.Contai

opts.PortMappings = kubecontainer.MakePortMappings(container)
// TODO(random-liu): Move following convert functions into pkg/kubelet/container
devices, err := kl.makeDevices(pod, container)
devices, err := kl.makeGPUDevices(pod, container)
if err != nil {
return nil, err
}
opts.Devices = append(opts.Devices, devices...)

// TODO: remove feature gate check after no longer needed
if utilfeature.DefaultFeatureGate.Enabled(features.BlockVolume) {
blkutil := volumeutil.NewBlockVolumePathHandler()
blkVolumes, err := kl.makeBlockVolumes(pod, container, volumes, blkutil)
if err != nil {
return nil, err
}
opts.Devices = append(opts.Devices, blkVolumes...)
}

mounts, err := makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
if err != nil {
return nil, err
Expand Down