Skip to content

Commit

Permalink
Add list of pods that use a volume to multiattach events
Browse files Browse the repository at this point in the history
So users knows what pods are blocking a volume and can realize their error.
  • Loading branch information
jsafrane committed Jan 24, 2018
1 parent ba09291 commit e46c886
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 5 deletions.
25 changes: 25 additions & 0 deletions pkg/controller/volume/attachdetach/cache/desired_state_of_world.go
Expand Up @@ -105,6 +105,10 @@ type DesiredStateOfWorld interface {
// Mark multiattach error as reported to prevent spamming multiple
// events for same error
SetMultiAttachError(v1.UniqueVolumeName, k8stypes.NodeName)

// GetPodsOnNodes returns list of pods ("namespace/name") that require
// given volume on given nodes.
GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod
}

// VolumeToAttach represents a volume that should be attached to a node.
Expand Down Expand Up @@ -412,3 +416,24 @@ func (dsw *desiredStateOfWorld) GetPodToAdd() map[types.UniquePodName]PodToAdd {
}
return pods
}

func (dsw *desiredStateOfWorld) GetVolumePodsOnNodes(nodes []k8stypes.NodeName, volumeName v1.UniqueVolumeName) []*v1.Pod {
dsw.RLock()
defer dsw.RUnlock()

pods := []*v1.Pod{}
for _, nodeName := range nodes {
node, ok := dsw.nodesManaged[nodeName]
if !ok {
continue
}
volume, ok := node.volumesToAttach[volumeName]
if !ok {
continue
}
for _, pod := range volume.scheduledPods {
pods = append(pods, pod.podObj)
}
}
return pods
}
Expand Up @@ -1032,3 +1032,49 @@ func verifyVolumeToAttach(

t.Fatalf("volumesToAttach (%v) should contain %q/%q. It does not.", volumesToAttach, expectedVolumeName, expectedNodeName)
}

func Test_GetPodsOnNodes(t *testing.T) {
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := NewDesiredStateOfWorld(volumePluginMgr)

// 2 nodes, each with one pod with a different volume
node1Name := k8stypes.NodeName("node1-name")
pod1Name := "pod1-uid"
volume1Name := v1.UniqueVolumeName("volume1-name")
volume1Spec := controllervolumetesting.GetTestVolumeSpec(string(volume1Name), volume1Name)
dsw.AddNode(node1Name, false /*keepTerminatedPodVolumes*/)
generatedVolume1Name, podAddErr := dsw.AddPod(types.UniquePodName(pod1Name), controllervolumetesting.NewPod(pod1Name, pod1Name), volume1Spec, node1Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod1Name,
podAddErr)
}
node2Name := k8stypes.NodeName("node2-name")
pod2Name := "pod2-uid"
volume2Name := v1.UniqueVolumeName("volume2-name")
volume2Spec := controllervolumetesting.GetTestVolumeSpec(string(volume2Name), volume2Name)
dsw.AddNode(node2Name, false /*keepTerminatedPodVolumes*/)
_, podAddErr = dsw.AddPod(types.UniquePodName(pod2Name), controllervolumetesting.NewPod(pod2Name, pod2Name), volume2Spec, node2Name)
if podAddErr != nil {
t.Fatalf(
"AddPod failed for pod %q. Expected: <no error> Actual: <%v>",
pod2Name,
podAddErr)
}

// Third node without any pod
node3Name := k8stypes.NodeName("node3-name")
dsw.AddNode(node3Name, false /*keepTerminatedPodVolumes*/)

// Act
pods := dsw.GetVolumePodsOnNodes([]k8stypes.NodeName{node1Name, node2Name, node3Name, "non-existing-node"}, generatedVolume1Name)

