Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add list of pods that use a volume to multiattach events #56288

Merged
merged 1 commit into from Jan 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 25 additions & 0 deletions pkg/controller/volume/attachdetach/cache/desired_state_of_world.go
Expand Up @@ -105,6 +105,10 @@ type DesiredStateOfWorld interface {
// Mark multiattach error as reported to prevent spamming multiple
// events for same error
SetMultiAttachError(v1.UniqueVolumeName, k8stypes.NodeName)

// GetPodsOnNodes returns list of pods ("namespace/name") that require
// given volume on given nodes.
GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod
}

// VolumeToAttach represents a volume that should be attached to a node.
Expand Down Expand Up @@ -412,3 +416,24 @@ func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd {
}
return pods
}

func (dsw *desiredStateOfWorld) GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod {
dsw.RLock()
defer dsw.RUnlock()

pods := []*v1.Pod{}
for _, nodeName := range nodes {
node, ok := dsw.nodesManaged[nodeName]
if !ok {
continue
}
volume, ok := node.volumesToAttach[volumeName]
if !ok {
continue
}
for _, pod := range volume.scheduledPods {
pods = append(pods, pod.podObj)
}
}
return pods
}
Expand Up @@ -1032,3 +1032,49 @@ func verifyVolumeToAttach(

t.Fatalf("volumesToAttach (%v) should contain %q/%q. It does not.", volumesToAttach, expectedVolumeName, expectedNodeName)
}

func Test_GetPodsOnNodes(t *testing.T) {
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)

// 2 nodes, each with one pod with a different volume
node1Name := k8stypes.NodeName("node1-name")
pod1Name := "pod1-uid"
volume1Name := v1.UniqueVolumeName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name, false /*keepTerminatedPodVolumes*/)
generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod1Name,
podAddErr)
}
node2Name := k8stypes.NodeName("node2-name")
pod2Name := "pod2-uid"
volume2Name := v1.UniqueVolumeName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name, false /*keepTerminatedPodVolumes*/)
_, podAddErr = dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod2Name,
podAddErr)
}

// Third node without any pod
node3Name := k8stypes.NodeName("node3-name")
dsw.AddNode(node3Name, false /*keepTerminatedPodVolumes*/)

// Act
pods := dsw.GetVolumePodsOnNodes([]k8stypes.NodeName{node1Name, node2Name, node3Name, "non-existing-node"}, generatedVolume1Name)

