Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #29673 #29746

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ type nodeToUpdateStatusFor struct {
}

func (asw *actualStateOfWorld) MarkVolumeAsAttached(
volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_ api.UniqueVolumeName, volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_, err := asw.AddVolumeNode(volumeSpec, nodeName, devicePath)
return err
}
Expand Down
44 changes: 21 additions & 23 deletions pkg/kubelet/volumemanager/cache/actual_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ type ActualStateOfWorld interface {
// operationexecutor to interact with it.
operationexecutor.ActualStateOfWorldAttacherUpdater

// AddVolume adds the given volume to the cache indicating the specified
// volume is attached to this node. A unique volume name is generated from
// the volumeSpec and returned on success.
// If a volume with the same generated name already exists, this is a noop.
// If no volume plugin can support the given volumeSpec or more than one
// plugin can support it, an error is returned.
AddVolume(volumeSpec *volume.Spec, devicePath string) (api.UniqueVolumeName, error)

// AddPodToVolume adds the given pod to the given volume in the cache
// indicating the specified volume has been successfully mounted to the
// specified pod.
Expand Down Expand Up @@ -274,9 +266,8 @@ type mountedPod struct {
}

func (asw *actualStateOfWorld) MarkVolumeAsAttached(
volumeSpec *volume.Spec, nodeName string, devicePath string) error {
_, err := asw.AddVolume(volumeSpec, devicePath)
return err
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, _, devicePath string) error {
return asw.addVolume(volumeName, volumeSpec, devicePath)
}

func (asw *actualStateOfWorld) MarkVolumeAsDetached(
Expand Down Expand Up @@ -315,27 +306,34 @@ func (asw *actualStateOfWorld) MarkDeviceAsUnmounted(
return asw.SetVolumeGloballyMounted(volumeName, false /* globallyMounted */)
}

func (asw *actualStateOfWorld) AddVolume(
volumeSpec *volume.Spec, devicePath string) (api.UniqueVolumeName, error) {
// addVolume adds the given volume to the cache indicating the specified
// volume is attached to this node. If no volume name is supplied, a unique
// volume name is generated from the volumeSpec and returned on success. If a
// volume with the same generated name already exists, this is a noop. If no
// volume plugin can support the given volumeSpec or more than one plugin can
// support it, an error is returned.
func (asw *actualStateOfWorld) addVolume(
volumeName api.UniqueVolumeName, volumeSpec *volume.Spec, devicePath string) error {
asw.Lock()
defer asw.Unlock()

volumePlugin, err := asw.volumePluginMgr.FindPluginBySpec(volumeSpec)
if err != nil || volumePlugin == nil {
return "", fmt.Errorf(
return fmt.Errorf(
"failed to get Plugin from volumeSpec for volume %q err=%v",
volumeSpec.Name(),
err)
}

volumeName, err :=
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
if len(volumeName) == 0 {
volumeName, err = volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
}

pluginIsAttachable := false
Expand All @@ -357,7 +355,7 @@ func (asw *actualStateOfWorld) AddVolume(
asw.attachedVolumes[volumeName] = volumeObj
}

return volumeObj.volumeName, nil
return nil
}

func (asw *actualStateOfWorld) AddPodToVolume(
Expand Down
104 changes: 76 additions & 28 deletions pkg/kubelet/volumemanager/cache/actual_state_of_world_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)

// Calls AddVolume() once to add volume
var emptyVolumeName = api.UniqueVolumeName("")

// Calls MarkVolumeAsAttached() once to add volume
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_AddVolume_Positive_NewVolume(t *testing.T) {
func Test_MarkVolumeAsAttached_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Expand All @@ -53,28 +55,29 @@ func Test_AddVolume_Positive_NewVolume(t *testing.T) {
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
generatedVolumeName, _ := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)

// Act
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
err := asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)

// Assert
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}

verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, generatedVolumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, generatedVolumeName, asw)
}

// Calls AddVolume() twice to add the same volume
// Verifies second call doesn't fail
// Calls MarkVolumeAsAttached() once to add volume, specifying a name --
// establishes that the supplied volume name is used to register the volume
// rather than the generated one.
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_AddVolume_Positive_ExistingVolume(t *testing.T) {
func Test_MarkVolumeAsAttached_SuppliedVolumeName_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
devicePath := "fake/device/path"
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Expand All @@ -94,19 +97,64 @@ func Test_AddVolume_Positive_ExistingVolume(t *testing.T) {
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
volumeName := api.UniqueVolumeName("this-would-never-be-a-volume-name")

// Act
err := asw.MarkVolumeAsAttached(volumeName, volumeSpec, "" /* nodeName */, devicePath)

// Assert
if err != nil {
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}

verifyVolumeExistsAsw(t, volumeName, true /* shouldExist */, asw)
verifyVolumeExistsInUnmountedVolumes(t, volumeName, asw)
verifyVolumeDoesntExistInGloballyMountedVolumes(t, volumeName, asw)
}

// Calls MarkVolumeAsAttached() twice to add the same volume
// Verifies second call doesn't fail
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume doesn't exist in GetGloballyMountedVolumes()
func Test_MarkVolumeAsAttached_Positive_ExistingVolume(t *testing.T) {
// Arrange
volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t)
devicePath := "fake/device/path"
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
UID: "pod1uid",
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "volume-name",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device1",
},
},
},
},
},
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
generatedVolumeName, _ := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)

err := asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}

// Act
generatedVolumeName, err = asw.AddVolume(volumeSpec, devicePath)
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)

// Assert
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}

verifyVolumeExistsAsw(t, generatedVolumeName, true /* shouldExist */, asw)
Expand Down Expand Up @@ -141,14 +189,12 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {
},
},
}

volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec)
generatedVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)

generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
podName := volumehelper.GetUniquePodName(pod)

Expand All @@ -159,7 +205,7 @@ func Test_AddPodToVolume_Positive_ExistingVolumeNewNode(t *testing.T) {

// Act
err = asw.AddPodToVolume(
podName, pod.UID, volumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
podName, pod.UID, generatedVolumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)

// Assert
if err != nil {
Expand Down Expand Up @@ -202,12 +248,12 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
}

volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
generatedVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(
plugin, volumeSpec)

generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}
podName := volumehelper.GetUniquePodName(pod)

Expand All @@ -217,14 +263,14 @@ func Test_AddPodToVolume_Positive_ExistingVolumeExistingNode(t *testing.T) {
}

err = asw.AddPodToVolume(
podName, pod.UID, volumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
podName, pod.UID, generatedVolumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
if err != nil {
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
}

// Act
err = asw.AddPodToVolume(
podName, pod.UID, volumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)
podName, pod.UID, generatedVolumeName, mounter, volumeSpec.Name(), "" /* volumeGidValue */)

// Assert
if err != nil {
Expand Down Expand Up @@ -301,13 +347,13 @@ func Test_AddPodToVolume_Negative_VolumeDoesntExist(t *testing.T) {
asw)
}

// Calls AddVolume() once to add volume
// Calls MarkVolumeAsAttached() once to add volume
// Calls MarkDeviceAsMounted() to mark volume as globally mounted.
// Verifies newly added volume exists in GetUnmountedVolumes()
// Verifies newly added volume exists in GetGloballyMountedVolumes()
func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
// Arrange
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
volumePluginMgr, plugin := volumetesting.GetTestVolumePluginMgr(t)
asw := NewActualStateOfWorld("mynode" /* nodeName */, volumePluginMgr)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Expand All @@ -329,9 +375,11 @@ func Test_MarkDeviceAsMounted_Positive_NewVolume(t *testing.T) {
}
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
devicePath := "fake/device/path"
generatedVolumeName, err := asw.AddVolume(volumeSpec, devicePath)
generatedVolumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)

err = asw.MarkVolumeAsAttached(emptyVolumeName, volumeSpec, "" /* nodeName */, devicePath)
if err != nil {
t.Fatalf("AddVolume failed. Expected: <no error> Actual: <%v>", err)
t.Fatalf("MarkVolumeAsAttached failed. Expected: <no error> Actual: <%v>", err)
}

// Act
Expand Down
31 changes: 22 additions & 9 deletions pkg/kubelet/volumemanager/cache/desired_state_of_world.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,22 +183,35 @@ func (dsw *desiredStateOfWorld) AddPodToVolume(
err)
}

volumeName, err :=
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
var volumeName api.UniqueVolumeName

// The unique volume name used depends on whether the volume is attachable
// or not.
attachable := dsw.isAttachableVolume(volumeSpec)
if attachable {
// For attachable volumes, use the unique volume name as reported by
// the plugin.
volumeName, err =
volumehelper.GetUniqueVolumeNameFromSpec(volumePlugin, volumeSpec)
if err != nil {
return "", fmt.Errorf(
"failed to GetUniqueVolumeNameFromSpec for volumeSpec %q using volume plugin %q err=%v",
volumeSpec.Name(),
volumePlugin.GetPluginName(),
err)
}
} else {
// For non-attachable volumes, generate a unique name based on the pod
// namespace and name and the name of the volume within the pod.
volumeName = volumehelper.GetUniqueVolumeNameForNonAttachableVolume(podName, volumePlugin, outerVolumeSpecName)
}

volumeObj, volumeExists := dsw.volumesToMount[volumeName]
if !volumeExists {
volumeObj = volumeToMount{
volumeName: volumeName,
podsToMount: make(map[types.UniquePodName]podToMount),
pluginIsAttachable: dsw.isAttachableVolume(volumeSpec),
pluginIsAttachable: attachable,
volumeGidValue: volumeGidValue,
reportedInUse: false,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/volume/git_repo/git_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (plugin *gitRepoPlugin) GetPluginName() string {
func (plugin *gitRepoPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
volumeSource, _ := getVolumeSource(spec)
if volumeSource == nil {
return "", fmt.Errorf("Spec does not reference a GCE volume type")
return "", fmt.Errorf("Spec does not reference a Git repo volume type")
}

return fmt.Sprintf(
Expand Down