Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small refactoring of scheduler predicates #34444

Merged
merged 1 commit into from Oct 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 8 additions & 9 deletions test/e2e/common/networking.go
Expand Up @@ -20,7 +20,6 @@ import (
. "github.com/onsi/ginkgo"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/test/e2e/framework"
networking_util "k8s.io/kubernetes/test/utils"
)

var _ = framework.KubeDescribe("Networking", func() {
Expand All @@ -32,30 +31,30 @@ var _ = framework.KubeDescribe("Networking", func() {
// expect exactly one unique hostname. Each of these endpoints reports
// its own hostname.
It("should function for intra-pod communication: http [Conformance]", func() {
config := networking_util.NewCoreNetworkingTestConfig(f)
config := framework.NewCoreNetworkingTestConfig(f)
for _, endpointPod := range config.EndpointPods {
config.DialFromTestContainer("http", endpointPod.Status.PodIP, networking_util.EndpointHttpPort, config.MaxTries, 0, sets.NewString(endpointPod.Name))
config.DialFromTestContainer("http", endpointPod.Status.PodIP, framework.EndpointHttpPort, config.MaxTries, 0, sets.NewString(endpointPod.Name))
}
})

It("should function for intra-pod communication: udp [Conformance]", func() {
config := networking_util.NewCoreNetworkingTestConfig(f)
config := framework.NewCoreNetworkingTestConfig(f)
for _, endpointPod := range config.EndpointPods {
config.DialFromTestContainer("udp", endpointPod.Status.PodIP, networking_util.EndpointUdpPort, config.MaxTries, 0, sets.NewString(endpointPod.Name))
config.DialFromTestContainer("udp", endpointPod.Status.PodIP, framework.EndpointUdpPort, config.MaxTries, 0, sets.NewString(endpointPod.Name))
}
})

It("should function for node-pod communication: http [Conformance]", func() {
config := networking_util.NewCoreNetworkingTestConfig(f)
config := framework.NewCoreNetworkingTestConfig(f)
for _, endpointPod := range config.EndpointPods {
config.DialFromNode("http", endpointPod.Status.PodIP, networking_util.EndpointHttpPort, config.MaxTries, 0, sets.NewString(endpointPod.Name))
config.DialFromNode("http", endpointPod.Status.PodIP, framework.EndpointHttpPort, config.MaxTries, 0, sets.NewString(endpointPod.Name))
}
})

It("should function for node-pod communication: udp [Conformance]", func() {
config := networking_util.NewCoreNetworkingTestConfig(f)
config := framework.NewCoreNetworkingTestConfig(f)
for _, endpointPod := range config.EndpointPods {
config.DialFromNode("udp", endpointPod.Status.PodIP, networking_util.EndpointUdpPort, config.MaxTries, 0, sets.NewString(endpointPod.Name))
config.DialFromNode("udp", endpointPod.Status.PodIP, framework.EndpointUdpPort, config.MaxTries, 0, sets.NewString(endpointPod.Name))
}
})
})
Expand Down
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package utils
package framework

import (
"encoding/json"
Expand All @@ -34,7 +34,6 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/uuid"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
)

const (
Expand All @@ -57,15 +56,15 @@ const (
)

// NewNetworkingTestConfig creates and sets up a new test config helper.
func NewNetworkingTestConfig(f *framework.Framework) *NetworkingTestConfig {
func NewNetworkingTestConfig(f *Framework) *NetworkingTestConfig {
config := &NetworkingTestConfig{f: f, Namespace: f.Namespace.Name}
By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
config.setup(getServiceSelector())
return config
}

// NewNetworkingTestNodeE2EConfig creates and sets up a new test config helper for Node E2E.
func NewCoreNetworkingTestConfig(f *framework.Framework) *NetworkingTestConfig {
func NewCoreNetworkingTestConfig(f *Framework) *NetworkingTestConfig {
config := &NetworkingTestConfig{f: f, Namespace: f.Namespace.Name}
By(fmt.Sprintf("Performing setup for networking test in namespace %v", config.Namespace))
config.setupCore(getServiceSelector())
Expand Down Expand Up @@ -94,8 +93,8 @@ type NetworkingTestConfig struct {
// test config. Each invocation of `setup` creates a service with
// 1 pod per node running the netexecImage.
EndpointPods []*api.Pod
f *framework.Framework
podClient *framework.PodClient
f *Framework
podClient *PodClient
// NodePortService is a Service with Type=NodePort spanning over all
// endpointPods.
NodePortService *api.Service
Expand Down Expand Up @@ -134,10 +133,10 @@ func (config *NetworkingTestConfig) diagnoseMissingEndpoints(foundEndpoints sets
if foundEndpoints.Has(e.Name) {
continue
}
framework.Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name)
desc, _ := framework.RunKubectl(
Logf("\nOutput of kubectl describe pod %v/%v:\n", e.Namespace, e.Name)
desc, _ := RunKubectl(
"describe", "pod", e.Name, fmt.Sprintf("--namespace=%v", e.Namespace))
framework.Logf(desc)
Logf(desc)
}
}

Expand Down Expand Up @@ -179,11 +178,11 @@ func (config *NetworkingTestConfig) DialFromContainer(protocol, containerIP, tar
// A failure to kubectl exec counts as a try, not a hard fail.
// Also note that we will keep failing for maxTries in tests where
// we confirm unreachability.
framework.Logf("Failed to execute %q: %v, stdout: %q, stderr %q", cmd, err, stdout, stderr)
Logf("Failed to execute %q: %v, stdout: %q, stderr %q", cmd, err, stdout, stderr)
} else {
var output map[string][]string
if err := json.Unmarshal([]byte(stdout), &output); err != nil {
framework.Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
Logf("WARNING: Failed to unmarshal curl response. Cmd %v run in %v, output: %s, err: %v",
cmd, config.HostTestContainerPod.Name, stdout, err)
continue
}
Expand All @@ -195,7 +194,7 @@ func (config *NetworkingTestConfig) DialFromContainer(protocol, containerIP, tar
}
}
}
framework.Logf("Waiting for endpoints: %v", expectedEps.Difference(eps))
Logf("Waiting for endpoints: %v", expectedEps.Difference(eps))

// Check against i+1 so we exit if minTries == maxTries.
if (eps.Equal(expectedEps) || eps.Len() == 0 && expectedEps.Len() == 0) && i+1 >= minTries {
Expand All @@ -204,7 +203,7 @@ func (config *NetworkingTestConfig) DialFromContainer(protocol, containerIP, tar
}

config.diagnoseMissingEndpoints(eps)
framework.Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", minTries, cmd, eps, expectedEps)
Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", minTries, cmd, eps, expectedEps)
}

// DialFromNode executes a tcp or udp request based on protocol via kubectl exec
Expand Down Expand Up @@ -237,14 +236,14 @@ func (config *NetworkingTestConfig) DialFromNode(protocol, targetIP string, targ
// A failure to exec command counts as a try, not a hard fail.
// Also note that we will keep failing for maxTries in tests where
// we confirm unreachability.
framework.Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", filterCmd, err, stdout, stderr)
Logf("Failed to execute %q: %v, stdout: %q, stderr: %q", filterCmd, err, stdout, stderr)
} else {
trimmed := strings.TrimSpace(stdout)
if trimmed != "" {
eps.Insert(trimmed)
}
}
framework.Logf("Waiting for %+v endpoints, got endpoints %+v", expectedEps.Difference(eps), eps)
Logf("Waiting for %+v endpoints, got endpoints %+v", expectedEps.Difference(eps), eps)

// Check against i+1 so we exit if minTries == maxTries.
if (eps.Equal(expectedEps) || eps.Len() == 0 && expectedEps.Len() == 0) && i+1 >= minTries {
Expand All @@ -253,7 +252,7 @@ func (config *NetworkingTestConfig) DialFromNode(protocol, targetIP string, targ
}

config.diagnoseMissingEndpoints(eps)
framework.Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", minTries, cmd, eps, expectedEps)
Failf("Failed to find expected endpoints:\nTries %d\nCommand %v\nretrieved %v\nexpected %v\n", minTries, cmd, eps, expectedEps)
}

// GetSelfURL executes a curl against the given path via kubectl exec into a
Expand All @@ -262,7 +261,7 @@ func (config *NetworkingTestConfig) DialFromNode(protocol, targetIP string, targ
func (config *NetworkingTestConfig) GetSelfURL(path string, expected string) {
cmd := fmt.Sprintf("curl -q -s --connect-timeout 1 http://localhost:10249%s", path)
By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
stdout := framework.RunHostCmdOrDie(config.Namespace, config.HostTestContainerPod.Name, cmd)
stdout := RunHostCmdOrDie(config.Namespace, config.HostTestContainerPod.Name, cmd)
Expect(strings.Contains(stdout, expected)).To(BeTrue())
}

Expand Down Expand Up @@ -380,31 +379,31 @@ func (config *NetworkingTestConfig) DeleteNodePortService() {

func (config *NetworkingTestConfig) createTestPods() {
testContainerPod := config.createTestPodSpec()
hostTestContainerPod := framework.NewHostExecPodSpec(config.Namespace, hostTestPodName)
hostTestContainerPod := NewHostExecPodSpec(config.Namespace, hostTestPodName)

config.createPod(testContainerPod)
config.createPod(hostTestContainerPod)

framework.ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name))
framework.ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name))
ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name))
ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name))