// Assert
if len(pods) != 1 {
t.Fatalf("Expected 1 pod, got %d", len(pods))
}
if pods[0].Name != pod1Name {
t.Errorf("Expected pod %s/%s, got %s", pod1Name, pod1Name, pods[0].Name)
}
}
2 changes: 2 additions & 0 deletions pkg/controller/volume/attachdetach/reconciler/BUILD
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/volume/util/operationexecutor:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
Expand All @@ -34,6 +35,7 @@ go_test(
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
Expand Down
81 changes: 76 additions & 5 deletions pkg/controller/volume/attachdetach/reconciler/reconciler.go
Expand Up @@ -21,10 +21,12 @@ package reconciler

import (
"fmt"
"strings"
"time"

"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
Expand Down Expand Up @@ -269,12 +271,8 @@ func (rc *reconciler) attachDesiredVolumes() {
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 {
if !volumeToAttach.MultiAttachErrorReported {
simpleMsg, detailedMsg := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
for _, pod := range volumeToAttach.ScheduledPods {
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}
rc.reportMultiAttachError(volumeToAttach, nodes)
rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
glog.Warningf(detailedMsg)
}
continue
}
Expand All @@ -292,5 +290,78 @@ func (rc *reconciler) attachDesiredVolumes() {
glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
}
}
}

// reportMultiAttachError sends events and logs situation that a volume that
// should be attached to a node is already attached to different node(s).
func (rc *reconciler) reportMultiAttachError(volumeToAttach cache.VolumeToAttach, nodes []types.NodeName) {
// Filter out the current node from list of nodes where the volume is
// attached.
// Some methods need []string, some other needs []NodeName, collect both.
// In theory, these arrays should have always only one element - the
// controller does not allow more than one attachment. But use array just
// in case...
otherNodes := []types.NodeName{}
otherNodesStr := []string{}
for _, node := range nodes {
if node != volumeToAttach.NodeName {
otherNodes = append(otherNodes, node)
otherNodesStr = append(otherNodesStr, string(node))
}
}

// Get list of pods that use the volume on the other nodes.
pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName)
Copy link
Member

@gnufied gnufied Jan 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this information be retrieved from desired state of world or actual state of world? It is possible that - multiple pods are requesting same volume on more than 1 node and they are all not getting attached because some other pod(on some other node) is actually using the volume.

In which case - while MultiAttach error is still mostly accurate, we may be printing wrong information back to the user and to the admin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem is that ASW does not keep list of pods for attached volumes. It's reconstructed from node.status when the controller is restarted and we don't put pods there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the comment below to:

// We did not find any pods that requests the volume. The pod must have been deleted already.

Message sent to user is: "Volume is already exclusively attached to one node and can't be attached to another" and IMO there is not much we could do about it.


if len(pods) == 0 {
// We did not find any pods that requests the volume. The pod must have been deleted already.
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
for _, pod := range volumeToAttach.ScheduledPods {
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}
// Log detailed message to system admin
nodeList := strings.Join(otherNodesStr, ", ")
detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already exclusively attached to node %s and can't be attached to another", nodeList))
glog.Warningf(detailedMsg)
return
}

// There are pods that require the volume and run on another node. Typically
// it's user error, e.g. a ReplicaSet uses a PVC and has >1 replicas. Let
// the user know what pods are blocking the volume.
for _, scheduledPod := range volumeToAttach.ScheduledPods {
// Each scheduledPod must get a custom message. They can run in
// different namespaces and user of a namespace should not see names of
// pods in other namespaces.
localPodNames := []string{} // Names of pods in scheduledPods's namespace
otherPods := 0 // Count of pods in other namespaces
for _, pod := range pods {
if pod.Namespace == scheduledPod.Namespace {
localPodNames = append(localPodNames, pod.Name)
} else {
otherPods++
}
}

var msg string
if len(localPodNames) > 0 {
msg = fmt.Sprintf("Volume is already used by pod(s) %s", strings.Join(localPodNames, ", "))
if otherPods > 0 {
msg = fmt.Sprintf("%s and %d pod(s) in different namespaces", msg, otherPods)
}
} else {
// No local pods, there are pods only in different namespaces.
msg = fmt.Sprintf("Volume is already used by %d pod(s) in different namespaces", otherPods)
}
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", msg)
rc.recorder.Eventf(scheduledPod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}

// Log all pods for system admin
podNames := []string{}
for _, pod := range pods {
podNames = append(podNames, pod.Namespace+"/"+pod.Name)
}
detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already used by pods %s on node %s", strings.Join(podNames, ", "), strings.Join(otherNodesStr, ", ")))
glog.Warningf(detailedMsg)
}
110 changes: 110 additions & 0 deletions pkg/controller/volume/attachdetach/reconciler/reconciler_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
stringutil "k8s.io/kubernetes/pkg/util/strings"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
Expand Down Expand Up @@ -531,6 +532,115 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
}

func Test_ReportMultiAttachError(t *testing.T) {
type nodeWithPods struct {
name k8stypes.NodeName
podNames []string
}
tests := []struct {
name string
nodes []nodeWithPods
expectedEvents []string
}{
{
"no pods use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already exclusively attached to one node and can't be attached to another"},
},
{
"pods in the same namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns1/pod2"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod2"},
},
{
"pods in anotother namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns2/pod2"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by 1 pod(s) in different namespaces"},
},
{
"pods both in the same and anotother namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns2/pod2"}},
{"node3", []string{"ns1/pod3"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod3 and 1 pod(s) in different namespaces"},
},
}

for _, test := range tests {
// Arrange
t.Logf("Test %q starting", test.name)
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := record.NewFakeRecorder(100)
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
rc := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)

nodes := []k8stypes.NodeName{}
for _, n := range test.nodes {
dsw.AddNode(n.name, false /*keepTerminatedPodVolumes*/)
nodes = append(nodes, n.name)
for _, podName := range n.podNames {
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
uid := string(n.name) + "-" + podName // unique UID
namespace, name := stringutil.SplitQualifiedName(podName)
pod := controllervolumetesting.NewPod(uid, name)
pod.Namespace = namespace
_, err := dsw.AddPod(types.UniquePodName(uid), pod, volumeSpec, n.name)
if err != nil {
t.Fatalf("Error adding pod %s to DSW: %s", podName, err)
}
}
}
// Act
volumes := dsw.GetVolumesToAttach()
for _, vol := range volumes {
if vol.NodeName == "node1" {
rc.(*reconciler).reportMultiAttachError(vol, nodes)
}
}

// Assert
close(fakeRecorder.Events)
index := 0
for event := range fakeRecorder.Events {
if len(test.expectedEvents) < index {
t.Errorf("Test %q: unexpected event received: %s", test.name, event)
} else {
expectedEvent := test.expectedEvents[index]
if expectedEvent != event {
t.Errorf("Test %q: event %d: expected %q, got %q", test.name, index, expectedEvent, event)
}
}
index++
}
for i := index; i < len(test.expectedEvents); i++ {
t.Errorf("Test %q: event %d: expected %q, got none", test.name, i, test.expectedEvents[i])
}
}
}

func waitForMultiAttachErrorOnNode(
t *testing.T,
attachedNode k8stypes.NodeName,
Expand Down