Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #37093 #40069

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ const (
// An annotation on the Service denoting if the endpoints controller should
// go ahead and create endpoints for unready pods. This annotation is
// currently only used by StatefulSets, where we need the pod to be DNS
// resolvable during initialization. In this situation we create a headless
// service just for the StatefulSet, and clients shouldn't be using this Service
// for anything so unready endpoints don't matter.
// resolvable during initialization and termination. In this situation we
// create a headless Service just for the StatefulSet, and clients shouldn't
// be using this Service for anything so unready endpoints don't matter.
// Endpoints of these Services retain their DNS records and continue
// receiving traffic for the Service from the moment the kubelet starts all
// containers in the pod and marks it "Running", till the kubelet stops all
// containers and deletes the pod from the apiserver.
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
)

Expand Down Expand Up @@ -402,7 +406,7 @@ func (e *EndpointController) syncService(key string) error {
glog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
if pod.DeletionTimestamp != nil {
if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
glog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
continue
}
Expand Down
101 changes: 94 additions & 7 deletions test/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ var _ = framework.KubeDescribe("Services", func() {
})

It("should create endpoints for unready pods", func() {
serviceName := "never-ready"
serviceName := "tolerate-unready"
ns := f.Namespace.Name

t := NewServerTest(cs, ns, serviceName)
Expand All @@ -1052,22 +1052,49 @@ var _ = framework.KubeDescribe("Services", func() {
}
}()

service := t.BuildServiceSpec()
service.Annotations = map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"}
t.name = "slow-terminating-unready-pod"
t.image = "gcr.io/google_containers/netexec:1.7"
port := 80
terminateSeconds := int64(600)

service := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: t.ServiceName,
Namespace: t.Namespace,
Annotations: map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"},
},
Spec: api.ServiceSpec{
Selector: t.Labels,
Ports: []api.ServicePort{{
Name: "http",
Port: int32(port),
TargetPort: intstr.FromInt(port),
}},
},
}
rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, api.Container{
Args: []string{fmt.Sprintf("--http-port=%d", port)},
Name: t.name,
Image: t.image,
Ports: []api.ContainerPort{{ContainerPort: int32(80), Protocol: api.ProtocolTCP}},
Ports: []api.ContainerPort{{ContainerPort: int32(port), Protocol: api.ProtocolTCP}},
ReadinessProbe: &api.Probe{
Handler: api.Handler{
Exec: &api.ExecAction{
Command: []string{"/bin/false"},
},
},
},
Lifecycle: &api.Lifecycle{
PreStop: &api.Handler{
Exec: &api.ExecAction{
Command: []string{"/bin/sleep", fmt.Sprintf("%d", terminateSeconds)},
},
},
},
}, nil)
rcSpec.Spec.Template.Spec.TerminationGracePeriodSeconds = &terminateSeconds

By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
By(fmt.Sprintf("creating RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
_, err := t.createRC(rcSpec)
ExpectNoError(err)

Expand All @@ -1079,10 +1106,10 @@ var _ = framework.KubeDescribe("Services", func() {
ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1))

svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name)
By("waiting for endpoints of Service with DNS name " + svcName)
By("Waiting for endpoints of Service with DNS name " + svcName)

execPodName := createExecPodOrFail(f.ClientSet, f.Namespace.Name, "execpod-")
cmd := fmt.Sprintf("wget -qO- %v", svcName)
cmd := fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port)
var stdout string
if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
var err error
Expand All @@ -1095,6 +1122,66 @@ var _ = framework.KubeDescribe("Services", func() {
}); pollErr != nil {
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
}

By("Scaling down replication controler to zero")
framework.ScaleRC(f.ClientSet, t.Namespace, rcSpec.Name, 0, false)

By("Update service to not tolerate unready services")
_, err = updateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *api.Service) {
s.ObjectMeta.Annotations[endpoint.TolerateUnreadyEndpointsAnnotation] = "false"
})
framework.ExpectNoError(err)

By("Check if pod is unreachable")
cmd = fmt.Sprintf("wget -qO- -T 2 http://%s:%d/; test \"$?\" -eq \"1\"", svcName, port)
if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
var err error
stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
if err != nil {
framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
return false, nil
}
return true, nil
}); pollErr != nil {
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
}

By("Update service to tolerate unready services again")
_, err = updateService(f.ClientSet, t.Namespace, t.ServiceName, func(s *api.Service) {
s.ObjectMeta.Annotations[endpoint.TolerateUnreadyEndpointsAnnotation] = "true"
})
framework.ExpectNoError(err)

By("Check if terminating pod is available through service")
cmd = fmt.Sprintf("wget -qO- http://%s:%d/", svcName, port)
if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
var err error
stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
if err != nil {
framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
return false, nil
}
return true, nil
}); pollErr != nil {
framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
}

By("Remove pods immediately")
label := labels.SelectorFromSet(labels.Set(t.Labels))
options := api.ListOptions{LabelSelector: label}
podClient := t.Client.Core().Pods(f.Namespace.Name)
pods, err := podClient.List(options)
if err != nil {
framework.Logf("warning: error retrieving pods: %s", err)
} else {
for _, pod := range pods.Items {
var gracePeriodSeconds int64 = 0
err := podClient.Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
if err != nil {
framework.Logf("warning: error force deleting pod '%s': %s", pod.Name, err)
}
}
}
})

It("should only allow access from service loadbalancer source ranges [Slow]", func() {
Expand Down