var err error
config.TestContainerPod, err = config.getPodClient().Get(testContainerPod.Name)
if err != nil {
framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
}

config.HostTestContainerPod, err = config.getPodClient().Get(hostTestContainerPod.Name)
if err != nil {
framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
}
}

func (config *NetworkingTestConfig) createService(serviceSpec *api.Service) *api.Service {
_, err := config.getServiceClient().Create(serviceSpec)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))

err = framework.WaitForService(config.f.Client, config.Namespace, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
err = WaitForService(config.f.Client, config.Namespace, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))

createdService, err := config.getServiceClient().Get(serviceSpec.Name)
Expand Down Expand Up @@ -432,12 +431,12 @@ func (config *NetworkingTestConfig) setup(selector map[string]string) {
config.setupCore(selector)

By("Getting node addresses")
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client))
nodeList := framework.GetReadySchedulableNodesOrDie(config.f.Client)
config.ExternalAddrs = framework.NodeAddresses(nodeList, api.NodeExternalIP)
ExpectNoError(WaitForAllNodesSchedulable(config.f.Client))
nodeList := GetReadySchedulableNodesOrDie(config.f.Client)
config.ExternalAddrs = NodeAddresses(nodeList, api.NodeExternalIP)
if len(config.ExternalAddrs) < 2 {
// fall back to legacy IPs
config.ExternalAddrs = framework.NodeAddresses(nodeList, api.NodeLegacyHostIP)
config.ExternalAddrs = NodeAddresses(nodeList, api.NodeLegacyHostIP)
}
Expect(len(config.ExternalAddrs)).To(BeNumerically(">=", 2), fmt.Sprintf("At least two nodes necessary with an external or LegacyHostIP"))
config.Nodes = nodeList.Items
Expand Down Expand Up @@ -483,8 +482,8 @@ func shuffleNodes(nodes []api.Node) []api.Node {
}

