Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs in daemonset controller and e2e tests #14550

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/controllermanager/controllermanager.go
Expand Up @@ -29,6 +29,7 @@ import (
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
"k8s.io/kubernetes/pkg/controller/daemon"
kendpoint "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/pkg/controller/node"
Expand All @@ -47,7 +48,6 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/controller/daemon"
)

// CMServer is the main context object for the controller manager.
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller_utils.go
Expand Up @@ -291,12 +291,12 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod
GenerateName: prefix,
},
}
if len(nodeName) != 0 {
pod.Spec.NodeName = nodeName
}
if err := api.Scheme.Convert(&template.Spec, &pod.Spec); err != nil {
return fmt.Errorf("unable to convert pod template: %v", err)
}
if len(nodeName) != 0 {
pod.Spec.NodeName = nodeName
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was convert doing here? Was it writing an empty nodeName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Pods had to be scheduled by the scheduler. Your "does daemon pod exist for node" check returned false until the scheduler was done => many new daemon pods are created meanwhile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But node controller should not have created a pod without NodeName set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erictune not sure I understand your comment. Why the node controller? The daemon controller calls this.

if labels.Set(pod.Labels).AsSelector().Empty() {
return fmt.Errorf("unable to create pods, no labels")
}
Expand Down
132 changes: 75 additions & 57 deletions test/e2e/daemon_set.go
Expand Up @@ -18,6 +18,8 @@ package e2e

import (
"fmt"
"reflect"
"strings"
"time"

"k8s.io/kubernetes/pkg/api"
Expand All @@ -35,21 +37,24 @@ import (
)

const (
updateRetryPeriod = 5 * time.Second
updateRetryTimeout = 30 * time.Second
updateRetryPeriod = 5 * time.Second
updateRetryTimeout = 30 * time.Second
daemonsetLabelPrefix = "daemonset-"
daemonsetNameLabel = daemonsetLabelPrefix + "name"
daemonsetColorLabel = daemonsetLabelPrefix + "color"
)

var _ = Describe("Daemon set", func() {
f := &Framework{BaseName: "daemonsets"}

BeforeEach(func() {
f.beforeEach()
err := clearNodeLabels(f.Client)
err := clearDaemonSetNodeLabels(f.Client)
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
err := clearNodeLabels(f.Client)
err := clearDaemonSetNodeLabels(f.Client)
Expect(err).NotTo(HaveOccurred())
f.afterEach()
})
Expand All @@ -59,37 +64,74 @@ var _ = Describe("Daemon set", func() {
})
})

func clearNodeLabels(c *client.Client) error {
func separateDaemonSetNodeLabels(labels map[string]string) (map[string]string, map[string]string) {
daemonSetLabels := map[string]string{}
otherLabels := map[string]string{}
for k, v := range labels {
if strings.HasPrefix(k, daemonsetLabelPrefix) {
daemonSetLabels[k] = v
} else {
otherLabels[k] = v
}
}
return daemonSetLabels, otherLabels
}

func clearDaemonSetNodeLabels(c *client.Client) error {
nodeClient := c.Nodes()
nodeList, err := nodeClient.List(labels.Everything(), fields.Everything())
if err != nil {
return err
}
for _, node := range nodeList.Items {
if len(node.Labels) != 0 {
node.Labels = map[string]string{}
var newNode *api.Node
err = wait.Poll(updateRetryPeriod, updateRetryTimeout, func() (bool, error) {
newNode, err = nodeClient.Update(&node)
if err == nil {
return true, err
}
if se, ok := err.(*apierrs.StatusError); ok && se.ErrStatus.Reason == unversioned.StatusReasonConflict {
Logf("failed to update node due to resource version conflict")
return false, nil
}
return false, err
})
if err != nil {
return err
} else if len(newNode.Labels) != 0 {
return fmt.Errorf("Could not make node labels nil.")
}
_, err := setDaemonSetNodeLabels(c, node.Name, map[string]string{})
if err != nil {
return err
}
}
return nil
}

func setDaemonSetNodeLabels(c *client.Client, nodeName string, labels map[string]string) (*api.Node, error) {
nodeClient := c.Nodes()
var newNode *api.Node
var newLabels map[string]string
err := wait.Poll(updateRetryPeriod, updateRetryTimeout, func() (bool, error) {
node, err := nodeClient.Get(nodeName)
if err != nil {
return false, err
}

// remove all labels this test is creating
daemonSetLabels, otherLabels := separateDaemonSetNodeLabels(node.Labels)
if reflect.DeepEqual(daemonSetLabels, labels) {
newNode = node
return true, nil
}
node.Labels = otherLabels
for k, v := range labels {
node.Labels[k] = v
}
newNode, err = nodeClient.Update(node)
if err == nil {
newLabels, _ = separateDaemonSetNodeLabels(newNode.Labels)
return true, err
}
if se, ok := err.(*apierrs.StatusError); ok && se.ErrStatus.Reason == unversioned.StatusReasonConflict {
Logf("failed to update node due to resource version conflict")
return false, nil
}
return false, err
})
if err != nil {
return nil, err
} else if len(newLabels) != len(labels) {
return nil, fmt.Errorf("Could not set daemon set test labels as expected.")
}

return newNode, nil
}

func checkDaemonPodOnNodes(f *Framework, selector map[string]string, nodeNames []string) func() (bool, error) {
return func() (bool, error) {
podList, err := f.Client.Pods(f.Namespace.Name).List(labels.Set(selector).AsSelector(), fields.Everything())
Expand Down Expand Up @@ -140,7 +182,7 @@ func testDaemonSets(f *Framework) {
c := f.Client
simpleDSName := "simple-daemon-set"
image := "gcr.io/google_containers/serve_hostname:1.1"
label := map[string]string{"name": simpleDSName}
label := map[string]string{daemonsetNameLabel: simpleDSName}
retryTimeout := 1 * time.Minute
retryInterval := 5 * time.Second

Expand Down Expand Up @@ -195,8 +237,8 @@ func testDaemonSets(f *Framework) {
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive")

complexDSName := "complex-daemon-set"
complexLabel := map[string]string{"name": complexDSName}
nodeSelector := map[string]string{"color": "blue"}
complexLabel := map[string]string{daemonsetNameLabel: complexDSName}
nodeSelector := map[string]string{daemonsetColorLabel: "blue"}
Logf("Creating daemon with a node selector %s", complexDSName)
_, err = c.DaemonSets(ns).Create(&experimental.DaemonSet{
ObjectMeta: api.ObjectMeta{
Expand Down Expand Up @@ -231,40 +273,16 @@ func testDaemonSets(f *Framework) {
nodeClient := c.Nodes()
nodeList, err := nodeClient.List(labels.Everything(), fields.Everything())
Expect(len(nodeList.Items)).To(BeNumerically(">", 0))
nodeList.Items[0].Labels = nodeSelector
var newNode *api.Node
err = wait.Poll(updateRetryPeriod, updateRetryTimeout, func() (bool, error) {
newNode, err = nodeClient.Update(&nodeList.Items[0])
if err == nil {
return true, err
}
if se, ok := err.(*apierrs.StatusError); ok && se.ErrStatus.Reason == unversioned.StatusReasonConflict {
Logf("failed to update node due to resource version conflict")
return false, nil
}
return false, err
})
Expect(err).NotTo(HaveOccurred())
Expect(len(newNode.Labels)).To(Equal(1))
newNode, err := setDaemonSetNodeLabels(c, nodeList.Items[0].Name, nodeSelector)
Expect(err).NotTo(HaveOccurred(), "error setting labels on node")
daemonSetLabels, _ := separateDaemonSetNodeLabels(newNode.Labels)
Expect(len(daemonSetLabels)).To(Equal(1))
err = wait.Poll(retryInterval, retryTimeout, checkDaemonPodOnNodes(f, complexLabel, []string{newNode.Name}))
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pods to be running on new nodes")

By("remove the node selector and wait for daemons to be unscheduled")
newNode, err = nodeClient.Get(newNode.Name)
Expect(err).NotTo(HaveOccurred(), "error getting node")
newNode.Labels = map[string]string{}
err = wait.Poll(updateRetryPeriod, updateRetryTimeout, func() (bool, error) {
newNode, err = nodeClient.Update(newNode)
if err == nil {
return true, err
}
if se, ok := err.(*apierrs.StatusError); ok && se.ErrStatus.Reason == unversioned.StatusReasonConflict {
Logf("failed to update node due to resource version conflict")
return false, nil
}
return false, err
})
Expect(err).NotTo(HaveOccurred())
_, err = setDaemonSetNodeLabels(c, nodeList.Items[0].Name, map[string]string{})
Expect(err).NotTo(HaveOccurred(), "error removing labels on node")
Expect(wait.Poll(retryInterval, retryTimeout, checkRunningOnNoNodes(f, complexLabel))).
NotTo(HaveOccurred(), "error waiting for daemon pod to not be running on nodes")

Expand Down