Skip to content

Commit

Permalink
Allow daemonsets to use affinity to limit nodes they run on (#990)
Browse files Browse the repository at this point in the history
For daemonsets, node affinity can be set to limit/select the nodes
which the daemonset will run on. Since this is a native k8s feature
we should support it.

However, we use 'node-plugin' as a key in a few places which need
to understand that not every node will be considered.

The upstream logic for this node selection is in the core scheduling
packages and not readily consumable as a module. As a result, the
most reasonable path forward is to implement a small bit of the
desired functionality on our end.

For now, we just care about the RequiredDuringSchedulingIgnoredDuringExecution
field and only for node labels. This allows picking out nodes based on
labels which may satisfy most use cases.

Fixes #988

Signed-off-by: John Schnake <jschnake@vmware.com>
  • Loading branch information
johnSchnake committed Nov 12, 2019
1 parent 73cdb50 commit c5ad70d
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 1 deletion.
69 changes: 69 additions & 0 deletions pkg/plugin/driver/daemonset/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewPlugin(dfn manifest.Manifest, namespace, sonobuoyImage, imagePullPolicy,

// ExpectedResults returns the list of results expected for this daemonset.
func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult {
nodes = p.filterByNodeSelector(nodes)
ret := make([]plugin.ExpectedResult, 0, len(nodes))

for _, node := range nodes {
Expand All @@ -82,6 +83,26 @@ func (p *Plugin) ExpectedResults(nodes []v1.Node) []plugin.ExpectedResult {
return ret
}

// filterByNodeSelector will filter the list of nodes to just the ones matching the affinity of the plugin.
func (p *Plugin) filterByNodeSelector(nodes []v1.Node) []v1.Node {
ps := p.Base.Definition.PodSpec
if ps == nil ||
ps.Affinity == nil ||
ps.Affinity.NodeAffinity == nil ||
ps.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
return nodes
}

retNodes := []v1.Node{}
nodeSelector := ps.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
for _, node := range nodes {
if nodeMatchesNodeSelector(&node, nodeSelector) {
retNodes = append(retNodes, node)
}
}
return retNodes
}

func (p *Plugin) createDaemonSetDefinition(hostname string, cert *tls.Certificate, ownerPod *v1.Pod, progressPort string) appsv1.DaemonSet {
ds := appsv1.DaemonSet{}
annotations := map[string]string{
Expand Down Expand Up @@ -222,6 +243,7 @@ func (p *Plugin) findDaemonSet(kubeclient kubernetes.Interface) (*appsv1.DaemonS
// Monitor adheres to plugin.Interface by ensuring the DaemonSet is correctly
// configured and that each pod is running normally.
func (p *Plugin) Monitor(ctx context.Context, kubeclient kubernetes.Interface, availableNodes []v1.Node, resultsCh chan<- *plugin.Result) {
availableNodes = p.filterByNodeSelector(availableNodes)
podsReported := make(map[string]bool)
podsFound := make(map[string]bool, len(availableNodes))
for _, node := range availableNodes {
Expand Down Expand Up @@ -322,6 +344,11 @@ func (p *Plugin) monitorOnce(kubeclient kubernetes.Interface, availableNodes []v
}
}

// The main caller, Monitor, filters the list typically before passing it to us. However,
// the cost of the call is pretty small and this aids in testing to ensure that we do
// not error when the plugin isn't targeting that node.
availableNodes = p.filterByNodeSelector(availableNodes)

// DaemonSets are a bit strange, if node taints are preventing
// scheduling, pods won't even be created (unlike say Jobs,
// which will create the pod and leave it in an unscheduled
Expand Down Expand Up @@ -351,3 +378,45 @@ func makeErrorResultsForNodes(resultType string, errdata map[string]interface{},
}
return results
}

// nodeMatchesNodeSelector checks if a node's labels satisfy a node selector. It is a simplification
// of upstream logic in `k8s.io/kubernetes` which isn't intended for consumption as a module.
// If the nodeSelector has multiple expressions/terms; this method returns the union of the nodes
// satisfying the individual terms.
func nodeMatchesNodeSelector(node *v1.Node, sel *v1.NodeSelector) bool {
for _, term := range sel.NodeSelectorTerms {
// We only support MatchExpressions at this time.
for _, exp := range term.MatchExpressions {
switch exp.Operator {
case v1.NodeSelectorOpExists:
if _, ok := node.Labels[exp.Key]; ok {
return true
}
case v1.NodeSelectorOpDoesNotExist:
if _, ok := node.Labels[exp.Key]; !ok {
return true
}
case v1.NodeSelectorOpIn:
if val, ok := node.Labels[exp.Key]; ok && stringInList(exp.Values, val) {
return true
}
case v1.NodeSelectorOpNotIn:
if val, ok := node.Labels[exp.Key]; !ok || !stringInList(exp.Values, val) {
return true
}
default:
continue
}
}
}
return false
}

func stringInList(list []string, s string) bool {
for _, v := range list {
if v == s {
return true
}
}
return false
}
139 changes: 138 additions & 1 deletion pkg/plugin/driver/daemonset/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"testing"

"github.com/vmware-tanzu/sonobuoy/pkg/backplane/ca"
"github.com/vmware-tanzu/sonobuoy/pkg/plugin"
"github.com/vmware-tanzu/sonobuoy/pkg/plugin/driver"
"github.com/vmware-tanzu/sonobuoy/pkg/plugin/manifest"

"github.com/kylelemons/godebug/pretty"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -374,6 +376,23 @@ func TestMonitorOnce(t *testing.T) {
// We will need to be able to grab these items repeatedly and tweak the minor details
// so these helpers make the test cases much more readable.
testPlugin := &Plugin{Base: driver.Base{Definition: manifest.Manifest{SonobuoyConfig: manifest.SonobuoyConfig{PluginName: "myPlugin"}}}}
testPluginWithAffinity := &Plugin{Base: driver.Base{Definition: manifest.Manifest{SonobuoyConfig: manifest.SonobuoyConfig{PluginName: "myPlugin"}}}}
testPluginWithAffinity.Base.Definition.PodSpec = &manifest.PodSpec{
PodSpec: corev1.PodSpec{
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{{Key: "foo", Operator: corev1.NodeSelectorOpExists}},
},
},
},
},
},
},
}

validDS := appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"sonobuoy-run": ""}},
}
Expand All @@ -397,7 +416,7 @@ func TestMonitorOnce(t *testing.T) {
default3Nodes := []corev1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node3"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: map[string]string{"foo": "bar"}}},
}