// Assert
if len(pods) != 1 {
t.Fatalf("Expected 1 pod, got %d", len(pods))
}
if pods[0].Name != pod1Name {
t.Errorf("Expected pod %s/%s, got %s", pod1Name, pod1Name, pods[0].Name)
}
}
2 changes: 2 additions & 0 deletions pkg/controller/volume/attachdetach/reconciler/BUILD
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/volume/util/operationexecutor:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
Expand All @@ -34,6 +35,7 @@ go_test(
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/statusupdater:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume/testing:go_default_library",
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/types:go_default_library",
Expand Down
81 changes: 76 additions & 5 deletions pkg/controller/volume/attachdetach/reconciler/reconciler.go
Expand Up @@ -21,10 +21,12 @@ package reconciler

import (
"fmt"
"strings"
"time"

"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
Expand Down Expand Up @@ -269,12 +271,8 @@ func (rc *reconciler) attachDesiredVolumes() {
nodes := rc.actualStateOfWorld.GetNodesForVolume(volumeToAttach.VolumeName)
if len(nodes) > 0 {
if !volumeToAttach.MultiAttachErrorReported {
simpleMsg, detailedMsg := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
for _, pod := range volumeToAttach.ScheduledPods {
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}
rc.reportMultiAttachError(volumeToAttach, nodes)
rc.desiredStateOfWorld.SetMultiAttachError(volumeToAttach.VolumeName, volumeToAttach.NodeName)
glog.Warningf(detailedMsg)
}
continue
}
Expand All @@ -292,5 +290,78 @@ func (rc *reconciler) attachDesiredVolumes() {
glog.Errorf(volumeToAttach.GenerateErrorDetailed("attacherDetacher.AttachVolume failed to start", err).Error())
}
}
}

// reportMultiAttachError sends events and logs situation that a volume that
// should be attached to a node is already attached to different node(s).
func (rc *reconciler) reportMultiAttachError(volumeToAttach cache.VolumeToAttach, nodes []types.NodeName) {
// Filter out the current node from list of nodes where the volume is
// attached.
// Some methods need []string, some other needs []NodeName, collect both.
// In theory, these arrays should have always only one element - the
// controller does not allow more than one attachment. But use array just
// in case...
otherNodes := []types.NodeName{}
otherNodesStr := []string{}
for _, node := range nodes {
if node != volumeToAttach.NodeName {
otherNodes = append(otherNodes, node)
otherNodesStr = append(otherNodesStr, string(node))
}
}

// Get list of pods that use the volume on the other nodes.
pods := rc.desiredStateOfWorld.GetVolumePodsOnNodes(otherNodes, volumeToAttach.VolumeName)

if len(pods) == 0 {
// We did not find any pods that requests the volume. The pod must have been deleted already.
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", "Volume is already exclusively attached to one node and can't be attached to another")
for _, pod := range volumeToAttach.ScheduledPods {
rc.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}
// Log detailed message to system admin
nodeList := strings.Join(otherNodesStr, ", ")
detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already exclusively attached to node %s and can't be attached to another", nodeList))
glog.Warningf(detailedMsg)
return
}

// There are pods that require the volume and run on another node. Typically
// it's user error, e.g. a ReplicaSet uses a PVC and has >1 replicas. Let
// the user know what pods are blocking the volume.
for _, scheduledPod := range volumeToAttach.ScheduledPods {
// Each scheduledPod must get a custom message. They can run in
// different namespaces and user of a namespace should not see names of
// pods in other namespaces.
localPodNames := []string{} // Names of pods in scheduledPods's namespace
otherPods := 0 // Count of pods in other namespaces
for _, pod := range pods {
if pod.Namespace == scheduledPod.Namespace {
localPodNames = append(localPodNames, pod.Name)
} else {
otherPods++
}
}

var msg string
if len(localPodNames) > 0 {
msg = fmt.Sprintf("Volume is already used by pod(s) %s", strings.Join(localPodNames, ", "))
if otherPods > 0 {
msg = fmt.Sprintf("%s and %d pod(s) in different namespaces", msg, otherPods)
}
} else {
// No local pods, there are pods only in different namespaces.
msg = fmt.Sprintf("Volume is already used by %d pod(s) in different namespaces", otherPods)
}
simpleMsg, _ := volumeToAttach.GenerateMsg("Multi-Attach error", msg)
rc.recorder.Eventf(scheduledPod, v1.EventTypeWarning, kevents.FailedAttachVolume, simpleMsg)
}

// Log all pods for system admin
podNames := []string{}
for _, pod := range pods {
podNames = append(podNames, pod.Namespace+"/"+pod.Name)
}
detailedMsg := volumeToAttach.GenerateMsgDetailed("Multi-Attach error", fmt.Sprintf("Volume is already used by pods %s on node %s", strings.Join(podNames, ", "), strings.Join(otherNodesStr, ", ")))
glog.Warningf(detailedMsg)
}
110 changes: 110 additions & 0 deletions pkg/controller/volume/attachdetach/reconciler/reconciler_test.go
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing"
stringutil "k8s.io/kubernetes/pkg/util/strings"
volumetesting "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
"k8s.io/kubernetes/pkg/volume/util/types"
Expand Down Expand Up @@ -531,6 +532,115 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing.
waitForTotalAttachCallCount(t, 2 /* expectedAttachCallCount */, fakePlugin)
}