func (config *NetworkingTestConfig) createNetProxyPods(podName string, selector map[string]string) []*api.Pod {
framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client))
nodeList := framework.GetReadySchedulableNodesOrDie(config.f.Client)
ExpectNoError(WaitForAllNodesSchedulable(config.f.Client))
nodeList := GetReadySchedulableNodesOrDie(config.f.Client)

// To make this test work reasonably fast in large clusters,
// we limit the number of NetProxyPods to no more than 100 ones
Expand All @@ -507,9 +506,9 @@ func (config *NetworkingTestConfig) createNetProxyPods(podName string, selector
// wait that all of them are up
runningPods := make([]*api.Pod, 0, len(nodes))
for _, p := range createdPods {
framework.ExpectNoError(config.f.WaitForPodReady(p.Name))
ExpectNoError(config.f.WaitForPodReady(p.Name))
rp, err := config.getPodClient().Get(p.Name)
framework.ExpectNoError(err)
ExpectNoError(err)
runningPods = append(runningPods, rp)
}

Expand All @@ -521,14 +520,14 @@ func (config *NetworkingTestConfig) DeleteNetProxyPod() {
config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0))
config.EndpointPods = config.EndpointPods[1:]
// wait for pod being deleted.
err := framework.WaitForPodToDisappear(config.f.Client, config.Namespace, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout)
err := WaitForPodToDisappear(config.f.Client, config.Namespace, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout)
if err != nil {
framework.Failf("Failed to delete %s pod: %v", pod.Name, err)
Failf("Failed to delete %s pod: %v", pod.Name, err)
}
// wait for endpoint being removed.
err = framework.WaitForServiceEndpointsNum(config.f.Client, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
err = WaitForServiceEndpointsNum(config.f.Client, config.Namespace, nodePortServiceName, len(config.EndpointPods), time.Second, wait.ForeverTestTimeout)
if err != nil {
framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
}
// wait for kube-proxy to catch up with the pod being deleted.
time.Sleep(5 * time.Second)
Expand All @@ -538,7 +537,7 @@ func (config *NetworkingTestConfig) createPod(pod *api.Pod) *api.Pod {
return config.getPodClient().Create(pod)
}

func (config *NetworkingTestConfig) getPodClient() *framework.PodClient {
func (config *NetworkingTestConfig) getPodClient() *PodClient {
if config.podClient == nil {
config.podClient = config.f.PodClient()
}
Expand Down
54 changes: 8 additions & 46 deletions test/e2e/framework/util.go
Expand Up @@ -79,6 +79,7 @@ import (
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
testutil "k8s.io/kubernetes/test/utils"

"github.com/blang/semver"
"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -3022,64 +3023,25 @@ func WaitForAllNodesSchedulable(c *client.Client) error {
})
}

func AddOrUpdateLabelOnNode(c *client.Client, nodeName string, labelKey string, labelValue string) {
patch := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, labelKey, labelValue)
var err error
for attempt := 0; attempt < UpdateRetries; attempt++ {
err = c.Patch(api.MergePatchType).Resource("nodes").Name(nodeName).Body([]byte(patch)).Do().Error()
if err != nil {
if !apierrs.IsConflict(err) {
ExpectNoError(err)
} else {
Logf("Conflict when trying to add a label %v:%v to %v", labelKey, labelValue, nodeName)
}
} else {
break
}
time.Sleep(100 * time.Millisecond)
}
ExpectNoError(err)
func AddOrUpdateLabelOnNode(c clientset.Interface, nodeName string, labelKey, labelValue string) {
ExpectNoError(testutil.AddLabelsToNode(c, nodeName, map[string]string{labelKey: labelValue}))
}

func ExpectNodeHasLabel(c *client.Client, nodeName string, labelKey string, labelValue string) {
func ExpectNodeHasLabel(c clientset.Interface, nodeName string, labelKey string, labelValue string) {
By("verifying the node has the label " + labelKey + " " + labelValue)
node, err := c.Nodes().Get(nodeName)
node, err := c.Core().Nodes().Get(nodeName)
ExpectNoError(err)
Expect(node.Labels[labelKey]).To(Equal(labelValue))
}

// RemoveLabelOffNode is for cleaning up labels temporarily added to node,
// won't fail if target label doesn't exist or has been removed.
func RemoveLabelOffNode(c *client.Client, nodeName string, labelKey string) {
func RemoveLabelOffNode(c clientset.Interface, nodeName string, labelKey string) {
By("removing the label " + labelKey + " off the node " + nodeName)
var nodeUpdated *api.Node
var node *api.Node
var err error
for attempt := 0; attempt < UpdateRetries; attempt++ {
node, err = c.Nodes().Get(nodeName)
ExpectNoError(err)
if node.Labels == nil || len(node.Labels[labelKey]) == 0 {
return
}
delete(node.Labels, labelKey)
nodeUpdated, err = c.Nodes().Update(node)
if err != nil {
if !apierrs.IsConflict(err) {
ExpectNoError(err)
} else {
Logf("Conflict when trying to remove a label %v from %v", labelKey, nodeName)
}
} else {
break
}
time.Sleep(100 * time.Millisecond)
}
ExpectNoError(err)
ExpectNoError(testutil.RemoveLabelOffNode(c, nodeName, []string{labelKey}))

By("verifying the node doesn't have the label " + labelKey)
if nodeUpdated.Labels != nil && len(nodeUpdated.Labels[labelKey]) != 0 {
Failf("Failed removing label " + labelKey + " of the node " + nodeName)
}
ExpectNoError(testutil.VerifyLabelsRemoved(c, nodeName, []string{labelKey}))
}

func AddOrUpdateTaintOnNode(c *client.Client, nodeName string, taint api.Taint) {
Expand Down