testCases := []struct {
Expand Down Expand Up @@ -440,6 +459,15 @@ func TestMonitorOnce(t *testing.T) {
"No pod was scheduled on node node2 within 2562047h47m16.854775807s. Check tolerations for plugin myPlugin",
"No pod was scheduled on node node3 within 2562047h47m16.854775807s. Check tolerations for plugin myPlugin",
},
}, {
desc: "Missing pods results in errors for each only if targeting those nodes",
nodes: default3Nodes,
dsPlugin: testPluginWithAffinity,
dsOnServer: &appsv1.DaemonSetList{Items: []appsv1.DaemonSet{validDS}},
podsOnServer: &corev1.PodList{},
expectErrResultMsgs: []string{
"No pod was scheduled on node node3 within 2562047h47m16.854775807s. Check tolerations for plugin myPlugin",
},
}, {
desc: "Failing pod results in error",
nodes: default3Nodes,
Expand Down Expand Up @@ -512,3 +540,112 @@ func TestMonitorOnce(t *testing.T) {
})
}
}

func TestExpectedResults(t *testing.T) {
testNodes := []corev1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node2", Labels: map[string]string{"foo": "bar"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node3", Labels: map[string]string{"foo": "baz"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "node4", Labels: map[string]string{"foo": "bar2"}}},
}

pluginWithAffinity := func(reqs []corev1.NodeSelectorRequirement) *Plugin {
p := &Plugin{
Base: driver.Base{
Definition: manifest.Manifest{
SonobuoyConfig: manifest.SonobuoyConfig{PluginName: "myPlugin"},
},
},
}
if len(reqs) > 0 {
p.Base.Definition.PodSpec = &manifest.PodSpec{
PodSpec: corev1.PodSpec{
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: reqs,
},
},
},
},
},
},
}
}
return p
}

testCases := []struct {
desc string
p *Plugin
expect []plugin.ExpectedResult
}{
{
desc: "Defaults to all nodes",
expect: []plugin.ExpectedResult{
{NodeName: "node1", ResultType: "myPlugin"},
{NodeName: "node2", ResultType: "myPlugin"},
{NodeName: "node3", ResultType: "myPlugin"},
{NodeName: "node4", ResultType: "myPlugin"},
},
p: pluginWithAffinity(nil),
}, {
desc: "Filters for label exists",
expect: []plugin.ExpectedResult{
{NodeName: "node2", ResultType: "myPlugin"},
{NodeName: "node3", ResultType: "myPlugin"},
{NodeName: "node4", ResultType: "myPlugin"},
},
p: pluginWithAffinity([]corev1.NodeSelectorRequirement{
{Key: "foo", Operator: corev1.NodeSelectorOpExists},
}),
}, {
desc: "Filters for label does not exist",
expect: []plugin.ExpectedResult{
{NodeName: "node1", ResultType: "myPlugin"},
},
p: pluginWithAffinity([]corev1.NodeSelectorRequirement{
{Key: "foo", Operator: corev1.NodeSelectorOpDoesNotExist},
}),
}, {
desc: "Filters for label value in",
expect: []plugin.ExpectedResult{
{NodeName: "node2", ResultType: "myPlugin"},
{NodeName: "node3", ResultType: "myPlugin"},
},
p: pluginWithAffinity([]corev1.NodeSelectorRequirement{
{Key: "foo", Operator: corev1.NodeSelectorOpIn, Values: []string{"bar", "baz"}},
}),
}, {
desc: "Filters for label value not in",
expect: []plugin.ExpectedResult{
{NodeName: "node1", ResultType: "myPlugin"},
{NodeName: "node4", ResultType: "myPlugin"},
},
p: pluginWithAffinity([]corev1.NodeSelectorRequirement{
{Key: "foo", Operator: corev1.NodeSelectorOpNotIn, Values: []string{"bar", "baz"}},
}),
}, {
desc: "Can combine filters as union",
expect: []plugin.ExpectedResult{
{NodeName: "node1", ResultType: "myPlugin"},
{NodeName: "node2", ResultType: "myPlugin"},
{NodeName: "node4", ResultType: "myPlugin"},
},
p: pluginWithAffinity([]corev1.NodeSelectorRequirement{
{Key: "foo", Operator: corev1.NodeSelectorOpNotIn, Values: []string{"bar", "baz"}},
{Key: "foo", Operator: corev1.NodeSelectorOpIn, Values: []string{"bar"}},
}),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
out := tc.p.ExpectedResults(testNodes)
if diff := pretty.Compare(tc.expect, out); diff != "" {
t.Fatalf("\n\n%s\n", diff)
}
})
}
}

0 comments on commit c5ad70d

Please sign in to comment.