func Test_ReportMultiAttachError(t *testing.T) {
type nodeWithPods struct {
name k8stypes.NodeName
podNames []string
}
tests := []struct {
name string
nodes []nodeWithPods
expectedEvents []string
}{
{
"no pods use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already exclusively attached to one node and can't be attached to another"},
},
{
"pods in the same namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns1/pod2"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod2"},
},
{
"pods in anotother namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns2/pod2"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by 1 pod(s) in different namespaces"},
},
{
"pods both in the same and anotother namespace use the volume",
[]nodeWithPods{
{"node1", []string{"ns1/pod1"}},
{"node2", []string{"ns2/pod2"}},
{"node3", []string{"ns1/pod3"}},
},
[]string{"Warning FailedAttachVolume Multi-Attach error for volume \"volume-name\" Volume is already used by pod(s) pod3 and 1 pod(s) in different namespaces"},
},
}

for _, test := range tests {
// Arrange
t.Logf("Test %q starting", test.name)
volumePluginMgr, _ := volumetesting.GetTestVolumePluginMgr(t)
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
asw := cache.NewActualStateOfWorld(volumePluginMgr)
fakeKubeClient := controllervolumetesting.CreateTestClient()
fakeRecorder := record.NewFakeRecorder(100)
fakeHandler := volumetesting.NewBlockVolumePathHandler()
ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeKubeClient,
volumePluginMgr,
fakeRecorder,
false, /* checkNodeCapabilitiesBeforeMount */
fakeHandler))
nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */)
rc := NewReconciler(
reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder)

nodes := []k8stypes.NodeName{}
for _, n := range test.nodes {
dsw.AddNode(n.name, false /*keepTerminatedPodVolumes*/)
nodes = append(nodes, n.name)
for _, podName := range n.podNames {
volumeName := v1.UniqueVolumeName("volume-name")
volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName)
volumeSpec.PersistentVolume.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}
uid := string(n.name) + "-" + podName // unique UID
namespace, name := stringutil.SplitQualifiedName(podName)
pod := controllervolumetesting.NewPod(uid, name)
pod.Namespace = namespace
_, err := dsw.AddPod(types.UniquePodName(uid), pod, volumeSpec, n.name)
if err != nil {
t.Fatalf("Error adding pod %s to DSW: %s", podName, err)
}
}
}
// Act
volumes := dsw.GetVolumesToAttach()
for _, vol := range volumes {
if vol.NodeName == "node1" {
rc.(*reconciler).reportMultiAttachError(vol, nodes)
}
}

// Assert
close(fakeRecorder.Events)
index := 0
for event := range fakeRecorder.Events {
if len(test.expectedEvents) < index {
t.Errorf("Test %q: unexpected event received: %s", test.name, event)
} else {
expectedEvent := test.expectedEvents[index]
if expectedEvent != event {
t.Errorf("Test %q: event %d: expected %q, got %q", test.name, index, expectedEvent, event)
}
}
index++
}
for i := index; i < len(test.expectedEvents); i++ {
t.Errorf("Test %q: event %d: expected %q, got none", test.name, i, test.expectedEvents[i])
}
}
}

func waitForMultiAttachErrorOnNode(
t *testing.T,
attachedNode k8stypes.NodeName,
Expand Down

0 comments on commit e46c886

Please sign in to comment.